Skip to content

Commit 8d3ad7f

Browse files
authored
Add partition_id to ChunkBatch (#9993)
### Related * Somewhat related: #8776 * Part of #9981. ### What In the future, we have decided that each chunk should know its partition id, here is a first step into that direction (right now we use ad-hoc code in the dataplatform for this). The only thing unclear to me is how that will work with blueprints in the future.
1 parent c4b613c commit 8d3ad7f

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

crates/store/re_sorbet/src/chunk_schema.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ impl ChunkSchema {
6161
)
6262
.collect(),
6363
},
64+
partition_id: None, // TODO(#9977): This should be required in the future.
6465
chunk_id: Some(chunk_id),
6566
entity_path: Some(entity_path.clone()),
6667
heap_size_bytes: None,

crates/store/re_sorbet/src/sorbet_schema.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ pub struct SorbetSchema {
2222
/// Which entity is this chunk for?
2323
pub entity_path: Option<EntityPath>,
2424

25+
/// The partition id that this chunk belongs to.
26+
pub partition_id: Option<String>,
27+
2528
/// The heap size of this batch in bytes, if known.
2629
pub heap_size_bytes: Option<u64>,
2730
}
@@ -50,12 +53,20 @@ impl SorbetSchema {
5053
("rerun.entity_path".to_owned(), entity_path.to_string())
5154
}
5255

56+
pub fn partition_id_metadata(partition_id: impl AsRef<str>) -> (String, String) {
57+
(
58+
"rerun.partition_id".to_owned(),
59+
partition_id.as_ref().to_owned(),
60+
)
61+
}
62+
5363
pub fn arrow_batch_metadata(&self) -> ArrowBatchMetadata {
5464
let Self {
5565
columns: _,
5666
chunk_id,
5767
entity_path,
5868
heap_size_bytes,
69+
partition_id,
5970
} = self;
6071

6172
[
@@ -65,6 +76,7 @@ impl SorbetSchema {
6576
)),
6677
chunk_id.as_ref().map(Self::chunk_id_metadata),
6778
entity_path.as_ref().map(Self::entity_path_metadata),
79+
partition_id.as_ref().map(Self::partition_id_metadata),
6880
heap_size_bytes.as_ref().map(|heap_size_bytes| {
6981
(
7082
"rerun.heap_size_bytes".to_owned(),
@@ -129,6 +141,8 @@ impl TryFrom<&ArrowSchema> for SorbetSchema {
129141
None
130142
};
131143

144+
let partition_id = metadata.get("rerun.partition_id").map(|s| s.to_owned());
145+
132146
// Verify version
133147
if let Some(batch_version) = metadata.get(Self::METADATA_KEY_VERSION) {
134148
if batch_version != Self::METADATA_VERSION {
@@ -143,6 +157,7 @@ impl TryFrom<&ArrowSchema> for SorbetSchema {
143157
columns,
144158
chunk_id,
145159
entity_path,
160+
partition_id,
146161
heap_size_bytes,
147162
})
148163
}

0 commit comments

Comments
 (0)