Skip to content

feat(ibis): add OpenTelemetry tracing logs #1080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions ibis-server/Metrics.md
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @douenergy. It looks great!

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Ibis Server Traced Metrics

The ibis-server codebase uses OpenTelemetry for tracing. The following spans are traced across different components:

## Rewriter Module
- `transpile` - Internal span for SQL transpilation operations
- `rewrite` - Internal span for SQL rewriting operations
- `extract_manifest` - Internal span for manifest extraction from SQL
- `external_rewrite` - Client span for external engine rewriting operations
- `embedded_rewrite` - Internal span for embedded engine rewriting operations

## Substitute Module
- `substitute` - Internal span for model substitution operations

## Connector Module
- `connector_init` - Internal span for connector initialization
- `connector_query` - Client span for executing queries
- `connector_dry_run` - Client span for dry-running queries
- `describe_sql_for_error_message` - Client span for generating SQL error messages
- `get_schema` - Client span for retrieving schema information
- `duckdb_query` - Internal span for DuckDB queries
- `duckdb_dry_run` - Internal span for DuckDB dry runs

## API Endpoints (v2)
- `v2_query_{data_source}` - Server span for query operations
- `v2_query_{data_source}_dry_run` - Server span for dry run query operations
- `v2_validate_{data_source}` - Server span for validation operations
- `v2_metadata_tables_{data_source}` - Server span for metadata table listing
- `v2_metadata_constraints_{data_source}` - Server span for metadata constraint listing
- `dry_plan` - Server span for dry planning operations
- `v2_dry_plan_{data_source}` - Server span for data source specific dry planning
- `v2_model_substitute_{data_source}` - Server span for model substitution operations

## API Endpoints (v3)
- `v3_query_{data_source}` - Server span for query operations
- `v3_query_{data_source}_dry_run` - Server span for dry run query operations
- `v3_dry_plan_{data_source}` - Server span for data source specific dry planning
- `v3_validate_{data_source}` - Server span for validation operations
- `v3_functions_{data_source}` - Server span for function listing
- `v3_model-substitute_{data_source}` - Server span for model substitution operations

## Utility Functions
- `base64_to_dict` - Internal span for base64 to dictionary conversion
- `to_json` - Internal span for DataFrame to JSON conversion

## Trace Context
- Each endpoint accepts request headers and properly propagates trace context using the `build_context` function.
10 changes: 10 additions & 0 deletions ibis-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,15 @@ Run the server
just run
```

### Enable Tracing
We uses OpenTelemetry as its tracing framework. Refer to OpenTelemetry zero-code instrumentation to install the required dependencies.
Then, use the following just command to start the Ibis server, which exports tracing logs to the console:
```
just run-trace-console
```
OpenTelemetry zero-code instrumentation is highly configurable. You can set the necessary exporters to send traces to your tracing services.

[Metrics we are tracing right now](./Metrics.md)

## Contributing
Please see [CONTRIBUTING.md](docs/CONTRIBUTING.md) for more information.
8 changes: 8 additions & 0 deletions ibis-server/app/mdl/rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sqlglot
from anyio import to_thread
from loguru import logger
from opentelemetry import trace

from app.config import get_config
from app.mdl.core import (
Expand All @@ -21,6 +22,8 @@
# Register custom dialects
importlib.import_module("app.custom_sqlglot.dialects")

tracer = trace.get_tracer(__name__)


class Rewriter:
def __init__(
Expand All @@ -39,11 +42,13 @@ def __init__(
else:
self._rewriter = ExternalEngineRewriter(java_engine_connector)

@tracer.start_as_current_span("transpile", kind=trace.SpanKind.INTERNAL)
def _transpile(self, planned_sql: str) -> str:
read = self._get_read_dialect(self.experiment)
write = self._get_write_dialect(self.data_source)
return sqlglot.transpile(planned_sql, read=read, write=write)[0]

@tracer.start_as_current_span("rewrite", kind=trace.SpanKind.INTERNAL)
async def rewrite(self, sql: str) -> str:
manifest_str = (
self._extract_manifest(self.manifest_str, sql) or self.manifest_str
Expand All @@ -55,6 +60,7 @@ async def rewrite(self, sql: str) -> str:
logger.debug("Dialect SQL: {}", dialect_sql)
return dialect_sql

@tracer.start_as_current_span("extract_manifest", kind=trace.SpanKind.INTERNAL)
def _extract_manifest(self, manifest_str: str, sql: str) -> str:
try:
extractor = get_manifest_extractor(manifest_str)
Expand Down Expand Up @@ -86,6 +92,7 @@ class ExternalEngineRewriter:
def __init__(self, java_engine_connector: JavaEngineConnector):
self.java_engine_connector = java_engine_connector

@tracer.start_as_current_span("external_rewrite", kind=trace.SpanKind.CLIENT)
async def rewrite(self, manifest_str: str, sql: str) -> str:
try:
return await self.java_engine_connector.dry_plan(manifest_str, sql)
Expand All @@ -105,6 +112,7 @@ class EmbeddedEngineRewriter:
def __init__(self, function_path: str):
self.function_path = function_path

@tracer.start_as_current_span("embedded_rewrite", kind=trace.SpanKind.INTERNAL)
async def rewrite(self, manifest_str: str, sql: str) -> str:
try:
session_context = get_session_context(manifest_str, self.function_path)
Expand Down
4 changes: 4 additions & 0 deletions ibis-server/app/mdl/substitute.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from opentelemetry import trace
from sqlglot import exp, parse_one
from sqlglot.optimizer.scope import build_scope

from app.model import UnprocessableEntityError
from app.model.data_source import DataSource
from app.util import base64_to_dict

tracer = trace.get_tracer(__name__)


class ModelSubstitute:
def __init__(self, data_source: DataSource, manifest_str: str):
self.data_source = data_source
self.manifest = base64_to_dict(manifest_str)
self.model_dict = self._build_model_dict(self.manifest["models"])

@tracer.start_as_current_span("substitute", kind=trace.SpanKind.INTERNAL)
def substitute(self, sql: str, write: str | None = None) -> str:
ast = parse_one(sql, dialect=self.data_source.value)
root = build_scope(ast)
Expand Down
55 changes: 36 additions & 19 deletions ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from google.oauth2 import service_account
from ibis import BaseBackend
from ibis.backends.sql.compilers.postgres import compiler as postgres_compiler
from opentelemetry import trace

from app.model import (
ConnectionInfo,
Expand All @@ -30,8 +31,11 @@
# Override datatypes of ibis
importlib.import_module("app.custom_ibis.backends.sql.datatypes")

tracer = trace.get_tracer(__name__)


class Connector:
@tracer.start_as_current_span("connector_init", kind=trace.SpanKind.INTERNAL)
def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
if data_source == DataSource.mssql:
self._connector = MSSqlConnector(connection_info)
Expand Down Expand Up @@ -64,9 +68,11 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
self.data_source = data_source
self.connection = self.data_source.get_connection(connection_info)

@tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT)
def query(self, sql: str, limit: int) -> pd.DataFrame:
return self.connection.sql(sql).limit(limit).to_pandas()

@tracer.start_as_current_span("connector_dry_run", kind=trace.SpanKind.CLIENT)
def dry_run(self, sql: str) -> None:
self.connection.sql(sql)

Expand All @@ -85,6 +91,9 @@ def dry_run(self, sql: str) -> None:
raise QueryDryRunError(f"The sql dry run failed. {error_message}.")
raise UnknownIbisError(e)

@tracer.start_as_current_span(
"describe_sql_for_error_message", kind=trace.SpanKind.CLIENT
)
def _describe_sql_for_error_message(self, sql: str) -> str:
tsql = sge.convert(sql).sql("mssql")
describe_sql = f"SELECT error_message FROM sys.dm_exec_describe_first_result_set({tsql}, NULL, 0)"
Expand All @@ -99,15 +108,18 @@ class CannerConnector:
def __init__(self, connection_info: ConnectionInfo):
self.connection = DataSource.canner.get_connection(connection_info)

@tracer.start_as_current_span("connector_query", kind=trace.SpanKind.CLIENT)
def query(self, sql: str, limit: int) -> pd.DataFrame:
# Canner enterprise does not support `CREATE TEMPORARY VIEW` for getting schema
schema = self._get_schema(sql)
return self.connection.sql(sql, schema=schema).limit(limit).to_pandas()

@tracer.start_as_current_span("connector_dry_run", kind=trace.SpanKind.CLIENT)
def dry_run(self, sql: str) -> Any:
# Canner enterprise does not support dry-run, so we have to query with limit zero
return self.connection.raw_sql(f"SELECT * FROM ({sql}) LIMIT 0")

@tracer.start_as_current_span("get_schema", kind=trace.SpanKind.CLIENT)
def _get_schema(self, sql: str) -> sch.Schema:
cur = self.dry_run(sql)
type_names = _get_pg_type_names(self.connection)
Expand Down Expand Up @@ -143,25 +155,28 @@ def query(self, sql: str, limit: int) -> pd.DataFrame:
# - https://github.com/Canner/wren-engine/issues/909
# - https://github.com/ibis-project/ibis/issues/10612
if "Must pass schema" in str(e):
credits_json = loads(
base64.b64decode(
self.connection_info.credentials.get_secret_value()
).decode("utf-8")
)
credentials = service_account.Credentials.from_service_account_info(
credits_json
)
credentials = credentials.with_scopes(
[
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/cloud-platform",
]
)
client = bigquery.Client(credentials=credentials)
ibis_schema_mapper = ibis.backends.bigquery.BigQuerySchema()
bq_fields = client.query(sql).result()
ibis_fields = ibis_schema_mapper.to_ibis(bq_fields.schema)
return pd.DataFrame(columns=ibis_fields.names)
with tracer.start_as_current_span(
"get_schema", kind=trace.SpanKind.CLIENT
):
credits_json = loads(
base64.b64decode(
self.connection_info.credentials.get_secret_value()
).decode("utf-8")
)
credentials = service_account.Credentials.from_service_account_info(
credits_json
)
credentials = credentials.with_scopes(
[
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/cloud-platform",
]
)
client = bigquery.Client(credentials=credentials)
ibis_schema_mapper = ibis.backends.bigquery.BigQuerySchema()
bq_fields = client.query(sql).result()
ibis_fields = ibis_schema_mapper.to_ibis(bq_fields.schema)
return pd.DataFrame(columns=ibis_fields.names)
else:
raise e

Expand All @@ -178,6 +193,7 @@ def __init__(self, connection_info: ConnectionInfo):
if isinstance(connection_info, GcsFileConnectionInfo):
init_duckdb_gcs(self.connection, connection_info)

@tracer.start_as_current_span("duckdb_query", kind=trace.SpanKind.INTERNAL)
def query(self, sql: str, limit: int) -> pd.DataFrame:
try:
return self.connection.execute(sql).fetch_df().head(limit)
Expand All @@ -186,6 +202,7 @@ def query(self, sql: str, limit: int) -> pd.DataFrame:
except HTTPException as e:
raise UnprocessableEntityError(f"Failed to execute query: {e!s}")

@tracer.start_as_current_span("duckdb_dry_run", kind=trace.SpanKind.INTERNAL)
def dry_run(self, sql: str) -> None:
try:
self.connection.execute(sql)
Expand Down
Loading