Skip to content

Commit e18f243

Browse files
authored
feat(connector): unified csv parser (risingwavelabs#8463)
1 parent 2ae019d commit e18f243

File tree

9 files changed

+466
-286
lines changed

9 files changed

+466
-286
lines changed

Cargo.lock

Lines changed: 13 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ci/scripts/s3-source-test.sh

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ set -euo pipefail
44

55
source ci/scripts/common.env.sh
66

7-
while getopts 'p:' opt; do
7+
while getopts 'p:s:' opt; do
88
case ${opt} in
99
p )
1010
profile=$OPTARG
1111
;;
12+
s )
13+
script=$OPTARG
14+
;;
1215
\? )
1316
echo "Invalid Option: -$OPTARG" 1>&2
1417
exit 1
@@ -20,6 +23,8 @@ while getopts 'p:' opt; do
2023
done
2124
shift $((OPTIND -1))
2225

26+
27+
2328
echo "--- Download artifacts"
2429
mkdir -p target/debug
2530
buildkite-agent artifact download risingwave-"$profile" target/debug/
@@ -44,7 +49,7 @@ cargo make ci-start ci-1cn-1fe
4449

4550
echo "--- Run test"
4651
python3 -m pip install minio psycopg2-binary
47-
python3 e2e_test/s3/run.py
52+
python3 e2e_test/s3/$script.py
4853

4954
echo "--- Kill cluster"
5055
cargo make ci-kill

ci/workflows/main-cron.yml

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ steps:
184184
timeout_in_minutes: 5
185185
retry: *auto-retry
186186

187-
- label: "S3 source check on AWS"
188-
command: "ci/scripts/s3-source-test.sh -p ci-release"
187+
- label: "S3 source check on AWS (json parser)"
188+
command: "ci/scripts/s3-source-test.sh -p ci-release -s run"
189189
depends_on: build
190190
plugins:
191191
- seek-oss/aws-sm#v2.3.1:
@@ -200,8 +200,8 @@ steps:
200200
timeout_in_minutes: 20
201201
retry: *auto-retry
202202

203-
- label: "S3 source check on lyvecloud.seagate.com"
204-
command: "ci/scripts/s3-source-test.sh -p ci-release"
203+
- label: "S3 source check on lyvecloud.seagate.com (json parser)"
204+
command: "ci/scripts/s3-source-test.sh -p ci-release -s run"
205205
depends_on: build
206206
plugins:
207207
- seek-oss/aws-sm#v2.3.1:
@@ -215,3 +215,18 @@ steps:
215215
- S3_SOURCE_TEST_CONF
216216
timeout_in_minutes: 20
217217
retry: *auto-retry
218+
- label: "S3 source check on AWS (csv parser)"
219+
command: "ci/scripts/s3-source-test.sh -p ci-release -s run_csv"
220+
depends_on: build
221+
plugins:
222+
- seek-oss/aws-sm#v2.3.1:
223+
env:
224+
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
225+
- docker-compose#v4.9.0:
226+
run: rw-build-env
227+
config: ci/docker-compose.yml
228+
mount-buildkite-agent: true
229+
environment:
230+
- S3_SOURCE_TEST_CONF
231+
timeout_in_minutes: 20
232+
retry: *auto-retry

e2e_test/s3/run_csv.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import os
2+
import string
3+
import json
4+
import string
5+
from time import sleep
6+
from minio import Minio
7+
import psycopg2
8+
import random
9+
10+
11+
def do_test(config, N, n, prefix):
12+
conn = psycopg2.connect(
13+
host="localhost",
14+
port="4566",
15+
user="root",
16+
database="dev"
17+
)
18+
19+
# Open a cursor to execute SQL statements
20+
cur = conn.cursor()
21+
cur.execute(f'''CREATE TABLE s3_test_csv_without_headers(
22+
a int,
23+
b int,
24+
c int,
25+
) WITH (
26+
connector = 's3',
27+
match_pattern = '{prefix}_data_without_headers.csv',
28+
s3.region_name = '{config['S3_REGION']}',
29+
s3.bucket_name = '{config['S3_BUCKET']}',
30+
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
31+
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
32+
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
33+
) ROW FORMAT CSV WITHOUT HEADER DELIMITED BY ',';''')
34+
35+
cur.execute(f'''CREATE TABLE s3_test_csv_with_headers(
36+
a int,
37+
b int,
38+
c int,
39+
) WITH (
40+
connector = 's3',
41+
match_pattern = '{prefix}_data_with_headers.csv',
42+
s3.region_name = '{config['S3_REGION']}',
43+
s3.bucket_name = '{config['S3_BUCKET']}',
44+
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
45+
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
46+
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
47+
) ROW FORMAT CSV DELIMITED BY ',';''')
48+
49+
total_row = int(N * n)
50+
sleep(60)
51+
while True:
52+
sleep(60)
53+
cur.execute('select count(*) from s3_test_csv_with_headers')
54+
result_with_headers = cur.fetchone()
55+
cur.execute('select count(*) from s3_test_csv_without_headers')
56+
result_without_headers = cur.fetchone()
57+
if result_with_headers[0] == total_row and result_without_headers[0] == total_row:
58+
break
59+
print(
60+
f"Now got {result_with_headers[0]} rows in table, {total_row} expected, wait 60s")
61+
62+
cur.execute(
63+
'select count(*), sum(a), sum(b), sum(c) from s3_test_csv_with_headers')
64+
result_with_headers = cur.fetchone()
65+
66+
cur.execute(
67+
'select count(*), sum(a), sum(b), sum(c) from s3_test_csv_without_headers')
68+
s3_test_csv_without_headers = cur.fetchone()
69+
70+
print(result_with_headers, s3_test_csv_without_headers,
71+
int(((N - 1) * N / 2) * n), int(N*n / 2))
72+
73+
assert s3_test_csv_without_headers[0] == total_row
74+
assert s3_test_csv_without_headers[1] == int(((N - 1) * N / 2) * n)
75+
assert s3_test_csv_without_headers[2] == int(N*n / 2)
76+
assert s3_test_csv_without_headers[3] == 0
77+
78+
assert result_with_headers[0] == total_row
79+
assert result_with_headers[1] == 0
80+
assert result_with_headers[2] == int(N*n / 2)
81+
assert result_with_headers[3] == int(((N - 1) * N / 2) * n)
82+
83+
cur.execute('drop table s3_test_csv_with_headers')
84+
cur.execute('drop table s3_test_csv_without_headers')
85+
86+
cur.close()
87+
conn.close()
88+
89+
90+
if __name__ == "__main__":
91+
config = json.loads(os.environ["S3_SOURCE_TEST_CONF"])
92+
run_id = str(random.randint(1000, 9999))
93+
N = 10000
94+
# do_test(config, N, 0, run_id)
95+
items = [",".join([str(j), str(j % 2), str(-1 if j % 2 else 1)])
96+
for j in range(N)
97+
]
98+
99+
data = "\n".join(items) + "\n"
100+
n = 10
101+
with open("data_without_headers.csv", "w") as f:
102+
for _ in range(10):
103+
f.write(data)
104+
os.fsync(f.fileno())
105+
106+
with open("data_with_headers.csv", "w") as f:
107+
f.write("c,b,a\n")
108+
for _ in range(10):
109+
f.write(data)
110+
os.fsync(f.fileno())
111+
112+
client = Minio(
113+
config["S3_ENDPOINT"],
114+
access_key=config["S3_ACCESS_KEY"],
115+
secret_key=config["S3_SECRET_KEY"],
116+
secure=True
117+
)
118+
119+
try:
120+
client.fput_object(
121+
config["S3_BUCKET"],
122+
f"{run_id}_data_without_headers.csv",
123+
f"data_without_headers.csv"
124+
125+
)
126+
client.fput_object(
127+
config["S3_BUCKET"],
128+
f"{run_id}_data_with_headers.csv",
129+
f"data_with_headers.csv"
130+
)
131+
print(
132+
f"Uploaded {run_id}_data_with_headers.csv & {run_id}_data_with_headers.csv to S3")
133+
os.remove(f"data_with_headers.csv")
134+
os.remove(f"data_without_headers.csv")
135+
except Exception as e:
136+
print(f"Error uploading test files")
137+
138+
return_code = 0
139+
try:
140+
do_test(config, N, n, run_id)
141+
except Exception as e:
142+
print("Test failed", e)
143+
return_code = 1
144+
145+
# Clean up
146+
for i in range(20):
147+
try:
148+
client.remove_object(
149+
config["S3_BUCKET"], f"{run_id}_data_with_headers.csv")
150+
client.remove_object(
151+
config["S3_BUCKET"], f"{run_id}_data_without_headers.csv")
152+
except Exception as e:
153+
print(f"Error removing testing files {e}")
154+
155+
exit(return_code)

src/connector/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ bincode = "1"
2626
byteorder = "1"
2727
bytes = { version = "1", features = ["serde"] }
2828
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
29-
csv-core = "0.1.10"
29+
csv = "1.2"
3030
duration-str = "0.5.0"
3131
enum-as-inner = "0.5"
3232
futures = { version = "0.3", default-features = false, features = ["alloc"] }

src/connector/src/macros.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,9 @@ macro_rules! impl_connector_properties {
163163
macro_rules! impl_common_parser_logic {
164164
($parser_name:ty) => {
165165
impl $parser_name {
166+
#[allow(unused_mut)]
166167
#[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = RwError)]
167-
async fn into_chunk_stream(self, data_stream: $crate::source::BoxSourceStream) {
168+
async fn into_chunk_stream(mut self, data_stream: $crate::source::BoxSourceStream) {
168169
#[for_await]
169170
for batch in data_stream {
170171
let batch = batch?;

0 commit comments

Comments
 (0)