Skip to content

Spark 3.0 scala.None$ is not a valid external type for schema of string error on read #1635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
1 of 2 tasks
bfalk-phr opened this issue Mar 29, 2021 · 9 comments · Fixed by #1816
Closed
1 of 2 tasks

Comments

@bfalk-phr
Copy link

bfalk-phr commented Mar 29, 2021

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

When using the elasticsearch-spark-30_2.12-7.12.0.jar connecter, found here , I'm getting the following error when trying to read from an index:

Error while encoding: java.lang.RuntimeException: scala.None$ is not a valid external type for schema of string

The issue seems to stem from having an empty string in a column. Notice the empty description column in the second row of the code example. The read is successful if I fill in that column.

Writes/Deletes work fine.

This same issue seems to have been reported by others in this thread #1412 (comment). I didn't see a new issue created by anyone yet.

Steps to reproduce

Code:

from pyspark.sql import *

departmentRows = (
  [Row(id='123456', name='Computer Science', description='test'), 
   Row(id='789012', name='Mechanical Engineering', description=''), 
   Row(id='345678', name='Theater and Drama', description='test 2'), 
   Row(id='901234', name='Indoor Recreation', description='test 3')]
)

df = spark.createDataFrame(departmentRows)

(df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port", "<port>")
  .option("es.net.ssl","true")
  .option("es.net.http.auth.user", "<user>")
  .option("es.net.http.auth.pass", "<pass>")
  .option("es.nodes", "<endpoint>")
  .option("es.write.operation", "upsert")
  .option("es.mapping.id", "id")
  .mode("append")
  .save("my_connector_test")
)

reader = (spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port", "<port>")
  .option("es.net.ssl","true")
  .option("es.net.http.auth.user", "<user>")
  .option("es.net.http.auth.pass", "<pass>")
  .option("es.nodes", "<endpoint>")
)

df_elastic = reader.load("my_connector_test")
display(df_elastic)

Strack trace:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 4 times, most recent failure: Lost task 0.3 in stage 20.0 (TID 72, 10.139.64.4, executor 1): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.None$ is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, desc), StringType), true, false) AS desc#165
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, id), StringType), true, false) AS id#166
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, name), StringType), true, false) AS name#167
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:239)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
	at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
	at org.apache.spark.scheduler.Task.run(Task.scala:117)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:640)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: scala.None$ is not a valid external type for schema of string
	at com.databricks.sql.expressions.codegen.RuntimeUtils.assertTrue(RuntimeUtils.java:75)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:235)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333)
	at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:298)
	at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:308)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:82)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:58)
	at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2994)
	at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:2985)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3709)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:836)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3707)
	at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2984)
	at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:194)
	at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:57)
	at com.databricks.backend.daemon.driver.PythonDriverLocal.generateTableResult(PythonDriverLocal.scala:1156)
	at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:1068)
	at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:855)
	at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:937)
	at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:538)
	at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:897)
	at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$8(PythonDriverLocal.scala:382)
	at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:855)
	at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:369)
	at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$10(DriverLocal.scala:431)
	at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:48)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:48)
	at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:408)
	at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:653)
	at scala.util.Try$.apply(Try.scala:213)
	at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:645)
	at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:486)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:598)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
	at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.None$ is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, desc), StringType), true, false) AS desc#165
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, id), StringType), true, false) AS id#166
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, name), StringType), true, false) AS name#167
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:239)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
	at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
	at org.apache.spark.scheduler.Task.run(Task.scala:117)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:640)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.RuntimeException: scala.None$ is not a valid external type for schema of string
	at com.databricks.sql.expressions.codegen.RuntimeUtils.assertTrue(RuntimeUtils.java:75)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:235)
	... 16 more

Version Info

OS: :
JVM :
Hadoop/Spark: Databricks Runtime Version 7.3 LTS (includes Apache Spark 3.0.1, Scala 2.12)
ES-Hadoop : elasticsearch-spark-30_2.12-7.12.0
ES : Elastic Cloud v7.8.1

@dlubomski
Copy link

dlubomski commented Apr 20, 2021

I have the same problem:

Hadoop/Spark: Spark 3.1.1
ES-Hadoop : elasticsearch-spark-30_2.12-7.13.0-20210303.042227-17.jar
ES : 7.7.1

Edit: for this ver I got the same error : https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-20_2.12/7.12.0/elasticsearch-spark-20_2.12-7.12.0.jar

@shivajimutkule
Copy link

I am also facing the same issue.

Spark: 3.1.1
ES-Hadoop : elasticsearch-spark-30_2.12-7.12.0
ES : 2.3.4

@dipeshchheda-drashta
Copy link

Can you please confirm whether this is a valid issue?
If yes, whether the fix is planned for 7.12.1 version or not?

Any workaround?

Env:
Spark: 3.1.1
ES : 2.3.4
ES-Hadoop : elasticsearch-spark-30_2.12-7.12.0

@shivajimutkule
Copy link

Able to get it working by setting elasticsearch-hadoop property es.field.read.empty.as.null = no

.option("es.field.read.empty.as.null", "no")

@dlubomski
Copy link

Able to get it working by setting elasticsearch-hadoop property es.field.read.empty.as.null = no

.option("es.field.read.empty.as.null", "no")

Confirm, it helps. Thanks!

@iercan
Copy link

iercan commented May 31, 2021

Able to get it working by setting elasticsearch-hadoop property es.field.read.empty.as.null = no

.option("es.field.read.empty.as.null", "no")

This config didn't work for me. I'm using sparklyr with spark 3.0.0

Edit: Looks like I was importing wrong jar. Setting this also worked for me.

@montgomery1944
Copy link

montgomery1944 commented Jul 13, 2021

I have the same problem on:
Spark 3.1.1
ES 2.4.4
elasticsearch-spark-30_2.12-7.12.1

@jbaiera, are there any plans to fix this? This prevents me from migrating from Spark 2 to 3 in my system, since using .option("es.field.read.empty.as.null", "no") is not an option, as we do not want to store empty strings in destination.

@masseyke
Copy link
Member

masseyke commented Dec 6, 2021

I'm just starting to take a look at this one (and I've been able to reproduce it with the code in the initial post). @montgomery1944 what is the behavior you are expecting to see? The behavior I get on read when I set "es.field.read.empty.as.null" to "no" is what I would expect the default behavior would be (the thing written in was empty string, and the thing read out is empty string), but it sounds like you would expect something different?

@masseyke
Copy link
Member

masseyke commented Dec 9, 2021

Sorry, I misunderstood montgomery1944's comment. The attached PR will now pull these fields in as nulls by default (rather than throwing an exception). They can still be pulled in as empty strings if you set "es.field.read.empty.as.null" to "no"

masseyke added a commit that referenced this issue Dec 13, 2021
…ion (#1816)

By default we intend to treat empty fields as nulls when being read in through spark sql. However we actually
turn them into None objects, which causes spark-sql to blow up in spark 2 and 3. This commit treats them
as nulls, which works for all versions of spark we currently support.
Closes #1635
masseyke added a commit to masseyke/elasticsearch-hadoop that referenced this issue Dec 20, 2021
…ion (elastic#1816)

By default we intend to treat empty fields as nulls when being read in through spark sql. However we actually
turn them into None objects, which causes spark-sql to blow up in spark 2 and 3. This commit treats them
as nulls, which works for all versions of spark we currently support.
Closes elastic#1635
masseyke added a commit to masseyke/elasticsearch-hadoop that referenced this issue Dec 20, 2021
…ion (elastic#1816)

By default we intend to treat empty fields as nulls when being read in through spark sql. However we actually
turn them into None objects, which causes spark-sql to blow up in spark 2 and 3. This commit treats them
as nulls, which works for all versions of spark we currently support.
Closes elastic#1635
masseyke added a commit that referenced this issue Dec 20, 2021
…ion (#1816) (#1831)

By default we intend to treat empty fields as nulls when being read in through spark sql. However we actually
turn them into None objects, which causes spark-sql to blow up in spark 2 and 3. This commit treats them
as nulls, which works for all versions of spark we currently support.
Closes #1635
masseyke added a commit that referenced this issue Dec 20, 2021
…ion (#1816) (#1832)

By default we intend to treat empty fields as nulls when being read in through spark sql. However we actually
turn them into None objects, which causes spark-sql to blow up in spark 2 and 3. This commit treats them
as nulls, which works for all versions of spark we currently support.
Closes #1635
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants