Skip to content

Commit f88c0dc

Browse files
Christophe Di Primacpcloud
Christophe Di Prima
authored andcommitted
feat(risingwave): add support for includes in create_source
1 parent 1666e33 commit f88c0dc

File tree

1 file changed

+18
-10
lines changed

1 file changed

+18
-10
lines changed

ibis/backends/risingwave/__init__.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -583,9 +583,7 @@ def create_table(
583583
create_stmt = sge.Create(
584584
kind="TABLE",
585585
this=target,
586-
properties=sge.Properties(
587-
expressions=sge.Properties.from_dict(connector_properties)
588-
),
586+
properties=sge.Properties.from_dict(connector_properties),
589587
)
590588
create_stmt = create_stmt.sql(self.dialect) + data_and_encode_format(
591589
data_format, encode_format, encode_properties
@@ -744,6 +742,7 @@ def create_source(
744742
data_format: str,
745743
encode_format: str,
746744
encode_properties: dict | None = None,
745+
includes: dict[str, str] | None = None,
747746
) -> ir.Table:
748747
"""Creating a source.
749748
@@ -764,23 +763,32 @@ def create_source(
764763
The encode format for the new source, e.g., "JSON". data_format and encode_format must be specified at the same time.
765764
encode_properties
766765
The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details.
766+
includes
767+
A dict of `INCLUDE` clauses of the form `{field: alias, ...}`.
768+
Set value(s) to `None` if no alias is needed. Refer to https://docs.risingwave.com/docs/current/sql-create-source/ for more details.
767769
768770
Returns
769771
-------
770772
Table
771773
Table expression
772774
"""
773-
table = sg.table(name, db=database, quoted=self.compiler.quoted)
775+
quoted = self.compiler.quoted
776+
table = sg.table(name, db=database, quoted=quoted)
774777
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
775778

776-
create_stmt = sge.Create(
777-
kind="SOURCE",
778-
this=target,
779-
properties=sge.Properties(
780-
expressions=sge.Properties.from_dict(connector_properties)
781-
),
779+
properties = sge.Properties.from_dict(connector_properties)
780+
properties.expressions.extend(
781+
sge.IncludeProperty(
782+
this=sg.to_identifier(include_type),
783+
alias=sg.to_identifier(column_name, quoted=quoted)
784+
if column_name
785+
else None,
786+
)
787+
for include_type, column_name in (includes or {}).items()
782788
)
783789

790+
create_stmt = sge.Create(kind="SOURCE", this=target, properties=properties)
791+
784792
create_stmt = create_stmt.sql(self.dialect) + data_and_encode_format(
785793
data_format, encode_format, encode_properties
786794
)

0 commit comments

Comments
 (0)