-
Notifications
You must be signed in to change notification settings - Fork 4.6k
🚀 Add files source to CDK + generator template #14410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
24fd9e2
initial adding abstract files source to cdk
Phlair 7202c4b
start migrating the tests
Phlair 7423201
remove files
Phlair 178bb59
add test_utils
Phlair e201a40
more tests across
Phlair 315c4c9
finish unit tests
Phlair 7a8f0ed
start moving integration test template
Phlair b9fae5b
Merge branch 'master' into george/files-source-abstract
Phlair 29b12c0
dev deps includes everything
Phlair 7a03a7a
init new generator template for files source
Phlair fe9d586
Merge branch 'master' into george/files-source-abstract
Phlair f52367c
some generator stuff
Phlair 374f4bd
generator and cdk changes
Phlair d20d8ac
Merge branch 'master' into george/files-source-abstract
Phlair 76676bf
custom integration test
Phlair 7d1d14f
change
Phlair db0f0af
oops .py
Phlair 777f1d2
spec template fix
Phlair aa604ee
add iterator import to stream template
Phlair 5ac34ce
Merge branch 'master' into george/files-source-abstract
Phlair 61ac2fd
generate csvs for integration tests
Phlair 2f52b87
minor fixes to abstract
Phlair 9a8822b
remove test files (now being generated on fly)
Phlair 3c79d36
add conftest
Phlair 7942507
s3 & gcs implementations
Phlair File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,10 @@ | ||
# | ||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from .abstract_files_source import AbstractFilesSource | ||
from .abstract_source import AbstractSource | ||
from .config import BaseConfig | ||
from .source import Source | ||
|
||
__all__ = ["AbstractSource", "BaseConfig", "Source"] | ||
__all__ = ["AbstractFilesSource", "AbstractSource", "BaseConfig", "Source"] |
106 changes: 106 additions & 0 deletions
106
airbyte-cdk/python/airbyte_cdk/sources/abstract_files_source.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
|
||
import logging | ||
from abc import ABC, abstractmethod | ||
from traceback import format_exc | ||
from typing import Any, List, Mapping, Optional, Tuple | ||
|
||
from airbyte_cdk.models import ConnectorSpecification | ||
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode | ||
from airbyte_cdk.sources.abstract_source import AbstractSource | ||
from airbyte_cdk.sources.streams import Stream | ||
from wcmatch.glob import GLOBSTAR, SPLIT, globmatch | ||
|
||
# ideas on extending this to handle multiple streams: | ||
# - "dataset" is currently the name of the single table/stream. We could allow comma-split table names in this string for many streams. | ||
# - "path_pattern" currently uses https://facelessuser.github.io/wcmatch/glob/ to match a single string pattern (can be multiple | separated) | ||
# we could change this to a JSON string in format {"stream_name": "pattern(s)"} to allow many streams and match to names in dataset. | ||
# - "format" I think we'd have to enforce like-for-like formats across streams otherwise the UI would become chaotic imo. | ||
# - "schema" could become a nested object such as {"stream_name": {schema}} allowing specifying schema for one/all/none/some tables. | ||
|
||
|
||
class AbstractFilesSource(AbstractSource, ABC): | ||
@property | ||
@abstractmethod | ||
def stream_class(self) -> type: | ||
""" | ||
:return: reference to the relevant FileStream class e.g. IncrementalFileStreamS3 | ||
""" | ||
|
||
@property | ||
@abstractmethod | ||
def spec_class(self) -> type: | ||
""" | ||
:return: reference to the relevant pydantic spec class e.g. SourceS3Spec | ||
""" | ||
|
||
@property | ||
@abstractmethod | ||
def documentation_url(self) -> str: | ||
""" | ||
:return: link to docs page for this source e.g. "https://docs.airbyte.io/integrations/sources/s3" | ||
""" | ||
|
||
def read_config(self, config_path: str) -> Mapping[str, Any]: | ||
config: Mapping[str, Any] = super().read_config(config_path) | ||
if config.get("format", {}).get("delimiter") == r"\t": | ||
config["format"]["delimiter"] = "\t" | ||
return config | ||
|
||
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: | ||
""" | ||
:param logger: source logger | ||
:param config: The user-provided configuration as specified by the source's spec. | ||
This usually contains information required to check connection e.g. tokens, secrets and keys etc. | ||
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful | ||
and we can connect to the underlying data source using the provided configuration. | ||
Otherwise, the input config cannot be used to connect to the underlying data source, | ||
and the "error" object should describe what went wrong. | ||
The error object will be cast to string to display the problem to the user. | ||
""" | ||
try: | ||
for file_info in self.stream_class(**config).file_iterator(): | ||
# TODO: will need to split config.get("path_pattern") up by stream once supporting multiple streams | ||
# test that matching on the pattern doesn't error | ||
globmatch(file_info.key, config.get("path_pattern"), flags=GLOBSTAR | SPLIT) | ||
# just need first file here to test connection and valid patterns | ||
return True, None | ||
|
||
except Exception as e: | ||
logger.error(format_exc()) | ||
return False, e | ||
|
||
logger.warning("Found 0 files (but connection is valid).") | ||
return True, None | ||
|
||
def streams(self, config: Mapping[str, Any]) -> List[Stream]: | ||
""" | ||
We just have a single stream per source so construct that here | ||
|
||
:param config: The user-provided configuration as specified by the source's spec. | ||
:return: A list of the streams in this source connector. | ||
""" | ||
return [self.stream_class(**config)] | ||
|
||
def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification: | ||
""" | ||
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) | ||
required to run this integration. | ||
""" | ||
# make dummy instance of stream_class in order to get 'supports_incremental' property | ||
incremental = self.stream_class(dataset="", provider="", format="", path_pattern="").supports_incremental | ||
|
||
supported_dest_sync_modes = [DestinationSyncMode.overwrite] | ||
if incremental: | ||
supported_dest_sync_modes.extend([DestinationSyncMode.append, DestinationSyncMode.append_dedup]) | ||
|
||
return ConnectorSpecification( | ||
documentationUrl=self.documentation_url, | ||
changelogUrl=self.documentation_url, | ||
supportsIncremental=incremental, | ||
supported_destination_sync_modes=supported_dest_sync_modes, | ||
connectionSpecification=self.spec_class.schema(), # type: ignore[attr-defined] | ||
) |
10 changes: 10 additions & 0 deletions
10
airbyte-cdk/python/airbyte_cdk/sources/streams/files/__init__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from .file_info import FileInfo | ||
from .files_spec import FilesSpec | ||
from .files_stream import FilesStream, IncrementalFilesStream | ||
from .storage_file import StorageFile | ||
|
||
__all__ = ["FileInfo", "FilesStream", "IncrementalFilesStream", "FilesSpec", "StorageFile"] |
43 changes: 43 additions & 0 deletions
43
airbyte-cdk/python/airbyte_cdk/sources/streams/files/file_info.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from dataclasses import dataclass | ||
from datetime import datetime | ||
from functools import total_ordering | ||
|
||
|
||
@total_ordering | ||
@dataclass | ||
class FileInfo: | ||
"""Class for sharing of metadata""" | ||
|
||
key: str | ||
size: int | ||
last_modified: datetime | ||
|
||
@property | ||
def size_in_megabytes(self) -> float: | ||
"""technically size in 'mebibytes'""" | ||
return self.size / 1024**2 | ||
|
||
def __str__(self) -> str: | ||
return "Key: %s, LastModified: %s, Size: %.4fMb" % (self.key, self.last_modified.isoformat(), self.size_in_megabytes) | ||
|
||
def __repr__(self) -> str: | ||
return self.__str__() | ||
|
||
def __eq__(self, other: object) -> bool: | ||
if isinstance(other, FileInfo): | ||
return self.key == other.key and self.last_modified == other.last_modified and self.size == other.size | ||
return False | ||
|
||
def __lt__(self, other: object) -> bool: | ||
# comparison of less than / greater than for FileInfos is based on last modified | ||
if isinstance(other, FileInfo): | ||
return self.last_modified < other.last_modified | ||
else: | ||
raise TypeError(f"Can't compare {self.__class__} with {other.__class__}") | ||
|
||
def __hash__(self) -> int: | ||
return self.key.__hash__() |
107 changes: 107 additions & 0 deletions
107
airbyte-cdk/python/airbyte_cdk/sources/streams/files/files_spec.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
|
||
import json | ||
import re | ||
from typing import Any, Dict, Union | ||
|
||
from jsonschema import RefResolver | ||
from pydantic import BaseModel, Field | ||
|
||
from .formats import AvroFormat, CsvFormat, ParquetFormat | ||
|
||
# To implement your provider specific spec, inherit from FilesSpec and add provider-specific settings e.g.: | ||
|
||
# class SourceS3Spec(FilesSpec, BaseModel): | ||
# class Config: | ||
# title="S3 Source Spec" | ||
|
||
# class S3Provider(BaseModel): | ||
# class Config: | ||
# title = "S3: Amazon Web Services" | ||
|
||
# bucket: str = Field(description="Name of the S3 bucket where the file(s) exist.") | ||
# aws_access_key_id: Optional[str] = Field( | ||
# default=None, | ||
# description="...", | ||
# airbyte_secret=True | ||
# ) | ||
# aws_secret_access_key: Optional[str] = Field( | ||
# default=None, | ||
# description="...", | ||
# airbyte_secret=True | ||
# ) | ||
# path_prefix: str = Field( | ||
# default="", | ||
# description="..." | ||
# ) | ||
# provider: S3Provider = Field(...) # leave this as Field(...), just change type to relevant class | ||
|
||
|
||
class FilesSpec(BaseModel): | ||
dataset: str = Field( | ||
pattern=r"^([A-Za-z0-9-_]+)$", | ||
description="The name of the stream you would like this source to output. Can contain letters, numbers, or underscores.", | ||
order=0, | ||
title="Output Stream Name", | ||
) | ||
|
||
path_pattern: str = Field( | ||
title="Pattern of files to replicate", | ||
description="A regular expression which tells the connector which files to replicate. All files which match this pattern will be " | ||
'replicated. Use | to separate multiple patterns. See <a href="https://facelessuser.github.io/wcmatch/glob/" target="_' | ||
'blank">this page</a> to understand pattern syntax (GLOBSTAR and SPLIT flags are enabled). ' | ||
"Use pattern <strong>**</strong> to pick up all files.", | ||
examples=["**", "myFolder/myTableFiles/*.csv|myFolder/myOtherTableFiles/*.csv"], | ||
order=10, | ||
) | ||
|
||
format: Union[CsvFormat, ParquetFormat, AvroFormat] = Field( | ||
default="csv", title="File Format", description="The format of the files you'd like to replicate", order=20 | ||
) | ||
|
||
user_schema: str = Field( | ||
title="Manually enforced data schema (Optional)", | ||
alias="schema", | ||
default="{}", | ||
description="Optionally provide a schema to enforce, as a valid JSON string. Ensure this is a mapping of " | ||
'<strong>{ "column" : "type" }</strong>, where types are valid ' | ||
'<a href="https://json-schema.org/understanding-json-schema/reference/type.html" target="_blank">JSON Schema ' | ||
"datatypes</a>. Leave as {} to auto-infer the schema.", | ||
examples=['{"column_1": "number", "column_2": "string", "column_3": "array", "column_4": "object", "column_5": "boolean"}'], | ||
order=30, | ||
) | ||
|
||
def _change_format_to_oneOf(schema: dict) -> dict: | ||
props_to_change = ["format"] | ||
for prop in props_to_change: | ||
schema["properties"][prop]["type"] = "object" | ||
if "oneOf" in schema["properties"][prop]: | ||
continue | ||
schema["properties"][prop]["oneOf"] = schema["properties"][prop].pop("anyOf") | ||
return schema | ||
|
||
def _check_provider_added(schema: dict) -> None: | ||
if "provider" not in schema["properties"]: | ||
raise RuntimeError("You must add the 'provider' property in your child spec class") | ||
|
||
def _resolve_refs(schema: dict) -> dict: | ||
json_schema_ref_resolver = RefResolver.from_schema(schema) | ||
str_schema = json.dumps(schema) | ||
for ref_block in re.findall(r'{"\$ref": "#\/definitions\/.+?(?="})"}', str_schema): | ||
ref = json.loads(ref_block)["$ref"] | ||
str_schema = str_schema.replace(ref_block, json.dumps(json_schema_ref_resolver.resolve(ref)[1])) | ||
pyschema: dict = json.loads(str_schema) | ||
del pyschema["definitions"] | ||
return pyschema | ||
|
||
@classmethod | ||
def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: | ||
"""we're overriding the schema classmethod to enable some post-processing""" | ||
schema = super().schema(*args, **kwargs) | ||
cls._check_provider_added(schema) | ||
schema = cls._change_format_to_oneOf(schema) | ||
schema = cls._resolve_refs(schema) | ||
return schema |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given our inability to evolve schemas we either do this after schema migration is available or we do it now. Feels like a non-trivial lift to do it now? so maybe let's keep it as is? WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, is it that big of a lift to take that whole part of the config and put it inside a
list
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess wrt S3 it leaves us with the folllowing options:
AbstractfileSource
's spec