Skip to content

Commit 4eee33a

Browse files
erohmensingakashkulk
authored andcommitted
Add improvements to "Building a connector the hard way" (#19093)
* Add improvements to "Building a connector the hard way" * add log_error() formatting to pass SAT tests * Update to new version of acceptance-test-config.yml * Edit tutorial directory to match tutorial * Change permissions on acceptance-test-docker.sh This reverts commit 40b2d98.
1 parent 2928869 commit 4eee33a

File tree

3 files changed

+82
-36
lines changed

3 files changed

+82
-36
lines changed

airbyte-integrations/connector-templates/source_acceptance_test_files/acceptance-test-docker.sh

100644100755
File mode changed.

airbyte-integrations/connectors/source-stock-ticker-api-tutorial/source.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
def read(config, catalog):
3434
# Assert required configuration was provided
3535
if "api_key" not in config or "stock_ticker" not in config:
36-
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
36+
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
3737
sys.exit(1)
3838

3939
# Find the stock_prices stream if it is present in the input catalog
@@ -43,22 +43,22 @@ def read(config, catalog):
4343
stock_prices_stream = configured_stream
4444

4545
if stock_prices_stream is None:
46-
log("No streams selected")
46+
log_error("No streams selected")
4747
return
4848

4949
# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
5050
if stock_prices_stream["sync_mode"] != "full_refresh":
51-
log("This connector only supports full refresh syncs! (for now)")
51+
log_error("This connector only supports full refresh syncs! (for now)")
5252
sys.exit(1)
5353

5454
# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
5555
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
5656
if response.status_code != 200:
5757
# In a real scenario we'd handle this error better :)
58-
log("Failure occurred when calling Polygon.io API")
58+
log_error("Failure occurred when calling Polygon.io API")
5959
sys.exit(1)
6060
else:
61-
# Stock prices are returned sorted by by date in ascending order
61+
# Stock prices are returned sorted by date in ascending order
6262
# We want to output them one by one as AirbyteMessages
6363
results = response.json()["results"]
6464
for result in results:
@@ -83,7 +83,7 @@ def _call_api(ticker, token):
8383
def check(config):
8484
# Assert required configuration was provided
8585
if "api_key" not in config or "stock_ticker" not in config:
86-
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
86+
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
8787
sys.exit(1)
8888
else:
8989
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
@@ -107,6 +107,12 @@ def log(message):
107107
print(json.dumps(log_json))
108108

109109

110+
def log_error(error_message):
111+
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
112+
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
113+
print(json.dumps(log_json))
114+
115+
110116
def discover():
111117
catalog = {
112118
"streams": [{

docs/connector-development/tutorials/build-a-connector-the-hard-way.md

Lines changed: 70 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ README.md
156156
acceptance-test-config.yml
157157
acceptance-test-docker.sh
158158
build.gradle
159+
source.py
159160
spec.json
160161
```
161162

@@ -261,6 +262,9 @@ Then we'll add the `check_method`:
261262

262263
```python
263264
import requests
265+
import datetime
266+
from datetime import date
267+
from datetime import timedelta
264268

265269
def _call_api(ticker, token):
266270
today = date.today()
@@ -314,6 +318,12 @@ elif command == "check":
314318
check(config)
315319
```
316320

321+
Then we need to update our list of available commands:
322+
323+
```python
324+
log("Invalid command. Allowable commands: [spec, check]")
325+
```
326+
317327
This results in the following `run` method.
318328

319329
```python
@@ -349,7 +359,9 @@ def run(args):
349359
sys.exit(0)
350360
```
351361

352-
and that should be it. Let's test our new method:
362+
and that should be it.
363+
364+
Let's test our new method:
353365

354366
```bash
355367
$ python source.py check --config secrets/valid_config.json
@@ -416,6 +428,12 @@ elif command == "discover":
416428
discover()
417429
```
418430

431+
We need to update our list of available commands:
432+
433+
```python
434+
log("Invalid command. Allowable commands: [spec, check, discover]")
435+
```
436+
419437
You may be wondering why `config` is a required input to `discover` if it's not used. This is done for consistency: the Airbyte Specification requires `--config` as an input to `discover` because many sources require it \(e.g: to discover the tables available in a Postgres database, you must supply a password\). So instead of guessing whether the flag is required depending on the connector, we always assume it is required, and the connector can choose whether to use it.
420438

421439
The full run method is now below:
@@ -526,14 +544,16 @@ First, let's create a configured catalog `fullrefresh_configured_catalog.json` t
526544
Then we'll define the `read` method in `source.py`:
527545

528546
```python
529-
import datetime
530-
from datetime import date
531-
from datetime import timedelta
547+
def log_error(error_message):
548+
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
549+
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
550+
print(json.dumps(log_json))
532551

552+
533553
def read(config, catalog):
534554
# Assert required configuration was provided
535555
if "api_key" not in config or "stock_ticker" not in config:
536-
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
556+
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
537557
sys.exit(1)
538558

539559
# Find the stock_prices stream if it is present in the input catalog
@@ -543,19 +563,19 @@ def read(config, catalog):
543563
stock_prices_stream = configured_stream
544564

545565
if stock_prices_stream is None:
546-
log("No streams selected")
566+
log_error("No stream selected.")
547567
return
548568

549569
# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
550570
if stock_prices_stream["sync_mode"] != "full_refresh":
551-
log("This connector only supports full refresh syncs! (for now)")
571+
log_error("This connector only supports full refresh syncs! (for now)")
552572
sys.exit(1)
553573

554574
# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
555575
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
556576
if response.status_code != 200:
557577
# In a real scenario we'd handle this error better :)
558-
log("Failure occurred when calling Polygon.io API")
578+
log_error("Failure occurred when calling Polygon.io API")
559579
sys.exit(1)
560580
else:
561581
# Stock prices are returned sorted by date in ascending order
@@ -568,6 +588,8 @@ def read(config, catalog):
568588
print(json.dumps(output_message))
569589
```
570590

591+
Note we've added a `log_error()` function to simplify formatting error messages from within connector functions as [`AirbyteTraceMessage`](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#airbytetracemessage)s, specifically `AirbyteErrorTraceMessage`s.
592+
571593
After doing some input validation, the code above calls the API to obtain daily prices for the input stock ticker, then outputs the prices. As always, our output is formatted according to the Airbyte Specification. Let's update our args parser with the following blocks:
572594

573595
```python
@@ -590,6 +612,12 @@ elif command == "read":
590612
read(config, configured_catalog)
591613
```
592614

615+
and:
616+
617+
```python
618+
log("Invalid command. Allowable commands: [spec, check, discover, read]")
619+
```
620+
593621
this yields the following `run` method:
594622

595623
```python
@@ -696,7 +724,7 @@ from datetime import timedelta
696724
def read(config, catalog):
697725
# Assert required configuration was provided
698726
if "api_key" not in config or "stock_ticker" not in config:
699-
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
727+
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
700728
sys.exit(1)
701729

702730
# Find the stock_prices stream if it is present in the input catalog
@@ -706,19 +734,19 @@ def read(config, catalog):
706734
stock_prices_stream = configured_stream
707735

708736
if stock_prices_stream is None:
709-
log("No streams selected")
737+
log_error("No streams selected")
710738
return
711739

712740
# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
713741
if stock_prices_stream["sync_mode"] != "full_refresh":
714-
log("This connector only supports full refresh syncs! (for now)")
742+
log_error("This connector only supports full refresh syncs! (for now)")
715743
sys.exit(1)
716744

717745
# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
718746
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
719747
if response.status_code != 200:
720748
# In a real scenario we'd handle this error better :)
721-
log("Failure occurred when calling Polygon.io API")
749+
log_error("Failure occurred when calling Polygon.io API")
722750
sys.exit(1)
723751
else:
724752
# Stock prices are returned sorted by date in ascending order
@@ -746,7 +774,7 @@ def _call_api(ticker, token):
746774
def check(config):
747775
# Assert required configuration was provided
748776
if "api_key" not in config or "stock_ticker" not in config:
749-
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
777+
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
750778
sys.exit(1)
751779
else:
752780
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
@@ -770,6 +798,12 @@ def log(message):
770798
print(json.dumps(log_json))
771799

772800

801+
def log_error(error_message):
802+
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
803+
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
804+
print(json.dumps(log_json))
805+
806+
773807
def discover():
774808
catalog = {
775809
"streams": [{
@@ -915,7 +949,7 @@ Then we can run the image using:
915949
docker run airbyte/source-stock-ticker-api:dev
916950
```
917951

918-
to run any of our commands, we'll need to mount all the inputs into the Docker container first, then refer to their _mounted_ paths when invoking the connector. For example, we'd run `check` or `read` as follows:
952+
To run any of our commands, we'll need to mount all the inputs into the Docker container first, then refer to their _mounted_ paths when invoking the connector. This allows the connector to access your secrets without having to build them into the container. For example, we'd run `check` or `read` as follows:
919953

920954
```bash
921955
$ docker run airbyte/source-stock-ticker-api:dev spec
@@ -948,25 +982,31 @@ The code generator should have already generated a YAML file which configures th
948982
# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference)
949983
# for more information about how to configure these tests
950984
connector_image: airbyte/source-stock-ticker-api:dev
951-
tests:
952-
spec:
953-
- spec_path: "spec.json"
954-
config_path: "secrets/valid_config.json"
955-
connection:
956-
- config_path: "secrets/valid_config.json"
957-
status: "succeed"
958-
- config_path: "secrets/invalid_config.json"
959-
status: "failed"
960-
discovery:
961-
- config_path: "secrets/valid_config.json"
985+
acceptance_tests:
962986
basic_read:
963-
- config_path: "secrets/valid_config.json"
964-
configured_catalog_path: "fullrefresh_configured_catalog.json"
987+
tests:
988+
- config_path: secrets/valid_config.json
989+
configured_catalog_path: fullrefresh_configured_catalog.json
965990
empty_streams: []
991+
connection:
992+
tests:
993+
- config_path: secrets/valid_config.json
994+
status: succeed
995+
- config_path: secrets/invalid_config.json
996+
status: failed
997+
discovery:
998+
tests:
999+
- config_path: secrets/valid_config.json
9661000
full_refresh:
967-
- config_path: "secrets/valid_config.json"
968-
configured_catalog_path: "fullrefresh_configured_catalog.json"
1001+
tests:
1002+
- config_path: secrets/valid_config.json
1003+
configured_catalog_path: fullrefresh_configured_catalog.json
1004+
spec:
1005+
tests:
1006+
- config_path: secrets/valid_config.json
1007+
spec_path: spec.json
9691008
# incremental: # TODO uncomment this once you implement incremental sync in part 2 of the tutorial
1009+
# tests:
9701010
# - config_path: "secrets/config.json"
9711011
# configured_catalog_path: "integration_tests/configured_catalog.json"
9721012
# future_state_path: "integration_tests/abnormal_state.json"
@@ -1058,7 +1098,7 @@ airbyte-server | Version: dev
10581098
airbyte-server |
10591099
```
10601100

1061-
After you see the above banner printed out in the terminal window where you are running `docker-compose up`, visit [http://localhost:8000](http://localhost:8000) in your browser.
1101+
After you see the above banner printed out in the terminal window where you are running `docker-compose up`, visit [http://localhost:8000](http://localhost:8000) in your browser and log in with the default credentials: username `airbyte` and password `password`.
10621102

10631103
If this is the first time using the Airbyte UI, then you will be prompted to go through a first-time wizard. To skip it, click the "Skip Onboarding" button.
10641104

0 commit comments

Comments
 (0)