Skip to content

✨Source US Census - Migrate Python CDK to Low-code CDK #43521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions airbyte-integrations/connectors/source-us-census/README.md
Original file line number Diff line number Diff line change
@@ -1,45 +1,52 @@
# Us-Census source connector
# Us Census Source


This is the repository for the Us-Census source connector, written in Python.
This is the repository for the Us Census configuration based source connector.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/sources/us-census).

## Local development

### Prerequisites
* Python (~=3.9)
* Poetry (~=1.7) - installation instructions [here](https://python-poetry.org/docs/#installation)

* Python (`^3.9`)
* Poetry (`^1.7`) - installation instructions [here](https://python-poetry.org/docs/#installation)



### Installing the connector

From this connector directory, run:
```bash
poetry install --with dev
```


### Create credentials

**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/sources/us-census)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_us_census/spec.yaml` file.
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `src/source_us_census/spec.yaml` file.
Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information.
See `sample_files/sample_config.json` for a sample config file.


### Locally running the connector

```
poetry run source-us-census spec
poetry run source-us-census check --config secrets/config.json
poetry run source-us-census discover --config secrets/config.json
poetry run source-us-census read --config secrets/config.json --catalog sample_files/configured_catalog.json
```

### Running unit tests
To run unit tests locally, from the connector directory run:
### Running tests

To run tests locally, from the connector directory run:

```
poetry run pytest unit_tests
poetry run pytest tests
```

### Building the docker image

1. Install [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md)
2. Run the following command to build the docker image:
```bash
Expand All @@ -50,6 +57,7 @@ An image will be available on your host with the tag `airbyte/source-us-census:d


### Running as a docker container

Then run any of the connector commands as follows:
```
docker run --rm airbyte/source-us-census:dev spec
Expand All @@ -59,16 +67,19 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat
```

### Running our CI test suite

You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
```bash
airbyte-ci connectors --name=source-us-census test
```

### Customizing acceptance Tests

Customize `acceptance-test-config.yml` file to configure acceptance tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.

### Dependency Management

All of your dependencies should be managed via Poetry.
To add a new dependency, run:
```bash
Expand All @@ -78,6 +89,7 @@ poetry add <package-name>
Please commit the changes to `pyproject.toml` and `poetry.lock` files.

## Publishing a new version of the connector

You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=source-us-census test`
2. Bump the connector version (please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors)):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: c4cfaeda-c757-489a-8aba-859fb08b6970
dockerImageTag: 0.1.16
dockerImageTag: 0.2.0
dockerRepository: airbyte/source-us-census
githubIssueLabel: source-us-census
icon: uscensus.svg
Expand All @@ -21,7 +21,7 @@ data:
documentationUrl: https://docs.airbyte.com/integrations/sources/us-census
tags:
- language:python
- cdk:python
- cdk:low-code
ab_internal:
sl: 100
ql: 100
Expand Down
592 changes: 474 additions & 118 deletions airbyte-integrations/connectors/source-us-census/poetry.lock

Large diffs are not rendered by default.

13 changes: 6 additions & 7 deletions airbyte-integrations/connectors/source-us-census/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.1.16"
version = "0.2.0"
name = "source-us-census"
description = "Source implementation for Us Census."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -16,14 +16,13 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_us_census"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.80.0"
python = "^3.10"
airbyte-cdk = "^4.0.1"

[tool.poetry.scripts]
source-us-census = "source_us_census.run:run"

[tool.poetry.group.dev.dependencies]
requests-mock = "^1.9.3"
responses = "^0.13"
pytest-mock = "^3.6.1"
pytest = "^6.1"
requests-mock = "*"
pytest-mock = "*"
pytest = "*"
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from typing import List, Optional, Union

import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_http_response_filter import DefaultHttpResponseFilter
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
DEFAULT_ERROR_RESOLUTION,
SUCCESS_RESOLUTION,
ErrorResolution,
ResponseAction,
)


class USCensusRecordExtractor(RecordExtractor):
"""
Parses the response from the us census website.

The US Census provides data in an atypical format,
which motivated the creation of this source rather
than using a generic http source.

* Data are represented in a two-dimensional array
* Square brackets [ ] hold arrays
* Values are separated by a , (comma).

e.g.
[["STNAME","POP","DATE_","state"],

["Alabama","4849377","7","01"],

["Alaska","736732","7","02"],

["Arizona","6731484","7","04"],

["Arkansas","2966369","7","05"],

["California","38802500","7","06"]]
"""

def extract_records(self, response: requests.Response) -> List[Record]: # type: ignore
# Where we accumulate a "row" of data until we encounter ']'
buffer = ""
# The response is in a tabular format where the first list of strings
# is the "header" of the table which we use as keys in the final dictionary
# we produce
header = []
# Characters with special meanings which should not be added to the buffer
# of values
ignore_chars = [
"[",
"\n",
]
# Context: save if previous value is an escape character
is_prev_escape = False
# Context: save if we are currently in double quotes
is_in_quotes = False
# Placeholder used to save position of commas that are
# within values, so the .split(',') call does not produce
# erroneous values
comma_placeholder = "||comma_placeholder||"

for response_chunk in response.iter_content(decode_unicode=True):
if response_chunk == "\\":
is_prev_escape = True
continue
elif response_chunk == '"' and not is_prev_escape:
# If we are in quotes and encounter
# closing quotes, we are not within quotes anymore
# otherwise we are within quotes.
is_in_quotes = not is_in_quotes
elif response_chunk == "," and is_in_quotes:
buffer += comma_placeholder
elif response_chunk in ignore_chars and not is_prev_escape:
pass
elif response_chunk == "]":
if not header:
header = buffer.split(",")
elif buffer:
# Remove the first character from the values since
# it's a comma.
split_values = buffer.split(",")[1:]
# Add back commas originally embedded in values
split_values = map(
lambda x: x.replace(comma_placeholder, ","),
split_values,
)
# Zip the values we found with the "header"
yield dict(
zip(
header,
split_values,
)
)
buffer = ""
else:
buffer += response_chunk
is_prev_escape = False


class USCensusErrorHandler(DefaultErrorHandler):
"""
This Custom Error Handler raises an error when an invalid API key is used.

In such cases, the status code is 200, so airbyte doesn't raise an error.
"""

def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:

if self.response_filters:
for response_filter in self.response_filters:
matched_error_resolution = response_filter.matches(response_or_exception=response_or_exception)
if matched_error_resolution:
return matched_error_resolution
if isinstance(response_or_exception, requests.Response):
if response_or_exception.ok:
if "Invalid Key" in response_or_exception.text:
return ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Failed to perform request. Error: Valid API Key needed.",
)
return SUCCESS_RESOLUTION

default_reponse_filter = DefaultHttpResponseFilter(parameters={}, config=self.config)
default_response_filter_resolution = default_reponse_filter.matches(response_or_exception)

return default_response_filter_resolution if default_response_filter_resolution else DEFAULT_ERROR_RESOLUTION
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
version: 4.3.2

type: DeclarativeSource

check:
type: CheckStream
stream_names:
- us_census_stream

definitions:
streams:
us_census_stream:
type: DeclarativeStream
name: us_census_stream
retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/base_requester"
path: "{{ config['query_path'] }}?{{ config['query_params'] }}"
http_method: GET
record_selector:
type: RecordSelector
extractor:
type: CustomRecordExtractor
class_name: source_us_census.components.USCensusRecordExtractor
schema_loader:
type: InlineSchemaLoader
schema:
$ref: "#/schemas/us_census_stream"
base_requester:
type: HttpRequester
url_base: https://api.census.gov/
authenticator:
type: ApiKeyAuthenticator
inject_into:
type: RequestOption
inject_into: request_parameter
field_name: key
api_token: '{{ config["api_key"] }}'
error_handler:
type: CustomErrorHandler
class_name: source_us_census.components.USCensusErrorHandler
response_filters:
- type: HttpResponseFilter
action: FAIL
error_message: "Failed to perform request. Error: Valid API Key needed."

streams:
- $ref: "#/definitions/streams/us_census_stream"

spec:
type: Spec
connection_specification:
type: object
$schema: http://json-schema.org/draft-07/schema#
required:
- query_path
- api_key
properties:
query_params:
type: string
description: The query parameters portion of the GET request, without the api key
order: 0
pattern: ^\w+=[\w,:*]+(&(?!key)\w+=[\w,:*]+)*$
examples:
- >-
get=NAME,NAICS2017_LABEL,LFO_LABEL,EMPSZES_LABEL,ESTAB,PAYANN,PAYQTR1,EMP&for=us:*&NAICS2017=72&LFO=001&EMPSZES=001
- >-
get=MOVEDIN,GEOID1,GEOID2,MOVEDOUT,FULL1_NAME,FULL2_NAME,MOVEDNET&for=county:*
query_path:
type: string
description: The path portion of the GET request
order: 1
pattern: ^data(\/[\w\d]+)+$
examples:
- data/2019/cbp
- data/2018/acs
- data/timeseries/healthins/sahie
api_key:
type: string
description: >-
Your API Key. Get your key <a
href="https://api.census.gov/data/key_signup.html">here</a>.
order: 2
airbyte_secret: true
additionalProperties: true

metadata:
autoImportSchema:
us_census_stream: true

schemas:
us_census_stream:
type: object
$schema: http://json-schema.org/draft-07/schema#
additionalProperties: true
properties: {}
Loading
Loading