Skip to content

Commit 37ca5bb

Browse files
authored
Merge pull request #2220 from broadinstitute/tw_spark2
Upgrade to Spark 2
2 parents 6d884c6 + 8f3255a commit 37ca5bb

37 files changed

+159
-308
lines changed

build.gradle

+5-5
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ repositories {
5555
}
5656

5757
final htsjdkVersion = System.getProperty('htsjdk.version','2.6.1')
58-
final hadoopBamVersion = System.getProperty('hadoopBam.version','7.7.0')
58+
final hadoopBamVersion = System.getProperty('hadoopBam.version','7.7.1')
5959

6060
configurations.all {
6161
resolutionStrategy {
@@ -99,7 +99,7 @@ dependencies {
9999
// Using the shaded version to avoid conflicts between its protobuf dependency
100100
// and that of Hadoop/Spark (either the one we reference explicitly, or the one
101101
// provided by dataproc).
102-
compile 'com.google.cloud:gcloud-java-nio:0.2.8:shaded'
102+
compile 'com.google.cloud:google-cloud-nio:0.5.1:shaded'
103103
compile 'com.google.cloud.genomics:google-genomics-dataflow:v1beta2-0.15'
104104
compile 'com.google.cloud.genomics:gatk-tools-java:1.1'
105105
compile 'org.apache.logging.log4j:log4j-api:2.3'
@@ -117,15 +117,15 @@ dependencies {
117117
compile 'com.google.cloud.genomics:google-genomics-utils:v1beta2-0.30'
118118

119119
compile 'org.ojalgo:ojalgo:39.0'
120-
compile ('org.apache.spark:spark-mllib_2.10:1.6.1') {
120+
compile ('org.apache.spark:spark-mllib_2.10:2.0.0') {
121121
// JUL is used by Google Dataflow as the backend logger, so exclude jul-to-slf4j to avoid a loop
122122
exclude module: 'jul-to-slf4j'
123123
exclude module: 'javax.servlet'
124124
exclude module: 'servlet-api'
125125
}
126126

127127
compile 'org.bdgenomics.bdg-formats:bdg-formats:0.5.0'
128-
compile('org.bdgenomics.adam:adam-core_2.10:0.18.0') {
128+
compile('org.bdgenomics.adam:adam-core_2.10:0.20.0') {
129129
exclude group: 'org.slf4j'
130130
exclude group: 'org.apache.hadoop'
131131
exclude group: 'org.scala-lang'
@@ -142,7 +142,7 @@ dependencies {
142142
exclude module: 'htsjdk'
143143
}
144144
compile('org.apache.hadoop:hadoop-client:2.7.2') // should be a 'provided' dependency
145-
compile('com.github.jsr203hadoop:jsr203hadoop:1.0.2')
145+
compile('com.github.jsr203hadoop:jsr203hadoop:1.0.3')
146146

147147
compile('de.javakaffee:kryo-serializers:0.37') {
148148
exclude module: 'kryo' // use Spark's version

gatk-launch

+20-9
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def main(args):
8686
print(" LOCAL: run using the in-memory spark runner")
8787
print(" SPARK: run using spark-submit on an existing cluster ")
8888
print(" --sparkMaster must be specified")
89+
print(" --sparkSubmitCommand may be specified to control the Spark submit command")
8990
print(" arguments to spark-submit may optionally be specified after -- ")
9091
print(" GCS: run using Google cloud dataproc")
9192
print(" commands after the -- will be passed to dataproc")
@@ -109,6 +110,12 @@ def main(args):
109110
del args[i] #remove spark target
110111
del args[i] #and its parameter
111112

113+
sparkSubmitCommand = getValueForArgument(args, "--sparkSubmitCommand")
114+
if sparkSubmitCommand is not None:
115+
i = args.index("--sparkSubmitCommand")
116+
del args[i] #remove sparkSubmitCommand target
117+
del args[i] #and its parameter
118+
112119
(gatkArgs, sparkArgs) = getSplitArgs(args)
113120

114121
sparkMaster = getValueForArgument(sparkArgs, "--sparkMaster")
@@ -118,19 +125,22 @@ def main(args):
118125
del sparkArgs[i] #and its parameter
119126
gatkArgs += ["--sparkMaster", sparkMaster]
120127

121-
runGATK(sparkRunner, dryRun, gatkArgs, sparkArgs)
128+
runGATK(sparkRunner, sparkSubmitCommand, dryRun, gatkArgs, sparkArgs)
122129
except GATKLaunchException as e:
123130
sys.stderr.write(str(e)+"\n")
124131
sys.exit(3)
125132
except CalledProcessError as e:
126133
sys.exit(e.returncode)
127134

128-
def getSparkSubmit():
129-
sparkhome = os.environ.get("SPARK_HOME")
130-
if sparkhome is not None:
131-
return sparkhome +"/bin/spark-submit"
135+
def getSparkSubmitCommand(sparkSubmitCommand):
136+
if sparkSubmitCommand is None:
137+
sparkhome = os.environ.get("SPARK_HOME")
138+
if sparkhome is not None:
139+
return sparkhome +"/bin/spark-submit"
140+
else:
141+
return "spark-submit"
132142
else:
133-
return "spark-submit"
143+
return sparkSubmitCommand
134144

135145
def getLocalGatkRunCommand():
136146
localJarFromEnv = getJarFromEnv(GATK_LOCAL_JAR_ENV_VARIABLE)
@@ -251,12 +261,13 @@ def cacheJarOnGCS(jar, dryRun):
251261
return jar
252262

253263

254-
def runGATK(sparkRunner, dryrun, gatkArgs, sparkArgs):
264+
def runGATK(sparkRunner, suppliedSparkSubmitCommand, dryrun, gatkArgs, sparkArgs):
255265
if sparkRunner is None or sparkRunner == "LOCAL":
256266
cmd = getLocalGatkRunCommand() + gatkArgs + sparkArgs
257267
runCommand(cmd, dryrun)
258268
elif sparkRunner == "SPARK":
259-
cmd = [ getSparkSubmit(),
269+
sparkSubmitCmd = getSparkSubmitCommand(suppliedSparkSubmitCommand)
270+
cmd = [ sparkSubmitCmd,
260271
"--master", getSparkMasterSpecified(gatkArgs)] \
261272
+ DEFAULT_SPARK_ARGS \
262273
+ sparkArgs \
@@ -265,7 +276,7 @@ def runGATK(sparkRunner, dryrun, gatkArgs, sparkArgs):
265276
try:
266277
runCommand(cmd, dryrun)
267278
except OSError:
268-
raise GATKLaunchException("Tried to run spark-submit but failed.\nMake sure spark-submit is available in your path")
279+
raise GATKLaunchException("Tried to run %s but failed.\nMake sure %s is available in your path" % (sparkSubmitCmd, sparkSubmitCmd))
269280
elif sparkRunner == "GCS":
270281
jarPath = cacheJarOnGCS(getSparkJar(), dryrun)
271282
dataprocargs = convertSparkSubmitToDataprocArgs(DEFAULT_SPARK_ARGS + sparkArgs)

src/main/java/org/broadinstitute/hellbender/cmdline/CommandLineProgram.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.broadinstitute.hellbender.utils.Utils;
1818

1919
import java.io.File;
20-
import java.io.IOException;
2120
import java.io.PrintStream;
2221
import java.net.InetAddress;
2322
import java.text.DecimalFormat;
@@ -45,13 +44,7 @@
4544
*
4645
*/
4746
public abstract class CommandLineProgram {
48-
protected transient Logger logger = LogManager.getLogger(this.getClass());
49-
50-
private void readObject(java.io.ObjectInputStream in)
51-
throws IOException, ClassNotFoundException {
52-
in.defaultReadObject();
53-
logger = LogManager.getLogger(this.getClass()); // Logger is not serializable (even by Kryo)
54-
}
47+
protected final Logger logger = LogManager.getLogger(this.getClass());
5548

5649
@Argument(common=true, optional=true)
5750
public List<File> TMP_DIR = new ArrayList<>();

src/main/java/org/broadinstitute/hellbender/engine/FeatureManager.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ public final class FeatureManager implements AutoCloseable {
7777
}
7878

7979
/**
80-
* The tool instance containing the FeatureInput argument values that will form the basis of our
80+
* The simple class name of the tool instance containing the FeatureInput argument values that will form the basis of our
8181
* pool of FeatureDataSources
8282
*/
83-
private final CommandLineProgram toolInstance;
83+
private final String toolInstanceSimpleClassName;
8484

8585
/**
8686
* Mapping from FeatureInput argument to query-able FeatureDataSource for that source of Features
@@ -110,10 +110,10 @@ public FeatureManager( final CommandLineProgram toolInstance ) {
110110
* the end of query intervals in anticipation of future queries (>= 0).
111111
*/
112112
public FeatureManager( final CommandLineProgram toolInstance, final int featureQueryLookahead ) {
113-
this.toolInstance = toolInstance;
113+
this.toolInstanceSimpleClassName = toolInstance.getClass().getSimpleName();
114114
featureSources = new LinkedHashMap<>();
115115

116-
initializeFeatureSources(featureQueryLookahead);
116+
initializeFeatureSources(featureQueryLookahead, toolInstance);
117117
}
118118

119119
/**
@@ -124,7 +124,7 @@ public FeatureManager( final CommandLineProgram toolInstance, final int featureQ
124124
* the end of query intervals in anticipation of future queries (>= 0).
125125
*/
126126
@SuppressWarnings({"unchecked", "rawtypes"})
127-
private void initializeFeatureSources( final int featureQueryLookahead ) {
127+
private void initializeFeatureSources( final int featureQueryLookahead, final CommandLineProgram toolInstance ) {
128128

129129
// Discover all arguments of type FeatureInput (or Collections thereof) in our tool's class hierarchy
130130
// (and associated ArgumentCollections). Arguments not specified by the user on the command line will
@@ -316,7 +316,7 @@ private <T extends Feature> FeatureDataSource<T> lookupDataSource( final Feature
316316
"In order to be detected, FeatureInputs must be declared in the tool class " +
317317
"itself, a superclass of the tool class, or an @ArgumentCollection declared " +
318318
"in the tool class or a superclass. They must also be annotated as an @Argument.",
319-
featureDescriptor.getName(), toolInstance.getClass().getSimpleName()));
319+
featureDescriptor.getName(), toolInstanceSimpleClassName));
320320
}
321321

322322
return dataSource;

src/main/java/org/broadinstitute/hellbender/engine/spark/AddContextDataToReadSpark.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private static JavaPairRDD<GATKRead, ReadContextData> addUsingOverlapsPartitioni
111111
return shardedReads.flatMapToPair(new PairFlatMapFunction<Shard<GATKRead>, GATKRead, ReadContextData>() {
112112
private static final long serialVersionUID = 1L;
113113
@Override
114-
public Iterable<Tuple2<GATKRead, ReadContextData>> call(Shard<GATKRead> shard) throws Exception {
114+
public Iterator<Tuple2<GATKRead, ReadContextData>> call(Shard<GATKRead> shard) throws Exception {
115115
// get reference bases for this shard (padded)
116116
SimpleInterval paddedInterval = shard.getInterval().expandWithinContig(shardPadding, sequenceDictionary);
117117
ReferenceBases referenceBases = bReferenceSource.getValue().getReferenceBases(null, paddedInterval);
@@ -132,7 +132,7 @@ public Tuple2<GATKRead, ReadContextData> apply(@Nullable GATKRead r) {
132132
}
133133
});
134134
// only include reads that start in the shard
135-
return () -> Iterators.filter(transform, r -> r._1().getStart() >= shard.getStart()
135+
return Iterators.filter(transform, r -> r._1().getStart() >= shard.getStart()
136136
&& r._1().getStart() <= shard.getEnd());
137137
}
138138
});

src/main/java/org/broadinstitute/hellbender/engine/spark/AddContextDataToReadSparkOptimized.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,8 @@ public static FlatMapFunction<ContextShard,ContextShard> subdivideAndFillReads(S
108108
return new FlatMapFunction<ContextShard, ContextShard>() {
109109
private static final long serialVersionUID = 1L;
110110
@Override
111-
public Iterable<ContextShard> call(ContextShard contextShard) throws Exception {
112-
return new Iterable<ContextShard>() {
113-
@Override
114-
public Iterator<ContextShard> iterator() {
115-
try {
116-
return new SubdivideAndFillReadsIterator(bam, auth, outputShardSize, margin, optFilter, contextShard);
117-
} catch (Exception x) {
118-
throw new RuntimeException(x);
119-
}
120-
}
121-
};
111+
public Iterator<ContextShard> call(ContextShard contextShard) throws Exception {
112+
return new SubdivideAndFillReadsIterator(bam, auth, outputShardSize, margin, optFilter, contextShard);
122113
}
123114
};
124115
}

src/main/java/org/broadinstitute/hellbender/engine/spark/CoalescedRDD.java

-96
This file was deleted.

src/main/java/org/broadinstitute/hellbender/engine/spark/CoalescedRDDPartition.java

-62
This file was deleted.

src/main/java/org/broadinstitute/hellbender/engine/spark/LocusWalkerSpark.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public Tuple3<AlignmentContext, ReferenceContext, FeatureContext> apply(@Nullabl
130130
return new Tuple3<>(alignmentContext, new ReferenceContext(reference, alignmentInterval), new FeatureContext(fm, alignmentInterval));
131131
}
132132
});
133-
return () -> transform;
133+
return transform;
134134
};
135135
}
136136
}

0 commit comments

Comments
 (0)