Skip to content

Properly resolve component selectors in dataset index creation and search APIs #9854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/store/re_log_types/src/path/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use entity_path_filter::{
ResolvedEntityPathFilter, ResolvedEntityPathRule, RuleEffect,
};
pub use entity_path_part::EntityPathPart;
pub use parse_path::PathParseError;
pub use parse_path::{tokenize_by, PathParseError};

// ----------------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_log_types/src/path/parse_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ fn tokenize_data_path(path: &str) -> Vec<&str> {
tokenize_by(path, b"/[]:")
}

fn tokenize_by<'s>(path: &'s str, special_chars: &[u8]) -> Vec<&'s str> {
pub fn tokenize_by<'s>(path: &'s str, special_chars: &[u8]) -> Vec<&'s str> {
#![allow(clippy::unwrap_used)]

// We parse on bytes, and take care to only split on either side of a one-byte ASCII,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::Arc;

use crate::{invalid_field, missing_field, TypeConversionError};
use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError};
use re_log_types::{external::re_types_core::ComponentDescriptor, TableId};

use crate::{invalid_field, missing_field, TypeConversionError};

// --- Arrow ---

impl TryFrom<&crate::common::v1alpha1::Schema> for ArrowSchema {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use arrow::{
error::ArrowError,
};

use re_chunk::TimelineName;
use re_log_types::EntityPath;

use super::rerun_manifest_registry_v1alpha1::VectorDistanceMetric;
use crate::common::v1alpha1::ComponentDescriptor;
use crate::manifest_registry::v1alpha1::{
CreatePartitionManifestsResponse, DataSourceKind, GetDatasetSchemaResponse,
};
use crate::{invalid_field, missing_field, TypeConversionError};

use super::rerun_manifest_registry_v1alpha1::VectorDistanceMetric;
use re_chunk::TimelineName;
use re_log_types::EntityPath;
use re_sorbet::ComponentColumnDescriptor;

// --- QueryDataset ---

Expand Down Expand Up @@ -509,3 +509,19 @@ impl From<IndexProperties> for crate::manifest_registry::v1alpha1::IndexProperti
}
}
}

// ---

impl From<ComponentColumnDescriptor> for crate::manifest_registry::v1alpha1::IndexColumn {
fn from(value: ComponentColumnDescriptor) -> Self {
Self {
entity_path: Some(value.entity_path.into()),

component: Some(ComponentDescriptor {
archetype_name: value.archetype_name.map(|n| n.full_name().to_owned()),
archetype_field_name: value.archetype_field_name.map(|n| n.to_string()),
component_name: Some(value.component_name.full_name().to_owned()),
}),
}
}
}
1 change: 1 addition & 0 deletions crates/store/re_sorbet/src/column_descriptor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ColumnDescriptorRef<'a> {
RowId(&'a RowIdColumnDescriptor),
//TODO(ab): this should be renamed Index!
Time(&'a IndexColumnDescriptor),
Component(&'a ComponentColumnDescriptor),
}
Expand Down
6 changes: 4 additions & 2 deletions crates/store/re_sorbet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ pub use self::{
},
migration::migrate_record_batch,
row_id_column_descriptor::{RowIdColumnDescriptor, WrongDatatypeError},
selectors::{ColumnSelector, ComponentColumnSelector, TimeColumnSelector},
selectors::{
ColumnSelector, ColumnSelectorParseError, ComponentColumnSelector, TimeColumnSelector,
},
sorbet_batch::SorbetBatch,
sorbet_columns::SorbetColumnDescriptors,
sorbet_columns::{ColumnSelectorResolveError, SorbetColumnDescriptors},
sorbet_schema::SorbetSchema,
};

Expand Down
46 changes: 45 additions & 1 deletion crates/store/re_sorbet/src/selectors.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
use re_log_types::{EntityPath, Timeline, TimelineName};
use re_log_types::{ComponentPath, EntityPath, Timeline, TimelineName};
use re_types_core::ComponentName;

use crate::{ColumnDescriptor, ComponentColumnDescriptor, IndexColumnDescriptor};

#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum ColumnSelectorParseError {
#[error("Expected column selector, found empty string")]
EmptyString,

#[error("Expected string in the form of `entity_path:component_name`, got: {0}")]
FormatError(String),
}

/// Describes a column selection to return as part of a query.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ColumnSelector {
Expand Down Expand Up @@ -117,6 +126,41 @@ impl From<ComponentColumnDescriptor> for ComponentColumnSelector {
}
}

impl std::str::FromStr for ComponentColumnSelector {
type Err = ColumnSelectorParseError;

/// Parses a string in the form of `entity_path:component_name`.
///
/// Note that no attempt is made to interpret `component_name`. In particular, we don't attempt
/// to prepend a `rerun.components.` prefix like `ComponentPath::from_str` does.
fn from_str(selector: &str) -> Result<Self, Self::Err> {
if selector.is_empty() {
return Err(ColumnSelectorParseError::EmptyString);
}

let tokens = re_log_types::tokenize_by(selector, b":");

match tokens.as_slice() {
&[entity_path_token, ":", component_name_token] => Ok(Self {
entity_path: EntityPath::from(entity_path_token),
component_name: component_name_token.to_owned(),
}),

_ => Err(ColumnSelectorParseError::FormatError(selector.to_owned())),
}
}
}

impl From<ComponentPath> for ComponentColumnSelector {
#[inline]
fn from(path: ComponentPath) -> Self {
Self {
entity_path: path.entity_path,
component_name: path.component_name.as_str().to_owned(),
}
}
}

impl ComponentColumnSelector {
/// Select a component of a given type, based on its [`EntityPath`]
#[inline]
Expand Down
140 changes: 137 additions & 3 deletions crates/store/re_sorbet/src/sorbet_columns.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
use arrow::datatypes::{Field as ArrowField, Fields as ArrowFields};

use nohash_hasher::IntSet;
use re_log_types::EntityPath;

use re_log_types::{EntityPath, TimelineName};

use crate::{
ColumnDescriptor, ColumnDescriptorRef, ColumnKind, ComponentColumnDescriptor,
IndexColumnDescriptor, RowIdColumnDescriptor, SorbetError,
ColumnDescriptor, ColumnDescriptorRef, ColumnKind, ColumnSelector, ComponentColumnDescriptor,
ComponentColumnSelector, IndexColumnDescriptor, RowIdColumnDescriptor, SorbetError,
TimeColumnSelector,
};

#[derive(thiserror::Error, Debug, PartialEq, Eq)]
#[expect(clippy::enum_variant_names)]
pub enum ColumnSelectorResolveError {
#[error("Column for component '{0}' not found")]
ComponentNotFound(String),

#[error(
"Multiple columns were found for component '{0}'. Consider using a more specific selector."
)]
MultipleComponentColumnFound(String),

#[error("Index column for timeline '{0}' not found")]
TimelineNotFound(TimelineName),
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SorbetColumnDescriptors {
/// The primary row id column.
Expand Down Expand Up @@ -96,6 +113,86 @@ impl SorbetColumnDescriptors {
}
}

/// Resolve the provided column selector. Returns `None` if no corresponding column was found.
pub fn resolve_selector(
&self,
column_selector: &ColumnSelector,
) -> Result<ColumnDescriptorRef<'_>, ColumnSelectorResolveError> {
match column_selector {
ColumnSelector::Time(selector) => self
.resolve_index_column_selector(selector)
.map(ColumnDescriptorRef::Time),

ColumnSelector::Component(selector) => self
.resolve_component_column_selector(selector)
.map(ColumnDescriptorRef::Component),
}
}

/// Resolve the provided index column selector. Returns `None` if no corresponding column was
/// found.
pub fn resolve_index_column_selector(
&self,
index_column_selector: &TimeColumnSelector,
) -> Result<&IndexColumnDescriptor, ColumnSelectorResolveError> {
self.indices
.iter()
.find(|column| column.timeline_name() == index_column_selector.timeline)
.ok_or(ColumnSelectorResolveError::TimelineNotFound(
index_column_selector.timeline,
))
}

/// Resolve the provided component column selector. Returns `None` if no corresponding column
/// was found.
///
/// Matching strategy:
/// - The entity path must match exactly.
/// - The first component with a fully matching name is returned (there shouldn't be more than
/// one for now).
/// - If no exact match is found, a partial match is attempted using
/// [`re_types_core::ComponentName::matches`] and is returned only if it is unique.
// TODO(#6889): this will need to be fully revisited with tagged components
// TODO(ab): this is related but different from `re_chunk_store::resolve_component_selector()`.
// It is likely that only one of these should eventually remain.
pub fn resolve_component_column_selector(
&self,
component_column_selector: &ComponentColumnSelector,
) -> Result<&ComponentColumnDescriptor, ColumnSelectorResolveError> {
let ComponentColumnSelector {
entity_path,
component_name,
} = component_column_selector;

// happy path: exact component name match
let exact_match = self.components.iter().find(|column| {
column.component_name.as_str() == component_name && &column.entity_path == entity_path
});

if let Some(exact_match) = exact_match {
return Ok(exact_match);
}

// fallback: use `ComponentName::match` and check that we have a single result
let mut partial_match = self.components.iter().filter(|column| {
column.component_name.matches(component_name) && &column.entity_path == entity_path
});

let first_match = partial_match.next();

// Note: non-unique partial match is highly unlikely for now, but will become more likely
// with tagged components.
if partial_match.next().is_none() {
first_match.ok_or(ColumnSelectorResolveError::ComponentNotFound(
component_name.clone(),
))
} else {
Err(ColumnSelectorResolveError::MultipleComponentColumnFound(
component_name.clone(),
))
}
}

pub fn arrow_fields(&self, batch_type: crate::BatchType) -> Vec<ArrowField> {
let Self {
row_id,
Expand Down Expand Up @@ -176,4 +273,41 @@ impl SorbetColumnDescriptors {
components,
})
}

// TODO(#9855): Reconcile this with the above.
pub fn try_from_arrow_fields_forgiving(
chunk_entity_path: Option<&EntityPath>,
fields: &ArrowFields,
) -> Result<Self, SorbetError> {
let mut row_ids = Vec::new();
let mut indices = Vec::new();
let mut components = Vec::new();

for field in fields {
let field = field.as_ref();
let column_kind = ColumnKind::try_from(field)?;
match column_kind {
ColumnKind::RowId => {
row_ids.push(RowIdColumnDescriptor::try_from(field)?);
}

ColumnKind::Index => {
indices.push(IndexColumnDescriptor::try_from(field)?);
}

ColumnKind::Component => {
components.push(ComponentColumnDescriptor::from_arrow_field(
chunk_entity_path,
field,
));
}
}
}

Ok(Self {
row_id: row_ids.pop(),
indices,
components,
})
}
}
26 changes: 22 additions & 4 deletions rerun_py/rerun_bindings/rerun_bindings.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,24 @@ class Schema:

"""

def column_for_selector(self, selector: str | ComponentColumnSelector | ComponentColumnDescriptor) -> ComponentColumnDescriptor:
"""
Look up the column descriptor for a specific selector.

Parameters
----------
selector: str | ComponentColumnDescriptor | ComponentColumnSelector
The selector to look up.

String arguments are expected to follow the following format:
`"<entity_path>:<component_name>"`

Returns
-------
ComponentColumnDescriptor
The column descriptor, if it exists. Raise an exception otherwise.
"""

class RecordingView:
"""
A view of a recording restricted to a given index, containing a specific set of entities and components.
Expand Down Expand Up @@ -1037,7 +1055,7 @@ class Dataset(Entry):
def create_fts_index(
self,
*,
column: ComponentColumnSelector,
column: str | ComponentColumnSelector | ComponentColumnDescriptor,
time_index: IndexColumnSelector,
store_position: bool = False,
base_tokenizer: str = "simple",
Expand All @@ -1047,7 +1065,7 @@ class Dataset(Entry):
def create_vector_index(
self,
*,
column: ComponentColumnSelector,
column: str | ComponentColumnSelector | ComponentColumnDescriptor,
time_index: IndexColumnSelector,
num_partitions: int = 5,
num_sub_vectors: int = 16,
Expand All @@ -1058,14 +1076,14 @@ class Dataset(Entry):
def search_fts(
self,
query: str,
column: ComponentColumnSelector,
column: str | ComponentColumnSelector | ComponentColumnDescriptor,
) -> DataFusionTable:
"""Search the dataset using a full-text search query."""

def search_vector(
self,
query: Any, # VectorLike
column: ComponentColumnSelector,
column: str | ComponentColumnSelector | ComponentColumnDescriptor,
top_k: int,
) -> DataFusionTable:
"""Search the dataset using a vector search query."""
Expand Down
Loading
Loading