Skip to content

Add spark versions of walker classes #2256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 11, 2017
Merged

Add spark versions of walker classes #2256

merged 3 commits into from
Jan 11, 2017

Conversation

tomwhite
Copy link
Contributor

@tomwhite tomwhite commented Nov 4, 2016

This adds ReadWalkerSpark, AssemblyRegionWalkerSpark, IntervalWalkerSpark, VariantWalkerSpark, and examples.

@tomwhite
Copy link
Contributor Author

tomwhite commented Nov 4, 2016

@lbergelson can you take a look?

@droazen droazen self-assigned this Nov 4, 2016
@tomwhite tomwhite force-pushed the tw_spark_walkers branch 2 times, most recently from 5f2653e to 679744f Compare November 8, 2016 09:56
@codecov-io
Copy link

codecov-io commented Nov 8, 2016

Current coverage is 75.861% (diff: 84.830%)

Merging #2256 into master will increase coverage by 0.159%

@@             master      #2256   diff @@
==========================================
  Files           729        742    +13   
  Lines         38506      38954   +448   
  Methods           0          0          
  Messages          0          0          
  Branches       8039       8123    +84   
==========================================
+ Hits          29150      29551   +401   
- Misses         6848       6858    +10   
- Partials       2508       2545    +37   

Sunburst

Diff Coverage File Path
••• 33% ...er/engine/spark/datasources/VariantsSparkSource.java
••••••• 72% new ...tute/hellbender/engine/spark/VariantWalkerSpark.java
••••••• 74% new ...bender/tools/examples/ExampleVariantWalkerSpark.java
••••••• 75% new ...ute/hellbender/engine/spark/IntervalWalkerSpark.java
••••••• 77% new ...stitute/hellbender/engine/spark/ReadWalkerSpark.java
•••••••• 87% new ...itute/hellbender/engine/spark/ReadWalkerContext.java
•••••••• 87% ...titute/hellbender/engine/spark/LocusWalkerSpark.java
•••••••• 89% new ...ender/tools/examples/ExampleIntervalWalkerSpark.java
••••••••• 90% new ...llbender/engine/spark/AssemblyRegionWalkerSpark.java
••••••••• 92% new ...ls/examples/ExampleReadWalkerWithReferenceSpark.java

Review all 19 files changed

Powered by Codecov. Last update e1b4c8f...bdb69f8

Copy link
Member

@lbergelson lbergelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the slow turn around. This looks really good overall.
I have a few nitpicks. I think it might be worthwhile to have wrapper types to avoid having to access fields with _1, _2, .... And it might be good to provide a more structured method for each tool so that extending them is slightly more obvious. (or if that'
s impracticable a bit of documentation at the head of each class explaining how to structure the runTool() )

I'm also a bit skeptical that the Feature's are going to work smoothly. Have you tested that on a real cluster? Do you have to preload the feature files in hdfs?

I think there are few places that we may end up converting flatMaps -> mapPartitions for performance reasons if there is some shared state, but we can wait to profile that since it makes things more complicated.

Also, if we could use streams instead of the guava iterators it would be a bit nicer I think, but I didn't actually try to do the refactoring, so it may be more complicated than I'm thinking of. If that's the case then there isn't any problem with leaving them.

/**
* Collects common variant filters.
*/
public final class VariantFilterLibrary {
public static VariantFilter ALLOW_ALL_VARIANTS = variant -> true;
public static VariantFilter ALLOW_ALL_VARIANTS = (VariantFilter & Serializable) variant -> true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets just make VariantFilter Serializable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -20,6 +20,7 @@
public final class ReadsContext implements Iterable<GATKRead> {

private final ReadsDataSource dataSource;
private final Iterable<GATKRead> iterable;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general this sort of thing where we have two paths to do everything within a class is kind of gross. It makes me think we should either have two classes, or find a way of representing the internals in a way that is agnostic to what it was constructed from.

We could do that by either storing a query lambda or by creating an anonymous adaptor class from Shard -> GatkDataSource<Read>. Both of those may be more complicated than your solution though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this by creating an anonymous adaptor class.

private List<ShardBoundary> intervalShards;

@Override
protected List<SimpleInterval> editIntervals(List<SimpleInterval> rawIntervals) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment to this method calling out the fact that it sets intervalShards as a side effect? People might gloss over that since editIntervals isn't usually expected to have any side effects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*
* @return all assembly regions as a {@link JavaRDD}, bounded by intervals if specified.
*/
public JavaRDD<Tuple3<AssemblyRegion, ReferenceContext, FeatureContext>> getAssemblyRegions(JavaSparkContext ctx) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tuple3 is a pretty awkward class to work with. I think introducing a container object would be a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @return The evaluator to be used to determine whether each locus is active or not. Must be implemented by tool authors.
* The results of this per-locus evaluator are used to determine the bounds of each active and inactive region.
*/
public abstract AssemblyRegionEvaluator assemblyRegionEvaluator();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might want to mention that it will be called once per shard, which may be expensive if this is an expensive operation. I found that practically, I had to either reuse the assembly region evaluator because initializing a HaplotypeCallerEngine is expensive. I ended up making the downstream call that used the assemblyRegionEvaluator be a mapPartitions in order to reduce the number of instantiations of the engine. Alternatively it could be serialized, but I ran into issues serializing the haplotypecallerengine since it does it's own file access.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it stands it's being serialized for each task. I suggest leaving it, and addressing any problems when using this for the HaplotypeCaller.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds fine. We'll measure it when we get to it.

*
* @return all reads as a {@link JavaRDD}, bounded by intervals if specified.
*/
public JavaRDD<Tuple3<GATKRead, ReferenceContext, FeatureContext>> getReads(JavaSparkContext ctx) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, I think the wrapper type might be convinient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public JavaRDD<Tuple4<SimpleInterval, ReadsContext, ReferenceContext, FeatureContext>> getIntervals(JavaSparkContext ctx) {
SAMSequenceDictionary sequenceDictionary = getBestAvailableSequenceDictionary();
// don't shard the intervals themselves, since we want each interval to be processed by a single task
final List<ShardBoundary> intervalShardBoundaries = getIntervals().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some warning if there are large intervals specified? It works fine in the walker since everything is computed lazily, but here we're going to get OOM I think since we're aggressively loading things into shards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. What would you consider to be large enough to need a warning?

new ReferenceMemorySource(bReferenceSource.getValue().getReferenceBases(null, paddedInterval), sequenceDictionary);
FeatureManager features = bFeatureManager == null ? null : bFeatureManager.getValue();

Iterator<Tuple3<GATKRead, ReferenceContext, FeatureContext>> transform = Iterators.transform(shard.iterator(), new Function<GATKRead, Tuple3<GATKRead, ReferenceContext, FeatureContext>>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace with the more idiomatic stream instead or is there something that Iterators does better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

header, referenceContext, featureContext, evaluator,
minAssemblyRegionSize, maxAssemblyRegionSize, assemblyRegionPadding, activeProbThreshold,
maxProbPropagationDistance);
return Iterators.transform(assemblyRegions.iterator(), new Function<AssemblyRegion, Tuple3<AssemblyRegion, ReferenceContext, FeatureContext>>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace with stream or is iterators doing something more cleanly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* A Spark version of {@link ReadWalker}.
*/
public abstract class ReadWalkerSpark extends GATKSparkTool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a simple ExampleReadWalker?

@tomwhite
Copy link
Contributor Author

Thanks for the review @lbergelson. I've addressed most of the feedback, but still have a few more to do.

@tomwhite
Copy link
Contributor Author

@lbergelson I've addressed your feedback now, so back to you.

@lbergelson
Copy link
Member

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants