Skip to content

Commit aaaf12e

Browse files
authored
[file-based cdk] add excel file type support (#43346)
1 parent 7bb7a74 commit aaaf12e

File tree

11 files changed

+1712
-758
lines changed

11 files changed

+1712
-758
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
6+
from pydantic.v1 import BaseModel, Field
7+
8+
9+
class ExcelFormat(BaseModel):
10+
class Config(OneOfOptionConfig):
11+
title = "Excel Format"
12+
discriminator = "filetype"
13+
14+
filetype: str = Field(
15+
"excel",
16+
const=True,
17+
)

airbyte-cdk/python/airbyte_cdk/sources/file_based/config/file_based_stream_config.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
#
2-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
#
44

55
from enum import Enum
66
from typing import Any, List, Mapping, Optional, Union
77

88
from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
99
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
10+
from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat
1011
from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
1112
from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
1213
from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat
@@ -55,7 +56,7 @@ class FileBasedStreamConfig(BaseModel):
5556
description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
5657
default=3,
5758
)
58-
format: Union[AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat] = Field(
59+
format: Union[AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat] = Field(
5960
title="Format",
6061
description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
6162
)
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
from typing import Any, Mapping, Type
22

33
from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
4+
from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat
45
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
56
from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
67
from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
78
from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat
89

910
from .avro_parser import AvroParser
1011
from .csv_parser import CsvParser
12+
from .excel_parser import ExcelParser
1113
from .file_type_parser import FileTypeParser
1214
from .jsonl_parser import JsonlParser
1315
from .parquet_parser import ParquetParser
@@ -16,9 +18,10 @@
1618
default_parsers: Mapping[Type[Any], FileTypeParser] = {
1719
AvroFormat: AvroParser(),
1820
CsvFormat: CsvParser(),
21+
ExcelFormat: ExcelParser(),
1922
JsonlFormat: JsonlParser(),
2023
ParquetFormat: ParquetParser(),
2124
UnstructuredFormat: UnstructuredParser(),
2225
}
2326

24-
__all__ = ["AvroParser", "CsvParser", "JsonlParser", "ParquetParser", "UnstructuredParser", "default_parsers"]
27+
__all__ = ["AvroParser", "CsvParser", "ExcelParser", "JsonlParser", "ParquetParser", "UnstructuredParser", "default_parsers"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import logging
6+
from io import IOBase
7+
from pathlib import Path
8+
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union
9+
10+
import pandas as pd
11+
from airbyte_cdk.sources.file_based.config.file_based_stream_config import ExcelFormat, FileBasedStreamConfig
12+
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError, RecordParseError
13+
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
14+
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
15+
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
16+
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
17+
from numpy import datetime64
18+
from numpy import dtype as dtype_
19+
from numpy import issubdtype
20+
from pydantic.v1 import BaseModel
21+
22+
23+
class ExcelParser(FileTypeParser):
24+
ENCODING = None
25+
26+
def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
27+
"""
28+
ExcelParser does not require config checks, implicit pydantic validation is enough.
29+
"""
30+
return True, None
31+
32+
async def infer_schema(
33+
self,
34+
config: FileBasedStreamConfig,
35+
file: RemoteFile,
36+
stream_reader: AbstractFileBasedStreamReader,
37+
logger: logging.Logger,
38+
) -> SchemaType:
39+
"""
40+
Infers the schema of the Excel file by examining its contents.
41+
42+
Args:
43+
config (FileBasedStreamConfig): Configuration for the file-based stream.
44+
file (RemoteFile): The remote file to be read.
45+
stream_reader (AbstractFileBasedStreamReader): Reader to read the file.
46+
logger (logging.Logger): Logger for logging information and errors.
47+
48+
Returns:
49+
SchemaType: Inferred schema of the Excel file.
50+
"""
51+
52+
# Validate the format of the config
53+
self.validate_format(config.format, logger)
54+
55+
fields: Dict[str, str] = {}
56+
57+
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
58+
df = self.open_and_parse_file(fp)
59+
for column, df_type in df.dtypes.items():
60+
# Choose the broadest data type if the column's data type differs in dataframes
61+
prev_frame_column_type = fields.get(column)
62+
fields[column] = self.dtype_to_json_type(prev_frame_column_type, df_type)
63+
64+
schema = {
65+
field: ({"type": "string", "format": "date-time"} if fields[field] == "date-time" else {"type": fields[field]})
66+
for field in fields
67+
}
68+
return schema
69+
70+
def parse_records(
71+
self,
72+
config: FileBasedStreamConfig,
73+
file: RemoteFile,
74+
stream_reader: AbstractFileBasedStreamReader,
75+
logger: logging.Logger,
76+
discovered_schema: Optional[Mapping[str, SchemaType]] = None,
77+
) -> Iterable[Dict[str, Any]]:
78+
"""
79+
Parses records from an Excel file based on the provided configuration.
80+
81+
Args:
82+
config (FileBasedStreamConfig): Configuration for the file-based stream.
83+
file (RemoteFile): The remote file to be read.
84+
stream_reader (AbstractFileBasedStreamReader): Reader to read the file.
85+
logger (logging.Logger): Logger for logging information and errors.
86+
discovered_schema (Optional[Mapping[str, SchemaType]]): Discovered schema for validation.
87+
88+
Yields:
89+
Iterable[Dict[str, Any]]: Parsed records from the Excel file.
90+
"""
91+
92+
# Validate the format of the config
93+
self.validate_format(config.format, logger)
94+
95+
try:
96+
# Open and parse the file using the stream reader
97+
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
98+
df = self.open_and_parse_file(fp)
99+
# Yield records as dictionaries
100+
yield from df.to_dict(orient="records")
101+
102+
except Exception as exc:
103+
# Raise a RecordParseError if any exception occurs during parsing
104+
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri) from exc
105+
106+
@property
107+
def file_read_mode(self) -> FileReadMode:
108+
"""
109+
Returns the file read mode for the Excel file.
110+
111+
Returns:
112+
FileReadMode: The file read mode (binary).
113+
"""
114+
return FileReadMode.READ_BINARY
115+
116+
@staticmethod
117+
def dtype_to_json_type(current_type: Optional[str], dtype: dtype_) -> str:
118+
"""
119+
Convert Pandas DataFrame types to Airbyte Types.
120+
121+
Args:
122+
current_type (Optional[str]): One of the previous types based on earlier dataframes.
123+
dtype: Pandas DataFrame type.
124+
125+
Returns:
126+
str: Corresponding Airbyte Type.
127+
"""
128+
number_types = ("int64", "float64")
129+
if current_type == "string":
130+
# Previous column values were of the string type, no need to look further.
131+
return current_type
132+
if dtype == object:
133+
return "string"
134+
if dtype in number_types and (not current_type or current_type == "number"):
135+
return "number"
136+
if dtype == "bool" and (not current_type or current_type == "boolean"):
137+
return "boolean"
138+
if issubdtype(dtype, datetime64):
139+
return "date-time"
140+
return "string"
141+
142+
@staticmethod
143+
def validate_format(excel_format: BaseModel, logger: logging.Logger) -> None:
144+
"""
145+
Validates if the given format is of type ExcelFormat.
146+
147+
Args:
148+
excel_format (Any): The format to be validated.
149+
150+
Raises:
151+
ConfigValidationError: If the format is not ExcelFormat.
152+
"""
153+
if not isinstance(excel_format, ExcelFormat):
154+
logger.info(f"Expected ExcelFormat, got {excel_format}")
155+
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)
156+
157+
@staticmethod
158+
def open_and_parse_file(fp: Union[IOBase, str, Path]) -> pd.DataFrame:
159+
"""
160+
Opens and parses the Excel file.
161+
162+
Args:
163+
fp: File pointer to the Excel file.
164+
165+
Returns:
166+
pd.DataFrame: Parsed data from the Excel file.
167+
"""
168+
return pd.ExcelFile(fp, engine="calamine").parse()

0 commit comments

Comments
 (0)