-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[source-postgres] State counter on postgres #34724
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 242 commits
f8b2b84
27e3713
f9a270e
e65713f
34999c7
140246e
4be3cc8
2f3c0c6
78e2fff
f7efbe8
86e59a5
e5cc585
e31c6d4
592040b
cef20c1
901be76
04ce62c
0043b0d
9c4a67d
2eb8fc6
aa8a439
5e63c11
b05031a
e9d9b6d
7443c5c
9a9913e
b803c87
8f4a9ab
362aa55
dc5efe3
c264473
3ad32b8
4c0ad45
2cc43d1
f795787
a89b189
1e5fd15
88ca308
c2d1133
e322517
3defd6a
07ebba0
e95363e
fd584ec
0150119
84414ff
ddc5931
2ab4932
55a20bf
ad5b8b2
3e07333
6afffa8
aeb02c3
93c0636
17e1643
2834cb1
bfcb989
b8a1d9b
43eb37f
39d8ce3
430258e
b6804df
40d0d0e
2f31ed9
94dc2c7
da384f2
826a0dd
dfe4431
7c32974
48cf32a
23afdf0
4a83013
aaae2cc
e7d52cb
3f5d3ca
d8bc92a
09e38bf
54f75cb
b5d4d02
1ba0314
0c7d1b7
d8e481a
ff0eaaf
980c44a
94ff050
691151e
4d37be3
d7cb3e1
73bf03e
d184939
a271cb2
451e4a3
551d604
9c9af8c
4e32a1d
bb17282
9cb99f0
1f2cae7
4be0a32
fe911c0
4fd5cbb
e580413
124e97b
79a65cb
6856e0a
d0b2aa2
0a73648
538ed97
acc78ac
73aacfa
83a053f
504684c
72c17f2
996dbd3
bf0b2c7
7ac4e94
47eb7c7
789f7e2
cd25015
b87268c
4580069
0532d98
b67ea56
9febaa1
4f740d9
325bf3a
b160ca1
50d7ef9
a9eebec
0cf385a
8af1c62
c79b992
5c02e48
776431b
f6cfed7
c8899a1
cf675ac
400d9bc
75b708e
d06e932
a473372
f091ed0
acf26df
08a4711
eb90e4f
b575b9d
d16bcce
e20cc48
d0176aa
8211736
e8708ef
76cb392
63a19ad
2f39ab5
9ff2dc5
0152f54
aef2ba5
361a49d
cbcbd23
4787f1d
b76c159
1bc07d7
6ab732a
74c8e82
c9dc40d
e8bd7ac
b9e1260
bee7d1e
39cd8ba
2f47ca0
36b838f
bdfcac0
28ae2b1
9cf9722
48290df
d44aead
a3634fa
9b85af4
2b23970
36df5eb
dd408ab
9a2eb85
1650d43
9e56d7c
db06bf7
e863ed9
c7b0c99
38763e2
cdab3b3
fc1461f
bfd6df2
78f2e06
64f4669
a16e2dd
64c0191
6138875
40a0717
0811e5e
69b4097
df93630
5dfe997
e4288b8
1bd45b5
85da269
534b75e
be14d24
9f062ec
b10a72f
8b53a06
77f0a37
32f652a
2ea7924
dde42bb
9f30a20
895ffae
72598e1
3104017
a7e5eb6
c6fef7a
c979226
f98e6c2
0d0b03b
df5e8b9
dab133e
ef65f42
8475729
f8dc6fa
52ec1b1
c397bba
28c389c
ae60b6f
8c809a0
66e0a2d
08a45ec
d66ef87
75d3d97
4db2b89
2bc1fcb
a62850a
ca3da03
6a6533f
5d51662
dd8e352
2f04c55
e87adbf
c82d61a
ced4a8c
e7073cf
ddcda7b
d0ecd23
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,15 +10,17 @@ | |
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.cdk.db.jdbc.JdbcDatabase; | ||
import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants; | ||
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil; | ||
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; | ||
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator; | ||
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency; | ||
import io.airbyte.commons.stream.AirbyteStreamUtils; | ||
import io.airbyte.commons.util.AutoCloseableIterator; | ||
import io.airbyte.commons.util.AutoCloseableIterators; | ||
import io.airbyte.integrations.source.postgres.PostgresQueryUtils.TableBlockSize; | ||
import io.airbyte.integrations.source.postgres.PostgresType; | ||
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations.RowDataWithCtid; | ||
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus; | ||
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.protocol.models.CommonField; | ||
import io.airbyte.protocol.models.v0.AirbyteMessage; | ||
|
@@ -109,7 +111,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getInitialSyncCtidIterator( | |
tablesMaxTuple.orElseGet(() -> Map.of(pair, -1)).get(pair)); | ||
final AutoCloseableIterator<AirbyteMessageWithCtid> recordIterator = | ||
getRecordIterator(queryStream, streamName, namespace, emmitedAt.toEpochMilli()); | ||
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, pair); | ||
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream); | ||
final AutoCloseableIterator<AirbyteMessage> logAugmented = augmentWithLogs(recordAndMessageIterator, pair, streamName); | ||
iteratorList.add(logAugmented); | ||
|
||
|
@@ -165,21 +167,20 @@ private AutoCloseableIterator<AirbyteMessage> augmentWithLogs(final AutoCloseabl | |
} | ||
|
||
private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseableIterator<AirbyteMessageWithCtid> recordIterator, | ||
final AirbyteStreamNameNamespacePair pair) { | ||
final ConfiguredAirbyteStream airbyteStream) { | ||
|
||
final CtidStatus currentCtidStatus = ctidStateManager.getCtidStatus(pair); | ||
final JsonNode incrementalState = | ||
(currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair) | ||
: currentCtidStatus.getIncrementalState(); | ||
final Duration syncCheckpointDuration = | ||
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong()) | ||
: CtidStateIterator.SYNC_CHECKPOINT_DURATION; | ||
: DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit : perhaps we should create a new InitialLoadConstants.SYNC_CHECKPOINT_DURATIONS to have the ability to differentiate between different checkpointing constants? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a difference for doing initial load vs regular incremental load? Feels like we should do that only if we found that would be required. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No difference right now. I think we don't need to create additional ones. But maybe a todo or something to rename the file in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a to do. Will move after I checked in other iterators. |
||
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong() | ||
: CtidStateIterator.SYNC_CHECKPOINT_RECORDS; | ||
: DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS; | ||
|
||
ctidStateManager.setStreamStateIteratorFields(streamStateForIncrementalRunSupplier, fileNodeHandler); | ||
|
||
final AirbyteStreamNameNamespacePair pair = | ||
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); | ||
return AutoCloseableIterators.transformIterator( | ||
r -> new CtidStateIterator(r, pair, fileNodeHandler, ctidStateManager, incrementalState, | ||
syncCheckpointDuration, syncCheckpointRecords), | ||
r -> new SourceStateIterator(r, airbyteStream, ctidStateManager, new StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)), | ||
recordIterator, pair); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we enable it for the relevant paths (Postgres/MySQL CDC) and disable it for others? That way we have coverage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call!