Skip to content

Commit 474e8c4

Browse files
🎉 New Destination: SQLite (#15018)
* sqlite destination skeleton * Add doc * Add destination to general docs * clean unused files * add seed and icon * sqlite destination skeleton * Add doc * Add destination to general docs * add seed and icon * correct config file * remove unused import * run seed spec file Co-authored-by: marcosmarxm <[email protected]> Co-authored-by: Marcos Marx <[email protected]>
1 parent d057555 commit 474e8c4

File tree

22 files changed

+701
-0
lines changed

22 files changed

+701
-0
lines changed
Lines changed: 1 addition & 0 deletions
Loading

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,3 +308,10 @@
308308
documentationUrl: https://docs.airbyte.io/integrations/destinations/google-sheets
309309
icon: google-sheets.svg
310310
releaseStage: alpha
311+
- name: Local SQLite
312+
destinationDefinitionId: b76be0a6-27dc-4560-95f6-2623da0bd7b6
313+
dockerRepository: airbyte/destination-sqlite
314+
dockerImageTag: 0.1.0
315+
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-sqlite
316+
icon: sqlite.svg
317+
releaseStage: alpha

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5158,3 +5158,26 @@
51585158
- - "client_secret"
51595159
oauthFlowOutputParameters:
51605160
- - "refresh_token"
5161+
- dockerImage: "airbyte/destination-sqlite:0.1.0"
5162+
spec:
5163+
documentationUrl: "https://docs.airbyte.io/integrations/destinations/sqlite"
5164+
connectionSpecification:
5165+
$schema: "http://json-schema.org/draft-07/schema#"
5166+
title: "Destination Sqlite"
5167+
type: "object"
5168+
required:
5169+
- "destination_path"
5170+
additionalProperties: false
5171+
properties:
5172+
destination_path:
5173+
type: "string"
5174+
description: "Path to the sqlite.db file. The file will be placed inside\
5175+
\ that local mount. For more information check out our <a href=\"https://docs.airbyte.io/integrations/destinations/sqlite\"\
5176+
>docs</a>"
5177+
example: "/local/sqlite.db"
5178+
supportsIncremental: true
5179+
supportsNormalization: false
5180+
supportsDBT: false
5181+
supported_destination_sync_modes:
5182+
- "overwrite"
5183+
- "append"
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
*
2+
!Dockerfile
3+
!main.py
4+
!destination_sqlite
5+
!setup.py
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
FROM python:3.9.11-alpine3.15 as base
2+
3+
# build and load all requirements
4+
FROM base as builder
5+
WORKDIR /airbyte/integration_code
6+
7+
# upgrade pip to the latest version
8+
RUN apk --no-cache upgrade \
9+
&& pip install --upgrade pip \
10+
&& apk --no-cache add tzdata build-base
11+
12+
13+
COPY setup.py ./
14+
# install necessary packages to a temporary folder
15+
RUN pip install --prefix=/install .
16+
17+
# build a clean environment
18+
FROM base
19+
WORKDIR /airbyte/integration_code
20+
21+
# copy all loaded and built libraries to a pure basic image
22+
COPY --from=builder /install /usr/local
23+
# add default timezone settings
24+
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
25+
RUN echo "Etc/UTC" > /etc/timezone
26+
27+
# bash is installed for more convenient debugging.
28+
RUN apk --no-cache add bash
29+
30+
# copy payload code only
31+
COPY main.py ./
32+
COPY destination_sqlite ./destination_sqlite
33+
34+
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
35+
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
36+
37+
LABEL io.airbyte.version=0.1.0
38+
LABEL io.airbyte.name=airbyte/destination-sqlite
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Sqlite Destination
2+
3+
This is the repository for the Sqlite destination connector, written in Python.
4+
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/destinations/sqlite).
5+
6+
## Local development
7+
8+
### Prerequisites
9+
**To iterate on this connector, make sure to complete this prerequisites section.**
10+
11+
#### Minimum Python version required `= 3.7.0`
12+
13+
#### Build & Activate Virtual Environment and install dependencies
14+
From this connector directory, create a virtual environment:
15+
```
16+
python -m venv .venv
17+
```
18+
19+
This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
20+
development environment of choice. To activate it from the terminal, run:
21+
```
22+
source .venv/bin/activate
23+
pip install -r requirements.txt
24+
```
25+
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
26+
27+
Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
28+
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
29+
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
30+
should work as you expect.
31+
32+
#### Building via Gradle
33+
From the Airbyte repository root, run:
34+
```
35+
./gradlew :airbyte-integrations:connectors:destination-sqlite:build
36+
```
37+
38+
#### Create credentials
39+
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/destinations/sqlite)
40+
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_sqlite/spec.json` file.
41+
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
42+
See `integration_tests/sample_config.json` for a sample config file.
43+
44+
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination sqlite test creds`
45+
and place them into `secrets/config.json`.
46+
47+
### Locally running the connector
48+
```
49+
python main.py spec
50+
python main.py check --config secrets/config.json
51+
python main.py discover --config secrets/config.json
52+
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
53+
```
54+
55+
### Locally running the connector docker image
56+
57+
#### Build
58+
First, make sure you build the latest Docker image:
59+
```
60+
docker build . -t airbyte/destination-sqlite:dev
61+
```
62+
63+
You can also build the connector image via Gradle:
64+
```
65+
./gradlew :airbyte-integrations:connectors:destination-sqlite:airbyteDocker
66+
```
67+
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
68+
the Dockerfile.
69+
70+
#### Run
71+
Then run any of the connector commands as follows:
72+
```
73+
docker run --rm airbyte/destination-sqlite:dev spec
74+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-sqlite:dev check --config /secrets/config.json
75+
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
76+
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-sqlite:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
77+
```
78+
## Testing
79+
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
80+
First install test dependencies into your virtual environment:
81+
```
82+
pip install .[tests]
83+
```
84+
### Unit Tests
85+
To run unit tests locally, from the connector directory run:
86+
```
87+
python -m pytest unit_tests
88+
```
89+
90+
### Integration Tests
91+
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).
92+
#### Custom Integration tests
93+
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
94+
```
95+
python -m pytest integration_tests
96+
```
97+
#### Acceptance Tests
98+
Coming soon:
99+
100+
### Using gradle to run tests
101+
All commands should be run from airbyte project root.
102+
To run unit tests:
103+
```
104+
./gradlew :airbyte-integrations:connectors:destination-sqlite:unitTest
105+
```
106+
To run acceptance and custom integration tests:
107+
```
108+
./gradlew :airbyte-integrations:connectors:destination-sqlite:integrationTest
109+
```
110+
111+
## Dependency Management
112+
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
113+
We split dependencies between two groups, dependencies that are:
114+
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
115+
* required for the testing need to go to `TEST_REQUIREMENTS` list
116+
117+
### Publishing a new version of the connector
118+
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
119+
1. Make sure your changes are passing unit and integration tests.
120+
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
121+
1. Create a Pull Request.
122+
1. Pat yourself on the back for being an awesome contributor.
123+
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
plugins {
2+
id 'airbyte-python'
3+
id 'airbyte-docker'
4+
}
5+
6+
airbytePython {
7+
moduleDirectory 'destination_sqlite'
8+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
from .destination import DestinationSqlite
7+
8+
__all__ = ["DestinationSqlite"]
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import datetime
6+
import json
7+
import os
8+
import sqlite3
9+
import uuid
10+
from asyncio.log import logger
11+
from collections import defaultdict
12+
from typing import Any, Iterable, Mapping
13+
14+
from airbyte_cdk import AirbyteLogger
15+
from airbyte_cdk.destinations import Destination
16+
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
17+
18+
19+
class DestinationSqlite(Destination):
20+
@staticmethod
21+
def _get_destination_path(destination_path: str) -> str:
22+
"""
23+
Get a normalized version of the destination path.
24+
Automatically append /local/ to the start of the path
25+
"""
26+
if not destination_path.startswith("/local"):
27+
destination_path = os.path.join("/local", destination_path)
28+
29+
destination_path = os.path.normpath(destination_path)
30+
if not destination_path.startswith("/local"):
31+
raise ValueError(
32+
f"destination_path={destination_path} is not a valid path." "A valid path shall start with /local or no / prefix"
33+
)
34+
35+
return destination_path
36+
37+
def write(
38+
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
39+
) -> Iterable[AirbyteMessage]:
40+
41+
"""
42+
Reads the input stream of messages, config, and catalog to write data to the destination.
43+
44+
This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received
45+
in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been
46+
successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing,
47+
then the source is given the last state message output from this method as the starting point of the next sync.
48+
49+
:param config: dict of JSON configuration matching the configuration declared in spec.json
50+
:param configured_catalog: The Configured Catalog describing the schema of the data being received and how it should be persisted in the
51+
destination
52+
:param input_messages: The stream of input messages received from the source
53+
:return: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs
54+
"""
55+
streams = {s.stream.name for s in configured_catalog.streams}
56+
path = config.get("destination_path")
57+
path = self._get_destination_path(path)
58+
con = sqlite3.connect(path)
59+
with con:
60+
# create the tables if needed
61+
for configured_stream in configured_catalog.streams:
62+
name = configured_stream.stream.name
63+
table_name = f"_airbyte_raw_{name}"
64+
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
65+
# delete the tables
66+
query = """
67+
DROP TABLE IF EXISTS {}
68+
""".format(
69+
table_name
70+
)
71+
con.execute(query)
72+
# create the table if needed
73+
query = """
74+
CREATE TABLE IF NOT EXISTS {table_name} (
75+
_airbyte_ab_id TEXT PRIMARY KEY,
76+
_airbyte_emitted_at TEXT,
77+
_airbyte_data TEXT
78+
)
79+
""".format(
80+
table_name=table_name
81+
)
82+
con.execute(query)
83+
84+
buffer = defaultdict(list)
85+
86+
for message in input_messages:
87+
if message.type == Type.STATE:
88+
# flush the buffer
89+
for stream_name in buffer.keys():
90+
91+
query = """
92+
INSERT INTO {table_name}
93+
VALUES (?,?,?)
94+
""".format(
95+
table_name=f"_airbyte_raw_{stream_name}"
96+
)
97+
98+
con.executemany(query, buffer[stream_name])
99+
100+
con.commit()
101+
buffer = defaultdict(list)
102+
103+
yield message
104+
elif message.type == Type.RECORD:
105+
data = message.record.data
106+
stream = message.record.stream
107+
if stream not in streams:
108+
logger.debug(f"Stream {stream} was not present in configured streams, skipping")
109+
continue
110+
111+
# add to buffer
112+
buffer[stream].append((str(uuid.uuid4()), datetime.datetime.now().isoformat(), json.dumps(data)))
113+
114+
# flush any remaining messages
115+
for stream_name in buffer.keys():
116+
117+
query = """
118+
INSERT INTO {table_name}
119+
VALUES (?,?,?)
120+
""".format(
121+
table_name=f"_airbyte_raw_{stream_name}"
122+
)
123+
124+
con.executemany(query, buffer[stream_name])
125+
126+
con.commit()
127+
128+
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
129+
"""
130+
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
131+
e.g: if a provided API token or password can be used to connect and write to the destination.
132+
133+
:param logger: Logging object to display debug/info/error to the logs
134+
(logs will not be accessible via airbyte UI if they are not passed to this logger)
135+
:param config: Json object containing the configuration of this destination, content of this json is as specified in
136+
the properties of the spec.json file
137+
138+
:return: AirbyteConnectionStatus indicating a Success or Failure
139+
"""
140+
try:
141+
# parse the destination path
142+
path = config.get("destination_path")
143+
path = self._get_destination_path(path)
144+
145+
os.makedirs(os.path.dirname(path), exist_ok=True)
146+
con = sqlite3.connect(path)
147+
con.execute("SELECT 1;")
148+
149+
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
150+
except Exception as e:
151+
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")

0 commit comments

Comments
 (0)