Skip to content

Commit abe8021

Browse files
committed
spark 3.0.0 with hdp 3.2
1 parent d0c91e2 commit abe8021

File tree

92 files changed

+10555
-24
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+10555
-24
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ metastore_db
2020
/spark/sql-13/with_meta_*
2121
out/
2222
localRepo/
23+
*.sha1

.travis.yml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
language: java
22
jdk:
3-
- oraclejdk8
4-
3+
- openjdk12
54

65
dist: trusty
76

7+
addons:
8+
apt:
9+
packages:
10+
- oracle-java8-installer
11+
812
before_install:
913
- "chmod +x gradlew"
1014
- export JAVA_OPTS="-Xmx2048m -XX:MaxPermSize=386m"
15+
- export JAVA8_HOME=$(update-alternatives --list java | grep java-8-oracle | sed 's|/bin/java$||' | sed 's|/jre$||')
1116

12-
install: ./gradlew assemble -Pskip.signing
17+
install: ./gradlew assemble -Pskip.signing updateSHAs

buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/BaseBuildPlugin.groovy

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ class BaseBuildPlugin implements Plugin<Project> {
124124
project.rootProject.ext.hadoopVersion = project.hadoop22Version
125125
println "Using Apache Hadoop [$project.hadoop22Version]"
126126
break
127+
// Hadoop YARN/3.2.x
128+
case "hadoopYarn3":
129+
String version = project.hadoop32Version
130+
project.rootProject.ext.hadoopVersion = version
131+
project.rootProject.ext.hadoopClient = ["org.apache.hadoop:hadoop-client:$version"]
132+
println "Using Apache Hadoop on YARN [$version]"
133+
break
127134
default:
128135
throw new GradleException("Invalid [hadoopDistro] setting: [$project.rootProject.ext.hadoopDistro]")
129136
}

dist/licenses/hadoop-client-2.7.6.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

dist/licenses/hadoop-common-2.7.6.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

dist/licenses/hadoop-mapreduce-client-core-2.7.6.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

gradle.properties

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ log4jVersion = 2.6.2
88
# Hadoop versions
99
hadoop2Version = 2.7.6
1010
hadoop22Version = 2.2.0
11+
hadoop32Version = 3.2.0
12+
13+
# `distro` property can be one of the following: [hadoopYarn, hadoopYarn3, hadoopStable]
14+
# default: hadoopYarn => build with hadoop2Version
15+
# hadoopStable => build with hadoop22Version
16+
# hadoopYarn3 => build with hadoop32Version
17+
# distro = hadoopYarn3
1118

1219
# Common libraries
1320
hiveVersion = 1.2.1
@@ -19,11 +26,14 @@ jacksonVersion = 1.8.8
1926
# Spark
2027
spark13Version = 1.6.2
2128
spark20Version = 2.3.0
29+
spark30Version = 3.0.0
2230
# same as Spark's
2331
scala210Version = 2.10.7
2432
scala210MajorVersion = 2.10
2533
scala211Version = 2.11.12
2634
scala211MajorVersion = 2.11
35+
scala212Version = 2.12.10
36+
scala212MajorVersion = 2.12
2737

2838
stormVersion = 1.0.6
2939

hive/src/test/java/org/elasticsearch/hadoop/serialization/handler/write/impl/HiveSerializationEventConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void generateEventHiveRecordLimited() throws Exception {
8585
SerializationFailure iaeFailure = new SerializationFailure(new IllegalArgumentException("garbage"), tuple, new ArrayList<String>());
8686

8787
String rawEvent = eventConverter.getRawEvent(iaeFailure);
88-
assertThat(rawEvent, startsWith("HiveType{object=org.apache.hadoop.io.MapWritable@"));
88+
assertTrue(rawEvent.matches("HiveType\\{object=\\{three=3, one=1, two=2\\}.*|^HiveType\\{object=org.apache.hadoop.io.MapWritable@.*"));
8989
String timestamp = eventConverter.getTimestamp(iaeFailure);
9090
assertTrue(StringUtils.hasText(timestamp));
9191
assertTrue(DateUtils.parseDate(timestamp).getTime().getTime() > 1L);

licenses/hadoop-client-2.7.6.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

licenses/hadoop-common-2.7.6.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

licenses/hadoop-mapreduce-client-core-2.7.6.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

licenses/scala-library-2.11.12.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

licenses/scala-reflect-2.11.12.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

mr/licenses/hadoop-client-2.7.6.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

mr/licenses/hadoop-common-2.7.6.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

mr/licenses/hadoop-mapreduce-client-core-2.7.6.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

mr/src/main/java/org/elasticsearch/hadoop/serialization/dto/NodeInfo.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ public NodeInfo(String id, Map<String, Object> map) {
5252
this.isData = true;
5353
} else {
5454
String data = (String) attributes.get("data");
55-
this.isClient = data == null ? true : !Boolean.parseBoolean(data);
56-
this.isData = data == null ? true : Boolean.parseBoolean(data);
55+
this.isClient = data == null || !Boolean.parseBoolean(data);
56+
this.isData = data == null || Boolean.parseBoolean(data);
5757
}
5858
this.isIngest = false;
5959
} else {
6060
List<String> roles = (List<String>) map.get("roles");
61-
this.isClient = roles.contains("data") == false;
61+
this.isClient = !roles.contains("data");
6262
this.isData = roles.contains("data");
6363
this.isIngest = roles.contains("ingest");
6464
}

mr/src/test/java/org/elasticsearch/hadoop/serialization/handler/write/impl/SerializationEventConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void generateEventWritable() throws Exception {
6969
SerializationFailure iaeFailure = new SerializationFailure(new IllegalArgumentException("garbage"), document, new ArrayList<String>());
7070

7171
String rawEvent = eventConverter.getRawEvent(iaeFailure);
72-
assertThat(rawEvent, Matchers.startsWith("org.apache.hadoop.io.MapWritable@"));
72+
assertTrue(rawEvent.matches("\\{field=value\\}|^org.apache.hadoop.io.MapWritable@.*"));
7373
String timestamp = eventConverter.getTimestamp(iaeFailure);
7474
assertTrue(StringUtils.hasText(timestamp));
7575
assertTrue(DateUtils.parseDate(timestamp).getTime().getTime() > 1L);

settings.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ include 'sql-20'
1515
project(":sql-20").projectDir = new File(settingsDir, "spark/sql-20")
1616
project(":sql-20").name = "elasticsearch-spark-20"
1717

18+
include 'sql-30'
19+
project(":sql-30").projectDir = new File(settingsDir, "spark/sql-30")
20+
project(":sql-30").name = "elasticsearch-spark-30"
21+
1822
include 'storm'
1923
project(":storm").name = "elasticsearch-storm"
2024

spark/core/main/scala/org/elasticsearch/spark/rdd/AbstractEsRDD.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
*/
1919
package org.elasticsearch.spark.rdd;
2020

21-
import scala.collection.JavaConversions.collectionAsScalaIterable
22-
import scala.collection.JavaConversions.mapAsJavaMap
21+
import scala.collection.JavaConverters._
2322
import scala.reflect.ClassTag
2423
import org.apache.commons.logging.LogFactory
2524
import org.apache.spark.Partition
@@ -45,7 +44,7 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](
4544
@transient protected lazy val logger = LogFactory.getLog(this.getClass())
4645

4746
override def getPartitions: Array[Partition] = {
48-
esPartitions.zipWithIndex.map { case(esPartition, idx) =>
47+
esPartitions.asScala.zipWithIndex.map { case(esPartition, idx) =>
4948
new EsPartition(id, idx, esPartition)
5049
}.toArray
5150
}
@@ -70,7 +69,7 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](
7069

7170
@transient private[spark] lazy val esCfg = {
7271
val cfg = new SparkSettingsManager().load(sc.getConf).copy();
73-
cfg.merge(params)
72+
cfg.merge(params.asJava)
7473
InitializationUtils.setUserProviderIfNotSet(cfg, classOf[HadoopUserProvider], logger)
7574
cfg
7675
}

spark/core/main/scala/org/elasticsearch/spark/rdd/EsRDDWriter.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.elasticsearch.spark.rdd
2121
import org.apache.commons.logging.Log
2222
import org.apache.commons.logging.LogFactory
2323
import org.apache.spark.TaskContext
24+
import org.apache.spark.util.TaskCompletionListener
2425
import org.elasticsearch.hadoop.cfg.PropertiesSettings
2526
import org.elasticsearch.hadoop.cfg.Settings
2627
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider
@@ -64,7 +65,10 @@ private[spark] class EsRDDWriter[T: ClassTag](val serializedSettings: String,
6465
def write(taskContext: TaskContext, data: Iterator[T]): Unit = {
6566
val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)
6667

67-
taskContext.addTaskCompletionListener((TaskContext) => writer.close())
68+
val taskCompletionListener = new TaskCompletionListener {
69+
override def onTaskCompletion(context: TaskContext): Unit = writer.close()
70+
}
71+
taskContext.addTaskCompletionListener(taskCompletionListener)
6872

6973
if (runtimeMetadata) {
7074
writer.repository.addRuntimeFieldExtractor(metaExtractor)

0 commit comments

Comments
 (0)