Skip to content

feat: add support for tagging to Connection API #623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add support for tagging in Connection API
  • Loading branch information
olavloite committed Apr 13, 2021
commit a87dfdf0a66936504c50a8fd239891b8c544ab3a
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
abstract class AbstractBaseUnitOfWork implements UnitOfWork {
private final StatementExecutor statementExecutor;
private final StatementTimeout statementTimeout;
protected final String transactionTag;

/** Class for keeping track of the stacktrace of the caller of an async statement. */
static final class SpannerAsyncExecutionException extends RuntimeException {
Expand Down Expand Up @@ -83,6 +84,7 @@ enum InterceptorsUsage {
abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUnitOfWork> {
private StatementExecutor statementExecutor;
private StatementTimeout statementTimeout = new StatementTimeout();
private String transactionTag;

Builder() {}

Expand All @@ -103,13 +105,19 @@ B setStatementTimeout(StatementTimeout timeout) {
return self();
}

B setTransactionTag(@Nullable String tag) {
this.transactionTag = tag;
return self();
}

abstract T build();
}

AbstractBaseUnitOfWork(Builder<?, ?> builder) {
Preconditions.checkState(builder.statementExecutor != null, "No statement executor specified");
this.statementExecutor = builder.statementExecutor;
this.statementTimeout = builder.statementTimeout;
this.transactionTag = builder.transactionTag;
}

StatementExecutor getStatementExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.ReadContext;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -88,7 +89,7 @@ ResultSet internalExecuteQuery(
}

@Override
public ApiFuture<long[]> runBatchAsync() {
public ApiFuture<long[]> runBatchAsync(UpdateOption... options) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Run batch is not supported for transactions");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
* Internal connection API for Google Cloud Spanner. This interface may introduce breaking changes
Expand Down Expand Up @@ -325,6 +326,39 @@ public interface Connection extends AutoCloseable {
*/
TransactionMode getTransactionMode();

/**
* Sets the transaction tag to use for the current transaction. This method may only be called
* when in a transaction, and before the transaction is actually started, i.e. before any
* statements have been executed in the transaction.
*
* <p>The tag will be set as the transaction tag of all statements during the transaction, and as
* the transaction tag of the commit.
*
* <p>The transaction tag will automatically be cleared after the transaction has ended.
*
* @param tag The tag to use.
*/
void setTransactionTag(String tag);

/** @return The transaction tag of the current transaction. */
String getTransactionTag();

/**
* Sets the statement tag to use for the next statement that will be executed. The tag is
* automatically cleared after the statement is executed. Statement tags can be used both with
* autocommit=true and autocommit=false, and can be used for partitioned DML.
*
* @param tag The statement tag to use with the next statement that will be executed on this
* connection.
*/
void setStatementTag(String tag);

/**
* @return The statement tag that will be used with the next statement that is executed on this
* connection.
*/
String getStatementTag();

/**
* @return <code>true</code> if this connection will automatically retry read/write transactions
* that abort. This method may only be called when the connection is in read/write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.ResultSets;
Expand All @@ -46,6 +48,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -208,6 +211,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private TimestampBound readOnlyStaleness = TimestampBound.strong();
private QueryOptions queryOptions = QueryOptions.getDefaultInstance();

private String transactionTag;
private String statementTag;

/** Create a connection and register it in the SpannerPool. */
ConnectionImpl(ConnectionOptions options) {
Preconditions.checkNotNull(options);
Expand Down Expand Up @@ -515,6 +521,42 @@ public void setTransactionMode(TransactionMode transactionMode) {
this.unitOfWorkType = UnitOfWorkType.of(transactionMode);
}

@Override
public String getTransactionTag() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(!isDdlBatchActive(), "This connection is in a DDL batch");
return transactionTag;
}

@Override
public void setTransactionTag(String tag) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "Cannot set transaction tag while in a batch");
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction");
ConnectionPreconditions.checkState(
!isTransactionStarted(),
"The transaction tag cannot be set after the transaction has started");

this.transactionBeginMarked = true;
this.transactionTag = tag;
}

@Override
public String getStatementTag() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(!isDdlBatchActive(), "This connection is in a DDL batch");
return statementTag;
}

@Override
public void setStatementTag(String tag) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, could we prevent the calling of this for a COMMIT? The user should not be able to do a statement tag in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. We can't prevent it before the COMMIT, but we can throw an exception if COMMIT is called after a statement tag has been set, and I think that makes sense as we are quite strict in checking the order of other statements (e.g. you are only allowed to get a commit timestamp if you actually committed etc.).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the implementation to check for this, and to throw an error if the application tries to set a statement tag for a COMMIT or ROLLBACK statement. This change also adds a change relating to DML batches: DML batches can only include one statement tag, as the statements are sent as one ExecuteBatchDmlRequest, which only allows one statement tag. This statement tag must be set before calling START BATCH DML, e.g.

SET STATEMENT_TAG = 'tag-1';
START BATCH DML;
INSERT INTO Singers (SingerId, Name) VALUES (1, 'Morrison');
INSERT INTO Singers (SingerId, Name) VALUES (2, 'Pieterson');
RUN BATCH;

ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(!isDdlBatchActive(), "This connection is in a DDL batch");

this.statementTag = tag;
}

/**
* Throws an {@link SpannerException} with code {@link ErrorCode#FAILED_PRECONDITION} if the
* current state of this connection does not allow changing the setting for retryAbortsInternally.
Expand Down Expand Up @@ -646,6 +688,7 @@ private void setDefaultTransactionOptions() {
? UnitOfWorkType.READ_ONLY_TRANSACTION
: UnitOfWorkType.READ_WRITE_TRANSACTION;
batchMode = BatchMode.NONE;
transactionTag = null;
} else {
popUnitOfWorkFromTransactionStack();
}
Expand Down Expand Up @@ -957,14 +1000,43 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<Statement> updates) {
return internalExecuteBatchUpdateAsync(parsedStatements);
}

private QueryOption[] mergeQueryStatementTag(QueryOption... options) {
if (this.statementTag != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new QueryOption[] {Options.tag(statementTag)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.tag(statementTag);
}
this.statementTag = null;
}
return options;
}

private UpdateOption[] mergeUpdateStatementTag(UpdateOption... options) {
if (this.statementTag != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new UpdateOption[] {Options.tag(statementTag)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.tag(statementTag);
}
this.statementTag = null;
}
return options;
}

private ResultSet internalExecuteQuery(
final ParsedStatement statement,
final AnalyzeMode analyzeMode,
final QueryOption... options) {
Preconditions.checkArgument(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return get(transaction.executeQueryAsync(statement, analyzeMode, options));
return get(
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)));
}

private AsyncResultSet internalExecuteQueryAsync(
Expand All @@ -975,21 +1047,23 @@ private AsyncResultSet internalExecuteQueryAsync(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return ResultSets.toAsyncResultSet(
transaction.executeQueryAsync(statement, analyzeMode, options),
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)),
spanner.getAsyncExecutorProvider(),
options);
}

private ApiFuture<Long> internalExecuteUpdateAsync(final ParsedStatement update) {
private ApiFuture<Long> internalExecuteUpdateAsync(
final ParsedStatement update, UpdateOption... options) {
Preconditions.checkArgument(
update.getType() == StatementType.UPDATE, "Statement must be an update");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return transaction.executeUpdateAsync(update);
return transaction.executeUpdateAsync(update, mergeUpdateStatementTag(options));
}

private ApiFuture<long[]> internalExecuteBatchUpdateAsync(List<ParsedStatement> updates) {
private ApiFuture<long[]> internalExecuteBatchUpdateAsync(
List<ParsedStatement> updates, UpdateOption... options) {
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return transaction.executeBatchUpdateAsync(updates);
return transaction.executeBatchUpdateAsync(updates, mergeUpdateStatementTag(options));
}

/**
Expand All @@ -1004,7 +1078,8 @@ UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() {
return this.currentUnitOfWork;
}

private UnitOfWork createNewUnitOfWork() {
@VisibleForTesting
UnitOfWork createNewUnitOfWork() {
if (isAutocommit() && !isInTransaction() && !isInBatch()) {
return SingleUseTransaction.newBuilder()
.setDdlClient(ddlClient)
Expand All @@ -1024,6 +1099,7 @@ private UnitOfWork createNewUnitOfWork() {
.setReadOnlyStaleness(readOnlyStaleness)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this behave if the user never set a transaction tag?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is nullable, so that is not a problem. It is verified by this test case.

.build();
case READ_WRITE_TRANSACTION:
return ReadWriteTransaction.newBuilder()
Expand All @@ -1033,6 +1109,7 @@ private UnitOfWork createNewUnitOfWork() {
.setTransactionRetryListeners(transactionRetryListeners)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
.build();
case DML_BATCH:
// A DML batch can run inside the current transaction. It should therefore only
Expand Down Expand Up @@ -1159,7 +1236,7 @@ public ApiFuture<long[]> runBatchAsync() {
ConnectionPreconditions.checkState(isBatchActive(), "This connection has no active batch");
try {
if (this.currentUnitOfWork != null) {
return this.currentUnitOfWork.runBatchAsync();
return this.currentUnitOfWork.runBatchAsync(mergeUpdateStatementTag());
}
return ApiFutures.immediateFuture(new long[0]);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ interface ConnectionStatementExecutor {
StatementResult statementSetReturnCommitStats(Boolean returnCommitStats);

StatementResult statementShowReturnCommitStats();

StatementResult statementSetStatementTag(String tag);

StatementResult statementShowStatementTag();

StatementResult statementSetTransactionTag(String tag);

StatementResult statementShowTransactionTag();

StatementResult statementBeginTransaction();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_READ_ONLY_STALENESS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_RETRY_ABORTS_INTERNALLY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_RETURN_COMMIT_STATS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_STATEMENT_TAG;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_STATEMENT_TIMEOUT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_TRANSACTION_MODE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_TRANSACTION_TAG;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_AUTOCOMMIT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_AUTOCOMMIT_DML_MODE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_COMMIT_RESPONSE;
Expand All @@ -40,7 +42,9 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_READ_TIMESTAMP;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_RETRY_ABORTS_INTERNALLY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_RETURN_COMMIT_STATS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_STATEMENT_TAG;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_STATEMENT_TIMEOUT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_TRANSACTION_TAG;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.START_BATCH_DDL;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.START_BATCH_DML;
import static com.google.cloud.spanner.connection.StatementResultImpl.noResult;
Expand All @@ -55,6 +59,7 @@
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Type.StructField;
import com.google.cloud.spanner.connection.ReadOnlyStalenessUtil.DurationValueGetter;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.protobuf.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -241,6 +246,34 @@ public StatementResult statementShowReturnCommitStats() {
return resultSet(
"RETURN_COMMIT_STATS", getConnection().isReturnCommitStats(), SHOW_RETURN_COMMIT_STATS);
}

@Override
public StatementResult statementSetStatementTag(String tag) {
getConnection().setStatementTag("".equals(tag) ? null : tag);
return noResult(SET_STATEMENT_TAG);
}

@Override
public StatementResult statementShowStatementTag() {
return resultSet(
"STATEMENT_TAG",
MoreObjects.firstNonNull(getConnection().getStatementTag(), ""),
SHOW_STATEMENT_TAG);
}

@Override
public StatementResult statementSetTransactionTag(String tag) {
getConnection().setTransactionTag("".equals(tag) ? null : tag);
return noResult(SET_TRANSACTION_TAG);
}

@Override
public StatementResult statementShowTransactionTag() {
return resultSet(
"TRANSACTION_TAG",
MoreObjects.firstNonNull(getConnection().getTransactionTag(), ""),
SHOW_TRANSACTION_TAG);
}

@Override
public StatementResult statementBeginTransaction() {
Expand Down
Loading