Skip to content

Commit e6f1c6b

Browse files
jean-philippe-martindroazen
authored andcommitted
Enable Requester Pays support (#5140)
* Enable Requester Pays support Google Cloud Storage has "requester pays" buckets. When reading from those buckets, the requester is billed for the bandwidth (normally, it's the bucket owner who is billed). This pull request enables this feature with GATK. By default it is turned off (so there are no unexpected charges). To turn it on, use the command line argument: "--gcs-project-for-requester-pays" Example: $ ./gatk PrintReads --input $INPUT --output /tmp/reads.bam fails with: com.google.cloud.storage.StorageException: Bucket is requester pays bucket but no user project provided. $ ./gatk PrintReads --input $INPUT --output /tmp/reads.bam --gcs-project-for-requester-pays $PROJECT works This PR also removes the argumentless version of setGlobalNIODefaultOptions() because it was confusing (it uses the default values, not those indicated by the user - usually we'd expect the user's values to be taken into account).
1 parent 01d5ea2 commit e6f1c6b

File tree

5 files changed

+56
-18
lines changed

5 files changed

+56
-18
lines changed

src/main/java/org/broadinstitute/hellbender/cmdline/CommandLineProgram.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.broadinstitute.hellbender.cmdline;
22

33
import com.google.common.annotations.VisibleForTesting;
4+
import com.google.common.base.Strings;
45
import com.intel.gkl.compression.IntelDeflaterFactory;
56
import com.intel.gkl.compression.IntelInflaterFactory;
67
import htsjdk.samtools.Defaults;
@@ -82,6 +83,9 @@ public abstract class CommandLineProgram implements CommandLinePluginProvider {
8283
@Argument(fullName = StandardArgumentDefinitions.NIO_MAX_REOPENS_LONG_NAME, shortName = StandardArgumentDefinitions.NIO_MAX_REOPENS_SHORT_NAME, doc = "If the GCS bucket channel errors out, how many times it will attempt to re-initiate the connection", optional = true)
8384
public int NIO_MAX_REOPENS = ConfigFactory.getInstance().getGATKConfig().gcsMaxRetries();
8485

86+
@Argument(fullName = StandardArgumentDefinitions.NIO_PROJECT_FOR_REQUESTER_PAYS_LONG_NAME, doc = "Project to bill when accessing \"requester pays\" buckets. If unset, these buckets cannot be accessed.", optional = true)
87+
public String NIO_PROJECT_FOR_REQUESTER_PAYS = ConfigFactory.getInstance().getGATKConfig().gcsProjectForRequesterPays();
88+
8589
// This option is here for documentation completeness.
8690
// This is actually parsed out in Main to initialize configuration files because
8791
// we need to have the configuration completely set up before we create our CommandLinePrograms.
@@ -176,7 +180,7 @@ public Object instanceMainPostParseArgs() {
176180
BlockGunzipper.setDefaultInflaterFactory(new IntelInflaterFactory());
177181
}
178182

179-
BucketUtils.setGlobalNIODefaultOptions(NIO_MAX_REOPENS);
183+
BucketUtils.setGlobalNIODefaultOptions(NIO_MAX_REOPENS, NIO_PROJECT_FOR_REQUESTER_PAYS);
180184

181185
if (!QUIET) {
182186
printStartupMessage(startDateTime);
@@ -435,7 +439,12 @@ protected void printSettings() {
435439
final boolean usingIntelInflater = (BlockGunzipper.getDefaultInflaterFactory() instanceof IntelInflaterFactory && ((IntelInflaterFactory)BlockGunzipper.getDefaultInflaterFactory()).usingIntelInflater());
436440
logger.info("Inflater: " + (usingIntelInflater ? "IntelInflater": "JdkInflater"));
437441

438-
logger.info("GCS max retries/reopens: " + BucketUtils.getCloudStorageConfiguration(NIO_MAX_REOPENS).maxChannelReopens());
442+
logger.info("GCS max retries/reopens: " + BucketUtils.getCloudStorageConfiguration(NIO_MAX_REOPENS, "").maxChannelReopens());
443+
if (Strings.isNullOrEmpty(NIO_PROJECT_FOR_REQUESTER_PAYS)) {
444+
logger.info("Requester pays: disabled");
445+
} else {
446+
logger.info("Requester pays: enabled. Billed to: " + NIO_PROJECT_FOR_REQUESTER_PAYS);
447+
}
439448
}
440449

441450
/**

src/main/java/org/broadinstitute/hellbender/cmdline/StandardArgumentDefinitions.java

+1
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,5 @@ private StandardArgumentDefinitions(){}
9292
public static final String USE_JDK_INFLATER_SHORT_NAME = "jdk-inflater";
9393
public static final String NIO_MAX_REOPENS_LONG_NAME = "gcs-max-retries";
9494
public static final String NIO_MAX_REOPENS_SHORT_NAME = "gcs-retries";
95+
public static final String NIO_PROJECT_FOR_REQUESTER_PAYS_LONG_NAME = "gcs-project-for-requester-pays";
9596
}

src/main/java/org/broadinstitute/hellbender/utils/config/GATKConfig.java

+3
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ public interface GATKConfig extends Mutable, Accessible {
157157
@DefaultValue("20")
158158
int gcsMaxRetries();
159159

160+
@DefaultValue("")
161+
String gcsProjectForRequesterPays();
162+
160163
@DefaultValue("true")
161164
boolean createOutputBamIndex();
162165
}

src/main/java/org/broadinstitute/hellbender/utils/gcs/BucketUtils.java

+27-15
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
import com.google.cloud.http.HttpTransportOptions;
44
import com.google.cloud.storage.StorageOptions;
55
import com.google.cloud.storage.contrib.nio.CloudStorageConfiguration;
6+
import com.google.cloud.storage.contrib.nio.CloudStorageConfiguration.Builder;
67
import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem;
78
import com.google.cloud.storage.contrib.nio.CloudStorageFileSystemProvider;
9+
import com.google.common.base.Strings;
810
import com.google.common.io.ByteStreams;
911
import htsjdk.samtools.util.IOUtil;
1012
import htsjdk.samtools.util.RuntimeIOException;
11-
import htsjdk.tribble.AbstractFeatureReader;
1213
import htsjdk.tribble.Tribble;
1314
import htsjdk.tribble.util.TabixUtils;
1415
import org.apache.hadoop.conf.Configuration;
@@ -19,7 +20,6 @@
1920
import org.apache.logging.log4j.Logger;
2021
import org.broadinstitute.hellbender.exceptions.UserException;
2122
import org.broadinstitute.hellbender.utils.Utils;
22-
import org.broadinstitute.hellbender.utils.config.ConfigFactory;
2323
import org.broadinstitute.hellbender.utils.io.IOUtils;
2424
import shaded.cloud_nio.com.google.api.gax.retrying.RetrySettings;
2525
import shaded.cloud_nio.com.google.auth.oauth2.GoogleCredentials;
@@ -343,20 +343,20 @@ public static String getBucket(String path) {
343343
*/
344344
public static String getPathWithoutBucket(String path) {
345345
final String[] split = path.split("/");
346-
final String BUCKET = split[2];
347346
return String.join("/", Arrays.copyOfRange(split, 3, split.length));
348-
349347
}
350348

351349
/**
352-
* Sets NIO_MAX_REOPENS and generous timeouts as the global default.
350+
* Sets max_reopens, requester_pays, and generous timeouts as the global default.
353351
* These will apply even to library code that creates its own paths to access with NIO.
352+
*
353+
* @param maxReopens If the GCS bucket channel errors out, how many times it will attempt to
354+
* re-initiate the connection.
355+
* @param requesterProject Project to bill when accessing "requester pays" buckets. If unset,
356+
* these buckets cannot be accessed.
354357
*/
355-
public static void setGlobalNIODefaultOptions() {
356-
setGlobalNIODefaultOptions(ConfigFactory.getInstance().getGATKConfig().gcsMaxRetries());
357-
}
358-
public static void setGlobalNIODefaultOptions(int maxReopens) {
359-
CloudStorageFileSystemProvider.setDefaultCloudStorageConfiguration(getCloudStorageConfiguration(maxReopens));
358+
public static void setGlobalNIODefaultOptions(int maxReopens, String requesterProject) {
359+
CloudStorageFileSystemProvider.setDefaultCloudStorageConfiguration(getCloudStorageConfiguration(maxReopens, requesterProject));
360360
CloudStorageFileSystemProvider.setStorageOptions(setGenerousTimeouts(StorageOptions.newBuilder()).build());
361361
}
362362

@@ -373,12 +373,24 @@ public static java.nio.file.Path getPathOnGcs(String gcsUrl) {
373373
return CloudStorageFileSystem.forBucket(BUCKET).getPath(pathWithoutBucket);
374374
}
375375

376-
/** The config we want to use. **/
377-
public static CloudStorageConfiguration getCloudStorageConfiguration(int maxReopens) {
378-
return CloudStorageConfiguration.builder()
376+
/**
377+
* The config we want to use.
378+
*
379+
* @param maxReopens If the GCS bucket channel errors out, how many times it will attempt to
380+
* re-initiate the connection.
381+
* @param requesterProject Project to bill when accessing "requester pays" buckets. If unset,
382+
* these buckets cannot be accessed.
383+
*
384+
**/
385+
public static CloudStorageConfiguration getCloudStorageConfiguration(int maxReopens, String requesterProject) {
386+
Builder builder = CloudStorageConfiguration.builder()
379387
// if the channel errors out, re-open up to this many times
380-
.maxChannelReopens(maxReopens)
381-
.build();
388+
.maxChannelReopens(maxReopens);
389+
if (!Strings.isNullOrEmpty(requesterProject)) {
390+
// enable requester pays and indicate who pays
391+
builder = builder.autoDetectRequesterPays(true).userProject(requesterProject);
392+
}
393+
return builder.build();
382394
}
383395

384396
private static StorageOptions.Builder setGenerousTimeouts(StorageOptions.Builder builder) {

src/test/java/org/broadinstitute/hellbender/utils/gcs/BucketUtilsTest.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package org.broadinstitute.hellbender.utils.gcs;
22

3+
import com.google.cloud.storage.contrib.nio.CloudStorageConfiguration;
34
import htsjdk.samtools.util.IOUtil;
45
import java.net.URI;
56
import java.nio.file.Path;
67
import java.nio.file.Paths;
78
import org.broadinstitute.hellbender.GATKBaseTest;
89
import org.broadinstitute.hellbender.testutils.MiniClusterUtils;
10+
import org.broadinstitute.hellbender.utils.config.ConfigFactory;
911
import org.testng.Assert;
1012
import org.testng.annotations.Test;
1113

@@ -17,7 +19,9 @@
1719
public final class BucketUtilsTest extends GATKBaseTest {
1820

1921
static {
20-
BucketUtils.setGlobalNIODefaultOptions();
22+
BucketUtils.setGlobalNIODefaultOptions(
23+
ConfigFactory.getInstance().getGATKConfig().gcsMaxRetries(),
24+
ConfigFactory.getInstance().getGATKConfig().gcsProjectForRequesterPays());
2125
}
2226

2327
@Test(groups={"bucket"})
@@ -36,6 +40,15 @@ public void testIsCloudStorageURL(){
3640
String x = "" + null + "://";
3741
}
3842

43+
@Test
44+
public void testGetCloudStorageConfiguration() {
45+
String mockProject = "yes";
46+
int mockReopens = 100;
47+
CloudStorageConfiguration config = BucketUtils.getCloudStorageConfiguration(mockReopens, mockProject);
48+
Assert.assertEquals(config.maxChannelReopens(), mockReopens);
49+
Assert.assertEquals(config.userProject(), mockProject);
50+
}
51+
3952
@Test
4053
public void testIsHadoopURL(){
4154
Assert.assertFalse(BucketUtils.isHadoopUrl("gs://abucket/bucket"));

0 commit comments

Comments
 (0)