Skip to content

Commit f37a266

Browse files
abey79jprochazk
authored andcommitted
Implement Dataset.download_partition() to download a partition as a Recording (#9451)
Title. This enable to tie the new Python API to the existing dataframe API: <img width="1010" alt="image" src="https://github.com/user-attachments/assets/fb623700-f5f3-431c-8ab0-9d17f35bb5d6" />
1 parent b08cf5d commit f37a266

File tree

4 files changed

+115
-16
lines changed

4 files changed

+115
-16
lines changed

crates/store/re_grpc_client/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ pub enum StreamError {
7070

7171
#[error(transparent)]
7272
TypeConversionError(#[from] re_protos::TypeConversionError),
73+
74+
#[error("Chunk data missing in response")]
75+
MissingChunkData,
7376
}
7477

7578
impl From<tonic::Status> for StreamError {

crates/store/re_grpc_client/src/redap/mod.rs

+20-14
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use tokio_stream::StreamExt as _;
1+
use tokio_stream::{Stream, StreamExt as _};
22

33
use re_chunk::Chunk;
44
use re_log_encoding::codec::wire::decoder::Decode as _;
@@ -7,7 +7,7 @@ use re_protos::catalog::v1alpha1::ext::ReadDatasetEntryResponse;
77
use re_protos::catalog::v1alpha1::ReadDatasetEntryRequest;
88
use re_protos::frontend::v1alpha1::frontend_service_client::FrontendServiceClient;
99
use re_protos::frontend::v1alpha1::FetchPartitionRequest;
10-
10+
use re_protos::manifest_registry::v1alpha1::FetchPartitionResponse;
1111
use re_uri::{DatasetDataEndpoint, Origin};
1212

1313
use crate::{spawn_future, StreamError, MAX_DECODING_MESSAGE_SIZE};
@@ -154,6 +154,20 @@ pub async fn client_with_interceptor<I: tonic::service::Interceptor>(
154154
)
155155
}
156156

157+
/// Converts a `FetchPartitionResponse` stream into a stream of `Chunk`s.
158+
//TODO(#9430): ideally this should be factored as a nice helper in `re_proto`
159+
pub fn fetch_partition_response_to_chunk(
160+
response: tonic::Streaming<FetchPartitionResponse>,
161+
) -> impl Stream<Item = Result<Chunk, StreamError>> {
162+
response.map(|resp| {
163+
resp.map_err(Into::into).and_then(|r| {
164+
let batch = r.chunk.ok_or(StreamError::MissingChunkData)?.decode()?;
165+
166+
Chunk::from_record_batch(&batch).map_err(Into::into)
167+
})
168+
})
169+
}
170+
157171
pub async fn stream_partition_async(
158172
tx: re_smart_channel::Sender<LogMsg>,
159173
endpoint: re_uri::DatasetDataEndpoint,
@@ -188,15 +202,6 @@ pub async fn stream_partition_async(
188202
.await?
189203
.into_inner();
190204

191-
let mut chunk_stream = catalog_chunk_stream.map(|resp| {
192-
resp.and_then(|r| {
193-
r.chunk
194-
.ok_or_else(|| tonic::Status::internal("missing chunk in FetchPartitionResponse"))?
195-
.decode()
196-
.map_err(|err| tonic::Status::internal(err.to_string()))
197-
})
198-
});
199-
200205
drop(client);
201206

202207
let store_id = StoreId::from_string(StoreKind::Recording, endpoint.partition_id.clone());
@@ -227,9 +232,10 @@ pub async fn stream_partition_async(
227232
return Ok(());
228233
}
229234

230-
while let Some(result) = chunk_stream.next().await {
231-
let batch = result?;
232-
let chunk = Chunk::from_record_batch(&batch)?;
235+
let mut chunk_stream = fetch_partition_response_to_chunk(catalog_chunk_stream);
236+
237+
while let Some(chunk) = chunk_stream.next().await {
238+
let chunk = chunk?;
233239

234240
if tx
235241
.send(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?))

rerun_py/src/catalog/dataset.rs

+61-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1-
use pyo3::{pyclass, pymethods};
1+
use pyo3::{pyclass, pymethods, PyRef, PyResult};
2+
use tokio_stream::StreamExt as _;
23

4+
use re_chunk_store::{ChunkStore, ChunkStoreHandle};
5+
use re_grpc_client::redap::fetch_partition_response_to_chunk;
6+
use re_log_types::{StoreId, StoreInfo, StoreKind, StoreSource};
37
use re_protos::common::v1alpha1::ext::DatasetHandle;
8+
use re_protos::frontend::v1alpha1::FetchPartitionRequest;
49

5-
use crate::catalog::PyEntry;
10+
use crate::catalog::{to_py_err, PyEntry};
11+
use crate::dataframe::PyRecording;
12+
use crate::utils::wait_for_future;
613

714
#[pyclass(name = "Dataset", extends=PyEntry)]
815
pub struct PyDataset {
@@ -15,4 +22,56 @@ impl PyDataset {
1522
fn manifest_url(&self) -> String {
1623
self.dataset_handle.url.to_string()
1724
}
25+
26+
fn download_partition(self_: PyRef<'_, Self>, partition_id: String) -> PyResult<PyRecording> {
27+
let super_ = self_.as_super();
28+
let mut client = super_.client.borrow(self_.py()).connection().client();
29+
30+
let dataset_id = super_.details.id;
31+
let dataset_name = super_.details.name.clone();
32+
33+
let store: PyResult<ChunkStore> = wait_for_future(self_.py(), async move {
34+
let catalog_chunk_stream = client
35+
.fetch_partition(FetchPartitionRequest {
36+
dataset_id: Some(dataset_id.into()),
37+
partition_id: Some(partition_id.clone().into()),
38+
})
39+
.await
40+
.map_err(to_py_err)?
41+
.into_inner();
42+
43+
let store_id = StoreId::from_string(StoreKind::Recording, partition_id);
44+
let store_info = StoreInfo {
45+
application_id: dataset_name.into(),
46+
store_id: store_id.clone(),
47+
cloned_from: None,
48+
store_source: StoreSource::Unknown,
49+
store_version: None,
50+
};
51+
52+
let mut store = ChunkStore::new(store_id, Default::default());
53+
store.set_info(store_info);
54+
55+
let mut chunk_stream = fetch_partition_response_to_chunk(catalog_chunk_stream);
56+
57+
while let Some(chunk) = chunk_stream.next().await {
58+
let chunk = chunk.map_err(to_py_err)?;
59+
store
60+
.insert_chunk(&std::sync::Arc::new(chunk))
61+
.map_err(to_py_err)?;
62+
}
63+
64+
Ok(store)
65+
});
66+
67+
let handle = ChunkStoreHandle::new(store?);
68+
69+
let cache =
70+
re_dataframe::QueryCacheHandle::new(re_dataframe::QueryCache::new(handle.clone()));
71+
72+
Ok(PyRecording {
73+
store: handle,
74+
cache,
75+
})
76+
}
1877
}

rerun_py/src/catalog/errors.rs

+31
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ enum ExternalError {
2929
ConnectionError(ConnectionError),
3030
TonicStatusError(tonic::Status),
3131
UriError(re_uri::Error),
32+
ChunkError(re_chunk::ChunkError),
33+
ChunkStoreError(re_chunk_store::ChunkStoreError),
34+
StreamError(re_grpc_client::StreamError),
3235
}
3336

3437
impl From<ConnectionError> for ExternalError {
@@ -49,6 +52,24 @@ impl From<re_uri::Error> for ExternalError {
4952
}
5053
}
5154

55+
impl From<re_chunk::ChunkError> for ExternalError {
56+
fn from(value: re_chunk::ChunkError) -> Self {
57+
Self::ChunkError(value)
58+
}
59+
}
60+
61+
impl From<re_chunk_store::ChunkStoreError> for ExternalError {
62+
fn from(value: re_chunk_store::ChunkStoreError) -> Self {
63+
Self::ChunkStoreError(value)
64+
}
65+
}
66+
67+
impl From<re_grpc_client::StreamError> for ExternalError {
68+
fn from(value: re_grpc_client::StreamError) -> Self {
69+
Self::StreamError(value)
70+
}
71+
}
72+
5273
impl From<ExternalError> for PyErr {
5374
fn from(err: ExternalError) -> Self {
5475
match err {
@@ -69,6 +90,16 @@ impl From<ExternalError> for PyErr {
6990
}
7091

7192
ExternalError::UriError(err) => PyValueError::new_err(format!("Invalid URI: {err}")),
93+
94+
ExternalError::ChunkError(err) => PyValueError::new_err(format!("Chunk error: {err}")),
95+
96+
ExternalError::ChunkStoreError(err) => {
97+
PyValueError::new_err(format!("Chunk store error: {err}"))
98+
}
99+
100+
ExternalError::StreamError(err) => {
101+
PyValueError::new_err(format!("Data streaming error: {err}"))
102+
}
72103
}
73104
}
74105
}

0 commit comments

Comments
 (0)