Skip to content

🚀 Add files source to CDK + generator template #14410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from .abstract_files_source import AbstractFilesSource
from .abstract_source import AbstractSource
from .config import BaseConfig
from .source import Source

__all__ = ["AbstractSource", "BaseConfig", "Source"]
__all__ = ["AbstractFilesSource", "AbstractSource", "BaseConfig", "Source"]
106 changes: 106 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_files_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


import logging
from abc import ABC, abstractmethod
from traceback import format_exc
from typing import Any, List, Mapping, Optional, Tuple

from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.streams import Stream
from wcmatch.glob import GLOBSTAR, SPLIT, globmatch

# ideas on extending this to handle multiple streams:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given our inability to evolve schemas we either do this after schema migration is available or we do it now. Feels like a non-trivial lift to do it now? so maybe let's keep it as is? WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, is it that big of a lift to take that whole part of the config and put it inside a list ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess wrt S3 it leaves us with the folllowing options:

  • don't port S3 to CDK-based abstract files source
  • port s3 and introduce a shim spec.json and convert between that and this new AbstractfileSource's spec
  • don't do it for now

# - "dataset" is currently the name of the single table/stream. We could allow comma-split table names in this string for many streams.
# - "path_pattern" currently uses https://facelessuser.github.io/wcmatch/glob/ to match a single string pattern (can be multiple | separated)
# we could change this to a JSON string in format {"stream_name": "pattern(s)"} to allow many streams and match to names in dataset.
# - "format" I think we'd have to enforce like-for-like formats across streams otherwise the UI would become chaotic imo.
# - "schema" could become a nested object such as {"stream_name": {schema}} allowing specifying schema for one/all/none/some tables.


class AbstractFilesSource(AbstractSource, ABC):
@property
@abstractmethod
def stream_class(self) -> type:
"""
:return: reference to the relevant FileStream class e.g. IncrementalFileStreamS3
"""

@property
@abstractmethod
def spec_class(self) -> type:
"""
:return: reference to the relevant pydantic spec class e.g. SourceS3Spec
"""

@property
@abstractmethod
def documentation_url(self) -> str:
"""
:return: link to docs page for this source e.g. "https://docs.airbyte.io/integrations/sources/s3"
"""

def read_config(self, config_path: str) -> Mapping[str, Any]:
config: Mapping[str, Any] = super().read_config(config_path)
if config.get("format", {}).get("delimiter") == r"\t":
config["format"]["delimiter"] = "\t"
return config

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
:param logger: source logger
:param config: The user-provided configuration as specified by the source's spec.
This usually contains information required to check connection e.g. tokens, secrets and keys etc.
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
and we can connect to the underlying data source using the provided configuration.
Otherwise, the input config cannot be used to connect to the underlying data source,
and the "error" object should describe what went wrong.
The error object will be cast to string to display the problem to the user.
"""
try:
for file_info in self.stream_class(**config).file_iterator():
# TODO: will need to split config.get("path_pattern") up by stream once supporting multiple streams
# test that matching on the pattern doesn't error
globmatch(file_info.key, config.get("path_pattern"), flags=GLOBSTAR | SPLIT)
# just need first file here to test connection and valid patterns
return True, None

except Exception as e:
logger.error(format_exc())
return False, e

logger.warning("Found 0 files (but connection is valid).")
return True, None

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
We just have a single stream per source so construct that here

:param config: The user-provided configuration as specified by the source's spec.
:return: A list of the streams in this source connector.
"""
return [self.stream_class(**config)]

def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
"""
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
required to run this integration.
"""
# make dummy instance of stream_class in order to get 'supports_incremental' property
incremental = self.stream_class(dataset="", provider="", format="", path_pattern="").supports_incremental

supported_dest_sync_modes = [DestinationSyncMode.overwrite]
if incremental:
supported_dest_sync_modes.extend([DestinationSyncMode.append, DestinationSyncMode.append_dedup])

return ConnectorSpecification(
documentationUrl=self.documentation_url,
changelogUrl=self.documentation_url,
supportsIncremental=incremental,
supported_destination_sync_modes=supported_dest_sync_modes,
connectionSpecification=self.spec_class.schema(), # type: ignore[attr-defined]
)
10 changes: 10 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/files/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from .file_info import FileInfo
from .files_spec import FilesSpec
from .files_stream import FilesStream, IncrementalFilesStream
from .storage_file import StorageFile

__all__ = ["FileInfo", "FilesStream", "IncrementalFilesStream", "FilesSpec", "StorageFile"]
43 changes: 43 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/files/file_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from datetime import datetime
from functools import total_ordering


@total_ordering
@dataclass
class FileInfo:
"""Class for sharing of metadata"""

key: str
size: int
last_modified: datetime

@property
def size_in_megabytes(self) -> float:
"""technically size in 'mebibytes'"""
return self.size / 1024**2

def __str__(self) -> str:
return "Key: %s, LastModified: %s, Size: %.4fMb" % (self.key, self.last_modified.isoformat(), self.size_in_megabytes)

def __repr__(self) -> str:
return self.__str__()

def __eq__(self, other: object) -> bool:
if isinstance(other, FileInfo):
return self.key == other.key and self.last_modified == other.last_modified and self.size == other.size
return False

def __lt__(self, other: object) -> bool:
# comparison of less than / greater than for FileInfos is based on last modified
if isinstance(other, FileInfo):
return self.last_modified < other.last_modified
else:
raise TypeError(f"Can't compare {self.__class__} with {other.__class__}")

def __hash__(self) -> int:
return self.key.__hash__()
107 changes: 107 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/files/files_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


import json
import re
from typing import Any, Dict, Union

from jsonschema import RefResolver
from pydantic import BaseModel, Field

from .formats import AvroFormat, CsvFormat, ParquetFormat

# To implement your provider specific spec, inherit from FilesSpec and add provider-specific settings e.g.:

# class SourceS3Spec(FilesSpec, BaseModel):
# class Config:
# title="S3 Source Spec"

# class S3Provider(BaseModel):
# class Config:
# title = "S3: Amazon Web Services"

# bucket: str = Field(description="Name of the S3 bucket where the file(s) exist.")
# aws_access_key_id: Optional[str] = Field(
# default=None,
# description="...",
# airbyte_secret=True
# )
# aws_secret_access_key: Optional[str] = Field(
# default=None,
# description="...",
# airbyte_secret=True
# )
# path_prefix: str = Field(
# default="",
# description="..."
# )
# provider: S3Provider = Field(...) # leave this as Field(...), just change type to relevant class


class FilesSpec(BaseModel):
dataset: str = Field(
pattern=r"^([A-Za-z0-9-_]+)$",
description="The name of the stream you would like this source to output. Can contain letters, numbers, or underscores.",
order=0,
title="Output Stream Name",
)

path_pattern: str = Field(
title="Pattern of files to replicate",
description="A regular expression which tells the connector which files to replicate. All files which match this pattern will be "
'replicated. Use | to separate multiple patterns. See <a href="https://facelessuser.github.io/wcmatch/glob/" target="_'
'blank">this page</a> to understand pattern syntax (GLOBSTAR and SPLIT flags are enabled). '
"Use pattern <strong>**</strong> to pick up all files.",
examples=["**", "myFolder/myTableFiles/*.csv|myFolder/myOtherTableFiles/*.csv"],
order=10,
)

format: Union[CsvFormat, ParquetFormat, AvroFormat] = Field(
default="csv", title="File Format", description="The format of the files you'd like to replicate", order=20
)

user_schema: str = Field(
title="Manually enforced data schema (Optional)",
alias="schema",
default="{}",
description="Optionally provide a schema to enforce, as a valid JSON string. Ensure this is a mapping of "
'<strong>{ "column" : "type" }</strong>, where types are valid '
'<a href="https://json-schema.org/understanding-json-schema/reference/type.html" target="_blank">JSON Schema '
"datatypes</a>. Leave as {} to auto-infer the schema.",
examples=['{"column_1": "number", "column_2": "string", "column_3": "array", "column_4": "object", "column_5": "boolean"}'],
order=30,
)

def _change_format_to_oneOf(schema: dict) -> dict:
props_to_change = ["format"]
for prop in props_to_change:
schema["properties"][prop]["type"] = "object"
if "oneOf" in schema["properties"][prop]:
continue
schema["properties"][prop]["oneOf"] = schema["properties"][prop].pop("anyOf")
return schema

def _check_provider_added(schema: dict) -> None:
if "provider" not in schema["properties"]:
raise RuntimeError("You must add the 'provider' property in your child spec class")

def _resolve_refs(schema: dict) -> dict:
json_schema_ref_resolver = RefResolver.from_schema(schema)
str_schema = json.dumps(schema)
for ref_block in re.findall(r'{"\$ref": "#\/definitions\/.+?(?="})"}', str_schema):
ref = json.loads(ref_block)["$ref"]
str_schema = str_schema.replace(ref_block, json.dumps(json_schema_ref_resolver.resolve(ref)[1]))
pyschema: dict = json.loads(str_schema)
del pyschema["definitions"]
return pyschema

@classmethod
def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
"""we're overriding the schema classmethod to enable some post-processing"""
schema = super().schema(*args, **kwargs)
cls._check_provider_added(schema)
schema = cls._change_format_to_oneOf(schema)
schema = cls._resolve_refs(schema)
return schema
Loading