Skip to content

Commit 12c22fb

Browse files
fix: Create your own source connector tutorial (#21985)
* First commit * Fix some required parts and added extra information. two tests failing: ``` 2 failed - ../actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py:699 TestBasicRead.test_airbyte_trace_message_on_failure[inputs0] - ../actions-runner/_work/airbyte/airbyte/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_incremental.py:196 TestIncremental.test_read_sequential_slices[inputs0] ``` * Improve message * Remove copy-paste line :D
1 parent e136e3c commit 12c22fb

File tree

2 files changed

+76
-15
lines changed

2 files changed

+76
-15
lines changed

docs/connector-development/tutorials/adding-incremental-sync.md

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,38 @@ def discover():
3737
print(json.dumps(airbyte_message))
3838
```
3939

40+
Also, create a file called `incremental_configured_catalog.json` with the following content:
41+
```javascript
42+
{
43+
"streams": [
44+
{
45+
"stream": {
46+
"name": "stock_prices",
47+
"supported_sync_modes": [
48+
"full_refresh",
49+
"incremental"
50+
],
51+
"json_schema": {
52+
"properties": {
53+
"date": {
54+
"type": "string"
55+
},
56+
"price": {
57+
"type": "number"
58+
},
59+
"stock_ticker": {
60+
"type": "string"
61+
}
62+
}
63+
}
64+
},
65+
"sync_mode": "full_refresh",
66+
"destination_sync_mode": "overwrite"
67+
}
68+
]
69+
}
70+
```
71+
4072
## Update `read`
4173

4274
Next we will adapt the `read` method that we wrote previously. We need to change three things.
@@ -63,7 +95,7 @@ Here's what our updated `read` method would look like.
6395
def read(config, catalog, state):
6496
# Assert required configuration was provided
6597
if "api_key" not in config or "stock_ticker" not in config:
66-
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
98+
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
6799
sys.exit(1)
68100

69101
# Find the stock_prices stream if it is present in the input catalog
@@ -73,7 +105,7 @@ def read(config, catalog, state):
73105
stock_prices_stream = configured_stream
74106

75107
if stock_prices_stream is None:
76-
log("No streams selected")
108+
log_error("No streams selected")
77109
return
78110

79111
# By default we fetch stock prices for the 7 day period ending with today
@@ -93,7 +125,7 @@ def read(config, catalog, state):
93125
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"], from_day=from_day, to_day=cursor_value)
94126
if response.status_code != 200:
95127
# In a real scenario we'd handle this error better :)
96-
log("Failure occurred when calling Polygon.io API")
128+
log_error("Failure occurred when calling Polygon.io API")
97129
sys.exit(1)
98130
else:
99131
# Stock prices are returned sorted by date in ascending order
@@ -117,6 +149,11 @@ def read(config, catalog, state):
117149
print(json.dumps(output_message))
118150
```
119151

152+
That code requires to add a new library import in the `source.py` file:
153+
```python
154+
from datetime import timezone
155+
```
156+
120157
We will also need to parse `state` argument in the `run` method. In order to do that, we will modify the code that
121158
calls `read` method from `run` method:
122159
```python
@@ -209,6 +246,23 @@ You will also need to create an `abnormal_state.json` file with a date in the fu
209246
{"stock_prices": {"date": "2121-01-01"}}
210247
```
211248

249+
And lastly you need to modify the `check` function call to include the new parameters `from_day` and `to_day` in `source.py`:
250+
```python
251+
def check(config):
252+
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
253+
response = _call_api(ticker=config["stock_ticker"], token=config["api_key"], from_day=datetime.now().date()-timedelta(days=1), to_day=datetime.now().date())
254+
if response.status_code == 200:
255+
result = {"status": "SUCCEEDED"}
256+
elif response.status_code == 403:
257+
# HTTP code 403 means authorization failed so the API key is incorrect
258+
result = {"status": "FAILED", "message": "API Key is incorrect."}
259+
else:
260+
result = {"status": "FAILED", "message": "Input configuration is incorrect. Please verify the input stock ticker and API key."}
261+
262+
output_message = {"type": "CONNECTION_STATUS", "connectionStatus": result}
263+
print(json.dumps(output_message))
264+
```
265+
212266
Run the tests once again:
213267

214268
```bash

docs/connector-development/tutorials/build-a-connector-the-hard-way.md

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ Python 3.9.11
3333

3434
On some systems, `python` points to a Python2 installation and `python3` points to Python3. If this is the case on your machine, substitute all `python` commands in this guide with `python3` . Otherwise, make sure to install Python 3 before beginning.
3535

36+
You need also to install `requests` python library:
37+
````bash
38+
pip install requests
39+
````
40+
3641
## Our connector: a stock ticker API
3742

3843
Our connector will output the daily price of a stock since a given date. We'll leverage the free [Polygon.io API](https://polygon.io/pricing) for this. We'll use Python to implement the connector because its syntax is accessible to most programmers, but the process described here can be applied to any language.
@@ -208,7 +213,7 @@ def run(args):
208213
else:
209214
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
210215
# had a failure
211-
log("Invalid command. Allowable commands: [spec]")
216+
log_error("Invalid command. Allowable commands: [spec]")
212217
sys.exit(1)
213218

214219
# A zero exit code means the process successfully completed
@@ -228,6 +233,8 @@ Some notes on the above code:
228233

229234
1. As described in the [specification](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#key-takeaways), Airbyte connectors are CLIs which communicate via stdout, so the output of the command is simply a JSON string formatted according to the Airbyte Specification. So to "return" a value we use `print` to output the return value to stdout
230235
2. All Airbyte commands can output log messages that take the form `{"type":"LOG", "log":"message"}`, so we create a helper method `log(message)` to allow logging
236+
3. All Airbyte commands can output error messages that take the form `{"type":"TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}}`, so we create a helper method `log_error(message)` to allow error messages
237+
231238

232239
Now if we run `python source.py spec` we should see the specification printed out:
233240

@@ -262,8 +269,8 @@ Then we'll add the `check_method`:
262269

263270
```python
264271
import requests
265-
import datetime
266272
from datetime import date
273+
from datetime import datetime
267274
from datetime import timedelta
268275

269276
def _call_api(ticker, token):
@@ -352,7 +359,7 @@ def run(args):
352359
else:
353360
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
354361
# had a failure
355-
log("Invalid command. Allowable commands: [spec, check]")
362+
log_error("Invalid command. Allowable commands: [spec, check]")
356363
sys.exit(1)
357364

358365
# A zero exit code means the process successfully completed
@@ -471,7 +478,7 @@ def run(args):
471478
else:
472479
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
473480
# had a failure
474-
log("Invalid command. Allowable commands: [spec, check, discover]")
481+
log_error("Invalid command. Allowable commands: [spec, check, discover]")
475482
sys.exit(1)
476483

477484
# A zero exit code means the process successfully completed
@@ -545,7 +552,7 @@ Then we'll define the `read` method in `source.py`:
545552

546553
```python
547554
def log_error(error_message):
548-
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
555+
current_time_in_ms = int(datetime.now().timestamp()) * 1000
549556
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
550557
print(json.dumps(log_json))
551558

@@ -583,7 +590,7 @@ def read(config, catalog):
583590
results = response.json()["results"]
584591
for result in results:
585592
data = {"date": date.fromtimestamp(result["t"]/1000).isoformat(), "stock_ticker": config["stock_ticker"], "price": result["c"]}
586-
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.datetime.now().timestamp()) * 1000}
593+
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.now().timestamp()) * 1000}
587594
output_message = {"type": "RECORD", "record": record}
588595
print(json.dumps(output_message))
589596
```
@@ -615,7 +622,7 @@ elif command == "read":
615622
and:
616623

617624
```python
618-
log("Invalid command. Allowable commands: [spec, check, discover, read]")
625+
log_error("Invalid command. Allowable commands: [spec, check, discover, read]")
619626
```
620627

621628
this yields the following `run` method:
@@ -666,7 +673,7 @@ def run(args):
666673
else:
667674
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
668675
# had a failure
669-
log("Invalid command. Allowable commands: [spec, check, discover, read]")
676+
log_error("Invalid command. Allowable commands: [spec, check, discover, read]")
670677
sys.exit(1)
671678

672679
# A zero exit code means the process successfully completed
@@ -717,8 +724,8 @@ import json
717724
import sys
718725
import os
719726
import requests
720-
import datetime
721727
from datetime import date
728+
from datetime import datetime
722729
from datetime import timedelta
723730

724731
def read(config, catalog):
@@ -754,7 +761,7 @@ def read(config, catalog):
754761
results = response.json()["results"]
755762
for result in results:
756763
data = {"date": date.fromtimestamp(result["t"]/1000).isoformat(), "stock_ticker": config["stock_ticker"], "price": result["c"]}
757-
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.datetime.now().timestamp()) * 1000}
764+
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.now().timestamp()) * 1000}
758765
output_message = {"type": "RECORD", "record": record}
759766
print(json.dumps(output_message))
760767

@@ -799,7 +806,7 @@ def log(message):
799806

800807

801808
def log_error(error_message):
802-
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
809+
current_time_in_ms = int(datetime.now().timestamp()) * 1000
803810
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
804811
print(json.dumps(log_json))
805812

@@ -892,7 +899,7 @@ def run(args):
892899
else:
893900
# If we don't recognize the command log the problem and exit with an error code greater than 0 to indicate the process
894901
# had a failure
895-
log("Invalid command. Allowable commands: [spec, check, discover, read]")
902+
log_error("Invalid command. Allowable commands: [spec, check, discover, read]")
896903
sys.exit(1)
897904

898905
# A zero exit code means the process successfully completed

0 commit comments

Comments
 (0)