From 6717f37cb134f15f15ac483e0e140970ef984fd3 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Wed, 30 Apr 2025 18:58:42 +0200 Subject: [PATCH 1/5] Properly resolve component selectors in dataset index creation and search APIs --- .../src/v1alpha1/rerun.common.v1alpha1.ext.rs | 3 +- .../rerun.manifest_registry.v1alpha1.ext.rs | 26 ++++-- .../re_sorbet/src/column_descriptor_ref.rs | 1 + crates/store/re_sorbet/src/sorbet_columns.rs | 79 ++++++++++++++++- rerun_py/src/catalog/dataset.rs | 85 ++++++++++--------- rerun_py/src/catalog/errors.rs | 7 ++ rerun_py/src/dataframe.rs | 22 ++++- 7 files changed, 170 insertions(+), 53 deletions(-) diff --git a/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.ext.rs index 1720c741a7dc..aee90146ef93 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.common.v1alpha1.ext.rs @@ -1,10 +1,9 @@ use std::sync::Arc; +use crate::{invalid_field, missing_field, TypeConversionError}; use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError}; use re_log_types::{external::re_types_core::ComponentDescriptor, TableId}; -use crate::{invalid_field, missing_field, TypeConversionError}; - // --- Arrow --- impl TryFrom<&crate::common::v1alpha1::Schema> for ArrowSchema { diff --git a/crates/store/re_protos/src/v1alpha1/rerun.manifest_registry.v1alpha1.ext.rs b/crates/store/re_protos/src/v1alpha1/rerun.manifest_registry.v1alpha1.ext.rs index 2a7c7313fe6b..f233968cee15 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.manifest_registry.v1alpha1.ext.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.manifest_registry.v1alpha1.ext.rs @@ -6,15 +6,15 @@ use arrow::{ error::ArrowError, }; -use re_chunk::TimelineName; -use re_log_types::EntityPath; - +use super::rerun_manifest_registry_v1alpha1::VectorDistanceMetric; +use crate::common::v1alpha1::ComponentDescriptor; use crate::manifest_registry::v1alpha1::{ CreatePartitionManifestsResponse, DataSourceKind, GetDatasetSchemaResponse, }; use crate::{invalid_field, missing_field, TypeConversionError}; - -use super::rerun_manifest_registry_v1alpha1::VectorDistanceMetric; +use re_chunk::TimelineName; +use re_log_types::EntityPath; +use re_sorbet::ComponentColumnDescriptor; // --- QueryDataset --- @@ -509,3 +509,19 @@ impl From for crate::manifest_registry::v1alpha1::IndexProperti } } } + +// --- + +impl From for crate::manifest_registry::v1alpha1::IndexColumn { + fn from(value: ComponentColumnDescriptor) -> Self { + Self { + entity_path: Some(value.entity_path.into()), + + component: Some(ComponentDescriptor { + archetype_name: value.archetype_name.map(|n| n.full_name().to_owned()), + archetype_field_name: value.archetype_field_name.map(|n| n.to_string()), + component_name: Some(value.component_name.full_name().to_owned()), + }), + } + } +} diff --git a/crates/store/re_sorbet/src/column_descriptor_ref.rs b/crates/store/re_sorbet/src/column_descriptor_ref.rs index bd87668500f7..2bace3f6dcba 100644 --- a/crates/store/re_sorbet/src/column_descriptor_ref.rs +++ b/crates/store/re_sorbet/src/column_descriptor_ref.rs @@ -5,6 +5,7 @@ use crate::{ #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum ColumnDescriptorRef<'a> { RowId(&'a RowIdColumnDescriptor), + //TODO(ab): this should be renamed Index! Time(&'a IndexColumnDescriptor), Component(&'a ComponentColumnDescriptor), } diff --git a/crates/store/re_sorbet/src/sorbet_columns.rs b/crates/store/re_sorbet/src/sorbet_columns.rs index 3f50d8c5d2a9..cd5366018b3b 100644 --- a/crates/store/re_sorbet/src/sorbet_columns.rs +++ b/crates/store/re_sorbet/src/sorbet_columns.rs @@ -4,8 +4,9 @@ use nohash_hasher::IntSet; use re_log_types::EntityPath; use crate::{ - ColumnDescriptor, ColumnDescriptorRef, ColumnKind, ComponentColumnDescriptor, - IndexColumnDescriptor, RowIdColumnDescriptor, SorbetError, + ColumnDescriptor, ColumnDescriptorRef, ColumnKind, ColumnSelector, ComponentColumnDescriptor, + ComponentColumnSelector, IndexColumnDescriptor, RowIdColumnDescriptor, SorbetError, + TimeColumnSelector, }; #[derive(Debug, Clone, PartialEq, Eq)] @@ -96,6 +97,80 @@ impl SorbetColumnDescriptors { } } + /// Resolve the provided column selector. Returns `None` if no corresponding column was found. + pub fn resolve_selector( + &self, + column_selector: &ColumnSelector, + ) -> Option> { + match column_selector { + ColumnSelector::Time(selector) => self + .resolve_index_column_selector(selector) + .map(ColumnDescriptorRef::Time), + + ColumnSelector::Component(selector) => self + .resolve_component_column_selector(selector) + .map(ColumnDescriptorRef::Component), + } + } + + /// Resolve the provided index column selector. Returns `None` if no corresponding column was + /// found. + pub fn resolve_index_column_selector( + &self, + index_column_selector: &TimeColumnSelector, + ) -> Option<&IndexColumnDescriptor> { + self.indices + .iter() + .find(|column| column.timeline_name() == index_column_selector.timeline) + } + + /// Resolve the provided component column selector. Returns `None` if no corresponding column + /// was found. + /// + /// Matching strategy: + /// - The entity path must match exactly. + /// - The first component with a fully matching name is returned (there shouldn't be more than + /// one for now). + /// - If no exact match is found, a partial match is attempted using + /// [`re_types_core::ComponentName::matches`] and is returned only if it is unique. + // TODO(#6889): this will need to be fully revisited with tagged components + // TODO(ab): this is related but different from `re_chunk_store::resolve_component_selector()`. + // It is likely that only one of these should eventually remain. + // TODO: return a result? + pub fn resolve_component_column_selector( + &self, + component_column_selector: &ComponentColumnSelector, + ) -> Option<&ComponentColumnDescriptor> { + let ComponentColumnSelector { + entity_path, + component_name, + } = component_column_selector; + + // happy path: exact component name match + let exact_match = self.components.iter().find(|column| { + column.component_name.as_str() == component_name && &column.entity_path == entity_path + }); + + if exact_match.is_some() { + return exact_match; + } + + // fallback: use `ComponentName::match` and check that we have a single result + let mut partial_match = self.components.iter().filter(|column| { + column.component_name.matches(component_name) && &column.entity_path == entity_path + }); + + let first_match = partial_match.next(); + + // Note: non-unique partial match is highly unlikely for now, but will become more likely + // with tagged components. + if partial_match.next().is_none() { + first_match + } else { + None + } + } + pub fn arrow_fields(&self, batch_type: crate::BatchType) -> Vec { let Self { row_id, diff --git a/rerun_py/src/catalog/dataset.rs b/rerun_py/src/catalog/dataset.rs index 7d89e4e20fb4..62b9d7022d43 100644 --- a/rerun_py/src/catalog/dataset.rs +++ b/rerun_py/src/catalog/dataset.rs @@ -16,17 +16,15 @@ use re_protos::common::v1alpha1::IfDuplicateBehavior; use re_protos::frontend::v1alpha1::{CreateIndexRequest, GetChunksRequest, SearchDatasetRequest}; use re_protos::manifest_registry::v1alpha1::ext::IndexProperties; use re_protos::manifest_registry::v1alpha1::{ - index_query_properties, IndexColumn, IndexConfig, IndexQueryProperties, InvertedIndexQuery, - VectorIndexQuery, + index_query_properties, IndexConfig, IndexQueryProperties, InvertedIndexQuery, VectorIndexQuery, }; -use re_sdk::{ComponentDescriptor, ComponentName}; -use re_sorbet::{ComponentColumnSelector, TimeColumnSelector}; +use re_sorbet::{SorbetColumnDescriptors, TimeColumnSelector}; use crate::catalog::{ dataframe_query::PyDataframeQueryView, to_py_err, PyEntry, VectorDistanceMetricLike, VectorLike, }; use crate::dataframe::{ - PyComponentColumnSelector, PyDataFusionTable, PyIndexColumnSelector, PyRecording, + PyComponentColumnSelector, PyDataFusionTable, PyIndexColumnSelector, PyRecording, PySchema, }; use crate::utils::wait_for_future; @@ -48,12 +46,14 @@ impl PyDataset { /// Return the Arrow schema of the data contained in the dataset. //TODO(#9457): there should be another `schema` method which returns a `PySchema` fn arrow_schema(self_: PyRef<'_, Self>) -> PyResult> { - let super_ = self_.as_super(); - let mut connection = super_.client.borrow_mut(self_.py()).connection().clone(); + let arrow_schema = Self::fetch_arrow_schema(&self_)?; - let schema = connection.get_dataset_schema(self_.py(), super_.details.id)?; + Ok(arrow_schema.into()) + } - Ok(schema.into()) + /// Return the schema of the data contained in the dataset. + fn schema(self_: PyRef<'_, Self>) -> PyResult { + Self::fetch_schema(&self_) } /// Return the partition table as a Datafusion table provider. @@ -218,18 +218,10 @@ impl PyDataset { let super_ = self_.as_super(); let connection = super_.client.borrow(self_.py()).connection().clone(); let dataset_id = super_.details.id; - let time_selector: TimeColumnSelector = time_index.into(); - let column_selector: ComponentColumnSelector = column.into(); - let mut component_descriptor = - ComponentDescriptor::new(column_selector.component_name.clone()); - - // TODO(jleibs): get rid of this hack - if component_descriptor.component_name == ComponentName::from("rerun.components.Text") { - component_descriptor = component_descriptor - .or_with_archetype_name(|| "rerun.archetypes.TextLog".into()) - .or_with_archetype_field_name(|| "text".into()); - } + + let schema = Self::fetch_schema(&self_)?; + let component_descriptor = schema.resolve_component_column_selector(&column)?; let properties = IndexProperties::Inverted { store_position, @@ -243,10 +235,7 @@ impl PyDataset { config: Some(IndexConfig { properties: Some(properties.into()), - column: Some(IndexColumn { - entity_path: Some(column_selector.entity_path.into()), - component: Some(component_descriptor.into()), - }), + column: Some(component_descriptor.0.into()), time_index: Some(time_selector.timeline.into()), }), @@ -286,8 +275,9 @@ impl PyDataset { let dataset_id = super_.details.id; let time_selector: TimeColumnSelector = time_index.into(); - let column_selector: ComponentColumnSelector = column.into(); - let component_descriptor = ComponentDescriptor::new(column_selector.component_name.clone()); + + let schema = Self::fetch_schema(&self_)?; + let component_descriptor = schema.resolve_component_column_selector(&column)?; let distance_metric: re_protos::manifest_registry::v1alpha1::VectorDistanceMetric = distance_metric.try_into()?; @@ -305,10 +295,7 @@ impl PyDataset { config: Some(IndexConfig { properties: Some(properties.into()), - column: Some(IndexColumn { - entity_path: Some(column_selector.entity_path.into()), - component: Some(component_descriptor.into()), - }), + column: Some(component_descriptor.0.into()), time_index: Some(time_selector.timeline.into()), }), @@ -336,8 +323,8 @@ impl PyDataset { let connection = super_.client.borrow(self_.py()).connection().clone(); let dataset_id = super_.details.id; - let column_selector: ComponentColumnSelector = column.into(); - let component_descriptor = ComponentDescriptor::new(column_selector.component_name.clone()); + let schema = Self::fetch_schema(&self_)?; + let component_descriptor = schema.resolve_component_column_selector(&column)?; let schema = arrow::datatypes::Schema::new_with_metadata( vec![Field::new("items", arrow::datatypes::DataType::Utf8, false)], @@ -352,10 +339,7 @@ impl PyDataset { let request = SearchDatasetRequest { dataset_id: Some(dataset_id.into()), - column: Some(IndexColumn { - entity_path: Some(column_selector.entity_path.into()), - component: Some(component_descriptor.into()), - }), + column: Some(component_descriptor.0.into()), properties: Some(IndexQueryProperties { props: Some( re_protos::manifest_registry::v1alpha1::index_query_properties::Props::Inverted( @@ -400,17 +384,14 @@ impl PyDataset { let connection = super_.client.borrow(self_.py()).connection().clone(); let dataset_id = super_.details.id; - let column_selector: ComponentColumnSelector = column.into(); - let component_descriptor = ComponentDescriptor::new(column_selector.component_name.clone()); + let schema = Self::fetch_schema(&self_)?; + let component_descriptor = schema.resolve_component_column_selector(&column)?; let query = query.to_record_batch()?; let request = SearchDatasetRequest { dataset_id: Some(dataset_id.into()), - column: Some(IndexColumn { - entity_path: Some(column_selector.entity_path.into()), - component: Some(component_descriptor.into()), - }), + column: Some(component_descriptor.0.into()), properties: Some(IndexQueryProperties { props: Some(index_query_properties::Props::Vector(VectorIndexQuery { top_k: Some(top_k), @@ -442,3 +423,23 @@ impl PyDataset { }) } } + +impl PyDataset { + fn fetch_arrow_schema(self_: &PyRef<'_, Self>) -> PyResult { + let super_ = self_.as_super(); + let mut connection = super_.client.borrow_mut(self_.py()).connection().clone(); + + let schema = connection.get_dataset_schema(self_.py(), super_.details.id)?; + + Ok(schema) + } + + fn fetch_schema(self_: &PyRef<'_, Self>) -> PyResult { + Self::fetch_arrow_schema(self_).and_then(|arrow_schema| { + let descs = SorbetColumnDescriptors::try_from_arrow_fields(None, arrow_schema.fields()) + .map_err(to_py_err)?; + + Ok(PySchema { schema: descs }) + }) + } +} diff --git a/rerun_py/src/catalog/errors.rs b/rerun_py/src/catalog/errors.rs index efcd91ac57b0..03702c6a5fcc 100644 --- a/rerun_py/src/catalog/errors.rs +++ b/rerun_py/src/catalog/errors.rs @@ -55,6 +55,9 @@ enum ExternalError { #[error(transparent)] CodecError(#[from] re_log_encoding::codec::CodecError), + + #[error(transparent)] + SorbetError(#[from] re_sorbet::SorbetError), } impl From @@ -112,6 +115,10 @@ impl From for PyErr { } ExternalError::CodecError(err) => PyValueError::new_err(format!("Codec error: {err}")), + + ExternalError::SorbetError(err) => { + PyValueError::new_err(format!("Sorbet error: {err}")) + } } } } diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs index 54e41cc08871..756b32ca9f42 100644 --- a/rerun_py/src/dataframe.rs +++ b/rerun_py/src/dataframe.rs @@ -157,7 +157,7 @@ impl From for TimeColumnSelector { /// column, use [`ComponentColumnSelector`][rerun.dataframe.ComponentColumnSelector]. #[pyclass(frozen, name = "ComponentColumnDescriptor")] #[derive(Clone)] -pub struct PyComponentColumnDescriptor(ComponentColumnDescriptor); +pub struct PyComponentColumnDescriptor(pub ComponentColumnDescriptor); impl From for PyComponentColumnDescriptor { fn from(desc: ComponentColumnDescriptor) -> Self { @@ -222,7 +222,7 @@ impl From for ComponentColumnDescriptor { /// The component to select #[pyclass(frozen, name = "ComponentColumnSelector")] #[derive(Clone)] -pub struct PyComponentColumnSelector(ComponentColumnSelector); +pub struct PyComponentColumnSelector(pub ComponentColumnSelector); #[pymethods] impl PyComponentColumnSelector { @@ -309,6 +309,7 @@ impl AnyColumn { } /// A type alias for any component-column-like object. +//TODO(#9853): rename to `ComponentColumnLike` #[derive(FromPyObject)] pub(crate) enum AnyComponentColumn { #[pyo3(transparent, annotation = "name")] @@ -560,6 +561,23 @@ impl PySchema { } }) } + + pub fn resolve_component_column_selector( + &self, + column_selector: &PyComponentColumnSelector, + ) -> PyResult { + let desc = self + .schema + .resolve_component_column_selector(&column_selector.0) + .ok_or_else(|| { + PyValueError::new_err(format!( + "The column selector {} was not found in the schema.", + column_selector.0 + )) + })?; + + Ok(PyComponentColumnDescriptor(desc.clone())) + } } /// A single Rerun recording. From 8a80899ca5c04c28a673caec6851db8f036778c4 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 30 Apr 2025 14:13:53 -0400 Subject: [PATCH 2/5] Use a forgiving version of SorbetColumnDescriptors::try_from_arrow_fields --- crates/store/re_sorbet/src/sorbet_columns.rs | 37 ++++++++++++++++++++ rerun_py/src/catalog/dataset.rs | 7 ++-- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/crates/store/re_sorbet/src/sorbet_columns.rs b/crates/store/re_sorbet/src/sorbet_columns.rs index cd5366018b3b..bf2404aab554 100644 --- a/crates/store/re_sorbet/src/sorbet_columns.rs +++ b/crates/store/re_sorbet/src/sorbet_columns.rs @@ -251,4 +251,41 @@ impl SorbetColumnDescriptors { components, }) } + + // TODO(#9855): Reconcile this with the above. + pub fn try_from_arrow_fields_forgiving( + chunk_entity_path: Option<&EntityPath>, + fields: &ArrowFields, + ) -> Result { + let mut row_ids = Vec::new(); + let mut indices = Vec::new(); + let mut components = Vec::new(); + + for field in fields { + let field = field.as_ref(); + let column_kind = ColumnKind::try_from(field)?; + match column_kind { + ColumnKind::RowId => { + row_ids.push(RowIdColumnDescriptor::try_from(field)?); + } + + ColumnKind::Index => { + indices.push(IndexColumnDescriptor::try_from(field)?); + } + + ColumnKind::Component => { + components.push(ComponentColumnDescriptor::from_arrow_field( + chunk_entity_path, + field, + )); + } + } + } + + Ok(Self { + row_id: row_ids.pop(), + indices, + components, + }) + } } diff --git a/rerun_py/src/catalog/dataset.rs b/rerun_py/src/catalog/dataset.rs index 62b9d7022d43..8682e38ea1b5 100644 --- a/rerun_py/src/catalog/dataset.rs +++ b/rerun_py/src/catalog/dataset.rs @@ -436,8 +436,11 @@ impl PyDataset { fn fetch_schema(self_: &PyRef<'_, Self>) -> PyResult { Self::fetch_arrow_schema(self_).and_then(|arrow_schema| { - let descs = SorbetColumnDescriptors::try_from_arrow_fields(None, arrow_schema.fields()) - .map_err(to_py_err)?; + let descs = SorbetColumnDescriptors::try_from_arrow_fields_forgiving( + None, + arrow_schema.fields(), + ) + .map_err(to_py_err)?; Ok(PySchema { schema: descs }) }) From 4829c5ba55943eb4148097e1f8b2c222acf408a7 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 1 May 2025 11:52:48 +0200 Subject: [PATCH 3/5] Accept AnyColumn everywhere, better error mgmt, cleanup --- crates/store/re_log_types/src/path/mod.rs | 2 +- .../store/re_log_types/src/path/parse_path.rs | 2 +- crates/store/re_sorbet/src/lib.rs | 6 ++- crates/store/re_sorbet/src/selectors.rs | 46 ++++++++++++++++++- crates/store/re_sorbet/src/sorbet_columns.rs | 40 ++++++++++++---- rerun_py/src/catalog/dataset.rs | 18 ++++---- rerun_py/src/catalog/errors.rs | 12 +++++ rerun_py/src/dataframe.rs | 44 ++++++++++++++---- 8 files changed, 137 insertions(+), 33 deletions(-) diff --git a/crates/store/re_log_types/src/path/mod.rs b/crates/store/re_log_types/src/path/mod.rs index f740cac14965..8815e57605de 100644 --- a/crates/store/re_log_types/src/path/mod.rs +++ b/crates/store/re_log_types/src/path/mod.rs @@ -19,7 +19,7 @@ pub use entity_path_filter::{ ResolvedEntityPathFilter, ResolvedEntityPathRule, RuleEffect, }; pub use entity_path_part::EntityPathPart; -pub use parse_path::PathParseError; +pub use parse_path::{tokenize_by, PathParseError}; // ---------------------------------------------------------------------------- diff --git a/crates/store/re_log_types/src/path/parse_path.rs b/crates/store/re_log_types/src/path/parse_path.rs index 07a7ffd8f4b1..59dc158cc627 100644 --- a/crates/store/re_log_types/src/path/parse_path.rs +++ b/crates/store/re_log_types/src/path/parse_path.rs @@ -286,7 +286,7 @@ fn tokenize_data_path(path: &str) -> Vec<&str> { tokenize_by(path, b"/[]:") } -fn tokenize_by<'s>(path: &'s str, special_chars: &[u8]) -> Vec<&'s str> { +pub fn tokenize_by<'s>(path: &'s str, special_chars: &[u8]) -> Vec<&'s str> { #![allow(clippy::unwrap_used)] // We parse on bytes, and take care to only split on either side of a one-byte ASCII, diff --git a/crates/store/re_sorbet/src/lib.rs b/crates/store/re_sorbet/src/lib.rs index 2394f8fc966d..e515104e82fb 100644 --- a/crates/store/re_sorbet/src/lib.rs +++ b/crates/store/re_sorbet/src/lib.rs @@ -48,9 +48,11 @@ pub use self::{ }, migration::migrate_record_batch, row_id_column_descriptor::{RowIdColumnDescriptor, WrongDatatypeError}, - selectors::{ColumnSelector, ComponentColumnSelector, TimeColumnSelector}, + selectors::{ + ColumnSelector, ColumnSelectorParseError, ComponentColumnSelector, TimeColumnSelector, + }, sorbet_batch::SorbetBatch, - sorbet_columns::SorbetColumnDescriptors, + sorbet_columns::{ColumnSelectorResolveError, SorbetColumnDescriptors}, sorbet_schema::SorbetSchema, }; diff --git a/crates/store/re_sorbet/src/selectors.rs b/crates/store/re_sorbet/src/selectors.rs index 0ac1492189ca..7928e697df89 100644 --- a/crates/store/re_sorbet/src/selectors.rs +++ b/crates/store/re_sorbet/src/selectors.rs @@ -1,8 +1,17 @@ -use re_log_types::{EntityPath, Timeline, TimelineName}; +use re_log_types::{ComponentPath, EntityPath, Timeline, TimelineName}; use re_types_core::ComponentName; use crate::{ColumnDescriptor, ComponentColumnDescriptor, IndexColumnDescriptor}; +#[derive(thiserror::Error, Debug, PartialEq, Eq)] +pub enum ColumnSelectorParseError { + #[error("Expected column selector, found empty string")] + EmptyString, + + #[error("Expected string in the form of `entity_path:component_name`, got: {0}")] + FormatError(String), +} + /// Describes a column selection to return as part of a query. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum ColumnSelector { @@ -117,6 +126,41 @@ impl From for ComponentColumnSelector { } } +impl std::str::FromStr for ComponentColumnSelector { + type Err = ColumnSelectorParseError; + + /// Parses a string in the form of `entity_path:component_name`. + /// + /// Note that no attempt is made to interpret `component_name`. In particular, we don't attempt + /// to prepend a `rerun.components.` prefix like [`ComponentPath::from_str`] does. + fn from_str(selector: &str) -> Result { + if selector.is_empty() { + return Err(ColumnSelectorParseError::EmptyString); + } + + let tokens = re_log_types::tokenize_by(selector, b":"); + + match tokens.as_slice() { + &[entity_path_token, ":", component_name_token] => Ok(Self { + entity_path: EntityPath::from(entity_path_token), + component_name: component_name_token.to_owned(), + }), + + _ => Err(ColumnSelectorParseError::FormatError(selector.to_owned())), + } + } +} + +impl From for ComponentColumnSelector { + #[inline] + fn from(path: ComponentPath) -> Self { + Self { + entity_path: path.entity_path, + component_name: path.component_name.as_str().to_owned(), + } + } +} + impl ComponentColumnSelector { /// Select a component of a given type, based on its [`EntityPath`] #[inline] diff --git a/crates/store/re_sorbet/src/sorbet_columns.rs b/crates/store/re_sorbet/src/sorbet_columns.rs index bf2404aab554..38aa2d58639d 100644 --- a/crates/store/re_sorbet/src/sorbet_columns.rs +++ b/crates/store/re_sorbet/src/sorbet_columns.rs @@ -1,7 +1,8 @@ use arrow::datatypes::{Field as ArrowField, Fields as ArrowFields}; use nohash_hasher::IntSet; -use re_log_types::EntityPath; + +use re_log_types::{EntityPath, TimelineName}; use crate::{ ColumnDescriptor, ColumnDescriptorRef, ColumnKind, ColumnSelector, ComponentColumnDescriptor, @@ -9,6 +10,21 @@ use crate::{ TimeColumnSelector, }; +#[derive(thiserror::Error, Debug, PartialEq, Eq)] +#[expect(clippy::enum_variant_names)] +pub enum ColumnSelectorResolveError { + #[error("Column for component '{0}' not found")] + ComponentNotFound(String), + + #[error( + "Multiple columns were found for component '{0}'. Consider using a more specific selector." + )] + MultipleComponentColumnFound(String), + + #[error("Index column for timeline '{0}' not found")] + TimelineNotFound(TimelineName), +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct SorbetColumnDescriptors { /// The primary row id column. @@ -101,7 +117,7 @@ impl SorbetColumnDescriptors { pub fn resolve_selector( &self, column_selector: &ColumnSelector, - ) -> Option> { + ) -> Result, ColumnSelectorResolveError> { match column_selector { ColumnSelector::Time(selector) => self .resolve_index_column_selector(selector) @@ -118,10 +134,13 @@ impl SorbetColumnDescriptors { pub fn resolve_index_column_selector( &self, index_column_selector: &TimeColumnSelector, - ) -> Option<&IndexColumnDescriptor> { + ) -> Result<&IndexColumnDescriptor, ColumnSelectorResolveError> { self.indices .iter() .find(|column| column.timeline_name() == index_column_selector.timeline) + .ok_or(ColumnSelectorResolveError::TimelineNotFound( + index_column_selector.timeline, + )) } /// Resolve the provided component column selector. Returns `None` if no corresponding column @@ -136,11 +155,10 @@ impl SorbetColumnDescriptors { // TODO(#6889): this will need to be fully revisited with tagged components // TODO(ab): this is related but different from `re_chunk_store::resolve_component_selector()`. // It is likely that only one of these should eventually remain. - // TODO: return a result? pub fn resolve_component_column_selector( &self, component_column_selector: &ComponentColumnSelector, - ) -> Option<&ComponentColumnDescriptor> { + ) -> Result<&ComponentColumnDescriptor, ColumnSelectorResolveError> { let ComponentColumnSelector { entity_path, component_name, @@ -151,8 +169,8 @@ impl SorbetColumnDescriptors { column.component_name.as_str() == component_name && &column.entity_path == entity_path }); - if exact_match.is_some() { - return exact_match; + if let Some(exact_match) = exact_match { + return Ok(exact_match); } // fallback: use `ComponentName::match` and check that we have a single result @@ -165,9 +183,13 @@ impl SorbetColumnDescriptors { // Note: non-unique partial match is highly unlikely for now, but will become more likely // with tagged components. if partial_match.next().is_none() { - first_match + first_match.ok_or(ColumnSelectorResolveError::ComponentNotFound( + component_name.clone(), + )) } else { - None + Err(ColumnSelectorResolveError::MultipleComponentColumnFound( + component_name.clone(), + )) } } diff --git a/rerun_py/src/catalog/dataset.rs b/rerun_py/src/catalog/dataset.rs index 8682e38ea1b5..7cfa54c22be7 100644 --- a/rerun_py/src/catalog/dataset.rs +++ b/rerun_py/src/catalog/dataset.rs @@ -24,7 +24,7 @@ use crate::catalog::{ dataframe_query::PyDataframeQueryView, to_py_err, PyEntry, VectorDistanceMetricLike, VectorLike, }; use crate::dataframe::{ - PyComponentColumnSelector, PyDataFusionTable, PyIndexColumnSelector, PyRecording, PySchema, + AnyComponentColumn, PyDataFusionTable, PyIndexColumnSelector, PyRecording, PySchema, }; use crate::utils::wait_for_future; @@ -210,7 +210,7 @@ impl PyDataset { ))] fn create_fts_index( self_: PyRef<'_, Self>, - column: PyComponentColumnSelector, + column: AnyComponentColumn, time_index: PyIndexColumnSelector, store_position: bool, base_tokenizer: &str, @@ -221,7 +221,7 @@ impl PyDataset { let time_selector: TimeColumnSelector = time_index.into(); let schema = Self::fetch_schema(&self_)?; - let component_descriptor = schema.resolve_component_column_selector(&column)?; + let component_descriptor = schema.column_for_selector(column)?; let properties = IndexProperties::Inverted { store_position, @@ -264,7 +264,7 @@ impl PyDataset { ))] fn create_vector_index( self_: PyRef<'_, Self>, - column: PyComponentColumnSelector, + column: AnyComponentColumn, time_index: PyIndexColumnSelector, num_partitions: usize, num_sub_vectors: usize, @@ -277,7 +277,7 @@ impl PyDataset { let time_selector: TimeColumnSelector = time_index.into(); let schema = Self::fetch_schema(&self_)?; - let component_descriptor = schema.resolve_component_column_selector(&column)?; + let component_descriptor = schema.column_for_selector(column)?; let distance_metric: re_protos::manifest_registry::v1alpha1::VectorDistanceMetric = distance_metric.try_into()?; @@ -317,14 +317,14 @@ impl PyDataset { fn search_fts( self_: PyRef<'_, Self>, query: String, - column: PyComponentColumnSelector, + column: AnyComponentColumn, ) -> PyResult { let super_ = self_.as_super(); let connection = super_.client.borrow(self_.py()).connection().clone(); let dataset_id = super_.details.id; let schema = Self::fetch_schema(&self_)?; - let component_descriptor = schema.resolve_component_column_selector(&column)?; + let component_descriptor = schema.column_for_selector(column)?; let schema = arrow::datatypes::Schema::new_with_metadata( vec![Field::new("items", arrow::datatypes::DataType::Utf8, false)], @@ -377,7 +377,7 @@ impl PyDataset { fn search_vector( self_: PyRef<'_, Self>, query: VectorLike<'_>, - column: PyComponentColumnSelector, + column: AnyComponentColumn, top_k: u32, ) -> PyResult { let super_ = self_.as_super(); @@ -385,7 +385,7 @@ impl PyDataset { let dataset_id = super_.details.id; let schema = Self::fetch_schema(&self_)?; - let component_descriptor = schema.resolve_component_column_selector(&column)?; + let component_descriptor = schema.column_for_selector(column)?; let query = query.to_record_batch()?; diff --git a/rerun_py/src/catalog/errors.rs b/rerun_py/src/catalog/errors.rs index 03702c6a5fcc..48899eb2fc2b 100644 --- a/rerun_py/src/catalog/errors.rs +++ b/rerun_py/src/catalog/errors.rs @@ -58,6 +58,12 @@ enum ExternalError { #[error(transparent)] SorbetError(#[from] re_sorbet::SorbetError), + + #[error(transparent)] + ColumnSelectorParseError(#[from] re_sorbet::ColumnSelectorParseError), + + #[error(transparent)] + ColumnSelectorResolveError(#[from] re_sorbet::ColumnSelectorResolveError), } impl From @@ -119,6 +125,12 @@ impl From for PyErr { ExternalError::SorbetError(err) => { PyValueError::new_err(format!("Sorbet error: {err}")) } + + ExternalError::ColumnSelectorParseError(err) => PyValueError::new_err(format!("{err}")), + + ExternalError::ColumnSelectorResolveError(err) => { + PyValueError::new_err(format!("{err}")) + } } } } diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs index 756b32ca9f42..2d29f6848135 100644 --- a/rerun_py/src/dataframe.rs +++ b/rerun_py/src/dataframe.rs @@ -35,6 +35,7 @@ use re_sorbet::{ ColumnSelector, ComponentColumnSelector, SorbetColumnDescriptors, TimeColumnSelector, }; +use crate::catalog::to_py_err; use crate::{catalog::PyCatalogClient, utils::get_tokio_runtime}; /// Register the `rerun.dataframe` module. @@ -311,7 +312,7 @@ impl AnyColumn { /// A type alias for any component-column-like object. //TODO(#9853): rename to `ComponentColumnLike` #[derive(FromPyObject)] -pub(crate) enum AnyComponentColumn { +pub enum AnyComponentColumn { #[pyo3(transparent, annotation = "name")] Name(String), #[pyo3(transparent, annotation = "component_descriptor")] @@ -322,7 +323,7 @@ pub(crate) enum AnyComponentColumn { impl AnyComponentColumn { #[allow(dead_code)] - pub(crate) fn into_selector(self) -> PyResult { + pub fn into_selector(self) -> PyResult { match self { Self::Name(name) => { let component_path = @@ -562,19 +563,42 @@ impl PySchema { }) } + /// Look up the column descriptor for a specific selector. + /// + /// Parameters + /// ---------- + /// component_column_selector: str | ComponentColumnDescriptor | ComponentColumnSelector + /// The selector to look up. + /// + /// String arguments are expected to follow the following format: + /// `":"` + pub fn column_for_selector( + &self, + component_column_selector: AnyComponentColumn, + ) -> PyResult { + match component_column_selector { + AnyComponentColumn::Name(name) => self.resolve_component_column_selector( + &ComponentColumnSelector::from_str(&name).map_err(to_py_err)?, + ), + + AnyComponentColumn::ComponentDescriptor(desc) => Ok(desc), + + AnyComponentColumn::ComponentSelector(selector) => { + self.resolve_component_column_selector(&selector.0) + } + } + } +} + +impl PySchema { pub fn resolve_component_column_selector( &self, - column_selector: &PyComponentColumnSelector, + column_selector: &ComponentColumnSelector, ) -> PyResult { let desc = self .schema - .resolve_component_column_selector(&column_selector.0) - .ok_or_else(|| { - PyValueError::new_err(format!( - "The column selector {} was not found in the schema.", - column_selector.0 - )) - })?; + .resolve_component_column_selector(column_selector) + .map_err(to_py_err)?; Ok(PyComponentColumnDescriptor(desc.clone())) } From f25c1e348d23084d35fe8bd61b75dba55da9cfe3 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 1 May 2025 11:59:43 +0200 Subject: [PATCH 4/5] Cleanup stub --- rerun_py/rerun_bindings/rerun_bindings.pyi | 26 ++++++++++++++++++---- rerun_py/src/dataframe.rs | 11 ++++++--- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/rerun_py/rerun_bindings/rerun_bindings.pyi b/rerun_py/rerun_bindings/rerun_bindings.pyi index c75c9359c2c8..c6ee159d03aa 100644 --- a/rerun_py/rerun_bindings/rerun_bindings.pyi +++ b/rerun_py/rerun_bindings/rerun_bindings.pyi @@ -180,6 +180,24 @@ class Schema: """ + def column_for_selector(self, selector: str | ComponentColumnSelector | ComponentColumnDescriptor) -> ComponentColumnDescriptor: + """ + Look up the column descriptor for a specific selector. + + Parameters + ---------- + selector: str | ComponentColumnDescriptor | ComponentColumnSelector + The selector to look up. + + String arguments are expected to follow the following format: + `":"` + + Returns + ------- + ComponentColumnDescriptor + The column descriptor, if it exists. Raise an exception otherwise. + """ + class RecordingView: """ A view of a recording restricted to a given index, containing a specific set of entities and components. @@ -1037,7 +1055,7 @@ class Dataset(Entry): def create_fts_index( self, *, - column: ComponentColumnSelector, + column: str | ComponentColumnSelector | ComponentColumnDescriptor, time_index: IndexColumnSelector, store_position: bool = False, base_tokenizer: str = "simple", @@ -1047,7 +1065,7 @@ class Dataset(Entry): def create_vector_index( self, *, - column: ComponentColumnSelector, + column: str | ComponentColumnSelector | ComponentColumnDescriptor, time_index: IndexColumnSelector, num_partitions: int = 5, num_sub_vectors: int = 16, @@ -1058,14 +1076,14 @@ class Dataset(Entry): def search_fts( self, query: str, - column: ComponentColumnSelector, + column: str | ComponentColumnSelector | ComponentColumnDescriptor, ) -> DataFusionTable: """Search the dataset using a full-text search query.""" def search_vector( self, query: Any, # VectorLike - column: ComponentColumnSelector, + column: str | ComponentColumnSelector | ComponentColumnDescriptor, top_k: int, ) -> DataFusionTable: """Search the dataset using a vector search query.""" diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs index 2d29f6848135..60d0dac4385c 100644 --- a/rerun_py/src/dataframe.rs +++ b/rerun_py/src/dataframe.rs @@ -567,16 +567,21 @@ impl PySchema { /// /// Parameters /// ---------- - /// component_column_selector: str | ComponentColumnDescriptor | ComponentColumnSelector + /// selector: str | ComponentColumnDescriptor | ComponentColumnSelector /// The selector to look up. /// /// String arguments are expected to follow the following format: /// `":"` + /// + /// Returns + /// ------- + /// ComponentColumnDescriptor + /// The column descriptor, if it exists. Raise an exception otherwise. pub fn column_for_selector( &self, - component_column_selector: AnyComponentColumn, + selector: AnyComponentColumn, ) -> PyResult { - match component_column_selector { + match selector { AnyComponentColumn::Name(name) => self.resolve_component_column_selector( &ComponentColumnSelector::from_str(&name).map_err(to_py_err)?, ), From 7cd60ee3b9ad2e37a286c37d21518d0501c96203 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 1 May 2025 12:18:44 +0200 Subject: [PATCH 5/5] fix doclink --- crates/store/re_sorbet/src/selectors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/store/re_sorbet/src/selectors.rs b/crates/store/re_sorbet/src/selectors.rs index 7928e697df89..5c6c19002ed8 100644 --- a/crates/store/re_sorbet/src/selectors.rs +++ b/crates/store/re_sorbet/src/selectors.rs @@ -132,7 +132,7 @@ impl std::str::FromStr for ComponentColumnSelector { /// Parses a string in the form of `entity_path:component_name`. /// /// Note that no attempt is made to interpret `component_name`. In particular, we don't attempt - /// to prepend a `rerun.components.` prefix like [`ComponentPath::from_str`] does. + /// to prepend a `rerun.components.` prefix like `ComponentPath::from_str` does. fn from_str(selector: &str) -> Result { if selector.is_empty() { return Err(ColumnSelectorParseError::EmptyString);