Skip to content

Commit 172498d

Browse files
committed
initial plumbing
1 parent 40c1aac commit 172498d

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstopubsub/BigtableChangeStreamsToPubSub.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ private static PubsubMessage createTestChangeMessage(PubSubUtils pubSub) throws
560560
case PROTOCOL_BUFFERS:
561561
return pubSub.mapChangeJsonStringToPubSubMessageAsProto(mod.getChangeJson());
562562
case JSON:
563-
return pubSub.mapChangeJsonStringToPubSubMessageAsJson(mod.getChangeJson());
563+
return pubSub.mapChangeJsonStringToPubSubMessageAsJson(mod.getChangeJson(), null);
564564
default:
565565
throw new IllegalArgumentException(
566566
"Unexpected message format: " + pubSub.getDestination().getMessageFormat());

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstopubsub/FailsafePublisher.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,15 @@ public void tearDown() {
112112
}
113113

114114
@ProcessElement
115-
public void processElement(ProcessContext context) {
115+
public void processElement(ProcessContext context, OutputReceiver<String> dlq) {
116116
FailsafeElement<String, String> failsafeModJsonString = context.element();
117117

118118
try {
119-
PubsubMessage pubSubMessage = newPubsubMessage(failsafeModJsonString.getPayload());
119+
PubsubMessage pubSubMessage = newPubsubMessage(failsafeModJsonString.getPayload(), dlq);
120+
if (pubSubMessage == null) {
121+
return;
122+
}
123+
120124
throttled.success(LOG, publisher.publish(pubSubMessage).get());
121125
} catch (Exception e) {
122126
throttled.failure(LOG, e);
@@ -128,7 +132,8 @@ public void processElement(ProcessContext context) {
128132
}
129133

130134
/* Schema Details: */
131-
private PubsubMessage newPubsubMessage(String modJsonString) throws Exception {
135+
private PubsubMessage newPubsubMessage(String modJsonString, OutputReceiver<String> dlq)
136+
throws Exception {
132137
String changeJsonString = Mod.fromJson(modJsonString).getChangeJson();
133138
MessageFormat messageFormat = pubSubUtils.getDestination().getMessageFormat();
134139

@@ -138,7 +143,7 @@ private PubsubMessage newPubsubMessage(String modJsonString) throws Exception {
138143
case PROTOCOL_BUFFERS:
139144
return pubSubUtils.mapChangeJsonStringToPubSubMessageAsProto(changeJsonString);
140145
case JSON:
141-
return pubSubUtils.mapChangeJsonStringToPubSubMessageAsJson(changeJsonString);
146+
return pubSubUtils.mapChangeJsonStringToPubSubMessageAsJson(changeJsonString, dlq);
142147
default:
143148
final String errorMessage =
144149
"Invalid message format:"

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstopubsub/schemautils/PubSubUtils.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.avro.io.BinaryEncoder;
3838
import org.apache.avro.io.Encoder;
3939
import org.apache.avro.io.EncoderFactory;
40+
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
4041
import org.apache.commons.lang3.Validate;
4142
import org.json.JSONObject;
4243

@@ -256,8 +257,8 @@ public PubsubMessage mapChangeJsonStringToPubSubMessageAsAvro(String changeJsonS
256257
return PubsubMessage.newBuilder().setData(data).build();
257258
}
258259

259-
public PubsubMessage mapChangeJsonStringToPubSubMessageAsJson(String changeJsonSting)
260-
throws Exception {
260+
public PubsubMessage mapChangeJsonStringToPubSubMessageAsJson(
261+
String changeJsonSting, OutputReceiver<String> dlq) throws Exception {
261262
JSONObject changeJsonParsed = new JSONObject(changeJsonSting);
262263

263264
var changelogEntryTextBuilder =
@@ -310,7 +311,10 @@ public PubsubMessage mapChangeJsonStringToPubSubMessageAsJson(String changeJsonS
310311
value = (String) FORMATTERS.get(PubSubFields.VALUE_STRING).format(this, changeJsonParsed);
311312
}
312313
if (value == null) {
313-
// somehow go to DLQ
314+
if (dlq != null) {
315+
dlq.output(changeJsonSting);
316+
return null;
317+
}
314318
}
315319
changelogEntryTextBuilder.setValue(value);
316320
}

0 commit comments

Comments
 (0)