Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature request][spark connector] support authentication to pinot api #15300

Open
tsekityam opened this issue Mar 17, 2025 · 0 comments · May be fixed by #15315
Open

[feature request][spark connector] support authentication to pinot api #15300

tsekityam opened this issue Mar 17, 2025 · 0 comments · May be fixed by #15315

Comments

@tsekityam
Copy link
Contributor

tsekityam commented Mar 17, 2025

Hi team,

When I try to use pinot-spark-3-connector to fetch our pinot table to databricks with the following code

var df = spark
        .read
        .format("pinot")
        .option("controller", "xxxx")
        .option("broker", "xxxx")
        .option("table", "xxxx")
        .option("tableType", "hybrid")
        .option("useGrpcServer", "true")
        .option("usePushDownFilters", "true")
        .load()

display(df)

I got the following error

com.databricks.backend.daemon.driver.scalakernel.SparkException: An error occurred while getting Pinot schema for table 'ord_activity_v3'
  org.apache.pinot.connector.spark.common.PinotClusterClient$.getTableSchema(PinotClusterClient.scala:54)
  org.apache.pinot.connector.spark.v3.datasource.PinotDataSource.inferSchema(PinotDataSource.scala:43)
  org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:100)
  org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:241)
  org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:349)
  scala.Option.flatMap(Option.scala:271)
  org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:347)
  org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:236)
  org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformReadRel(SparkConnectPlanner.scala:1428)
  org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:176)
  org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:493)
  scala.Option.getOrElse(Option.scala:189)
  org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:492)
  org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:171)
  org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:158)
  com.databricks.backend.daemon.driver.DataFrameUtil$.convertRawPlanToDataFrame(DataFrameUtil.scala:66)
  com.databricks.backend.daemon.driver.DataFrameUtil$.displayRawDfPlanWithSparkConnectSession(DataFrameUtil.scala:40)
  com.databricks.backend.daemon.driver.ScalaJupyterDriverLocal.processDisplay(ScalaJupyterDriverLocal.scala:618)
  com.databricks.backend.daemon.driver.ScalaJupyterDriverLocal$DisplayHandler.$anonfun$handle$1(ScalaJupyterDriverLocal.scala:315)
  com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
  com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
  com.databricks.backend.daemon.driver.ScalaJupyterDriverLocal$DisplayHandler.handle(ScalaJupyterDriverLocal.scala:314)
  com.databricks.backend.daemon.driver.CommMessageHandler$.$anonfun$register$1(CommMessageHandler.scala:29)
  com.databricks.backend.daemon.driver.CommMessageHandler$.$anonfun$register$1$adapted(CommMessageHandler.scala:24)
  com.databricks.backend.daemon.driver.JupyterKernelListener$CommMessageAsyncHandler.$anonfun$submit$2(JupyterKernelListener.scala:890)
  com.databricks.backend.daemon.driver.JupyterKernelListener$CommMessageAsyncHandler$$anon$1.run(JupyterKernelListener.scala:906)
  java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
  java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  java.base/java.lang.Thread.run(Thread.java:840)

I checked the source code of getTableSchema(), look like it doesn't use any authentication when it tried to get table schema from pinot /tables/${xxxx}/schema api, which is protected by token authentication, and then it failed. Can we add authentication to the spark connector?

Thanks.

@tsekityam tsekityam changed the title [spark connector] [feature request][spark connector] support authentication to pinot api Mar 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants