-
Notifications
You must be signed in to change notification settings - Fork 602
Add spark versions of walker classes #2256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@lbergelson can you take a look? |
5f2653e
to
679744f
Compare
679744f
to
e113cb0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the slow turn around. This looks really good overall.
I have a few nitpicks. I think it might be worthwhile to have wrapper types to avoid having to access fields with _1, _2, .... And it might be good to provide a more structured method for each tool so that extending them is slightly more obvious. (or if that'
s impracticable a bit of documentation at the head of each class explaining how to structure the runTool() )
I'm also a bit skeptical that the Feature's are going to work smoothly. Have you tested that on a real cluster? Do you have to preload the feature files in hdfs?
I think there are few places that we may end up converting flatMaps -> mapPartitions for performance reasons if there is some shared state, but we can wait to profile that since it makes things more complicated.
Also, if we could use streams instead of the guava iterators it would be a bit nicer I think, but I didn't actually try to do the refactoring, so it may be more complicated than I'm thinking of. If that's the case then there isn't any problem with leaving them.
/** | ||
* Collects common variant filters. | ||
*/ | ||
public final class VariantFilterLibrary { | ||
public static VariantFilter ALLOW_ALL_VARIANTS = variant -> true; | ||
public static VariantFilter ALLOW_ALL_VARIANTS = (VariantFilter & Serializable) variant -> true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets just make VariantFilter Serializable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -20,6 +20,7 @@ | |||
public final class ReadsContext implements Iterable<GATKRead> { | |||
|
|||
private final ReadsDataSource dataSource; | |||
private final Iterable<GATKRead> iterable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general this sort of thing where we have two paths to do everything within a class is kind of gross. It makes me think we should either have two classes, or find a way of representing the internals in a way that is agnostic to what it was constructed from.
We could do that by either storing a query lambda or by creating an anonymous adaptor class from Shard -> GatkDataSource<Read>
. Both of those may be more complicated than your solution though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed this by creating an anonymous adaptor class.
private List<ShardBoundary> intervalShards; | ||
|
||
@Override | ||
protected List<SimpleInterval> editIntervals(List<SimpleInterval> rawIntervals) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment to this method calling out the fact that it sets intervalShards as a side effect? People might gloss over that since editIntervals isn't usually expected to have any side effects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* | ||
* @return all assembly regions as a {@link JavaRDD}, bounded by intervals if specified. | ||
*/ | ||
public JavaRDD<Tuple3<AssemblyRegion, ReferenceContext, FeatureContext>> getAssemblyRegions(JavaSparkContext ctx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tuple3 is a pretty awkward class to work with. I think introducing a container object would be a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* @return The evaluator to be used to determine whether each locus is active or not. Must be implemented by tool authors. | ||
* The results of this per-locus evaluator are used to determine the bounds of each active and inactive region. | ||
*/ | ||
public abstract AssemblyRegionEvaluator assemblyRegionEvaluator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might want to mention that it will be called once per shard, which may be expensive if this is an expensive operation. I found that practically, I had to either reuse the assembly region evaluator because initializing a HaplotypeCallerEngine is expensive. I ended up making the downstream call that used the assemblyRegionEvaluator be a mapPartitions in order to reduce the number of instantiations of the engine. Alternatively it could be serialized, but I ran into issues serializing the haplotypecallerengine since it does it's own file access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it stands it's being serialized for each task. I suggest leaving it, and addressing any problems when using this for the HaplotypeCaller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds fine. We'll measure it when we get to it.
* | ||
* @return all reads as a {@link JavaRDD}, bounded by intervals if specified. | ||
*/ | ||
public JavaRDD<Tuple3<GATKRead, ReferenceContext, FeatureContext>> getReads(JavaSparkContext ctx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, I think the wrapper type might be convinient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public JavaRDD<Tuple4<SimpleInterval, ReadsContext, ReferenceContext, FeatureContext>> getIntervals(JavaSparkContext ctx) { | ||
SAMSequenceDictionary sequenceDictionary = getBestAvailableSequenceDictionary(); | ||
// don't shard the intervals themselves, since we want each interval to be processed by a single task | ||
final List<ShardBoundary> intervalShardBoundaries = getIntervals().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have some warning if there are large intervals specified? It works fine in the walker since everything is computed lazily, but here we're going to get OOM I think since we're aggressively loading things into shards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe. What would you consider to be large enough to need a warning?
new ReferenceMemorySource(bReferenceSource.getValue().getReferenceBases(null, paddedInterval), sequenceDictionary); | ||
FeatureManager features = bFeatureManager == null ? null : bFeatureManager.getValue(); | ||
|
||
Iterator<Tuple3<GATKRead, ReferenceContext, FeatureContext>> transform = Iterators.transform(shard.iterator(), new Function<GATKRead, Tuple3<GATKRead, ReferenceContext, FeatureContext>>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we replace with the more idiomatic stream instead or is there something that Iterators
does better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
header, referenceContext, featureContext, evaluator, | ||
minAssemblyRegionSize, maxAssemblyRegionSize, assemblyRegionPadding, activeProbThreshold, | ||
maxProbPropagationDistance); | ||
return Iterators.transform(assemblyRegions.iterator(), new Function<AssemblyRegion, Tuple3<AssemblyRegion, ReferenceContext, FeatureContext>>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we replace with stream or is iterators doing something more cleanly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
/** | ||
* A Spark version of {@link ReadWalker}. | ||
*/ | ||
public abstract class ReadWalkerSpark extends GATKSparkTool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add a simple ExampleReadWalker?
Thanks for the review @lbergelson. I've addressed most of the feedback, but still have a few more to do. |
…WalkerSpark, IntervalWalkerSpark, VariantWalkerSpark) and examples.
1c03dd6
to
bdb69f8
Compare
@lbergelson I've addressed your feedback now, so back to you. |
👍 |
This adds ReadWalkerSpark, AssemblyRegionWalkerSpark, IntervalWalkerSpark, VariantWalkerSpark, and examples.