Skip to content

Commit c163907

Browse files
authored
Cloud Spanner DML & PartitionedDML support (googleapis#3703)
* Support for Cloud Spanner DML & PDML * Fix for DML/PDML - manual error from sync * Codacybot & Stylecheck fixes. * Fix merge conflict * PR pull/3703 fixes
1 parent 626e4d1 commit c163907

File tree

10 files changed

+552
-64
lines changed

10 files changed

+552
-64
lines changed

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java

+48-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public interface DatabaseClient {
135135
ReadOnlyTransaction singleUseReadOnlyTransaction();
136136

137137
/**
138-
  * Returns a read-only transaction context in which a single read or query can be performed at the
138+
* Returns a read-only transaction context in which a single read or query can be performed at
139139
* given timestamp bound. This method differs from {@link #singleUse(TimestampBound)} in that the
140140
* read timestamp used may be inspected after the read has returned data or finished successfully.
141141
*
@@ -269,4 +269,51 @@ public interface DatabaseClient {
269269
*
270270
*/
271271
TransactionManager transactionManager();
272+
273+
/**
274+
* Returns the lower bound of rows modified by this DML statement.
275+
*
276+
* <p>The method will block until the update is complete. Running a DML statement with this method
277+
* does not offer exactly once semantics, and therfore the DML statement should be idempotent. The
278+
* DML statement must be fully-partitionable. Specifically, the statement must be expressible as
279+
* the union of many statements which each access only a single row of the table. This is a
280+
* Partitioned DML transaction in which a single Partitioned DML statement is executed.
281+
* Partitioned DML partitions the key space and runs the DML statement over each partition in
282+
* parallel using separate, internal transactions that commit independently. Partitioned DML
283+
* transactions do not need to be committed.
284+
*
285+
* <p>Partitioned DML updates are used to execute a single DML statement with a different
286+
* execution strategy that provides different, and often better, scalability properties for large,
287+
* table-wide operations than DML in a {@link #readWriteTransaction()} transaction. Smaller scoped
288+
* statements, such as an OLTP workload, should prefer using {@link
289+
* TransactionContext#executeUpdate(Statement)} with {@link #readWriteTransaction()}.
290+
*
291+
* <ul>
292+
* That said, Partitioned DML is not a drop-in replacement for standard DML used in {@link
293+
* #readWriteTransaction()}.
294+
* <li>The DML statement must be fully-partitionable. Specifically, the statement must be
295+
* expressible as the union of many statements which each access only a single row of the
296+
* table.
297+
* <li>The statement is not applied atomically to all rows of the table. Rather, the statement
298+
* is applied atomically to partitions of the table, in independent internal transactions.
299+
* Secondary index rows are updated atomically with the base table rows.
300+
* <li>Partitioned DML does not guarantee exactly-once execution semantics against a partition.
301+
* The statement will be applied at least once to each partition. It is strongly recommended
302+
* that the DML statement should be idempotent to avoid unexpected results. For instance, it
303+
* is potentially dangerous to run a statement such as `UPDATE table SET column = column +
304+
* 1` as it could be run multiple times against some rows.
305+
* <li>The partitions are committed automatically - there is no support for Commit or Rollback.
306+
* If the call returns an error, or if the client issuing the DML statement dies, it is
307+
* possible that some rows had the statement executed on them successfully. It is also
308+
* possible that statement was never executed against other rows.
309+
* <li>If any error is encountered during the execution of the partitioned DML operation (for
310+
* instance, a UNIQUE INDEX violation, division by zero, or a value that cannot be stored
311+
* due to schema constraints), then the operation is stopped at that point and an error is
312+
* returned. It is possible that at this point, some partitions have been committed (or even
313+
* committed multiple times), and other partitions have not been run at all.
314+
*
315+
* <p>Given the above, Partitioned DML is good fit for large, database-wide, operations that are
316+
* idempotent, such as deleting old rows from a very large table.
317+
*/
318+
long executePartitionedUpdate(Statement stmt);
272319
}

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@
2828
class DatabaseClientImpl implements DatabaseClient {
2929
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
3030
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
31+
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
3132
private static final Tracer tracer = Tracing.getTracer();
3233

3334
static {
34-
TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION);
35+
TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION, PARTITION_DML_TRANSACTION);
3536
}
3637

3738
private final SessionPool pool;
@@ -155,6 +156,17 @@ public TransactionManager transactionManager() {
155156
}
156157
}
157158

159+
@Override
160+
public long executePartitionedUpdate(Statement stmt) {
161+
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
162+
try (Scope s = tracer.withSpan(span)) {
163+
return pool.getReadWriteSession().executePartitionedUpdate(stmt);
164+
} catch (RuntimeException e) {
165+
TraceUtil.endSpanWithFailure(span, e);
166+
throw e;
167+
}
168+
}
169+
158170
ListenableFuture<Void> closeAsync() {
159171
return pool.closeAsync();
160172
}

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.spanner;
1818

1919
import com.google.spanner.v1.ResultSetStats;
20+
import javax.annotation.Nullable;
2021

2122
/**
2223
* Provides access to the data returned by a Cloud Spanner read or query. {@code ResultSet} allows a
@@ -59,13 +60,17 @@ public interface ResultSet extends AutoCloseable, StructReader {
5960
@Override
6061
void close();
6162

63+
6264
/**
6365
* Returns the {@link ResultSetStats} for the query only if the query was executed in either the
6466
* {@code PLAN} or the {@code PROFILE} mode via the {@link ReadContext#analyzeQuery(Statement,
65-
* com.google.cloud.spanner.ReadContext.QueryAnalyzeMode)} method. Attempts to call this method on
66-
* a {@code ResultSet} not obtained from {@code analyzeQuery} result in an {@code
67-
* UnsupportedOperationException}. This method must be called after {@link #next()} has
68-
* returned @{code false}. Calling it before that will result in an {@code IllegalStateException}.
67+
* com.google.cloud.spanner.ReadContext.QueryAnalyzeMode)} method or for DML statements in
68+
* {@link ReadContext#executeQuery(Statement, QueryOption)}. Attempts to call this method on
69+
* a {@code ResultSet} not obtained from {@code analyzeQuery} or {@code executeQuery} will return
70+
* a {@code null} {@code ResultSetStats}. This method must be called after {@link #next()} has
71+
* returned @{code false}. Calling it before that will result in {@code null}
72+
* {@code ResultSetStats} too.
6973
*/
74+
@Nullable
7075
ResultSetStats getStats();
7176
}

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

+12
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,18 @@ public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
312312
}
313313
}
314314

315+
@Override
316+
public long executePartitionedUpdate(Statement stmt) throws SpannerException {
317+
try {
318+
markUsed();
319+
return delegate.executePartitionedUpdate(stmt);
320+
} catch (SpannerException e) {
321+
throw lastException = e;
322+
} finally {
323+
close();
324+
}
325+
}
326+
315327
@Override
316328
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
317329
try {

0 commit comments

Comments
 (0)