Skip to content

Commit d08d3de

Browse files
authored
feat: inline begin transaction (#325)
* feat: inline begin tx with first statement * feat: support inlining BeginTransaction * fix: invalid dml statement can still return tx id * bench: add benchmarks for inline begin * feat: add inline begin for async runner * test: add additional tests and ITs * test: add tests for error during tx * test: use statement with same error code on emulator * test: skip test on emulator * test: constraint error causes transaction to be invalidated * fix: retry transaction if first statements fails and had BeginTransaction option * fix: handle aborted exceptions * test: add additional tests for corner cases * feat: use single-use tx for idem-potent mutations * fix: remove check for idempotent mutations * chore: remove commented code * feat!: remove session pool preparing (#515) * feat: remove session pool preparing * fix: fix integration tests * test: fix malformed retry loop in test case * fix: review comments * chore: run formatter * test: fix integration test that relied on data from other test case
1 parent 659719d commit d08d3de

37 files changed

+3105
-1953
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
633633
return stream;
634634
}
635635
};
636-
return new GrpcResultSet(stream, this);
636+
return new GrpcResultSet(
637+
stream, this, request.hasTransaction() && request.getTransaction().hasBegin());
637638
}
638639

639640
/**
@@ -672,14 +673,20 @@ public void close() {
672673
}
673674
}
674675

676+
/**
677+
* Returns the {@link TransactionSelector} that should be used for a statement that is executed on
678+
* this read context. This could be a reference to an existing transaction ID, or it could be a
679+
* BeginTransaction option that should be included with the statement.
680+
*/
675681
@Nullable
676682
abstract TransactionSelector getTransactionSelector();
677683

684+
/** This method is called when a statement returned a new transaction as part of its results. */
678685
@Override
679686
public void onTransactionMetadata(Transaction transaction) {}
680687

681688
@Override
682-
public void onError(SpannerException e) {}
689+
public void onError(SpannerException e, boolean withBeginTransaction) {}
683690

684691
@Override
685692
public void onDone() {}
@@ -740,7 +747,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
740747
return stream;
741748
}
742749
};
743-
GrpcResultSet resultSet = new GrpcResultSet(stream, this);
750+
GrpcResultSet resultSet =
751+
new GrpcResultSet(stream, this, selector != null && selector.hasBegin());
744752
return resultSet;
745753
}
746754

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ interface Listener {
8181
void onTransactionMetadata(Transaction transaction) throws SpannerException;
8282

8383
/** Called when the read finishes with an error. */
84-
void onError(SpannerException e);
84+
void onError(SpannerException e, boolean withBeginTransaction);
8585

8686
/** Called when the read finishes normally. */
8787
void onDone();
@@ -91,14 +91,17 @@ interface Listener {
9191
static class GrpcResultSet extends AbstractResultSet<List<Object>> {
9292
private final GrpcValueIterator iterator;
9393
private final Listener listener;
94+
private final boolean beginTransaction;
9495
private GrpcStruct currRow;
9596
private SpannerException error;
9697
private ResultSetStats statistics;
9798
private boolean closed;
9899

99-
GrpcResultSet(CloseableIterator<PartialResultSet> iterator, Listener listener) {
100+
GrpcResultSet(
101+
CloseableIterator<PartialResultSet> iterator, Listener listener, boolean beginTransaction) {
100102
this.iterator = new GrpcValueIterator(iterator);
101103
this.listener = listener;
104+
this.beginTransaction = beginTransaction;
102105
}
103106

104107
@Override
@@ -127,7 +130,7 @@ public boolean next() throws SpannerException {
127130
}
128131
return hasNext;
129132
} catch (SpannerException e) {
130-
throw yieldError(e);
133+
throw yieldError(e, beginTransaction && currRow == null);
131134
}
132135
}
133136

@@ -149,9 +152,9 @@ public Type getType() {
149152
return currRow.getType();
150153
}
151154

152-
private SpannerException yieldError(SpannerException e) {
155+
private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
153156
close();
154-
listener.onError(e);
157+
listener.onError(e, beginTransaction);
155158
throw e;
156159
}
157160
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import com.google.api.core.ApiAsyncFunction;
1920
import com.google.api.core.ApiFuture;
2021
import com.google.api.core.ApiFutureCallback;
2122
import com.google.api.core.ApiFutures;
@@ -27,6 +28,7 @@
2728
import com.google.common.base.MoreObjects;
2829
import com.google.common.base.Preconditions;
2930
import com.google.common.util.concurrent.MoreExecutors;
31+
import com.google.protobuf.Empty;
3032
import io.opencensus.trace.Span;
3133
import io.opencensus.trace.Tracer;
3234
import io.opencensus.trace.Tracing;
@@ -76,14 +78,19 @@ public TransactionContextFutureImpl beginAsync() {
7678
return begin;
7779
}
7880

79-
private ApiFuture<TransactionContext> internalBeginAsync(boolean setActive) {
81+
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
8082
txnState = TransactionState.STARTED;
8183
txn = session.newTransaction();
82-
if (setActive) {
84+
if (firstAttempt) {
8385
session.setActive(this);
8486
}
8587
final SettableApiFuture<TransactionContext> res = SettableApiFuture.create();
86-
final ApiFuture<Void> fut = txn.ensureTxnAsync();
88+
final ApiFuture<Void> fut;
89+
if (firstAttempt) {
90+
fut = ApiFutures.immediateFuture(null);
91+
} else {
92+
fut = txn.ensureTxnAsync();
93+
}
8794
ApiFutures.addCallback(
8895
fut,
8996
new ApiFutureCallback<Void>() {
@@ -149,7 +156,15 @@ public ApiFuture<Void> rollbackAsync() {
149156
txnState == TransactionState.STARTED,
150157
"rollback can only be called if the transaction is in progress");
151158
try {
152-
return txn.rollbackAsync();
159+
return ApiFutures.transformAsync(
160+
txn.rollbackAsync(),
161+
new ApiAsyncFunction<Empty, Void>() {
162+
@Override
163+
public ApiFuture<Void> apply(Empty input) throws Exception {
164+
return ApiFutures.immediateFuture(null);
165+
}
166+
},
167+
MoreExecutors.directExecutor());
153168
} finally {
154169
txnState = TransactionState.ROLLED_BACK;
155170
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,6 @@ class DatabaseClientImpl implements DatabaseClient {
3434
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
3535
private static final Tracer tracer = Tracing.getTracer();
3636

37-
private enum SessionMode {
38-
READ,
39-
READ_WRITE
40-
}
41-
4237
@VisibleForTesting final String clientId;
4338
@VisibleForTesting final SessionPool pool;
4439

@@ -53,21 +48,15 @@ private enum SessionMode {
5348
}
5449

5550
@VisibleForTesting
56-
PooledSessionFuture getReadSession() {
57-
return pool.getReadSession();
58-
}
59-
60-
@VisibleForTesting
61-
PooledSessionFuture getReadWriteSession() {
62-
return pool.getReadWriteSession();
51+
PooledSessionFuture getSession() {
52+
return pool.getSession();
6353
}
6454

6555
@Override
6656
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
6757
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
6858
try (Scope s = tracer.withSpan(span)) {
6959
return runWithSessionRetry(
70-
SessionMode.READ_WRITE,
7160
new Function<Session, Timestamp>() {
7261
@Override
7362
public Timestamp apply(Session session) {
@@ -94,7 +83,6 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
9483
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
9584
try (Scope s = tracer.withSpan(span)) {
9685
return runWithSessionRetry(
97-
SessionMode.READ_WRITE,
9886
new Function<Session, Timestamp>() {
9987
@Override
10088
public Timestamp apply(Session session) {
@@ -120,7 +108,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
120108
public ReadContext singleUse() {
121109
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
122110
try (Scope s = tracer.withSpan(span)) {
123-
return getReadSession().singleUse();
111+
return getSession().singleUse();
124112
} catch (RuntimeException e) {
125113
TraceUtil.endSpanWithFailure(span, e);
126114
throw e;
@@ -131,7 +119,7 @@ public ReadContext singleUse() {
131119
public ReadContext singleUse(TimestampBound bound) {
132120
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
133121
try (Scope s = tracer.withSpan(span)) {
134-
return getReadSession().singleUse(bound);
122+
return getSession().singleUse(bound);
135123
} catch (RuntimeException e) {
136124
TraceUtil.endSpanWithFailure(span, e);
137125
throw e;
@@ -142,7 +130,7 @@ public ReadContext singleUse(TimestampBound bound) {
142130
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
143131
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
144132
try (Scope s = tracer.withSpan(span)) {
145-
return getReadSession().singleUseReadOnlyTransaction();
133+
return getSession().singleUseReadOnlyTransaction();
146134
} catch (RuntimeException e) {
147135
TraceUtil.endSpanWithFailure(span, e);
148136
throw e;
@@ -153,7 +141,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
153141
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
154142
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
155143
try (Scope s = tracer.withSpan(span)) {
156-
return getReadSession().singleUseReadOnlyTransaction(bound);
144+
return getSession().singleUseReadOnlyTransaction(bound);
157145
} catch (RuntimeException e) {
158146
TraceUtil.endSpanWithFailure(span, e);
159147
throw e;
@@ -164,7 +152,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
164152
public ReadOnlyTransaction readOnlyTransaction() {
165153
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
166154
try (Scope s = tracer.withSpan(span)) {
167-
return getReadSession().readOnlyTransaction();
155+
return getSession().readOnlyTransaction();
168156
} catch (RuntimeException e) {
169157
TraceUtil.endSpanWithFailure(span, e);
170158
throw e;
@@ -175,7 +163,7 @@ public ReadOnlyTransaction readOnlyTransaction() {
175163
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
176164
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
177165
try (Scope s = tracer.withSpan(span)) {
178-
return getReadSession().readOnlyTransaction(bound);
166+
return getSession().readOnlyTransaction(bound);
179167
} catch (RuntimeException e) {
180168
TraceUtil.endSpanWithFailure(span, e);
181169
throw e;
@@ -186,9 +174,9 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
186174
public TransactionRunner readWriteTransaction() {
187175
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
188176
try (Scope s = tracer.withSpan(span)) {
189-
return getReadWriteSession().readWriteTransaction();
177+
return getSession().readWriteTransaction();
190178
} catch (RuntimeException e) {
191-
TraceUtil.setWithFailure(span, e);
179+
TraceUtil.endSpanWithFailure(span, e);
192180
throw e;
193181
} finally {
194182
span.end(TraceUtil.END_SPAN_OPTIONS);
@@ -199,7 +187,7 @@ public TransactionRunner readWriteTransaction() {
199187
public TransactionManager transactionManager() {
200188
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
201189
try (Scope s = tracer.withSpan(span)) {
202-
return getReadWriteSession().transactionManager();
190+
return getSession().transactionManager();
203191
} catch (RuntimeException e) {
204192
TraceUtil.endSpanWithFailure(span, e);
205193
throw e;
@@ -210,7 +198,7 @@ public TransactionManager transactionManager() {
210198
public AsyncRunner runAsync() {
211199
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
212200
try (Scope s = tracer.withSpan(span)) {
213-
return getReadWriteSession().runAsync();
201+
return getSession().runAsync();
214202
} catch (RuntimeException e) {
215203
TraceUtil.endSpanWithFailure(span, e);
216204
throw e;
@@ -221,7 +209,7 @@ public AsyncRunner runAsync() {
221209
public AsyncTransactionManager transactionManagerAsync() {
222210
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
223211
try (Scope s = tracer.withSpan(span)) {
224-
return getReadWriteSession().transactionManagerAsync();
212+
return getSession().transactionManagerAsync();
225213
} catch (RuntimeException e) {
226214
TraceUtil.endSpanWithFailure(span, e);
227215
throw e;
@@ -232,10 +220,7 @@ public AsyncTransactionManager transactionManagerAsync() {
232220
public long executePartitionedUpdate(final Statement stmt) {
233221
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
234222
try (Scope s = tracer.withSpan(span)) {
235-
// A partitioned update transaction does not need a prepared write session, as the transaction
236-
// object will start a new transaction with specific options anyway.
237223
return runWithSessionRetry(
238-
SessionMode.READ,
239224
new Function<Session, Long>() {
240225
@Override
241226
public Long apply(Session session) {
@@ -248,17 +233,13 @@ public Long apply(Session session) {
248233
}
249234
}
250235

251-
private <T> T runWithSessionRetry(SessionMode mode, Function<Session, T> callable) {
252-
PooledSessionFuture session =
253-
mode == SessionMode.READ_WRITE ? getReadWriteSession() : getReadSession();
236+
private <T> T runWithSessionRetry(Function<Session, T> callable) {
237+
PooledSessionFuture session = getSession();
254238
while (true) {
255239
try {
256240
return callable.apply(session);
257241
} catch (SessionNotFoundException e) {
258-
session =
259-
mode == SessionMode.READ_WRITE
260-
? pool.replaceReadWriteSession(e, session)
261-
: pool.replaceReadSession(e, session);
242+
session = pool.replaceSession(e, session);
262243
}
263244
}
264245
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MetricRegistryConstants.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,22 @@ class MetricRegistryConstants {
3636
private static final LabelValue UNSET_LABEL = LabelValue.create(null);
3737

3838
static final LabelValue NUM_IN_USE_SESSIONS = LabelValue.create("num_in_use_sessions");
39+
40+
/**
41+
* The session pool no longer prepares a fraction of the sessions with a read/write transaction.
42+
* This metric will therefore always be zero and may be removed in the future.
43+
*/
44+
@Deprecated
3945
static final LabelValue NUM_SESSIONS_BEING_PREPARED =
4046
LabelValue.create("num_sessions_being_prepared");
47+
4148
static final LabelValue NUM_READ_SESSIONS = LabelValue.create("num_read_sessions");
49+
50+
/**
51+
* The session pool no longer prepares a fraction of the sessions with a read/write transaction.
52+
* This metric will therefore always be zero and may be removed in the future.
53+
*/
54+
@Deprecated
4255
static final LabelValue NUM_WRITE_SESSIONS = LabelValue.create("num_write_prepared_sessions");
4356

4457
static final ImmutableList<LabelKey> SPANNER_LABEL_KEYS =

0 commit comments

Comments
 (0)