Skip to content

Commit 4fcd46c

Browse files
add a bulk delete example (#266)
1 parent 5cad7b8 commit 4fcd46c

File tree

7 files changed

+231
-1
lines changed

7 files changed

+231
-1
lines changed

java/dataflow-bulk-delete/README.md

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Bulk Delete
2+
3+
## An example of how to delete a list of row prefixes.
4+
5+
This demonstrates how to delete a lot of data to workaround the rate limiting of DropRowRanges.
6+
The general idea is to:
7+
* read a set of key prefixes
8+
* use a Bigtable connection directly to fetch the keys that start with each of the prefixes
9+
* create a delete mutation for each key
10+
* use CloudBigtableIO to apply the deletes using internal batching

java/dataflow-bulk-delete/pom.xml

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>bigtable-samples</artifactId>
7+
<groupId>com.google.cloud</groupId>
8+
<version>1.0.0</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>dataflow-bulk-delete</artifactId>
13+
14+
<properties>
15+
<!-- dependency versions -->
16+
<beam.version>2.1.0</beam.version>
17+
<bigtable.version>1.0.0-pre4</bigtable.version>
18+
<slf4j.version>1.7.21</slf4j.version>
19+
<bigtable.hbase.version>${bigtable.version}</bigtable.hbase.version>
20+
21+
<!-- options -->
22+
<maven.compiler.target>1.8</maven.compiler.target>
23+
<maven.compiler.source>1.8</maven.compiler.source>
24+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
25+
</properties>
26+
27+
28+
<dependencies>
29+
<dependency>
30+
<groupId>org.apache.beam</groupId>
31+
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
32+
<version>${beam.version}</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.beam</groupId>
36+
<artifactId>beam-runners-direct-java</artifactId>
37+
<version>${beam.version}</version>
38+
</dependency>
39+
<dependency>
40+
<groupId>com.google.cloud.bigtable</groupId>
41+
<artifactId>bigtable-hbase-beam</artifactId>
42+
<version>${bigtable.version}</version>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.slf4j</groupId>
47+
<artifactId>slf4j-api</artifactId>
48+
<version>${slf4j.version}</version>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.slf4j</groupId>
52+
<artifactId>slf4j-simple</artifactId>
53+
<version>${slf4j.version}</version>
54+
</dependency>
55+
</dependencies>
56+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.google.cloud.bigtable.example.bulk_delete; /**
2+
* Copyright 2017 Google Inc. All Rights Reserved.
3+
* <p/>
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
* <p/>
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
* <p/>
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
22+
import org.apache.beam.sdk.options.Description;
23+
import org.apache.beam.sdk.options.Validation.Required;
24+
import org.apache.beam.sdk.options.ValueProvider;
25+
26+
interface JobOptions extends DataflowPipelineOptions {
27+
@Required
28+
@Description("The Google Cloud Bigtable instance ID .")
29+
String getBigtableInstanceId();
30+
void setBigtableInstanceId(String instanceId);
31+
32+
@Required
33+
@Description("The Cloud Bigtable table ID in the instance." )
34+
String getBigtableTableId();
35+
void setBigtableTableId(String bigtableTableId);
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package com.google.cloud.bigtable.example.bulk_delete; /**
2+
* Copyright 2017 Google Inc. All Rights Reserved.
3+
* <p/>
4+
* Licensed to the Apache Software Foundation (ASF) under one
5+
* or more contributor license agreements. See the NOTICE file
6+
* distributed with this work for additional information
7+
* regarding copyright ownership. The ASF licenses this file
8+
* to you under the Apache License, Version 2.0 (the
9+
* "License"); you may not use this file except in compliance
10+
* with the License. You may obtain a copy of the License at
11+
* <p/>
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
* <p/>
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
import com.google.cloud.bigtable.beam.AbstractCloudBigtableTableDoFn;
22+
import com.google.cloud.bigtable.beam.CloudBigtableConfiguration;
23+
import com.google.cloud.bigtable.beam.CloudBigtableIO;
24+
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
25+
import com.google.common.collect.Lists;
26+
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.Collections;
29+
import org.apache.beam.sdk.Pipeline;
30+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
31+
import org.apache.beam.sdk.transforms.Create;
32+
import org.apache.beam.sdk.transforms.DoFn;
33+
import org.apache.beam.sdk.transforms.ParDo;
34+
import org.apache.hadoop.hbase.TableName;
35+
import org.apache.hadoop.hbase.client.Delete;
36+
import org.apache.hadoop.hbase.client.Mutation;
37+
import org.apache.hadoop.hbase.client.Result;
38+
import org.apache.hadoop.hbase.client.Scan;
39+
import org.apache.hadoop.hbase.client.Table;
40+
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
41+
42+
public class Main {
43+
public static void main(String[] args) {
44+
JobOptions jobOptions = PipelineOptionsFactory.fromArgs(args)
45+
.withValidation()
46+
.as(JobOptions.class);
47+
48+
CloudBigtableTableConfiguration bigtableConfig = new CloudBigtableTableConfiguration.Builder()
49+
.withProjectId(jobOptions.getProject())
50+
.withInstanceId(jobOptions.getBigtableInstanceId())
51+
.withTableId(jobOptions.getBigtableTableId())
52+
.build();
53+
54+
ArrayList<String> prefixes = Lists.newArrayList("prefix1", "prefix2", "prefix3");
55+
56+
// randomize the prefixes to avoid hotspoting a region.
57+
Collections.shuffle(prefixes);
58+
59+
Pipeline pipeline = Pipeline.create(jobOptions);
60+
61+
pipeline.apply(Create.of(prefixes))
62+
.apply("Scan prefix", ParDo.of(new ScanPrefixDoFn(bigtableConfig, bigtableConfig.getTableId())))
63+
.apply("Create mutations", ParDo.of(new DeleteKeyDoFn()))
64+
.apply("Delete keys", CloudBigtableIO.writeToTable(bigtableConfig));
65+
66+
pipeline.run().waitUntilFinish();
67+
}
68+
69+
/**
70+
* Query Bigtable for all of the keys that start with the given prefix.
71+
*/
72+
static class ScanPrefixDoFn extends AbstractCloudBigtableTableDoFn<String, byte[]> {
73+
private final String tableId;
74+
75+
public ScanPrefixDoFn(CloudBigtableConfiguration config, String tableId) {
76+
super(config);
77+
this.tableId = tableId;
78+
}
79+
80+
@ProcessElement
81+
public void processElement(ProcessContext c) throws IOException {
82+
Scan scan = new Scan()
83+
.setRowPrefixFilter(c.element().getBytes())
84+
.setFilter(new KeyOnlyFilter());
85+
86+
Table table = getConnection().getTable(TableName.valueOf(tableId));
87+
88+
for (Result result : table.getScanner(scan)) {
89+
c.output(result.getRow());
90+
}
91+
}
92+
}
93+
94+
/**
95+
* Converts a row key into a delete mutation to be written to Bigtable.
96+
*/
97+
static class DeleteKeyDoFn extends DoFn<byte[], Mutation> {
98+
@ProcessElement
99+
public void processElement(ProcessContext c) {
100+
c.output(new Delete(c.element()));
101+
}
102+
}
103+
}
104+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2015 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Root logger option
16+
log4j.rootLogger=INFO, stderr
17+
18+
# Direct log messages to stdout
19+
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
20+
log4j.appender.stderr.Target=System.err
21+
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
22+
log4j.appender.stderr.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
23+

java/dataflow-coinbase/frontend/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ This is based on the simpler [App Engine Flexible example](https://github.com/Go
66

77
# Prerequisites
88

9-
1. Follow all the steps to start the Dataflow/Bigtable backend pipeline
9+
1. Follow all the com.google.cloud.bigtable.example.bulk_delete.steps to start the Dataflow/Bigtable backend pipeline
1010
1. Install [Bower](http://bower.io/)
1111
1. `bower install`
1212

java/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,6 @@ limitations under the License.
3939
<module>metric-scaler</module>
4040
<module>simple-cli</module>
4141
<module>simple-performance-test</module>
42+
<module>dataflow-bulk-delete</module>
4243
</modules>
4344
</project>

0 commit comments

Comments
 (0)