Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class StreamWriter implements AutoCloseable {
private final String streamName;
private final String tableName;

private final String traceId;

private final BatchingSettings batchingSettings;
private final RetrySettings retrySettings;
private BigQueryWriteSettings stubSettings;
Expand Down Expand Up @@ -151,6 +153,7 @@ private StreamWriter(Builder builder)
tableName = matcher.group(1);
}

this.traceId = builder.traceId;
this.batchingSettings = builder.batchingSettings;
this.retrySettings = builder.retrySettings;
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this);
Expand Down Expand Up @@ -477,6 +480,11 @@ private AppendRowsRequest getMergedRequest() throws IllegalStateException {
"The first message on the connection must have writer schema set");
}
requestBuilder.setWriteStream(streamName);
if (!inflightRequests.get(0).message.getTraceId().isEmpty()) {
requestBuilder.setTraceId(inflightRequests.get(0).message.getTraceId());
} else if (streamWriter.traceId != null) {
requestBuilder.setTraceId(streamWriter.traceId);
}
}
return requestBuilder.setProtoRows(data.build()).build();
}
Expand Down Expand Up @@ -660,6 +668,8 @@ public static final class Builder {
private String streamOrTableName;
private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();

private String traceId;

private BigQueryWriteClient client = null;

// Batching options
Expand Down Expand Up @@ -814,6 +824,12 @@ public Builder createDefaultStream() {
return this;
}

/** Mark the request as coming from Dataflow. */
public Builder setDataflowTraceId() {
this.traceId = "Dataflow";
return this;
}

/** Builds the {@code StreamWriter}. */
public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException {
return new StreamWriter(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,4 +993,26 @@ public void testFlushAllFailed() throws Exception {

writer.close();
}

@Test
public void testDatasetTraceId() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.build())
.setDataflowTraceId()
.build();
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
appendFuture1.get();
appendFuture2.get();
assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId());
assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId());
}
}