Skip to content

Commit dc63916

Browse files
teh-cmcjprochazk
authored andcommitted
Partition manifests basics (#9434)
Basic definitions to get the ball rolling on the new partition manifests. * For the interesting stuff, see rerun-io/dataplatform#487
1 parent de902c7 commit dc63916

File tree

10 files changed

+286
-20
lines changed

10 files changed

+286
-20
lines changed

Cargo.lock

+1
Original file line numberDiff line numberDiff line change
@@ -6374,6 +6374,7 @@ dependencies = [
63746374
"re_tuid",
63756375
"thiserror 1.0.65",
63766376
"tonic",
6377+
"url",
63776378
]
63786379

63796380
[[package]]

crates/store/re_grpc_client/src/redap/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use re_log_encoding::codec::wire::decoder::Decode as _;
55
use re_log_types::{LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
66
use re_protos::catalog::v1alpha1::ext::ReadDatasetEntryResponse;
77
use re_protos::catalog::v1alpha1::ReadDatasetEntryRequest;
8-
use re_protos::common::v1alpha1::ext::PartitionId;
98
use re_protos::frontend::v1alpha1::frontend_service_client::FrontendServiceClient;
109
use re_protos::frontend::v1alpha1::FetchPartitionRequest;
1110

@@ -184,7 +183,7 @@ pub async fn stream_partition_async(
184183
//TODO(rerun-io/dataplatform#474): filter chunks by time range
185184
.fetch_partition(FetchPartitionRequest {
186185
dataset_id: Some(endpoint.dataset_id.into()),
187-
partition_id: Some(PartitionId::new(endpoint.partition_id.clone()).into()),
186+
partition_id: Some(endpoint.partition_id.clone().into()),
188187
})
189188
.await?
190189
.into_inner();

crates/store/re_log_encoding/src/protobuf_conversions.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// TODO(#9430): this belongs in re_protos::ext
2+
13
impl From<re_protos::log_msg::v1alpha1::Compression> for crate::Compression {
24
fn from(value: re_protos::log_msg::v1alpha1::Compression) -> Self {
35
match value {

crates/store/re_protos/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ prost-types.workspace = true
3434
prost.workspace = true
3535
pyo3 = { workspace = true, optional = true }
3636
thiserror.workspace = true
37+
url.workspace = true
3738

3839
# Native dependencies:
3940
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]

crates/store/re_protos/proto/rerun/v1alpha1/manifest_registry.proto

+14
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ service ManifestRegistryService {
106106
//
107107
// This is normally automatically done as part of the registration process.
108108
rpc CreatePartitionManifests(CreatePartitionManifestsRequest) returns (CreatePartitionManifestsResponse) {}
109+
110+
// Fetch the internal state of a Partition Manifest.
111+
rpc FetchPartitionManifest(FetchPartitionManifestRequest) returns (stream FetchPartitionManifestResponse) {}
109112
}
110113

111114
// --- Write data ---
@@ -430,6 +433,17 @@ message CreatePartitionManifestsResponse {
430433
rerun.common.v1alpha1.DataframePart data = 1;
431434
}
432435

436+
message FetchPartitionManifestRequest {
437+
rerun.common.v1alpha1.DatasetHandle entry = 1;
438+
rerun.common.v1alpha1.PartitionId id = 2;
439+
rerun.common.v1alpha1.ScanParameters scan_parameters = 3;
440+
}
441+
442+
// TODO(cmc): this should have response extensions too.
443+
message FetchPartitionManifestResponse {
444+
rerun.common.v1alpha1.DataframePart data = 1;
445+
}
446+
433447
message FetchPartitionRequest {
434448
rerun.common.v1alpha1.DatasetHandle entry = 1;
435449
rerun.common.v1alpha1.PartitionId partition_id = 2;

crates/store/re_protos/src/lib.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ mod v1alpha1 {
4141
#[path = "./rerun.manifest_registry.v1alpha1.rs"]
4242
pub mod rerun_manifest_registry_v1alpha1;
4343

44+
#[path = "./rerun.manifest_registry.v1alpha1.ext.rs"]
45+
pub mod rerun_manifest_registry_v1alpha1_ext;
46+
4447
#[path = "./rerun.frontend.v1alpha1.rs"]
4548
pub mod rerun_frontend_v1alpha1;
4649

@@ -64,18 +67,25 @@ pub mod log_msg {
6467
}
6568

6669
pub mod manifest_registry {
70+
#[rustfmt::skip] // keep these constants single line for easy sorting
6771
pub mod v1alpha1 {
6872
pub use crate::v1alpha1::rerun_manifest_registry_v1alpha1::*;
73+
#[expect(unused_imports)]
74+
pub mod ext {
75+
pub use crate::v1alpha1::rerun_manifest_registry_v1alpha1_ext::*;
76+
}
6977

7078
/// `DatasetManifest` mandatory field names. All mandatory metadata fields are prefixed
7179
/// with "rerun_" to avoid conflicts with user-defined fields.
72-
pub const DATASET_MANIFEST_ID_FIELD_NAME: &str = "rerun_partition_id";
7380
pub const DATASET_MANIFEST_APP_ID_FIELD_NAME: &str = "rerun_application_id";
74-
pub const DATASET_MANIFEST_START_TIME_FIELD_NAME: &str = "rerun_start_time";
81+
pub const DATASET_MANIFEST_ID_FIELD_NAME: &str = "rerun_partition_id";
82+
pub const DATASET_MANIFEST_PARTITION_MANIFEST_UPDATED_AT_FIELD_NAME: &str = "rerun_partition_manifest_updated_at";
83+
pub const DATASET_MANIFEST_PARTITION_MANIFEST_URL_FIELD_NAME: &str = "rerun_partition_manifest_url";
7584
pub const DATASET_MANIFEST_RECORDING_TYPE_FIELD_NAME: &str = "rerun_partition_type";
76-
pub const DATASET_MANIFEST_STORAGE_URL_FIELD_NAME: &str = "rerun_storage_url";
7785
pub const DATASET_MANIFEST_REGISTRATION_TIME_FIELD_NAME: &str = "rerun_registration_time";
7886
pub const DATASET_MANIFEST_ROW_ID_FIELD_NAME: &str = "rerun_row_id";
87+
pub const DATASET_MANIFEST_START_TIME_FIELD_NAME: &str = "rerun_start_time";
88+
pub const DATASET_MANIFEST_STORAGE_URL_FIELD_NAME: &str = "rerun_storage_url";
7989
}
8090
}
8191

crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.ext.rs

+54-14
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ impl TryFrom<crate::common::v1alpha1::Tuid> for crate::common::v1alpha1::EntryId
104104

105105
// --- PartitionId ---
106106

107-
#[derive(Clone, PartialEq, Eq, Debug, Hash)]
107+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
108108
pub struct PartitionId {
109109
pub id: String,
110110
}
@@ -116,21 +116,49 @@ impl PartitionId {
116116
}
117117
}
118118

119-
impl From<PartitionId> for crate::common::v1alpha1::PartitionId {
120-
fn from(value: PartitionId) -> Self {
121-
Self { id: Some(value.id) }
119+
impl From<String> for PartitionId {
120+
fn from(id: String) -> Self {
121+
Self { id }
122+
}
123+
}
124+
125+
impl From<&str> for PartitionId {
126+
fn from(id: &str) -> Self {
127+
Self { id: id.to_owned() }
128+
}
129+
}
130+
131+
impl std::fmt::Display for PartitionId {
132+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133+
self.id.fmt(f)
122134
}
123135
}
124136

125137
impl TryFrom<crate::common::v1alpha1::PartitionId> for PartitionId {
126138
type Error = TypeConversionError;
127139

128140
fn try_from(value: crate::common::v1alpha1::PartitionId) -> Result<Self, Self::Error> {
129-
let id = value
130-
.id
131-
.ok_or(missing_field!(crate::common::v1alpha1::PartitionId, "id"))?;
141+
Ok(Self {
142+
id: value
143+
.id
144+
.ok_or(missing_field!(crate::common::v1alpha1::PartitionId, "id"))?,
145+
})
146+
}
147+
}
148+
149+
// shortcuts
132150

133-
Ok(Self { id })
151+
impl From<String> for crate::common::v1alpha1::PartitionId {
152+
fn from(id: String) -> Self {
153+
Self { id: Some(id) }
154+
}
155+
}
156+
157+
impl From<&str> for crate::common::v1alpha1::PartitionId {
158+
fn from(id: &str) -> Self {
159+
Self {
160+
id: Some(id.to_owned()),
161+
}
134162
}
135163
}
136164

@@ -139,7 +167,13 @@ impl TryFrom<crate::common::v1alpha1::PartitionId> for PartitionId {
139167
#[derive(Debug, Clone)]
140168
pub struct DatasetHandle {
141169
pub id: Option<EntryId>,
142-
pub url: String,
170+
pub url: url::Url,
171+
}
172+
173+
impl DatasetHandle {
174+
pub fn new(url: url::Url) -> Self {
175+
Self { id: None, url }
176+
}
143177
}
144178

145179
impl TryFrom<crate::common::v1alpha1::DatasetHandle> for DatasetHandle {
@@ -148,10 +182,16 @@ impl TryFrom<crate::common::v1alpha1::DatasetHandle> for DatasetHandle {
148182
fn try_from(value: crate::common::v1alpha1::DatasetHandle) -> Result<Self, Self::Error> {
149183
Ok(Self {
150184
id: value.entry_id.map(|id| id.try_into()).transpose()?,
151-
url: value.dataset_url.ok_or(missing_field!(
152-
crate::common::v1alpha1::DatasetHandle,
153-
"dataset_url"
154-
))?,
185+
url: value
186+
.dataset_url
187+
.ok_or(missing_field!(
188+
crate::common::v1alpha1::DatasetHandle,
189+
"dataset_url"
190+
))?
191+
.parse()
192+
.map_err(|err| {
193+
invalid_field!(crate::common::v1alpha1::DatasetHandle, "dataset_url", err)
194+
})?,
155195
})
156196
}
157197
}
@@ -160,7 +200,7 @@ impl From<DatasetHandle> for crate::common::v1alpha1::DatasetHandle {
160200
fn from(value: DatasetHandle) -> Self {
161201
Self {
162202
entry_id: value.id.map(Into::into),
163-
dataset_url: Some(value.url),
203+
dataset_url: Some(value.url.to_string()),
164204
}
165205
}
166206
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use std::sync::Arc;
2+
3+
use arrow::{
4+
array::{ArrayRef, RecordBatch, StringArray, TimestampNanosecondArray},
5+
datatypes::{DataType, Field, Schema, TimeUnit},
6+
};
7+
8+
use crate::manifest_registry::v1alpha1::CreatePartitionManifestsResponse;
9+
10+
// --- CreatePartitionManifestsResponse ---
11+
12+
impl CreatePartitionManifestsResponse {
13+
pub const FIELD_ID: &str = "id";
14+
pub const FIELD_UPDATED_AT: &str = "updated_at";
15+
pub const FIELD_URL: &str = "url";
16+
pub const FIELD_ERROR: &str = "error";
17+
18+
/// The Arrow schema of the dataframe in [`Self::data`].
19+
pub fn schema() -> Schema {
20+
Schema::new(vec![
21+
Field::new(Self::FIELD_ID, DataType::Utf8, false),
22+
Field::new(
23+
Self::FIELD_UPDATED_AT,
24+
DataType::Timestamp(TimeUnit::Nanosecond, None),
25+
true,
26+
),
27+
Field::new(Self::FIELD_URL, DataType::Utf8, true),
28+
Field::new(Self::FIELD_ERROR, DataType::Utf8, true),
29+
])
30+
}
31+
32+
/// Helper to simplify instantiation of the dataframe in [`Self::data`].
33+
pub fn create_dataframe(
34+
partition_ids: Vec<String>,
35+
updated_ats: Vec<Option<jiff::Timestamp>>,
36+
partition_manifest_urls: Vec<Option<String>>,
37+
errors: Vec<Option<String>>,
38+
) -> arrow::error::Result<RecordBatch> {
39+
let updated_ats = updated_ats
40+
.into_iter()
41+
.map(|ts| ts.map(|ts| ts.as_nanosecond() as i64)) // ~300 years should be fine
42+
.collect::<Vec<_>>();
43+
44+
let schema = Arc::new(Self::schema());
45+
let columns: Vec<ArrayRef> = vec![
46+
Arc::new(StringArray::from(partition_ids)),
47+
Arc::new(TimestampNanosecondArray::from(updated_ats)),
48+
Arc::new(StringArray::from(partition_manifest_urls)),
49+
Arc::new(StringArray::from(errors)),
50+
];
51+
52+
RecordBatch::try_new(schema, columns)
53+
}
54+
}
55+
56+
// TODO(#9430): I'd love if I could do this, but this creates a nasty circular dep with `re_log_encoding`.
57+
#[cfg(all(unix, windows))] // always statically false
58+
impl TryFrom<RecordBatch> for CreatePartitionManifestsResponse {
59+
type Error = tonic::Status;
60+
61+
fn try_from(batch: RecordBatch) -> Result<Self, Self::Error> {
62+
if !Self::schema().contains(batch.schema()) {
63+
let typ = std::any::type_name::<Self>();
64+
return Err(tonic::Status::internal(format!(
65+
"invalid schema for {typ}: expected {:?} but got {:?}",
66+
Self::schema(),
67+
batch.schema(),
68+
)));
69+
}
70+
71+
use re_log_encoding::codec::wire::encoder::Encode as _;
72+
batch
73+
.encode()
74+
.map(|data| Self { data: Some(data) })
75+
.map_err(|err| tonic::Status::internal(format!("failed to encode chunk: {err}")))?;
76+
}
77+
}
78+
79+
// TODO(#9430): the other way around would be nice too, but same problem.

0 commit comments

Comments
 (0)