Skip to content

Commit 573e8f7

Browse files
✨Source US Census - Migrate Python CDK to Low-code CDK (#43521)
1 parent 167403c commit 573e8f7

File tree

10 files changed

+747
-509
lines changed

10 files changed

+747
-509
lines changed

airbyte-integrations/connectors/source-us-census/README.md

+21-9
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,52 @@
1-
# Us-Census source connector
1+
# Us Census Source
22

3-
4-
This is the repository for the Us-Census source connector, written in Python.
3+
This is the repository for the Us Census configuration based source connector.
54
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/sources/us-census).
65

76
## Local development
87

98
### Prerequisites
10-
* Python (~=3.9)
11-
* Poetry (~=1.7) - installation instructions [here](https://python-poetry.org/docs/#installation)
9+
10+
* Python (`^3.9`)
11+
* Poetry (`^1.7`) - installation instructions [here](https://python-poetry.org/docs/#installation)
12+
1213

1314

1415
### Installing the connector
16+
1517
From this connector directory, run:
1618
```bash
1719
poetry install --with dev
1820
```
1921

2022

2123
### Create credentials
24+
2225
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/sources/us-census)
23-
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_us_census/spec.yaml` file.
26+
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `src/source_us_census/spec.yaml` file.
2427
Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information.
2528
See `sample_files/sample_config.json` for a sample config file.
2629

2730

2831
### Locally running the connector
32+
2933
```
3034
poetry run source-us-census spec
3135
poetry run source-us-census check --config secrets/config.json
3236
poetry run source-us-census discover --config secrets/config.json
3337
poetry run source-us-census read --config secrets/config.json --catalog sample_files/configured_catalog.json
3438
```
3539

36-
### Running unit tests
37-
To run unit tests locally, from the connector directory run:
40+
### Running tests
41+
42+
To run tests locally, from the connector directory run:
43+
3844
```
39-
poetry run pytest unit_tests
45+
poetry run pytest tests
4046
```
4147

4248
### Building the docker image
49+
4350
1. Install [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md)
4451
2. Run the following command to build the docker image:
4552
```bash
@@ -50,6 +57,7 @@ An image will be available on your host with the tag `airbyte/source-us-census:d
5057

5158

5259
### Running as a docker container
60+
5361
Then run any of the connector commands as follows:
5462
```
5563
docker run --rm airbyte/source-us-census:dev spec
@@ -59,16 +67,19 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat
5967
```
6068

6169
### Running our CI test suite
70+
6271
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
6372
```bash
6473
airbyte-ci connectors --name=source-us-census test
6574
```
6675

6776
### Customizing acceptance Tests
77+
6878
Customize `acceptance-test-config.yml` file to configure acceptance tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
6979
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
7080

7181
### Dependency Management
82+
7283
All of your dependencies should be managed via Poetry.
7384
To add a new dependency, run:
7485
```bash
@@ -78,6 +89,7 @@ poetry add <package-name>
7889
Please commit the changes to `pyproject.toml` and `poetry.lock` files.
7990

8091
## Publishing a new version of the connector
92+
8193
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
8294
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=source-us-census test`
8395
2. Bump the connector version (please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors)):

airbyte-integrations/connectors/source-us-census/metadata.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: api
33
connectorType: source
44
definitionId: c4cfaeda-c757-489a-8aba-859fb08b6970
5-
dockerImageTag: 0.1.16
5+
dockerImageTag: 0.2.0
66
dockerRepository: airbyte/source-us-census
77
githubIssueLabel: source-us-census
88
icon: uscensus.svg
@@ -21,7 +21,7 @@ data:
2121
documentationUrl: https://docs.airbyte.com/integrations/sources/us-census
2222
tags:
2323
- language:python
24-
- cdk:python
24+
- cdk:low-code
2525
ab_internal:
2626
sl: 100
2727
ql: 100

airbyte-integrations/connectors/source-us-census/poetry.lock

+474-118
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

airbyte-integrations/connectors/source-us-census/pyproject.toml

+6-7
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "0.1.16"
6+
version = "0.2.0"
77
name = "source-us-census"
88
description = "Source implementation for Us Census."
99
authors = [ "Airbyte <[email protected]>",]
@@ -16,14 +16,13 @@ repository = "https://github.com/airbytehq/airbyte"
1616
include = "source_us_census"
1717

1818
[tool.poetry.dependencies]
19-
python = "^3.9,<3.12"
20-
airbyte-cdk = "0.80.0"
19+
python = "^3.10"
20+
airbyte-cdk = "^4.0.1"
2121

2222
[tool.poetry.scripts]
2323
source-us-census = "source_us_census.run:run"
2424

2525
[tool.poetry.group.dev.dependencies]
26-
requests-mock = "^1.9.3"
27-
responses = "^0.13"
28-
pytest-mock = "^3.6.1"
29-
pytest = "^6.1"
26+
requests-mock = "*"
27+
pytest-mock = "*"
28+
pytest = "*"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
from typing import List, Optional, Union
7+
8+
import requests
9+
from airbyte_cdk.models import FailureType
10+
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
11+
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
12+
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_http_response_filter import DefaultHttpResponseFilter
13+
from airbyte_cdk.sources.declarative.types import Record
14+
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
15+
DEFAULT_ERROR_RESOLUTION,
16+
SUCCESS_RESOLUTION,
17+
ErrorResolution,
18+
ResponseAction,
19+
)
20+
21+
22+
class USCensusRecordExtractor(RecordExtractor):
23+
"""
24+
Parses the response from the us census website.
25+
26+
The US Census provides data in an atypical format,
27+
which motivated the creation of this source rather
28+
than using a generic http source.
29+
30+
* Data are represented in a two-dimensional array
31+
* Square brackets [ ] hold arrays
32+
* Values are separated by a , (comma).
33+
34+
e.g.
35+
[["STNAME","POP","DATE_","state"],
36+
37+
["Alabama","4849377","7","01"],
38+
39+
["Alaska","736732","7","02"],
40+
41+
["Arizona","6731484","7","04"],
42+
43+
["Arkansas","2966369","7","05"],
44+
45+
["California","38802500","7","06"]]
46+
"""
47+
48+
def extract_records(self, response: requests.Response) -> List[Record]: # type: ignore
49+
# Where we accumulate a "row" of data until we encounter ']'
50+
buffer = ""
51+
# The response is in a tabular format where the first list of strings
52+
# is the "header" of the table which we use as keys in the final dictionary
53+
# we produce
54+
header = []
55+
# Characters with special meanings which should not be added to the buffer
56+
# of values
57+
ignore_chars = [
58+
"[",
59+
"\n",
60+
]
61+
# Context: save if previous value is an escape character
62+
is_prev_escape = False
63+
# Context: save if we are currently in double quotes
64+
is_in_quotes = False
65+
# Placeholder used to save position of commas that are
66+
# within values, so the .split(',') call does not produce
67+
# erroneous values
68+
comma_placeholder = "||comma_placeholder||"
69+
70+
for response_chunk in response.iter_content(decode_unicode=True):
71+
if response_chunk == "\\":
72+
is_prev_escape = True
73+
continue
74+
elif response_chunk == '"' and not is_prev_escape:
75+
# If we are in quotes and encounter
76+
# closing quotes, we are not within quotes anymore
77+
# otherwise we are within quotes.
78+
is_in_quotes = not is_in_quotes
79+
elif response_chunk == "," and is_in_quotes:
80+
buffer += comma_placeholder
81+
elif response_chunk in ignore_chars and not is_prev_escape:
82+
pass
83+
elif response_chunk == "]":
84+
if not header:
85+
header = buffer.split(",")
86+
elif buffer:
87+
# Remove the first character from the values since
88+
# it's a comma.
89+
split_values = buffer.split(",")[1:]
90+
# Add back commas originally embedded in values
91+
split_values = map(
92+
lambda x: x.replace(comma_placeholder, ","),
93+
split_values,
94+
)
95+
# Zip the values we found with the "header"
96+
yield dict(
97+
zip(
98+
header,
99+
split_values,
100+
)
101+
)
102+
buffer = ""
103+
else:
104+
buffer += response_chunk
105+
is_prev_escape = False
106+
107+
108+
class USCensusErrorHandler(DefaultErrorHandler):
109+
"""
110+
This Custom Error Handler raises an error when an invalid API key is used.
111+
112+
In such cases, the status code is 200, so airbyte doesn't raise an error.
113+
"""
114+
115+
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
116+
117+
if self.response_filters:
118+
for response_filter in self.response_filters:
119+
matched_error_resolution = response_filter.matches(response_or_exception=response_or_exception)
120+
if matched_error_resolution:
121+
return matched_error_resolution
122+
if isinstance(response_or_exception, requests.Response):
123+
if response_or_exception.ok:
124+
if "Invalid Key" in response_or_exception.text:
125+
return ErrorResolution(
126+
response_action=ResponseAction.FAIL,
127+
failure_type=FailureType.config_error,
128+
error_message="Failed to perform request. Error: Valid API Key needed.",
129+
)
130+
return SUCCESS_RESOLUTION
131+
132+
default_reponse_filter = DefaultHttpResponseFilter(parameters={}, config=self.config)
133+
default_response_filter_resolution = default_reponse_filter.matches(response_or_exception)
134+
135+
return default_response_filter_resolution if default_response_filter_resolution else DEFAULT_ERROR_RESOLUTION
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
version: 4.3.2
2+
3+
type: DeclarativeSource
4+
5+
check:
6+
type: CheckStream
7+
stream_names:
8+
- us_census_stream
9+
10+
definitions:
11+
streams:
12+
us_census_stream:
13+
type: DeclarativeStream
14+
name: us_census_stream
15+
retriever:
16+
type: SimpleRetriever
17+
requester:
18+
$ref: "#/definitions/base_requester"
19+
path: "{{ config['query_path'] }}?{{ config['query_params'] }}"
20+
http_method: GET
21+
record_selector:
22+
type: RecordSelector
23+
extractor:
24+
type: CustomRecordExtractor
25+
class_name: source_us_census.components.USCensusRecordExtractor
26+
schema_loader:
27+
type: InlineSchemaLoader
28+
schema:
29+
$ref: "#/schemas/us_census_stream"
30+
base_requester:
31+
type: HttpRequester
32+
url_base: https://api.census.gov/
33+
authenticator:
34+
type: ApiKeyAuthenticator
35+
inject_into:
36+
type: RequestOption
37+
inject_into: request_parameter
38+
field_name: key
39+
api_token: '{{ config["api_key"] }}'
40+
error_handler:
41+
type: CustomErrorHandler
42+
class_name: source_us_census.components.USCensusErrorHandler
43+
response_filters:
44+
- type: HttpResponseFilter
45+
action: FAIL
46+
error_message: "Failed to perform request. Error: Valid API Key needed."
47+
48+
streams:
49+
- $ref: "#/definitions/streams/us_census_stream"
50+
51+
spec:
52+
type: Spec
53+
connection_specification:
54+
type: object
55+
$schema: http://json-schema.org/draft-07/schema#
56+
required:
57+
- query_path
58+
- api_key
59+
properties:
60+
query_params:
61+
type: string
62+
description: The query parameters portion of the GET request, without the api key
63+
order: 0
64+
pattern: ^\w+=[\w,:*]+(&(?!key)\w+=[\w,:*]+)*$
65+
examples:
66+
- >-
67+
get=NAME,NAICS2017_LABEL,LFO_LABEL,EMPSZES_LABEL,ESTAB,PAYANN,PAYQTR1,EMP&for=us:*&NAICS2017=72&LFO=001&EMPSZES=001
68+
- >-
69+
get=MOVEDIN,GEOID1,GEOID2,MOVEDOUT,FULL1_NAME,FULL2_NAME,MOVEDNET&for=county:*
70+
query_path:
71+
type: string
72+
description: The path portion of the GET request
73+
order: 1
74+
pattern: ^data(\/[\w\d]+)+$
75+
examples:
76+
- data/2019/cbp
77+
- data/2018/acs
78+
- data/timeseries/healthins/sahie
79+
api_key:
80+
type: string
81+
description: >-
82+
Your API Key. Get your key <a
83+
href="https://api.census.gov/data/key_signup.html">here</a>.
84+
order: 2
85+
airbyte_secret: true
86+
additionalProperties: true
87+
88+
metadata:
89+
autoImportSchema:
90+
us_census_stream: true
91+
92+
schemas:
93+
us_census_stream:
94+
type: object
95+
$schema: http://json-schema.org/draft-07/schema#
96+
additionalProperties: true
97+
properties: {}

0 commit comments

Comments
 (0)