Skip to content

CDK: Fix typing errors #9037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.47
Fix typing errors.

## 0.1.45
Integrate Sentry for performance and errors tracking.

Expand Down
6 changes: 3 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@


import json
import logging
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Mapping, Optional

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification


Expand Down Expand Up @@ -48,7 +48,7 @@ def write_config(config: Mapping[str, Any], config_path: str):
with open(config_path, "w") as fh:
fh.write(json.dumps(config))

def spec(self, logger: AirbyteLogger) -> ConnectorSpecification:
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
required to run this integration.
Expand All @@ -59,7 +59,7 @@ def spec(self, logger: AirbyteLogger) -> ConnectorSpecification:
return ConnectorSpecification.parse_obj(json.loads(raw_spec))

@abstractmethod
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
to the Stripe API.
Expand Down
15 changes: 8 additions & 7 deletions airbyte-cdk/python/airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@

import argparse
import io
import logging
import sys
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.connector import Connector
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
from pydantic import ValidationError

logger = logging.getLogger("airbyte")


class Destination(Connector, ABC):
logger = AirbyteLogger()
VALID_CMDS = {"spec", "check", "write"}

@abstractmethod
Expand All @@ -26,7 +27,7 @@ def write(
"""Implement to define how the connector writes data to the destination"""

def _run_check(self, config: Mapping[str, Any]) -> AirbyteMessage:
check_result = self.check(self.logger, config)
check_result = self.check(logger, config)
return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result)

def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]:
Expand All @@ -35,16 +36,16 @@ def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[Airbyt
try:
yield AirbyteMessage.parse_raw(line)
except ValidationError:
self.logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}")
logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}")

def _run_write(
self, config: Mapping[str, Any], configured_catalog_path: str, input_stream: io.TextIOWrapper
) -> Iterable[AirbyteMessage]:
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
input_messages = self._parse_input_stream(input_stream)
self.logger.info("Begin writing to the destination...")
logger.info("Begin writing to the destination...")
yield from self.write(config=config, configured_catalog=catalog, input_messages=input_messages)
self.logger.info("Writing complete.")
logger.info("Writing complete.")

def parse_args(self, args: List[str]) -> argparse.Namespace:
"""
Expand Down Expand Up @@ -86,7 +87,7 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
if cmd not in self.VALID_CMDS:
raise Exception(f"Unrecognized command: {cmd}")

spec = self.spec(self.logger)
spec = self.spec(logger)
if cmd == "spec":
yield AirbyteMessage(type=Type.SPEC, spec=spec)
return
Expand Down
53 changes: 14 additions & 39 deletions airbyte-cdk/python/airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging.config
import sys
import traceback
from typing import List
from typing import List, Tuple

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage

Expand Down Expand Up @@ -49,7 +49,6 @@ def hook_fn(exception_type, exception_value, traceback_):

def init_logger(name: str = None):
"""Initial set up of logger"""
logging.setLoggerClass(AirbyteNativeLogger)
logging.addLevelName(TRACE_LEVEL_NUM, "TRACE")
logger = logging.getLogger(name)
logger.setLevel(TRACE_LEVEL_NUM)
Expand All @@ -61,7 +60,7 @@ def init_logger(name: str = None):
class AirbyteLogFormatter(logging.Formatter):
"""Output log records using AirbyteMessage"""

_secrets = []
_secrets: List[str] = []

@classmethod
def update_secrets(cls, secrets: List[str]):
Expand All @@ -88,46 +87,22 @@ def format(self, record: logging.LogRecord) -> str:
return log_message.json(exclude_unset=True)


class AirbyteNativeLogger(logging.Logger):
"""Using native logger with implementing all AirbyteLogger features"""
def log_by_prefix(msg: str, default_level: str) -> Tuple[int, str]:
"""Custom method, which takes log level from first word of message"""
valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]
split_line = msg.split()
first_word = next(iter(split_line), None)
if first_word in valid_log_types:
log_level = logging.getLevelName(first_word)
rendered_message = " ".join(split_line[1:])
else:
log_level = logging.getLevelName(default_level)
rendered_message = msg

def __init__(self, name):
super().__init__(name)
self.valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]

def log_by_prefix(self, msg, default_level):
"""Custom method, which takes log level from first word of message"""
split_line = msg.split()
first_word = next(iter(split_line), None)
if first_word in self.valid_log_types:
log_level = logging.getLevelName(first_word)
rendered_message = " ".join(split_line[1:])
else:
default_level = default_level if default_level in self.valid_log_types else "INFO"
log_level = logging.getLevelName(default_level)
rendered_message = msg
self.log(log_level, rendered_message)

def trace(self, msg, *args, **kwargs):
self._log(TRACE_LEVEL_NUM, msg, args, **kwargs)
return log_level, rendered_message


class AirbyteLogger:
def __init__(self):
self.valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"]

def log_by_prefix(self, message, default_level):
"""Custom method, which takes log level from first word of message"""
split_line = message.split()
first_word = next(iter(split_line), None)
if first_word in self.valid_log_types:
log_level = first_word
rendered_message = " ".join(split_line[1:])
else:
log_level = default_level
rendered_message = message
self.log(log_level, rendered_message)

def log(self, level, message):
log_record = AirbyteLogMessage(level=level, message=message)
log_message = AirbyteMessage(type="LOG", log=log_record)
Expand Down
24 changes: 12 additions & 12 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@


import copy
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Status,
Expand All @@ -38,8 +37,9 @@ class AbstractSource(Source, ABC):
"""

@abstractmethod
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
:param logger: source logger
:param config: The user-provided configuration as specified by the source's spec.
This usually contains information required to check connection e.g. tokens, secrets and keys etc.
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
Expand All @@ -57,19 +57,19 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""

# Stream name to instance map for applying output object transformation
_stream_to_instance_map: Dict[str, AirbyteStream] = {}
_stream_to_instance_map: Dict[str, Stream] = {}

@property
def name(self) -> str:
"""Source name"""
return self.__class__.__name__

def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog:
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
return AirbyteCatalog(streams=streams)

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
try:
check_succeeded, error = self.check_connection(logger, config)
Expand All @@ -81,7 +81,7 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
connector_state = copy.deepcopy(state or {})
Expand Down Expand Up @@ -118,7 +118,7 @@ def read(

def _read_stream(
self,
logger: AirbyteLogger,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
connector_state: MutableMapping[str, Any],
Expand Down Expand Up @@ -160,7 +160,7 @@ def _limit_reached(internal_config: InternalConfig, records_counter: int) -> boo

def _read_incremental(
self,
logger: AirbyteLogger,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
connector_state: MutableMapping[str, Any],
Expand Down Expand Up @@ -222,15 +222,15 @@ def _checkpoint_state(self, stream_name, stream_state, connector_state, logger):
return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=connector_state))

@lru_cache(maxsize=None)
def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, dict]:
def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, Mapping[str, Any]]:
"""
Lookup stream's transform object and jsonschema based on stream name.
This function would be called a lot so using caching to save on costly
get_json_schema operation.
:param stream_name name of stream from catalog.
:return tuple with stream transformer object and discover json schema.
"""
stream_instance = self._stream_to_instance_map.get(stream_name)
stream_instance = self._stream_to_instance_map[stream_name]
return stream_instance.transformer, stream_instance.get_json_schema()

def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
Expand All @@ -240,6 +240,6 @@ def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
# need it to normalize values against json schema. By default no action
# taken unless configured. See
# docs/connector-development/cdk-python/schemas.md for details.
transformer.transform(data, schema)
transformer.transform(data, schema) # type: ignore
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
return AirbyteMessage(type=MessageType.RECORD, record=message)
4 changes: 2 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ class BaseConfig(BaseModel):
"""

@classmethod
def schema(cls, **kwargs) -> Dict[str, Any]:
def schema(cls, *args, **kwargs) -> Dict[str, Any]:
"""We're overriding the schema classmethod to enable some post-processing"""
schema = super().schema(**kwargs)
schema = super().schema(*args, **kwargs)
rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf
expand_refs(schema)
schema.pop("description", None) # description added from the docstring
Expand Down
10 changes: 5 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/sources/deprecated/base_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@


import copy
import logging
from datetime import datetime
from typing import Any, Iterable, Mapping, MutableMapping, Type

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
Expand Down Expand Up @@ -39,13 +39,13 @@ def _get_client(self, config: Mapping):
"""Construct client"""
return self.client_class(**config)

def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog:
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Discover streams"""
client = self._get_client(config)

return AirbyteCatalog(streams=[stream for stream in client.streams])

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Check connection"""
client = self._get_client(config)
alive, error = client.health_check()
Expand All @@ -55,7 +55,7 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterable[AirbyteMessage]:
state = state or {}
client = self._get_client(config)
Expand All @@ -73,7 +73,7 @@ def read(
logger.info(f"Finished syncing {self.name}")

def _read_stream(
self, logger: AirbyteLogger, client: BaseClient, configured_stream: ConfiguredAirbyteStream, state: MutableMapping[str, Any]
self, logger: logging.Logger, client: BaseClient, configured_stream: ConfiguredAirbyteStream, state: MutableMapping[str, Any]
):
stream_name = configured_stream.stream.name
use_incremental = configured_stream.sync_mode == SyncMode.incremental and client.stream_has_state(stream_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from io import TextIOWrapper
from typing import Any, DefaultDict, Dict, Iterator, List, Mapping, Optional, Tuple

from airbyte_cdk.logger import log_by_prefix
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteMessage,
Expand Down Expand Up @@ -138,7 +139,7 @@ def _read_singer_catalog(logger, shell_command: str) -> Mapping[str, Any]:
shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
)
for line in completed_process.stderr.splitlines():
logger.log_by_prefix(line, "ERROR")
logger.log(*log_by_prefix(line, "ERROR"))

return json.loads(completed_process.stdout)

Expand Down Expand Up @@ -169,9 +170,9 @@ def read(logger, shell_command, is_message=(lambda x: True)) -> Iterator[Airbyte
if message_data is not None:
yield message_data
else:
logger.log_by_prefix(line, "INFO")
logger.log(*log_by_prefix(line, "INFO"))
else:
logger.log_by_prefix(line, "ERROR")
logger.log(*log_by_prefix(line, "ERROR"))

@staticmethod
def _read_lines(process: subprocess.Popen) -> Iterator[Tuple[str, TextIOWrapper]]:
Expand Down
Loading