Skip to content

Commit cdedd93

Browse files
authored
[source-postgres] fix on final state (#44119)
1 parent b5c27eb commit cdedd93

File tree

5 files changed

+296
-286
lines changed

5 files changed

+296
-286
lines changed

airbyte-integrations/connectors/source-postgres/integration_tests/seed/hook.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing import List, Tuple
1111

1212
import psycopg2
13+
import pytz
1314
from psycopg2 import extensions, sql
1415

1516
catalog_write_file = "/connector/integration_tests/temp/configured_catalog_copy.json"
@@ -24,6 +25,8 @@
2425
secret_config_cdc_file = '/connector/secrets/config_cdc.json'
2526
secret_active_config_cdc_file = '/connector/integration_tests/temp/config_cdc_active.json'
2627

28+
la_timezone = pytz.timezone('America/Los_Angeles')
29+
2730
def connect_to_db() -> extensions.connection:
2831
with open(secret_config_file) as f:
2932
secret = json.load(f)
@@ -124,7 +127,7 @@ def create_table(conn: extensions.connection, schema_name: str, table_name: str)
124127
cursor.close()
125128

126129
def generate_schema_date_with_suffix() -> str:
127-
current_date = datetime.datetime.now().strftime("%Y%m%d")
130+
current_date = datetime.datetime.now(la_timezone).strftime("%Y%m%d")
128131
suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
129132
return f"{current_date}_{suffix}"
130133

@@ -251,7 +254,7 @@ def delete_schemas_with_prefix(conn, date_prefix):
251254

252255
def teardown() -> None:
253256
conn = connect_to_db()
254-
today = datetime.datetime.now()
257+
today = datetime.datetime.now(la_timezone)
255258
yesterday = today - timedelta(days=1)
256259
formatted_yesterday = yesterday.strftime('%Y%m%d')
257260
delete_schemas_with_prefix(conn, formatted_yesterday)
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
psycopg2==2.9.9
2+
pytz

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.6.15
12+
dockerImageTag: 3.6.16
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespa
106106
streamsThatHaveCompletedSnapshot.forEach(stream -> {
107107
final DbStreamState state = getFinalState(stream);
108108
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(state)));
109-
110109
});
111110

112111
resumableFullRefreshStreams.forEach(stream -> {
@@ -159,14 +158,20 @@ public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamesp
159158
if (isIncrementalStream(pair)) {
160159
streamsThatHaveCompletedSnapshot.add(pair);
161160
}
161+
162+
if (resumableFullRefreshStreams.contains(pair)) {
163+
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(pair);
164+
pairToCtidStatus.put(pair, ctidStatusForFullRefreshStream);
165+
}
166+
162167
final List<AirbyteStreamState> streamStates = new ArrayList<>();
163168
streamsThatHaveCompletedSnapshot.forEach(stream -> {
164169
final DbStreamState state = getFinalState(stream);
165170
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(state)));
166171
});
167172

168173
resumableFullRefreshStreams.forEach(stream -> {
169-
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(pair);
174+
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(stream);
170175
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ctidStatusForFullRefreshStream)));
171176
});
172177

0 commit comments

Comments
 (0)