-
Notifications
You must be signed in to change notification settings - Fork 4.6k
🐛 Source S3: Loading of files' metadata #8252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
3f5fe85
a4ce4dd
00cd291
75fb824
5749ce9
a84d5bb
7a9d3a0
1aface7
fb5daa0
c221569
86b8640
88b153c
1f28602
4819e23
ac1f33f
1bc0c3b
247eb1d
4aa028d
150f98e
9f39aa2
05e6641
4a26bde
1e2efaf
c9e4d3a
29d9dc3
6f07a58
80ff530
a1108e7
e20d0cd
17542b1
7779233
7f8b106
6d2af0c
a4dc642
c98c46c
9858cc0
bc1dae9
7873cd6
4a08a2b
df836a9
7782cba
e4b0f3d
89e80e3
2a42bf2
6a7b01d
9e6d5b0
c321fe4
abb6044
a07ee66
a4cc4c8
ababf7d
c93f1d3
5b48a94
8c933d6
c7f227f
2efb604
f64d8e3
e2ecb0c
b753296
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# | ||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import json | ||
import os | ||
from pathlib import Path | ||
from typing import Mapping | ||
|
||
import pytest | ||
from airbyte_cdk import AirbyteLogger | ||
from source_s3.source import SourceS3 | ||
from unit_tests.abstract_test_parser import memory_limit | ||
from unit_tests.test_csv_parser import generate_big_file | ||
|
||
from .acceptance import TMP_FOLDER | ||
|
||
HERE = Path(__file__).resolve().parent | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def credentials() -> Mapping: | ||
filename = HERE / "config_minio.json" | ||
with open(filename) as f: | ||
return json.load(f) | ||
|
||
|
||
class TestIntegrationCsvFiles: | ||
logger = AirbyteLogger() | ||
|
||
@memory_limit(20) # max used memory should be less than 20Mb | ||
def read_source(self, credentials, catalog): | ||
read_count = 0 | ||
for msg in SourceS3().read(logger=self.logger, config=credentials, catalog=catalog): | ||
if msg.record: | ||
read_count += 1 | ||
return read_count | ||
|
||
@pytest.mark.order(1) | ||
def test_big_file(self, credentials): | ||
"""tests a big csv file (>= 1.0G records)""" | ||
# generates a big CSV files separately | ||
big_file_folder = os.path.join(TMP_FOLDER, "minio_data", "test-bucket", "big_files") | ||
os.makedirs(big_file_folder) | ||
filepath = os.path.join(big_file_folder, "file.csv") | ||
|
||
# please change this value if you need to test another file size | ||
future_file_size = 1.0 # in gigabytes | ||
_, file_size = generate_big_file(filepath, future_file_size, 500) | ||
expected_count = sum(1 for _ in open(filepath)) - 1 | ||
self.logger.info(f"generated file {filepath} with size {file_size}Gb, lines: {expected_count}") | ||
|
||
credentials["path_pattern"] = "big_files/*.csv" | ||
credentials["format"]["block_size"] = 5 * 1024 ** 2 | ||
source = SourceS3() | ||
catalog = source.read_catalog(HERE / "configured_catalog.json") | ||
|
||
assert self.read_source(credentials, catalog) == expected_count |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# | ||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import pytest | ||
|
||
from .acceptance import minio_setup | ||
|
||
|
||
@pytest.fixture(scope="session", autouse=True) | ||
def connector_setup(): | ||
yield from minio_setup() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# | ||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import os | ||
from dataclasses import dataclass | ||
from datetime import datetime | ||
from functools import total_ordering | ||
|
||
|
||
@total_ordering | ||
@dataclass | ||
class FileInfo: | ||
"""Class for sharing of metadate""" | ||
antixar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
key: str | ||
size: int | ||
last_modified: datetime | ||
|
||
@property | ||
def size_in_megabytes(self): | ||
return self.size / 1024 ** 2 | ||
|
||
def __str__(self): | ||
return "Key: %s, LastModified: %s, Size: %.4fMb" % (self.key, self.last_modified.isoformat(), self.size_in_megabytes) | ||
|
||
def __repr__(self): | ||
return self.__str__() | ||
|
||
@classmethod | ||
def create_by_local_file(cls, filepath: str) -> "FileInfo": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like we're only using this method in testing... doesn't make sense as a method of the class imo, should rather be a separate private testing method to instantiate an instance of FileInfo if we need it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved to test folders |
||
"Generates a FileInfo instance. This function can be used for tests" | ||
if not os.path.exists(filepath): | ||
return cls(key=filepath, size=0, last_modified=datetime.now()) | ||
return cls(key=filepath, size=os.stat(filepath).st_size, last_modified=datetime.fromtimestamp(os.path.getmtime(filepath))) | ||
|
||
def __eq__(self, other): | ||
if isinstance(other, str): | ||
return self.key == other | ||
return self.key == other.key | ||
|
||
def __lt__(self, other): | ||
if isinstance(other, str): | ||
return self.key < other | ||
return self.key < other.key | ||
antixar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def __hash__(self): | ||
return self.key.__hash__() |
Uh oh!
There was an error while loading. Please reload this page.