Skip to content

Commit 10c7108

Browse files
evantahleralafanechereoctavia-squidington-iii
authored andcommitted
Faker emits AirbyteEstimateTraceMessage (#19197)
* Faker emits `AirbyteEstimateTraceMessage` * update pr * fix SAT * auto-bump connector version Co-authored-by: alafanechere <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 02b05f8 commit 10c7108

File tree

8 files changed

+75
-16
lines changed

8 files changed

+75
-16
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@
423423
- name: Faker
424424
sourceDefinitionId: dfd88b22-b603-4c3d-aad7-3701784586b1
425425
dockerRepository: airbyte/source-faker
426-
dockerImageTag: 0.2.0
426+
dockerImageTag: 0.2.1
427427
documentationUrl: https://docs.airbyte.com/integrations/sources/faker
428428
sourceType: api
429429
releaseStage: alpha

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3655,7 +3655,7 @@
36553655
oauthFlowInitParameters: []
36563656
oauthFlowOutputParameters:
36573657
- - "access_token"
3658-
- dockerImage: "airbyte/source-faker:0.2.0"
3658+
- dockerImage: "airbyte/source-faker:0.2.1"
36593659
spec:
36603660
documentationUrl: "https://docs.airbyte.com/integrations/sources/faker"
36613661
connectionSpecification:

airbyte-integrations/connectors/source-faker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,5 @@ COPY source_faker ./source_faker
3434
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
3535
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
3636

37-
LABEL io.airbyte.version=0.2.0
37+
LABEL io.airbyte.version=0.2.1
3838
LABEL io.airbyte.name=airbyte/source-faker

airbyte-integrations/connectors/source-faker/acceptance-test-config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ tests:
88
- config_path: "secrets/config.json"
99
status: "succeed"
1010
- config_path: "integration_tests/invalid_config.json"
11-
status: "exception"
11+
status: "failed"
1212
discovery:
1313
- config_path: "secrets/config.json"
1414
basic_read:

airbyte-integrations/connectors/source-faker/setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55

66
from setuptools import find_packages, setup
77

8-
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "mimesis==6.1.1"]
8+
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "mimesis==6.1.1"]
99

1010
TEST_REQUIREMENTS = [
11-
"pytest~=6.1",
11+
"pytest~=7.0",
1212
"source-acceptance-test",
1313
]
1414

airbyte-integrations/connectors/source-faker/source_faker/source.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@
1313
from airbyte_cdk.models import (
1414
AirbyteCatalog,
1515
AirbyteConnectionStatus,
16+
AirbyteEstimateTraceMessage,
1617
AirbyteLogMessage,
1718
AirbyteMessage,
1819
AirbyteRecordMessage,
1920
AirbyteStateMessage,
2021
AirbyteStream,
22+
AirbyteTraceMessage,
2123
ConfiguredAirbyteCatalog,
24+
EstimateType,
2225
Status,
26+
TraceType,
2327
Type,
2428
)
2529
from airbyte_cdk.sources import Source
@@ -42,7 +46,10 @@ def check(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteConnect
4246
"""
4347

4448
# As this is an in-memory source, it always succeeds
45-
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
49+
if type(config["count"]) == int or type(config["count"]) == float:
50+
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
51+
else:
52+
return AirbyteConnectionStatus(status=Status.FAILED)
4653

4754
def discover(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteCatalog:
4855
"""
@@ -136,6 +143,10 @@ def read(
136143
records_in_sync = 0
137144
records_in_page = 0
138145

146+
users_estimate = count - cursor
147+
yield generate_estimate(stream.stream.name, users_estimate, 450)
148+
yield generate_estimate("Purchases", users_estimate * 1.5, 230) # a fuzzy guess, some users have purchases, some don't
149+
139150
for i in range(cursor, count):
140151
user = generate_user(person, dt, i)
141152
yield generate_record(stream, user)
@@ -162,6 +173,7 @@ def read(
162173

163174
elif stream.stream.name == "Products":
164175
products = generate_products()
176+
yield generate_estimate(stream.stream.name, len(products), 180)
165177
for p in products:
166178
yield generate_record(stream, p)
167179
yield generate_state(state, stream, {"product_count": len(products)})
@@ -204,6 +216,14 @@ def log_stream(stream_name: str):
204216
)
205217

206218

219+
def generate_estimate(stream_name: str, total: int, bytes_per_row: int):
220+
emitted_at = int(datetime.datetime.now().timestamp() * 1000)
221+
estimate = AirbyteEstimateTraceMessage(
222+
type=EstimateType.STREAM, name=stream_name, row_estimate=round(total), byte_estimate=round(total * bytes_per_row)
223+
)
224+
return AirbyteMessage(type=Type.TRACE, trace=AirbyteTraceMessage(type=TraceType.ESTIMATE, emitted_at=emitted_at, estimate=estimate))
225+
226+
207227
def generate_state(state: Dict[str, any], stream: any, data: any):
208228
state[
209229
stream.stream.name

airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,13 @@ def test_read_small_random_data():
4444
logger = None
4545
config = {"count": 10}
4646
catalog = ConfiguredAirbyteCatalog(
47-
streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}]
47+
streams=[
48+
{
49+
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
50+
"sync_mode": "full_refresh",
51+
"destination_sync_mode": "overwrite",
52+
}
53+
]
4854
)
4955
state = {}
5056
iterator = source.read(logger, config, catalog, state)
@@ -70,8 +76,16 @@ def test_read_big_random_data():
7076
config = {"count": 1000, "records_per_slice": 100, "records_per_sync": 1000}
7177
catalog = ConfiguredAirbyteCatalog(
7278
streams=[
73-
{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
74-
{"stream": {"name": "Products", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
79+
{
80+
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
81+
"sync_mode": "full_refresh",
82+
"destination_sync_mode": "overwrite",
83+
},
84+
{
85+
"stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
86+
"sync_mode": "full_refresh",
87+
"destination_sync_mode": "overwrite",
88+
},
7589
]
7690
)
7791
state = {}
@@ -98,9 +112,21 @@ def test_with_purchases():
98112
config = {"count": 1000, "records_per_sync": 1000}
99113
catalog = ConfiguredAirbyteCatalog(
100114
streams=[
101-
{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
102-
{"stream": {"name": "Products", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
103-
{"stream": {"name": "Purchases", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"},
115+
{
116+
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
117+
"sync_mode": "full_refresh",
118+
"destination_sync_mode": "overwrite",
119+
},
120+
{
121+
"stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
122+
"sync_mode": "full_refresh",
123+
"destination_sync_mode": "overwrite",
124+
},
125+
{
126+
"stream": {"name": "Purchases", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
127+
"sync_mode": "full_refresh",
128+
"destination_sync_mode": "overwrite",
129+
},
104130
]
105131
)
106132
state = {}
@@ -128,7 +154,13 @@ def test_sync_ends_with_limit():
128154
logger = None
129155
config = {"count": 100, "records_per_sync": 5}
130156
catalog = ConfiguredAirbyteCatalog(
131-
streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}]
157+
streams=[
158+
{
159+
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
160+
"sync_mode": "full_refresh",
161+
"destination_sync_mode": "overwrite",
162+
}
163+
]
132164
)
133165
state = {}
134166
iterator = source.read(logger, config, catalog, state)
@@ -157,7 +189,13 @@ def test_read_with_seed():
157189
logger = None
158190
config = {"count": 1, "seed": 100}
159191
catalog = ConfiguredAirbyteCatalog(
160-
streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}]
192+
streams=[
193+
{
194+
"stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]},
195+
"sync_mode": "full_refresh",
196+
"destination_sync_mode": "overwrite",
197+
}
198+
]
161199
)
162200
state = {}
163201
iterator = source.read(logger, config, catalog, state)

docs/integrations/sources/faker.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ N/A
4141

4242
| Version | Date | Pull Request | Subject |
4343
| :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------- |
44-
| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! |
44+
| 0.2.1 | 2022-10-14 | [19197](https://github.com/airbytehq/airbyte/pull/19197) | Emit `AirbyteEstimateTraceMessage` |
45+
| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! |
4546
| 0.1.8 | 2022-10-12 | [17889](https://github.com/airbytehq/airbyte/pull/17889) | Bump to test publish command (2) |
4647
| 0.1.7 | 2022-10-11 | [17848](https://github.com/airbytehq/airbyte/pull/17848) | Bump to test publish command |
4748
| 0.1.6 | 2022-09-07 | [16418](https://github.com/airbytehq/airbyte/pull/16418) | Log start of each stream |

0 commit comments

Comments
 (0)