Skip to content

Commit c6d83b3

Browse files
authored
Fix per stream state protocol backward compatibility (#14032)
* rename state type field to fix backwards compatibility issue * replace usages of stateType with type
1 parent cb90d7b commit c6d83b3

File tree

22 files changed

+75
-75
lines changed

22 files changed

+75
-75
lines changed

airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ class AirbyteStateMessage(BaseModel):
281281
class Config:
282282
extra = Extra.allow
283283

284-
state_type: Optional[AirbyteStateType] = None
284+
type: Optional[AirbyteStateType] = None
285285
stream: Optional[AirbyteStreamState] = None
286286
global_: Optional[AirbyteGlobalState] = Field(None, alias="global")
287287
data: Optional[Dict[str, Any]] = Field(None, description="(Deprecated) the state data")

airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/StateMessageHelper.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ public static Optional<StateWrapper> getTypedState(final JsonNode state) {
3838
if (stateMessages.stream().anyMatch(streamMessage -> !streamMessage.getAdditionalProperties().isEmpty())) {
3939
return Optional.of(getLegacyStateWrapper(state));
4040
}
41-
if (stateMessages.size() == 1 && stateMessages.get(0).getStateType() == AirbyteStateType.GLOBAL) {
41+
if (stateMessages.size() == 1 && stateMessages.get(0).getType() == AirbyteStateType.GLOBAL) {
4242
return Optional.of(new StateWrapper()
4343
.withStateType(StateType.GLOBAL)
4444
.withGlobal(stateMessages.get(0)));
4545
} else if (stateMessages.size() >= 1
46-
&& stateMessages.stream().allMatch(stateMessage -> stateMessage.getStateType() == AirbyteStateType.STREAM)) {
46+
&& stateMessages.stream().allMatch(stateMessage -> stateMessage.getType() == AirbyteStateType.STREAM)) {
4747
return Optional.of(new StateWrapper()
4848
.withStateType(StateType.STREAM)
4949
.withStateMessages(stateMessages));

airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/StateMessageHelperTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void testLegacyInList() {
4545
@Test
4646
public void testGlobal() {
4747
final AirbyteStateMessage stateMessage = new AirbyteStateMessage()
48-
.withStateType(AirbyteStateType.GLOBAL)
48+
.withType(AirbyteStateType.GLOBAL)
4949
.withGlobal(
5050
new AirbyteGlobalState()
5151
.withSharedState(Jsons.emptyObject())
@@ -61,11 +61,11 @@ public void testGlobal() {
6161
@Test
6262
public void testStream() {
6363
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
64-
.withStateType(AirbyteStateType.STREAM)
64+
.withType(AirbyteStateType.STREAM)
6565
.withStream(
6666
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()));
6767
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
68-
.withStateType(AirbyteStateType.STREAM)
68+
.withType(AirbyteStateType.STREAM)
6969
.withStream(
7070
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()));
7171
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2)));
@@ -77,11 +77,11 @@ public void testStream() {
7777
@Test
7878
public void testInvalidMixedState() {
7979
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
80-
.withStateType(AirbyteStateType.STREAM)
80+
.withType(AirbyteStateType.STREAM)
8181
.withStream(
8282
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()));
8383
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
84-
.withStateType(AirbyteStateType.GLOBAL)
84+
.withType(AirbyteStateType.GLOBAL)
8585
.withGlobal(
8686
new AirbyteGlobalState()
8787
.withSharedState(Jsons.emptyObject())
@@ -95,15 +95,15 @@ public void testInvalidMixedState() {
9595
@Test
9696
public void testDuplicatedGlobalState() {
9797
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
98-
.withStateType(AirbyteStateType.GLOBAL)
98+
.withType(AirbyteStateType.GLOBAL)
9999
.withGlobal(
100100
new AirbyteGlobalState()
101101
.withSharedState(Jsons.emptyObject())
102102
.withStreamStates(Lists.newArrayList(
103103
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
104104
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
105105
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
106-
.withStateType(AirbyteStateType.GLOBAL)
106+
.withType(AirbyteStateType.GLOBAL)
107107
.withGlobal(
108108
new AirbyteGlobalState()
109109
.withSharedState(Jsons.emptyObject())

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/dest_state_lifecycle_manager/DefaultDestStateLifecycleManager.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public DefaultDestStateLifecycleManager() {
5151
@Override
5252
public void addState(final AirbyteMessage message) {
5353
Preconditions.checkArgument(message.getType() == Type.STATE, "Messages passed to State Manager must be of type STATE.");
54-
Preconditions.checkArgument(isStateTypeCompatible(stateType, message.getState().getStateType()));
54+
Preconditions.checkArgument(isStateTypeCompatible(stateType, message.getState().getType()));
5555

5656
setManagerStateTypeIfNotSet(message);
5757

@@ -83,10 +83,10 @@ private static boolean isStateTypeCompatible(final AirbyteStateType previousStat
8383
private void setManagerStateTypeIfNotSet(final AirbyteMessage message) {
8484
// detect and set state type.
8585
if (stateType == null) {
86-
if (message.getState().getStateType() == null) {
86+
if (message.getState().getType() == null) {
8787
stateType = AirbyteStateType.LEGACY;
8888
} else {
89-
stateType = message.getState().getStateType();
89+
stateType = message.getState().getType();
9090
}
9191
}
9292
}

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/dest_state_lifecycle_manager/DestStreamStateLifecycleManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public DestStreamStateLifecycleManager() {
4141

4242
@Override
4343
public void addState(final AirbyteMessage message) {
44-
Preconditions.checkArgument(message.getState().getStateType() == AirbyteStateType.STREAM);
44+
Preconditions.checkArgument(message.getState().getType() == AirbyteStateType.STREAM);
4545
streamToLastPendingState.put(message.getState().getStream().getStreamDescriptor(), message);
4646
}
4747

airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/dest_state_lifecycle_manager/DefaultDestStateLifecycleManagerTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ class DefaultDestStateLifecycleManagerTest {
2424
.withState(new AirbyteStateMessage());
2525
private static final AirbyteMessage LEGACY_MESSAGE = new AirbyteMessage()
2626
.withType(Type.STATE)
27-
.withState(new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY));
27+
.withState(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY));
2828
private static final AirbyteMessage GLOBAL_MESSAGE = new AirbyteMessage()
2929
.withType(Type.STATE)
30-
.withState(new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL));
30+
.withState(new AirbyteStateMessage().withType(AirbyteStateType.GLOBAL));
3131
private static final AirbyteMessage STREAM_MESSAGE = new AirbyteMessage()
3232
.withType(Type.STATE)
3333
.withState(new AirbyteStateMessage()
34-
.withStateType(AirbyteStateType.STREAM)
34+
.withType(AirbyteStateType.STREAM)
3535
.withStream(new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("users"))));
3636

3737
private DestStateLifecycleManager mgr1;

airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/dest_state_lifecycle_manager/DestSingleStateLifecycleManagerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ class DestSingleStateLifecycleManagerTest {
1919

2020
private static final AirbyteMessage MESSAGE1 = new AirbyteMessage()
2121
.withType(Type.STATE)
22-
.withState(new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withData(Jsons.jsonNode("a")));
22+
.withState(new AirbyteStateMessage().withType(AirbyteStateType.GLOBAL).withData(Jsons.jsonNode("a")));
2323
private static final AirbyteMessage MESSAGE2 = new AirbyteMessage()
2424
.withType(Type.STATE)
25-
.withState(new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withData(Jsons.jsonNode("b")));
25+
.withState(new AirbyteStateMessage().withType(AirbyteStateType.GLOBAL).withData(Jsons.jsonNode("b")));
2626

2727
private DestSingleStateLifecycleManager mgr;
2828

airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/dest_state_lifecycle_manager/DestStreamStateLifecycleManagerTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,17 @@ class DestStreamStateLifecycleManagerTest {
2323
private static final AirbyteMessage STREAM1_MESSAGE1 = new AirbyteMessage()
2424
.withType(Type.STATE)
2525
.withState(new AirbyteStateMessage()
26-
.withStateType(AirbyteStateType.STREAM)
26+
.withType(AirbyteStateType.STREAM)
2727
.withStream(new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("apples")).withStreamState(Jsons.jsonNode("a"))));
2828
private static final AirbyteMessage STREAM1_MESSAGE2 = new AirbyteMessage()
2929
.withType(Type.STATE)
3030
.withState(new AirbyteStateMessage()
31-
.withStateType(AirbyteStateType.STREAM)
31+
.withType(AirbyteStateType.STREAM)
3232
.withStream(new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("apples")).withStreamState(Jsons.jsonNode("b"))));
3333
private static final AirbyteMessage STREAM2_MESSAGE1 = new AirbyteMessage()
3434
.withType(Type.STATE)
3535
.withState(new AirbyteStateMessage()
36-
.withStateType(AirbyteStateType.STREAM)
36+
.withType(AirbyteStateType.STREAM)
3737
.withStream(
3838
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("bananas")).withStreamState(Jsons.jsonNode("10"))));
3939

airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,10 @@ protected List<AirbyteStateMessage> generateEmptyInitialState(final JsonNode con
137137
final AirbyteGlobalState globalState = new AirbyteGlobalState()
138138
.withSharedState(Jsons.jsonNode(new CdcState()))
139139
.withStreamStates(List.of());
140-
return List.of(new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withGlobal(globalState));
140+
return List.of(new AirbyteStateMessage().withType(AirbyteStateType.GLOBAL).withGlobal(globalState));
141141
} else {
142142
return List.of(new AirbyteStateMessage()
143-
.withStateType(AirbyteStateType.STREAM)
143+
.withType(AirbyteStateType.STREAM)
144144
.withStream(new AirbyteStreamState()));
145145
}
146146
}

airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -885,27 +885,27 @@ protected List<AirbyteMessage> createExpectedTestMessages(final List<DbStreamSta
885885
? states.stream()
886886
.map(s -> new AirbyteMessage().withType(Type.STATE)
887887
.withState(
888-
new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM)
888+
new AirbyteStateMessage().withType(AirbyteStateType.STREAM)
889889
.withStream(new AirbyteStreamState()
890890
.withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName()))
891891
.withStreamState(Jsons.jsonNode(s)))
892892
.withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(states)))))
893893
.collect(
894894
Collectors.toList())
895-
: List.of(new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY)
895+
: List.of(new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY)
896896
.withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(states)))));
897897
}
898898

899899
protected List<AirbyteStateMessage> createState(final List<DbStreamState> states) {
900900
return supportsPerStream()
901901
? states.stream()
902-
.map(s -> new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM)
902+
.map(s -> new AirbyteStateMessage().withType(AirbyteStateType.STREAM)
903903
.withStream(new AirbyteStreamState()
904904
.withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName()))
905905
.withStreamState(Jsons.jsonNode(s))))
906906
.collect(
907907
Collectors.toList())
908-
: List.of(new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY).withData(Jsons.jsonNode(new DbState().withStreams(states))));
908+
: List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY).withData(Jsons.jsonNode(new DbState().withStreams(states))));
909909
}
910910

911911
protected ConfiguredAirbyteStream createTableWithSpaces() throws SQLException {
@@ -1020,7 +1020,7 @@ protected boolean supportsPerStream() {
10201020
protected JsonNode createEmptyState(final String streamName, final String streamNamespace) {
10211021
if (supportsPerStream()) {
10221022
final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage()
1023-
.withStateType(AirbyteStateType.STREAM)
1023+
.withType(AirbyteStateType.STREAM)
10241024
.withStream(new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName(streamName).withNamespace(streamNamespace)));
10251025
return Jsons.jsonNode(List.of(airbyteStateMessage));
10261026
} else {
@@ -1049,14 +1049,14 @@ protected AirbyteMessage createStateMessage(final DbStreamState dbStreamState, f
10491049
if (supportsPerStream()) {
10501050
return new AirbyteMessage().withType(Type.STATE)
10511051
.withState(
1052-
new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM)
1052+
new AirbyteStateMessage().withType(AirbyteStateType.STREAM)
10531053
.withStream(new AirbyteStreamState()
10541054
.withStreamDescriptor(new StreamDescriptor().withNamespace(dbStreamState.getStreamNamespace())
10551055
.withName(dbStreamState.getStreamName()))
10561056
.withStreamState(Jsons.jsonNode(dbStreamState)))
10571057
.withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(legacyStates))));
10581058
} else {
1059-
return new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY)
1059+
return new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY)
10601060
.withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(legacyStates))));
10611061
}
10621062
}

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -417,10 +417,10 @@ protected List<AirbyteStateMessage> generateEmptyInitialState(final JsonNode con
417417
final AirbyteGlobalState globalState = new AirbyteGlobalState()
418418
.withSharedState(Jsons.jsonNode(new CdcState()))
419419
.withStreamStates(List.of());
420-
return List.of(new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withGlobal(globalState));
420+
return List.of(new AirbyteStateMessage().withType(AirbyteStateType.GLOBAL).withGlobal(globalState));
421421
} else {
422422
return List.of(new AirbyteStateMessage()
423-
.withStateType(AirbyteStateType.STREAM)
423+
.withType(AirbyteStateType.STREAM)
424424
.withStream(new AirbyteStreamState()));
425425
}
426426
}

airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ protected List<AirbyteStateMessage> deserializeInitialState(final JsonNode initi
528528
return Jsons.object(initialStateJson, new AirbyteStateMessageListTypeReference());
529529
} catch (final IllegalArgumentException e) {
530530
LOGGER.warn("Defaulting to legacy state object...");
531-
return List.of(new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY).withData(initialStateJson));
531+
return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY).withData(initialStateJson));
532532
}
533533
}
534534
}
@@ -541,7 +541,7 @@ protected List<AirbyteStateMessage> deserializeInitialState(final JsonNode initi
541541
*/
542542
protected List<AirbyteStateMessage> generateEmptyInitialState(final JsonNode config) {
543543
// For backwards compatibility with existing connectors
544-
return List.of(new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY).withData(Jsons.jsonNode(new DbState())));
544+
return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY).withData(Jsons.jsonNode(new DbState())));
545545
}
546546

547547
/**

airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public AirbyteStateMessage toState(final Optional<AirbyteStreamNameNamespacePair
7575
.withCdcState(getCdcStateManager().getCdcState());
7676

7777
return new AirbyteStateMessage()
78-
.withStateType(AirbyteStateType.GLOBAL)
78+
.withType(AirbyteStateType.GLOBAL)
7979
// Temporarily include legacy state for backwards compatibility with the platform
8080
.withData(Jsons.jsonNode(dbState))
8181
.withGlobal(globalState);
@@ -91,7 +91,7 @@ public AirbyteStateMessage toState(final Optional<AirbyteStreamNameNamespacePair
9191
* but may be empty.
9292
*/
9393
private CdcState extractCdcState(final AirbyteStateMessage airbyteStateMessage) {
94-
if (airbyteStateMessage.getStateType() == AirbyteStateType.GLOBAL) {
94+
if (airbyteStateMessage.getType() == AirbyteStateType.GLOBAL) {
9595
return Jsons.object(airbyteStateMessage.getGlobal().getSharedState(), CdcState.class);
9696
} else {
9797
return Jsons.object(airbyteStateMessage.getData(), DbState.class).getCdcState();
@@ -113,7 +113,7 @@ private static Supplier<Collection<AirbyteStreamState>> getStreamsSupplier(final
113113
* storing state in the legacy "data" field.
114114
*/
115115
return () -> {
116-
if (airbyteStateMessage.getStateType() == AirbyteStateType.GLOBAL) {
116+
if (airbyteStateMessage.getType() == AirbyteStateType.GLOBAL) {
117117
return airbyteStateMessage.getGlobal().getStreamStates();
118118
} else if (airbyteStateMessage.getData() != null) {
119119
return Jsons.object(airbyteStateMessage.getData(), DbState.class).getStreams().stream()

airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public AirbyteStateMessage toState(final Optional<AirbyteStreamNameNamespacePair
9494
.withCdcState(getCdcStateManager().getCdcState());
9595

9696
LOGGER.info("Generated legacy state for {} streams", dbState.getStreams().size());
97-
return new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY).withData(Jsons.jsonNode(dbState));
97+
return new AirbyteStateMessage().withType(AirbyteStateType.LEGACY).withData(Jsons.jsonNode(dbState));
9898
}
9999

100100
@Override

airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public static AirbyteStateMessage convertLegacyStateToGlobalState(final AirbyteS
180180
.withStreamState(Jsons.jsonNode(s)))
181181
.collect(
182182
Collectors.toList()));
183-
return new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withGlobal(globalState);
183+
return new AirbyteStateMessage().withType(AirbyteStateType.GLOBAL).withGlobal(globalState);
184184
}
185185

186186
/**
@@ -192,7 +192,7 @@ public static AirbyteStateMessage convertLegacyStateToGlobalState(final AirbyteS
192192
*/
193193
public static List<AirbyteStateMessage> convertGlobalStateToStreamState(final AirbyteStateMessage airbyteStateMessage) {
194194
return airbyteStateMessage.getGlobal().getStreamStates().stream()
195-
.map(s -> new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM)
195+
.map(s -> new AirbyteStateMessage().withType(AirbyteStateType.STREAM)
196196
.withStream(new AirbyteStreamState().withStreamDescriptor(s.getStreamDescriptor()).withStreamState(s.getStreamState())))
197197
.collect(Collectors.toList());
198198
}
@@ -206,7 +206,7 @@ public static List<AirbyteStateMessage> convertGlobalStateToStreamState(final Ai
206206
*/
207207
public static List<AirbyteStateMessage> convertLegacyStateToStreamState(final AirbyteStateMessage airbyteStateMessage) {
208208
return Jsons.object(airbyteStateMessage.getData(), DbState.class).getStreams().stream()
209-
.map(s -> new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM)
209+
.map(s -> new AirbyteStateMessage().withType(AirbyteStateType.STREAM)
210210
.withStream(new AirbyteStreamState()
211211
.withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName()))
212212
.withStreamState(Jsons.jsonNode(s))))

0 commit comments

Comments
 (0)