Skip to content

Commit 4004cb0

Browse files
committed
spark 3.0.0 with hdp 3.2
1 parent d0c91e2 commit 4004cb0

File tree

87 files changed

+10561
-13
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+10561
-13
lines changed

.travis.yml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
language: java
22
jdk:
3-
- oraclejdk8
4-
3+
- openjdk12
54

65
dist: trusty
76

7+
addons:
8+
apt:
9+
packages:
10+
- oracle-java8-installer
11+
812
before_install:
913
- "chmod +x gradlew"
1014
- export JAVA_OPTS="-Xmx2048m -XX:MaxPermSize=386m"
15+
- export JAVA8_HOME=$(update-alternatives --list java | grep java-8-oracle | sed 's|/bin/java$||' | sed 's|/jre$||')
1116

12-
install: ./gradlew assemble -Pskip.signing
17+
install: ./gradlew assemble -Pskip.signing updateSHAs

buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/BaseBuildPlugin.groovy

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ class BaseBuildPlugin implements Plugin<Project> {
124124
project.rootProject.ext.hadoopVersion = project.hadoop22Version
125125
println "Using Apache Hadoop [$project.hadoop22Version]"
126126
break
127+
// Hadoop YARN/3.2.x
128+
case "hadoopYarn3":
129+
String version = project.hadoop32Version
130+
project.rootProject.ext.hadoopVersion = version
131+
project.rootProject.ext.hadoopClient = ["org.apache.hadoop:hadoop-client:$version"]
132+
println "Using Apache Hadoop on YARN [$version]"
133+
break
127134
default:
128135
throw new GradleException("Invalid [hadoopDistro] setting: [$project.rootProject.ext.hadoopDistro]")
129136
}

gradle.properties

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ log4jVersion = 2.6.2
88
# Hadoop versions
99
hadoop2Version = 2.7.6
1010
hadoop22Version = 2.2.0
11+
hadoop32Version = 3.2.0
12+
13+
# `distro` property can be one of the following: [hadoopYarn, hadoopYarn3, hadoopStable]
14+
# default: hadoopYarn => build with hadoop2Version
15+
# hadoopStable => build with hadoop22Version
16+
# hadoopYarn3 => build with hadoop32Version
17+
# distro = hadoopYarn3
1118

1219
# Common libraries
1320
hiveVersion = 1.2.1
@@ -19,11 +26,14 @@ jacksonVersion = 1.8.8
1926
# Spark
2027
spark13Version = 1.6.2
2128
spark20Version = 2.3.0
29+
spark30Version = 3.0.0
2230
# same as Spark's
2331
scala210Version = 2.10.7
2432
scala210MajorVersion = 2.10
2533
scala211Version = 2.11.12
2634
scala211MajorVersion = 2.11
35+
scala212Version = 2.12.10
36+
scala212MajorVersion = 2.12
2737

2838
stormVersion = 1.0.6
2939

hive/src/test/java/org/elasticsearch/hadoop/serialization/handler/write/impl/HiveSerializationEventConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void generateEventHiveRecordLimited() throws Exception {
8585
SerializationFailure iaeFailure = new SerializationFailure(new IllegalArgumentException("garbage"), tuple, new ArrayList<String>());
8686

8787
String rawEvent = eventConverter.getRawEvent(iaeFailure);
88-
assertThat(rawEvent, startsWith("HiveType{object=org.apache.hadoop.io.MapWritable@"));
88+
assertTrue(rawEvent.matches("HiveType\\{object=\\{three=3, one=1, two=2\\}.*|^HiveType\\{object=org.apache.hadoop.io.MapWritable@.*"));
8989
String timestamp = eventConverter.getTimestamp(iaeFailure);
9090
assertTrue(StringUtils.hasText(timestamp));
9191
assertTrue(DateUtils.parseDate(timestamp).getTime().getTime() > 1L);

mr/src/main/java/org/elasticsearch/hadoop/serialization/dto/NodeInfo.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ public NodeInfo(String id, Map<String, Object> map) {
5252
this.isData = true;
5353
} else {
5454
String data = (String) attributes.get("data");
55-
this.isClient = data == null ? true : !Boolean.parseBoolean(data);
56-
this.isData = data == null ? true : Boolean.parseBoolean(data);
55+
this.isClient = data == null || !Boolean.parseBoolean(data);
56+
this.isData = data == null || Boolean.parseBoolean(data);
5757
}
5858
this.isIngest = false;
5959
} else {
6060
List<String> roles = (List<String>) map.get("roles");
61-
this.isClient = roles.contains("data") == false;
61+
this.isClient = !roles.contains("data");
6262
this.isData = roles.contains("data");
6363
this.isIngest = roles.contains("ingest");
6464
}

mr/src/test/java/org/elasticsearch/hadoop/serialization/handler/write/impl/SerializationEventConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void generateEventWritable() throws Exception {
6969
SerializationFailure iaeFailure = new SerializationFailure(new IllegalArgumentException("garbage"), document, new ArrayList<String>());
7070

7171
String rawEvent = eventConverter.getRawEvent(iaeFailure);
72-
assertThat(rawEvent, Matchers.startsWith("org.apache.hadoop.io.MapWritable@"));
72+
assertTrue(rawEvent.matches("\\{field=value\\}|^org.apache.hadoop.io.MapWritable@.*"));
7373
String timestamp = eventConverter.getTimestamp(iaeFailure);
7474
assertTrue(StringUtils.hasText(timestamp));
7575
assertTrue(DateUtils.parseDate(timestamp).getTime().getTime() > 1L);

settings.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ include 'sql-20'
1515
project(":sql-20").projectDir = new File(settingsDir, "spark/sql-20")
1616
project(":sql-20").name = "elasticsearch-spark-20"
1717

18+
include 'sql-30'
19+
project(":sql-30").projectDir = new File(settingsDir, "spark/sql-30")
20+
project(":sql-30").name = "elasticsearch-spark-30"
21+
1822
include 'storm'
1923
project(":storm").name = "elasticsearch-storm"
2024

spark/core/main/scala/org/elasticsearch/spark/rdd/AbstractEsRDD.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
*/
1919
package org.elasticsearch.spark.rdd;
2020

21-
import scala.collection.JavaConversions.collectionAsScalaIterable
22-
import scala.collection.JavaConversions.mapAsJavaMap
21+
import scala.collection.JavaConverters._
2322
import scala.reflect.ClassTag
2423
import org.apache.commons.logging.LogFactory
2524
import org.apache.spark.Partition
@@ -45,7 +44,7 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](
4544
@transient protected lazy val logger = LogFactory.getLog(this.getClass())
4645

4746
override def getPartitions: Array[Partition] = {
48-
esPartitions.zipWithIndex.map { case(esPartition, idx) =>
47+
esPartitions.asScala.zipWithIndex.map { case(esPartition, idx) =>
4948
new EsPartition(id, idx, esPartition)
5049
}.toArray
5150
}
@@ -70,7 +69,7 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](
7069

7170
@transient private[spark] lazy val esCfg = {
7271
val cfg = new SparkSettingsManager().load(sc.getConf).copy();
73-
cfg.merge(params)
72+
cfg.merge(params.asJava)
7473
InitializationUtils.setUserProviderIfNotSet(cfg, classOf[HadoopUserProvider], logger)
7574
cfg
7675
}

spark/core/main/scala/org/elasticsearch/spark/rdd/EsRDDWriter.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.elasticsearch.spark.rdd
2121
import org.apache.commons.logging.Log
2222
import org.apache.commons.logging.LogFactory
2323
import org.apache.spark.TaskContext
24+
import org.apache.spark.util.TaskCompletionListener
2425
import org.elasticsearch.hadoop.cfg.PropertiesSettings
2526
import org.elasticsearch.hadoop.cfg.Settings
2627
import org.elasticsearch.hadoop.mr.security.HadoopUserProvider
@@ -64,7 +65,10 @@ private[spark] class EsRDDWriter[T: ClassTag](val serializedSettings: String,
6465
def write(taskContext: TaskContext, data: Iterator[T]): Unit = {
6566
val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)
6667

67-
taskContext.addTaskCompletionListener((TaskContext) => writer.close())
68+
val taskCompletionListener = new TaskCompletionListener {
69+
override def onTaskCompletion(context: TaskContext): Unit = writer.close()
70+
}
71+
taskContext.addTaskCompletionListener(taskCompletionListener)
6872

6973
if (runtimeMetadata) {
7074
writer.repository.addRuntimeFieldExtractor(metaExtractor)

spark/sql-30/build.gradle

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
2+
description = "Elasticsearch Spark (for Spark 3.X)"
3+
4+
evaluationDependsOn(':elasticsearch-hadoop-mr')
5+
6+
apply plugin: 'java-library'
7+
apply plugin: 'scala'
8+
apply plugin: 'es.hadoop.build.integration'
9+
apply plugin: 'scala.variants'
10+
11+
variants {
12+
defaultVersion '2.12.10'
13+
targetVersions '2.12.10'
14+
}
15+
16+
configurations {
17+
embedded {
18+
transitive = false
19+
canBeResolved = true
20+
}
21+
implementation {
22+
extendsFrom project.configurations.embedded
23+
}
24+
if (project.ext.scalaMajorVersion != '2.10') {
25+
scalaCompilerPlugin {
26+
defaultDependencies { dependencies ->
27+
dependencies.add(project.dependencies.create( "com.typesafe.genjavadoc:genjavadoc-plugin_${scalaVersion}:0.16"))
28+
}
29+
}
30+
}
31+
}
32+
33+
println "Compiled using Scala ${project.ext.scalaMajorVersion} [${project.ext.scalaVersion}]"
34+
String sparkVersion = spark30Version
35+
36+
// Revert to spark 2.2.0 for scala 2.10 as 2.3+ does not support scala 2.10
37+
if (project.ext.scalaMajorVersion == '2.10') {
38+
sparkVersion = '2.2.0'
39+
}
40+
41+
tasks.withType(ScalaCompile) { ScalaCompile task ->
42+
task.sourceCompatibility = project.ext.minimumRuntimeVersion
43+
task.targetCompatibility = project.ext.minimumRuntimeVersion
44+
task.options.forkOptions.executable = new File(project.ext.runtimeJavaHome, 'bin/java').absolutePath
45+
}
46+
47+
compileScala {
48+
configure(scalaCompileOptions.forkOptions) {
49+
memoryMaximumSize = '1g'
50+
jvmArgs = ['-XX:MaxPermSize=512m']
51+
}
52+
scalaCompileOptions.additionalParameters = [
53+
"-feature",
54+
"-unchecked",
55+
"-deprecation",
56+
"-Xfuture",
57+
"-Yno-adapted-args",
58+
"-Ywarn-dead-code",
59+
"-Ywarn-numeric-widen",
60+
"-Xfatal-warnings"
61+
]
62+
}
63+
64+
String coreSrc = file("$projectDir/../core").absolutePath.replace('\\','/')
65+
66+
sourceSets {
67+
main.scala.srcDirs += "$coreSrc/main/scala"
68+
test.scala.srcDirs += "$coreSrc/test/scala"
69+
itest.java.srcDirs += "$coreSrc/itest/java"
70+
itest.scala.srcDirs += "$coreSrc/itest/scala"
71+
itest.resources.srcDirs += "$coreSrc/itest/resources"
72+
}
73+
74+
def javaFilesOnly = { FileTreeElement spec ->
75+
spec.file.name.endsWith('.java') || spec.isDirectory()
76+
}
77+
78+
artifacts {
79+
sourceElements(project.file("$coreSrc/main/scala"))
80+
// Add java files from core source to javadocElements.
81+
project.fileTree("$coreSrc/main/scala").include(javaFilesOnly).each {
82+
javadocElements(it)
83+
}
84+
project.fileTree("src/main/scala").include(javaFilesOnly).each {
85+
javadocElements(it)
86+
}
87+
}
88+
89+
// currently the outside project folders are transformed into linked resources however
90+
// Gradle only supports one so the project will be invalid as not all sources will be in there
91+
// as such, they are setup here manually for Eclipse. IntelliJ probably needs a similar approach
92+
eclipse {
93+
project.file.whenMerged { pj ->
94+
// eliminated resources created by gradle
95+
96+
linkedResources.clear()
97+
linkedResources.add(new org.gradle.plugins.ide.eclipse.model.Link("core/main/scala", "2", "$coreSrc/main/scala", null))
98+
linkedResources.add(new org.gradle.plugins.ide.eclipse.model.Link("core/test/scala", "2", "$coreSrc/test/scala", null))
99+
linkedResources.add(new org.gradle.plugins.ide.eclipse.model.Link("core/itest/java", "2", "$coreSrc/itest/java", null))
100+
linkedResources.add(new org.gradle.plugins.ide.eclipse.model.Link("core/itest/scala", "2", "$coreSrc/itest/scala", null))
101+
linkedResources.add(new org.gradle.plugins.ide.eclipse.model.Link("core/itest/resources","2", "$coreSrc/itest/resources", null))
102+
103+
}
104+
classpath.file {
105+
whenMerged { cp ->
106+
entries.removeAll { entry ->
107+
entry.kind == 'src' && (entry.path in ["scala", "java", "resources"] || entry.path.startsWith("itest-") || entry.path.endsWith("-scala"))
108+
}
109+
110+
entries.add(new org.gradle.plugins.ide.eclipse.model.SourceFolder("core/main/scala", null))
111+
entries.add(new org.gradle.plugins.ide.eclipse.model.SourceFolder("core/test/scala", null))
112+
entries.add(new org.gradle.plugins.ide.eclipse.model.SourceFolder("core/itest/java", null))
113+
entries.add(new org.gradle.plugins.ide.eclipse.model.SourceFolder("core/itest/scala", null))
114+
entries.add(new org.gradle.plugins.ide.eclipse.model.SourceFolder("core/itest/resources", null))
115+
}
116+
}
117+
}
118+
119+
dependencies {
120+
embedded(project(":elasticsearch-hadoop-mr"))
121+
122+
api("org.scala-lang:scala-library:$scalaVersion")
123+
api("org.scala-lang:scala-reflect:$scalaVersion")
124+
api("org.apache.spark:spark-core_${project.ext.scalaMajorVersion}:$sparkVersion") {
125+
exclude group: 'javax.servlet'
126+
exclude group: 'org.apache.hadoop'
127+
}
128+
129+
implementation("org.apache.spark:spark-sql_${project.ext.scalaMajorVersion}:$sparkVersion") {
130+
exclude group: 'org.apache.hadoop'
131+
}
132+
implementation("org.apache.spark:spark-streaming_${project.ext.scalaMajorVersion}:$sparkVersion") {
133+
exclude group: 'org.apache.hadoop'
134+
}
135+
implementation("org.slf4j:slf4j-api:1.7.6") {
136+
because 'spark exposes slf4j components in traits that we need to extend'
137+
}
138+
implementation("commons-logging:commons-logging:1.1.1")
139+
implementation("javax.xml.bind:jaxb-api:2.3.1")
140+
implementation("org.apache.spark:spark-catalyst_${project.ext.scalaMajorVersion}:$sparkVersion")
141+
implementation("org.apache.spark:spark-yarn_${project.ext.scalaMajorVersion}:$sparkVersion") {
142+
exclude group: 'org.apache.hadoop'
143+
}
144+
145+
// Scala compiler needs these for arcane reasons, but they are not used in the api nor the runtime
146+
compileOnly("com.fasterxml.jackson.core:jackson-annotations:2.6.7")
147+
compileOnly("org.json4s:json4s-jackson_${project.ext.scalaMajorVersion}:3.2.11")
148+
compileOnly("org.json4s:json4s-ast_${project.ext.scalaMajorVersion}:3.2.11")
149+
compileOnly("org.apache.spark:spark-tags_${project.ext.scalaMajorVersion}:$sparkVersion")
150+
151+
if ('2.10' == scalaMajorVersion) {
152+
implementation("org.apache.spark:spark-unsafe_${project.ext.scalaMajorVersion}:$sparkVersion")
153+
implementation("org.apache.avro:avro:1.7.7")
154+
implementation("log4j:log4j:1.2.17")
155+
implementation("com.google.code.findbugs:jsr305:2.0.1")
156+
implementation("org.json4s:json4s-ast_2.10:3.2.10")
157+
implementation("com.esotericsoftware.kryo:kryo:2.21")
158+
compileOnly("org.apache.hadoop:hadoop-annotations:${project.ext.hadoopVersion}")
159+
compileOnly("org.codehaus.jackson:jackson-core-asl:${project.ext.jacksonVersion}")
160+
compileOnly("org.codehaus.jackson:jackson-mapper-asl:${project.ext.jacksonVersion}")
161+
}
162+
163+
testImplementation(project(":test:shared"))
164+
testImplementation(project.ext.hadoopClient)
165+
testImplementation("org.elasticsearch:securemock:1.2")
166+
testImplementation("org.apache.spark:spark-core_${project.ext.scalaMajorVersion}:$sparkVersion") {
167+
exclude group: 'javax.servlet'
168+
exclude group: 'org.apache.hadoop'
169+
}
170+
testImplementation("org.apache.spark:spark-sql_${project.ext.scalaMajorVersion}:$sparkVersion") {
171+
exclude group: 'org.apache.hadoop'
172+
}
173+
174+
itestImplementation(project(":test:shared"))
175+
itestImplementation("org.apache.spark:spark-yarn_${project.ext.scalaMajorVersion}:$sparkVersion") {
176+
exclude group: 'org.apache.hadoop'
177+
}
178+
itestImplementation("org.apache.spark:spark-streaming_${project.ext.scalaMajorVersion}:$sparkVersion") {
179+
exclude group: 'org.apache.hadoop'
180+
}
181+
182+
additionalSources(project(":elasticsearch-hadoop-mr"))
183+
javadocSources(project(":elasticsearch-hadoop-mr"))
184+
}
185+
186+
// Export generated Java code from the genjavadoc compiler plugin
187+
artifacts {
188+
javadocElements(project.file("$buildDir/generated/java")) {
189+
builtBy compileScala
190+
}
191+
}
192+
193+
jar {
194+
dependsOn(project.configurations.embedded)
195+
from(project.configurations.embedded.collect { it.isDirectory() ? it : zipTree(it)}) {
196+
include "org/elasticsearch/hadoop/**"
197+
include "esh-build.properties"
198+
include "META-INF/services/*"
199+
}
200+
}
201+
202+
javadoc {
203+
dependsOn compileScala
204+
source += "$buildDir/generated/java"
205+
}
206+
207+
scaladoc {
208+
title = "${rootProject.description} ${version} API"
209+
}
210+
211+
if (project.ext.scalaMajorVersion != '2.10') {
212+
tasks.withType(ScalaCompile) {
213+
scalaCompileOptions.with {
214+
additionalParameters = [
215+
"-Xplugin:" + configurations.scalaCompilerPlugin.asPath,
216+
"-P:genjavadoc:out=$buildDir/generated/java".toString()
217+
]
218+
}
219+
}
220+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
5043bfebc3db072ed80fbd362e7caf00e885d8ae

0 commit comments

Comments
 (0)