Skip to content

[source-postgres] State counter on postgres #34724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 252 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 242 commits
Commits
Show all changes
252 commits
Select commit Hold shift + click to select a range
f8b2b84
add comment on interface
xiaohansong Dec 11, 2023
27e3713
delete unrelated merge change
xiaohansong Dec 11, 2023
f9a270e
add unit test for sourceStateIterator
xiaohansong Dec 11, 2023
e65713f
cdk change - support ctid
xiaohansong Dec 12, 2023
34999c7
mysql related change, and support ctid
xiaohansong Dec 12, 2023
140246e
save work
xiaohansong Dec 12, 2023
4be3cc8
:wq
xiaohansong Jan 30, 2024
2f3c0c6
merge
xiaohansong Jan 30, 2024
78e2fff
revert unnecessary changes
xiaohansong Jan 30, 2024
f7efbe8
xmin tested
xiaohansong Jan 31, 2024
86e59a5
add/edit test for xmin
xiaohansong Feb 5, 2024
e5cc585
Source Faker: Declare primary keys (#34644)
aaronsteers Jan 30, 2024
e31c6d4
Destination Redshift - Bump CDK version to 0.16.3 (#34680)
jbfbell Jan 30, 2024
592040b
Destination Teradata: make connector avaialble on Airbyte Cloud (#28667)
marcosmarxm Jan 30, 2024
cef20c1
Support resuming initial snapshot when id type is String, Int, Long (…
rodireich Jan 30, 2024
901be76
Publish to pypi batch4 (#34666)
Jan 31, 2024
04ce62c
airbyte-ci: Test pypi published properly (#34689)
Jan 31, 2024
0043b0d
Publish to pypi batch5 (#34668)
Jan 31, 2024
9c4a67d
Publish to pypi batch6 (#34672)
Jan 31, 2024
2eb8fc6
Publish to pypi batch7 (#34673)
Jan 31, 2024
aa8a439
Kubernetes docs: update instructions to use external database (#34604)
marcosmarxm Jan 31, 2024
5e63c11
Update file-based connectors for compatibility with concurrent CDK (#…
clnoll Jan 31, 2024
b05031a
🚨🚨🐛Source Hubspot: update marketing_emails stream schema (#34492)
Jan 31, 2024
e9d9b6d
Publish to pypi batch8 (#34690)
Jan 31, 2024
7443c5c
Source Azure Table Storage: CDK Update (#34576)
ChristoGrab Jan 31, 2024
9a9913e
Publish to pypi batch9 (#34691)
Jan 31, 2024
b803c87
Build a resume token with a pipeline consisting of selected streams (…
rodireich Jan 31, 2024
8f4a9ab
Publish to pypi batch10 (#34692)
Jan 31, 2024
362aa55
Destination Postgres: Upgrade CDK with fixed dependency and unpin clo…
gisripa Jan 31, 2024
dc5efe3
CAT: fix NoAdditionalPropertiesValidator (#34709)
artem1205 Jan 31, 2024
c264473
airbyte-lib: Use connector metadata (#34697)
Jan 31, 2024
3ad32b8
remove alloydb once again (#34642)
erohmensing Feb 1, 2024
4c0ad45
Publish to pypi batch11 (#34694)
Feb 1, 2024
2cc43d1
Publish to pypi batch12 (#34664)
Feb 1, 2024
f795787
:bug: Source Bing Ads: fix duplicates in report-based streams (#34712)
darynaishchenko Feb 1, 2024
a89b189
🐛Source Amazon Seller Partner: fix date format in state message for s…
Feb 1, 2024
1e5fd15
airbyte-lib: Hidden documentation (#34702)
Feb 1, 2024
88ca308
Publish to pypi batch13 (#34658)
Feb 1, 2024
c2d1133
Publish to pypi batch14 (#34662)
Feb 1, 2024
e322517
🐛 Source Recharge: Added the UI toggle `Use 'Orders' Deprecated API` …
Feb 1, 2024
3defd6a
Bump source-mysql and source-postgres to cdk version 0.16.3 (#34723)
akashkulk Feb 1, 2024
07ebba0
Source S3: pin CDK and moto (#34665)
clnoll Feb 1, 2024
e95363e
Source Kyriba: Updates CDK, Increases Testing Coverage, Fixes Accepta…
pnilan Feb 1, 2024
fd584ec
CDK destinations: Future based output reader for T+D test (#34727)
gisripa Feb 1, 2024
0150119
Destination BigQuery: CDK updates for latest changes (#34728)
gisripa Feb 1, 2024
84414ff
🐛 Source Hubspot: Fix pagination for companies_properties_history str…
lazebnyi Feb 1, 2024
ddc5931
airbyte-ci: enable connectors tests in draft prs (#34756)
Feb 1, 2024
2ab4932
🐛 bump cdk versions for java sources (#34453)
chandlerprall Feb 1, 2024
55a20bf
Bump Airbyte version from 0.50.45 to 0.50.46
pmossman Feb 1, 2024
ad5b8b2
:bug: Source Google Analytics Data API: Add rounding integer values t…
tolik0 Feb 2, 2024
3e07333
🐛 Source GitHub: Continue Sync on Stream failure (#34700)
artem1205 Feb 2, 2024
6afffa8
Source Recharge: Fix airbyte-lib integration (#34772)
Feb 2, 2024
aeb02c3
add log4jConfig to docs (#34777)
colesnodgrass Feb 2, 2024
93c0636
AirbyteLib: fix examples in getting started (#34762)
aaronsteers Feb 2, 2024
17e1643
AirbyteLib: Add len() support on SQL datasets and Mapping behaviors f…
aaronsteers Feb 2, 2024
2834cb1
🎉 New Source: Microsoft SharePoint (#33537)
lazebnyi Feb 2, 2024
bfcb989
AirbyteLib: Treat error trace as logs (#34771)
Feb 2, 2024
b8a1d9b
Docs: Make sure reference section is always added (#34770)
Feb 2, 2024
43eb37f
AirbyteLib: Ignore unused Airbyte Protocol message types (#34779)
aaronsteers Feb 2, 2024
39d8ce3
📘Docs: Update source Linnworks page (#34788)
ChristoGrab Feb 2, 2024
430258e
Bump Airbyte version from 0.50.46 to 0.50.47
bgroff Feb 3, 2024
b6804df
AirbyteLib: Fix column count mismatch bug (#34783)
aaronsteers Feb 3, 2024
40d0d0e
AirbyteLib: friendly install and post-install messaging (#34816)
aaronsteers Feb 3, 2024
2f31ed9
AirbyteLib: Add pip_url helpers to streamline connector debugging (#…
aaronsteers Feb 5, 2024
94dc2c7
fix bullet style
aaronsteers Feb 5, 2024
da384f2
🐛 Source Coin API: Fix catalog typos (#34826)
aaronsteers Feb 5, 2024
826a0dd
Source Salesforce: concurrent incremental syncs (#33522)
clnoll Feb 5, 2024
dfe4431
Source S3: updates for compatibility with the concurrent CDK (#34591)
clnoll Feb 5, 2024
7c32974
:bug: Source Google Ads: Add ignore fields for multiple fields (#34844)
tolik0 Feb 5, 2024
48cf32a
Source Zoom: Disable pypi (#34848)
Feb 5, 2024
23afdf0
Source Gong: Adjust schemas (#34847)
Feb 5, 2024
4a83013
Kubernetes docs: external logs with S3 (#34621)
marcosmarxm Feb 5, 2024
aaae2cc
[source-mongodb-v2] : Fail sync if initial snapshot for any stream fa…
akashkulk Feb 5, 2024
e7d52cb
Docs: update pg13 requirement for external db (#34858)
marcosmarxm Feb 5, 2024
3f5d3ca
Improve error messages for concurrent CDK (#34754)
maxi297 Feb 5, 2024
d8bc92a
Update http-streams.md typo (#34861)
rwask Feb 5, 2024
09e38bf
🤖 Bump patch version of Python CDK
maxi297 Feb 5, 2024
54f75cb
rollback source-github to 1.5.7 (#34870)
pedroslopez Feb 5, 2024
b5d4d02
AirbyteLib: Support write strategies: 'merge' and 'auto' (#34592)
aaronsteers Feb 6, 2024
1ba0314
AirbyteLib: detect REPL and disable Rich.Live if so (#34782)
aaronsteers Feb 6, 2024
0c7d1b7
AirbyteLib: Add basic secrets management (#34822)
aaronsteers Feb 6, 2024
d8e481a
airbyte-lib: Add testing to connectors (#34044)
Feb 6, 2024
ff0eaaf
✨ Source S3: Add region to S3 source (#34842)
tolik0 Feb 6, 2024
980c44a
airbyte-lib: Fix processed records counter (#34857)
Feb 6, 2024
94ff050
airbyte-lib: Improve source factory (#34849)
Feb 6, 2024
691151e
airbyte-lib: Use proper segment key (#34863)
Feb 6, 2024
4d37be3
airbyte-ci: Make pypi publish enabled for certified connectors (#34836)
Feb 6, 2024
d7cb3e1
airbyte-lib: Stream state (#34778)
Feb 6, 2024
73bf03e
Source Zendesk Support: integration tests for empty streams (#34840)
roman-yermilov-gl Feb 6, 2024
d184939
✨ Source Stripe: Events stream concurrent on incremental syncs (#34619)
maxi297 Feb 6, 2024
a271cb2
S3 and Google Analytics v4: Enable pypi publishing (#34903)
Feb 6, 2024
451e4a3
✨ Source Google Analytics Data API: Add spec parameter to convert `co…
tolik0 Feb 6, 2024
551d604
Connector templates: Publish by default (#34766)
Feb 6, 2024
9c9af8c
File-based CDK: log warning on no sync mode instead of raising except…
clnoll Feb 6, 2024
4e32a1d
Remove connector ops team (#34867)
evantahler Feb 6, 2024
bb17282
🤖 Bump minor version of Python CDK
clnoll Feb 6, 2024
9cb99f0
Destination Postgres: Remove varchar limit of 64k, defaults to 10MiB …
gisripa Feb 6, 2024
1f2cae7
source-s3: pin to version 4.4.1 (#34926)
pedroslopez Feb 6, 2024
4be0a32
AirbyteLib: Case insensitive missing column checks, deterministic col…
aaronsteers Feb 6, 2024
fe911c0
CDK: fix flaky scenario-based tests by sorting on k & v (#34912)
clnoll Feb 6, 2024
4fd5cbb
File based sources fix sync mode bug (#34936)
clnoll Feb 7, 2024
e580413
🐛Source Amazon Seller Partner: fix date formatting for ledger reports…
Feb 7, 2024
124e97b
:bug: Source Facebook Marketing: Add missing fields (#34845)
tolik0 Feb 7, 2024
79a65cb
Source Hubspot: add notes about property history stream (#34915)
roman-yermilov-gl Feb 7, 2024
6856e0a
Source S3: bump CDK version to fix issue when SyncMode is missing fro…
clnoll Feb 7, 2024
d0b2aa2
internal poetry packages: declare poe tasks and airbyte-ci sections i…
alafanechere Feb 7, 2024
0a73648
Update reset.md (#34855)
evantahler Feb 7, 2024
538ed97
airbyte-ci: run poe tasks declared in pyproject.toml file of internal…
alafanechere Feb 7, 2024
acc78ac
✨ Source Google Analytics Data API: Replace convert_conversions_purch…
tolik0 Feb 7, 2024
73aacfa
fix version of airbyte-cdk in Dockerfile pip install (#34941)
lmossman Feb 7, 2024
83a053f
🤖 Bump patch version of Python CDK
lmossman Feb 7, 2024
504684c
Emit multiple error trace messages and continue syncs by default (#34…
brianjlai Feb 7, 2024
72c17f2
AirbyteLib: Improved progress print, especially in the terminal (#34973)
aaronsteers Feb 7, 2024
996dbd3
airbyte-ci: embed junit xml reports into user-facing html report (#3…
Feb 7, 2024
bf0b2c7
db-sources-java-cdk: fix logic to calculate the state stats count in …
subodh1810 Feb 7, 2024
7ac4e94
[ISSUE #34755] do not propagate parameters on InlineSchemaLoader (#34…
maxi297 Feb 7, 2024
47eb7c7
🤖 Bump patch version of Python CDK
maxi297 Feb 7, 2024
789f7e2
✨ Source Hubspot: Add contacts form submissions stream (#34829)
lazebnyi Feb 7, 2024
cd25015
make exclusive containers first class citizens (#34892)
stephane-airbyte Feb 7, 2024
b87268c
remove useLocalCdk for source-postgres (#34981)
stephane-airbyte Feb 7, 2024
4580069
source-postgres/mysql: fix wrong tags in metadata (#34980)
alafanechere Feb 7, 2024
0532d98
AirbyteLib: Require stream selection (#34979)
aaronsteers Feb 7, 2024
b67ea56
AirbyteLib: Add 'get_available_connectors()' option (#34982)
aaronsteers Feb 8, 2024
9febaa1
Revert "Emit multiple error trace messages and continue syncs by defa…
brianjlai Feb 8, 2024
4f740d9
File-based CDK: make incremental syncs concurrent (#34540)
clnoll Feb 8, 2024
325bf3a
AirbyteLib: Use case-insensitive method of finding column objects (#3…
aaronsteers Feb 8, 2024
b160ca1
🤖 Bump minor version of Python CDK
clnoll Feb 8, 2024
50d7ef9
improve startup performance of airbyte-ci (#34430)
stephane-airbyte Feb 8, 2024
a9eebec
airbyte-ci: only install main dependencies when calling poetry instal…
alafanechere Feb 8, 2024
0cf385a
source-google-sheets: use poetry for dependency management (#34944)
alafanechere Feb 8, 2024
8af1c62
Revert "source-google-sheets: use poetry for dependency management (#…
alafanechere Feb 8, 2024
c79b992
airbyte-ci: poetry install --no-root in builder (#35010)
alafanechere Feb 8, 2024
5c02e48
publish-workflow: expose airbyte-ci-binary-url input (#35011)
alafanechere Feb 8, 2024
776431b
source-google-sheets: use poetry for dependency management [2] (#35008)
alafanechere Feb 8, 2024
f6cfed7
✨ Source Amazon Ads: Add missing field to `sponsored_display_budget_r…
tolik0 Feb 8, 2024
c8899a1
CAT: Validate connector documentation (#34380)
darynaishchenko Feb 8, 2024
cf675ac
CDK: allow ConnectorStateManager stream_instance_map to take Configur…
clnoll Feb 8, 2024
400d9bc
🐛 Source Freshservice - add backoff policy to requested_items stream …
sajarin Feb 8, 2024
75b708e
Add Information about Snowflake Column Case change to V2 Doc (#34989)
jbfbell Feb 8, 2024
d06e932
fix low-code tags on source-sendgrid/sentry/intercom (#35028)
alafanechere Feb 8, 2024
a473372
Source Klaviyo: Add missing fields to stream schemas (#34998)
ChristoGrab Feb 8, 2024
f091ed0
Source Linnworks: CDK update (#34717)
ChristoGrab Feb 8, 2024
acf26df
Remove sources with LEGACY STATE from registry (#35038)
evantahler Feb 8, 2024
08a4711
🐛 Destination snowflake: use 200MB batches (#34502)
edgao Feb 8, 2024
eb90e4f
[source-postgres] : Provide option to advance LSN (#34781)
akashkulk Feb 8, 2024
b575b9d
airbyte-ci: fix missing test reports (#35039)
Feb 8, 2024
d16bcce
CAT: add validation for stream statuses (#34675)
artem1205 Feb 8, 2024
e20cc48
async-destination-framework: make GlobalAsyncStateManager entirely th…
subodh1810 Feb 8, 2024
d0176aa
Checkpointing source-mssql (#34182)
rodireich Feb 8, 2024
8211736
AirbyteLib: Show list of actually available connectors (#35018)
Feb 9, 2024
e8708ef
airbyte-lib: Clean up test schema in Snowflake (#35015)
Feb 9, 2024
76cb392
airbyte-lib: Fix telemetry for streaming (#34955)
Feb 9, 2024
63a19ad
Docs reference: Fix bug with hidden array item properties (#34946)
Feb 9, 2024
2f39ab5
Bump Airbyte version from 0.50.47 to 0.50.48
bgroff Feb 9, 2024
9ff2dc5
java CDK: clean up dependencies, refactor modules (#34745)
Feb 9, 2024
0152f54
undo useLocalCdk = true from #34745 (#35054)
Feb 9, 2024
aef2ba5
[Docs] January 2024 release notes (#34753)
nataliekwong Feb 9, 2024
361a49d
Update documentation for aws secret manager configs (#35057)
terencecho Feb 9, 2024
cbcbd23
✨ Source Bing Ads: Add ignore fields for ads stream (#35019)
tolik0 Feb 9, 2024
4787f1d
Delete resources/example/airflow (#35056)
evantahler Feb 9, 2024
b76c159
Source Monday: add integration tests (#35016)
roman-yermilov-gl Feb 9, 2024
1bc07d7
✨ Source Intercom: Add missing fields (#35063)
tolik0 Feb 9, 2024
6ab732a
:bug: Source Amazon Seller Partner: Fix check command to check access…
tolik0 Feb 9, 2024
74c8e82
[ISSUE #34910] add headers to HttpResponse for test framework (#35105)
maxi297 Feb 9, 2024
c9dc40d
🤖 Bump patch version of Python CDK
maxi297 Feb 9, 2024
e8bd7ac
✨ source-github: migrate to poetry (#35087)
alafanechere Feb 9, 2024
b9e1260
✨ source-greenhouse: migrate to poetry (#35077)
alafanechere Feb 9, 2024
bee7d1e
✨ source-stripe: migrate to poetry (#35068)
alafanechere Feb 9, 2024
39cd8ba
[Docs] Create homepage for Sources/Destinations (#34391)
nataliekwong Feb 9, 2024
2f47ca0
Destination redshift: Switch back to jooq execution; add e2e special …
edgao Feb 9, 2024
36b838f
destination-snowflake: adopt cleaned-up cdk (#34747)
Feb 9, 2024
bdfcac0
source-mongodb-v2: adopt cleaned-up cdk (#34748)
Feb 9, 2024
28ae2b1
source-postgres: adopt cleaned-up cdk (#34751)
Feb 9, 2024
9cf9722
source-mssql: adopt cleaned-up cdk (#34749)
Feb 9, 2024
48290df
source-mysql: adopt cleaned-up cdk (#34750)
Feb 9, 2024
d44aead
✨ source-linkedin-ads: migrate to poetry (#35086)
alafanechere Feb 9, 2024
a3634fa
Destination bigquery: update test fixtures (#34575)
edgao Feb 9, 2024
9b85af4
Source Google Ads: add test for empty streams (#34983)
artem1205 Feb 9, 2024
2b23970
[Source-mysql] Add soft link in mysql test db (#35045)
xiaohansong Feb 9, 2024
36df5eb
Destination snowflake: update test fixtures (#34574)
edgao Feb 9, 2024
dd408ab
airbyte-lib: Escape column names (#34969)
Feb 9, 2024
9a2eb85
Update docs reference from Java 17 to 21 (#34418)
ambirdsall Feb 9, 2024
1650d43
migrate everything from java 17 to java 21 (#35103)
Feb 10, 2024
9e56d7c
Update airbyte-protocol.md (#35123)
evantahler Feb 10, 2024
db06bf7
[docs] On final table re-creation (#35124)
evantahler Feb 10, 2024
e863ed9
✨ source-google-analytics-v4: migrate to poetry (#35101)
alafanechere Feb 12, 2024
c7b0c99
✨ source-mailchimp: migrate to poetry (#35092)
alafanechere Feb 12, 2024
38763e2
✨ source-klaviyo: migrate to poetry (#35088)
alafanechere Feb 12, 2024
cdab3b3
✨ source-zendesk-support: migrate to poetry (#35083)
alafanechere Feb 12, 2024
fc1461f
✨ source-marketo: migrate to poetry (#35078)
alafanechere Feb 12, 2024
bfd6df2
✨ source-paypal-transaction: migrate to poetry (#35075)
alafanechere Feb 12, 2024
78f2e06
✨ source-google-analytics-data-api: migrate to poetry (#35073)
alafanechere Feb 12, 2024
64f4669
destination-async-framework: use the value from stats counter for glo…
subodh1810 Feb 12, 2024
a16e2dd
source-stripe: fix license in `pyproject.toml` (#35137)
alafanechere Feb 12, 2024
64c0191
source-google-sheets: fix license in pyproject.toml (#35136)
alafanechere Feb 12, 2024
6138875
✨ Source Amazon Seller Partner: Add logs for the failed check command…
tolik0 Feb 12, 2024
40a0717
✨ source-surveymonkey: migrate to poetry (#35168)
alafanechere Feb 12, 2024
0811e5e
✨ source-monday: migrate to poetry (#35146)
alafanechere Feb 12, 2024
69b4097
✨ source-salesforce: migrate to poetry (#35147)
alafanechere Feb 12, 2024
df93630
✨ source-intercom: migrate to poetry (#35148)
alafanechere Feb 12, 2024
5dfe997
✨ source-iterable: migrate to poetry (#35150)
alafanechere Feb 12, 2024
e4288b8
✨ source-mixpanel: migrate to poetry (#35151)
alafanechere Feb 12, 2024
1bd45b5
✨ source-typeform: migrate to poetry (#35152)
alafanechere Feb 12, 2024
85da269
✨ source-twilio: migrate to poetry (#35153)
alafanechere Feb 12, 2024
534b75e
✨ source-notion: migrate to poetry (#35155)
alafanechere Feb 12, 2024
be14d24
✨ source-zendesk-talk: migrate to poetry (#35156)
alafanechere Feb 12, 2024
9f062ec
✨ source-amplitude: migrate to poetry (#35162)
alafanechere Feb 12, 2024
b10a72f
✨ source-jira: migrate to poetry (#35160)
alafanechere Feb 12, 2024
8b53a06
✨ source-google-ads: migrate to poetry (#35158)
alafanechere Feb 12, 2024
77f0a37
🐛 Source Slack: Join to the channels while `read` instead of `discove…
Feb 12, 2024
32f652a
✨ source-hubspot: migrate to poetry (#35165)
alafanechere Feb 12, 2024
2ea7924
✨ source-pinterest: migrate to poetry (#35159)
alafanechere Feb 12, 2024
dde42bb
✨ source-sentry: migrate to poetry (#35145)
alafanechere Feb 12, 2024
9f30a20
✨ source-chargebee: migrate to poetry (#35169)
alafanechere Feb 12, 2024
895ffae
source-snapchat-marketing: adopt our base image (#35170)
alafanechere Feb 12, 2024
72598e1
✨ source-snapchat-marketing: migrate to poetry (#35171)
alafanechere Feb 12, 2024
3104017
source-faker: adopt our base image (#35172)
alafanechere Feb 12, 2024
a7e5eb6
✨ source-faker: migrate to poetry (#35174)
alafanechere Feb 12, 2024
c6fef7a
✨ source-amazon-ads: migrate to poetry (#35180)
alafanechere Feb 12, 2024
c979226
Source Github: add integration tests (#34933)
artem1205 Feb 12, 2024
f98e6c2
✨ source-bing-ads: migrate to poetry (#35179)
alafanechere Feb 12, 2024
0d0b03b
✨ source-instagram: migrate to poetry (#35177)
alafanechere Feb 12, 2024
df5e8b9
rename
xiaohansong Feb 13, 2024
dab133e
✨ source-facebook-marketing: migrate to poetry (#35178)
alafanechere Feb 12, 2024
ef65f42
destination-async-framework: make emission of state from FlushWorkers…
subodh1810 Feb 12, 2024
8475729
Merge remote-tracking branch 'origin/master' into xiaohan/ctiditerator
xiaohansong Feb 13, 2024
f8dc6fa
save work
xiaohansong Feb 13, 2024
52ec1b1
fix a test bug
xiaohansong Feb 14, 2024
c397bba
save work
xiaohansong Feb 14, 2024
28c389c
Merge remote-tracking branch 'origin/master' into xiaohan/ctiditerator
xiaohansong Feb 21, 2024
ae60b6f
format
xiaohansong Feb 21, 2024
8c809a0
fix some tests
xiaohansong Feb 21, 2024
66e0a2d
Merge branch 'master' into xiaohan/ctiditerator
xiaohansong Feb 21, 2024
08a45ec
Merge remote-tracking branch 'origin/master' into xiaohan/ctiditerator
xiaohansong Feb 27, 2024
d66ef87
merge state manager together
xiaohansong Feb 27, 2024
75d3d97
Merge branch 'master' into xiaohan/ctiditerator
xiaohansong Feb 27, 2024
4db2b89
postgres use stream in iterator
xiaohansong Feb 28, 2024
2bc1fcb
Merge remote-tracking branch 'origin/master' into xiaohan/ctiditerator
xiaohansong Feb 29, 2024
a62850a
conform to new format, and auto-formatting
xiaohansong Feb 29, 2024
ca3da03
format, bump
xiaohansong Feb 29, 2024
6a6533f
more test
xiaohansong Mar 5, 2024
5d51662
xmin iterator follow state iterator
xiaohansong Mar 5, 2024
dd8e352
add todo
xiaohansong Mar 5, 2024
2f04c55
Merge branch 'master' into xiaohan/ctiditerator
xiaohansong Mar 6, 2024
e87adbf
Merge remote-tracking branch 'origin/master' into xiaohan/ctiditerator
xiaohansong Mar 6, 2024
c82d61a
bypass deprecation check to uncover other issues
xiaohansong Mar 6, 2024
ced4a8c
extend test for postgres to 5m
xiaohansong Mar 6, 2024
e7073cf
Merge remote-tracking branch 'origin/master' into xiaohan/ctiditerator
xiaohansong Mar 7, 2024
ddcda7b
upgrade cdk
xiaohansong Mar 7, 2024
d0ecd23
upgrade
xiaohansong Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -141,6 +142,15 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {

protected abstract void assertExpectedStateMessages(final List<AirbyteStateMessage> stateMessages);

// TODO: this assertion should be added into test cases in this class, we will need to implement
// corresponding iterator for other connectors before
// doing so.
protected void assertExpectedStateMessageCountMatches(final List<AirbyteStateMessage> stateMessages, long totalCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we enable it for the relevant paths (Postgres/MySQL CDC) and disable it for others? That way we have coverage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call!

AtomicLong count = new AtomicLong(0L);
stateMessages.stream().forEach(stateMessage -> count.addAndGet(stateMessage.getSourceStats().getRecordCount().longValue()));
assertEquals(totalCount, count.get());
}

@BeforeEach
protected void setup() {
testdb = createTestDatabase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.6'
cdkVersionRequired = '0.20.8'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
useLocalCdk = true
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.13
dockerImageTag: 3.3.14
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,32 @@
package io.airbyte.integrations.source.postgres.ctid;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CtidStateManager {
public abstract class CtidStateManager implements SourceStateMessageProducer<AirbyteMessageWithCtid> {

private static final Logger LOGGER = LoggerFactory.getLogger(CtidStateManager.class);

public static final long CTID_STATUS_VERSION = 2;
public static final String STATE_TYPE_KEY = "state_type";

protected final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus;
private Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;

private String lastCtid;
private FileNodeHandler fileNodeHandler;

protected CtidStateManager(final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus) {
this.pairToCtidStatus = pairToCtidStatus;
Expand All @@ -41,4 +55,65 @@ public static boolean validateRelationFileNode(final CtidStatus ctidstatus,

public abstract AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun);

public void setStreamStateIteratorFields(Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier,
FileNodeHandler fileNodeHandler) {
this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier;
this.fileNodeHandler = fileNodeHandler;
}

@Override
public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
stream.getStream().getNamespace());
final Long fileNode = fileNodeHandler.getFileNode(pair);
assert fileNode != null;
final CtidStatus ctidStatus = new CtidStatus()
.withVersion(CTID_STATUS_VERSION)
.withStateType(StateType.CTID)
.withCtid(lastCtid)
.withIncrementalState(getStreamState(pair))
.withRelationFilenode(fileNode);
LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus);
return createCtidStateMessage(pair, ctidStatus);
}

/**
* Stores the latest CTID.
*/
@Override
public AirbyteMessage processRecordMessage(final ConfiguredAirbyteStream stream, AirbyteMessageWithCtid message) {
if (Objects.nonNull(message.ctid())) {
this.lastCtid = message.ctid();
}
return message.recordMessage();
}

/**
* Creates a final state message for the stream.
*/
@Override
public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream stream) {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
stream.getStream().getNamespace());

final AirbyteStateMessage finalStateMessage = createFinalStateMessage(pair, getStreamState(pair));
LOGGER.info("Finished initial sync of stream {}, Emitting final state, state is {}", pair, finalStateMessage);
return finalStateMessage;
}

/**
* Extra criteria(besides checking frequency) to check if we should emit state message.
*/
@Override
public boolean shouldEmitStateMessage(final ConfiguredAirbyteStream stream) {
return Objects.nonNull(lastCtid)
&& StringUtils.isNotBlank(lastCtid);
}

private JsonNode getStreamState(final AirbyteStreamNameNamespacePair pair) {
final CtidStatus currentCtidStatus = getCtidStatus(pair);
return (currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair)
: currentCtidStatus.getIncrementalState();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants;
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.commons.stream.AirbyteStreamUtils;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.postgres.PostgresQueryUtils.TableBlockSize;
import io.airbyte.integrations.source.postgres.PostgresType;
import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations.RowDataWithCtid;
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -109,7 +111,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getInitialSyncCtidIterator(
tablesMaxTuple.orElseGet(() -> Map.of(pair, -1)).get(pair));
final AutoCloseableIterator<AirbyteMessageWithCtid> recordIterator =
getRecordIterator(queryStream, streamName, namespace, emmitedAt.toEpochMilli());
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, pair);
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream);
final AutoCloseableIterator<AirbyteMessage> logAugmented = augmentWithLogs(recordAndMessageIterator, pair, streamName);
iteratorList.add(logAugmented);

Expand Down Expand Up @@ -165,21 +167,20 @@ private AutoCloseableIterator<AirbyteMessage> augmentWithLogs(final AutoCloseabl
}

private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseableIterator<AirbyteMessageWithCtid> recordIterator,
final AirbyteStreamNameNamespacePair pair) {
final ConfiguredAirbyteStream airbyteStream) {

final CtidStatus currentCtidStatus = ctidStateManager.getCtidStatus(pair);
final JsonNode incrementalState =
(currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair)
: currentCtidStatus.getIncrementalState();
final Duration syncCheckpointDuration =
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: CtidStateIterator.SYNC_CHECKPOINT_DURATION;
: DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : perhaps we should create a new InitialLoadConstants.SYNC_CHECKPOINT_DURATIONS to have the ability to differentiate between different checkpointing constants?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a difference for doing initial load vs regular incremental load? Feels like we should do that only if we found that would be required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No difference right now. I think we don't need to create additional ones. But maybe a todo or something to rename the file in DebeziumIteratorConstants to just IteratorConstants

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a to do. Will move after I checked in other iterators.

final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: CtidStateIterator.SYNC_CHECKPOINT_RECORDS;
: DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS;

ctidStateManager.setStreamStateIteratorFields(streamStateForIncrementalRunSupplier, fileNodeHandler);

final AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
return AutoCloseableIterators.transformIterator(
r -> new CtidStateIterator(r, pair, fileNodeHandler, ctidStateManager, incrementalState,
syncCheckpointDuration, syncCheckpointRecords),
r -> new SourceStateIterator(r, airbyteStream, ctidStateManager, new StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)),
recordIterator, pair);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final AutoCloseableIterator<JsonNode> queryStream = queryTableXmin(selectedDatabaseFields, table.getNameSpace(), table.getName());
final AutoCloseableIterator<AirbyteMessage> recordIterator =
getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, pair);
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream, pair);

iteratorList.add(augmentWithLogs(recordAndMessageIterator, pair, streamName));
}
Expand Down Expand Up @@ -233,12 +233,14 @@ private AutoCloseableIterator<AirbyteMessage> augmentWithLogs(final AutoCloseabl
}

private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseableIterator<AirbyteMessage> recordIterator,
final ConfiguredAirbyteStream airbyteStream,
final AirbyteStreamNameNamespacePair pair) {
xminStateManager.setCurrentXminStatus(currentXminStatus);
return AutoCloseableIterators.transform(
autoCloseableIterator -> new XminStateIterator(
autoCloseableIterator,
pair,
currentXminStatus),
airbyteStream,
xminStateManager),
recordIterator,
AirbyteStreamUtils.convertFromNameAndNamespace(pair.getName(), pair.getNamespace()));
}
Expand Down
Loading