Skip to content

Commit ab9ddbe

Browse files
committed
Process Arrow record batch in chunks
1 parent f0a3f2e commit ab9ddbe

File tree

1 file changed

+29
-13
lines changed

1 file changed

+29
-13
lines changed

crates/duckdb/src/appender/arrow.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use crate::{
66
Error,
77
};
88
use arrow::record_batch::RecordBatch;
9-
use ffi::duckdb_append_data_chunk;
9+
use ffi::{duckdb_append_data_chunk, duckdb_vector_size};
1010

1111
impl Appender<'_> {
12-
/// Append one record_batch
12+
/// Append one record batch
1313
///
1414
/// ## Example
1515
///
@@ -28,19 +28,35 @@ impl Appender<'_> {
2828
/// Will return `Err` if append column count not the same with the table schema
2929
#[inline]
3030
pub fn append_record_batch(&mut self, record_batch: RecordBatch) -> Result<()> {
31-
let schema = record_batch.schema();
32-
let mut logical_type: Vec<LogicalTypeHandle> = vec![];
33-
for field in schema.fields() {
34-
let logical_t = to_duckdb_logical_type(field.data_type())
35-
.map_err(|_op| Error::ArrowTypeToDuckdbType(field.to_string(), field.data_type().clone()))?;
36-
logical_type.push(logical_t);
37-
}
31+
let logical_types: Vec<LogicalTypeHandle> = record_batch
32+
.schema()
33+
.fields()
34+
.iter()
35+
.map(|field| {
36+
to_duckdb_logical_type(field.data_type())
37+
.map_err(|_op| Error::ArrowTypeToDuckdbType(field.to_string(), field.data_type().clone()))
38+
})
39+
.collect::<Result<Vec<_>, _>>()?;
40+
41+
let vector_size = unsafe { duckdb_vector_size() } as usize;
42+
let num_rows = record_batch.num_rows();
43+
44+
// Process record batch in chunks that fit within DuckDB's vector size
45+
let mut offset = 0;
46+
while offset < num_rows {
47+
let slice_len = std::cmp::min(vector_size, num_rows - offset);
48+
let slice = record_batch.slice(offset, slice_len);
49+
50+
let mut data_chunk = DataChunkHandle::new(&logical_types);
51+
record_batch_to_duckdb_data_chunk(&slice, &mut data_chunk).map_err(|_op| Error::AppendError)?;
3852

39-
let mut data_chunk = DataChunkHandle::new(&logical_type);
40-
record_batch_to_duckdb_data_chunk(&record_batch, &mut data_chunk).map_err(|_op| Error::AppendError)?;
53+
let rc = unsafe { duckdb_append_data_chunk(self.app, data_chunk.get_ptr()) };
54+
result_from_duckdb_appender(rc, &mut self.app)?;
4155

42-
let rc = unsafe { duckdb_append_data_chunk(self.app, data_chunk.get_ptr()) };
43-
result_from_duckdb_appender(rc, &mut self.app)
56+
offset += slice_len;
57+
}
58+
59+
Ok(())
4460
}
4561
}
4662

0 commit comments

Comments
 (0)