Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,18 @@ long flush(

// Maximum number of times we retry before we fail the work item.
if (failedContext.failureCount > allowedRetry) {
throw new RuntimeException(
String errorMessage =
String.format(
"More than %d attempts to call AppendRows failed.", allowedRetry));
"More than %d attempts to call AppendRows failed. Last encountered error: %s",
allowedRetry, error != null ? error.toString() : "unknown");
if (statusCode.equals(Status.Code.PERMISSION_DENIED)
|| statusCode.equals(Status.Code.NOT_FOUND)) {
Comment thread
damccorm marked this conversation as resolved.
Outdated
errorMessage +=
". Please check if the destination table exists and if the service account has the "
+ "TABLES_UPDATE_DATA permission.";
}
Comment thread
damccorm marked this conversation as resolved.
Comment thread
damccorm marked this conversation as resolved.
LOG.error("{}", errorMessage, error);
throw new RuntimeException(errorMessage, error);
Comment thread
damccorm marked this conversation as resolved.
}

// The following errors are known to be persistent, so always fail the work item in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,27 @@ public void process(

if (numAppends > 0) {
initializeContexts.accept(contexts);
retryManager.run(true);
try {
retryManager.run(true);
} catch (Exception e) {
Throwable cause = e.getCause();
if (cause == null) {
cause = e;
}
Status.Code statusCode = Status.fromThrowable(cause).getCode();
String errorMessage =
String.format(
"More than %d attempts to call AppendRows failed. Last encountered error: %s",
maxRetries, cause.toString());
if (statusCode.equals(Status.Code.PERMISSION_DENIED)
|| statusCode.equals(Status.Code.NOT_FOUND)) {
Comment thread
damccorm marked this conversation as resolved.
Outdated
errorMessage +=
". Please check if the destination table exists and if the service account has the "
+ "TABLES_UPDATE_DATA permission.";
}
Comment thread
damccorm marked this conversation as resolved.
LOG.error("{}", errorMessage, cause);
throw new RuntimeException(errorMessage, cause);
}
Comment thread
damccorm marked this conversation as resolved.
Comment thread
damccorm marked this conversation as resolved.

appendSplitDistribution.update(numAppends);
if (autoUpdateSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ void commit() {

Function<TableRow, Boolean> shouldFailRow =
(Function<TableRow, Boolean> & Serializable) tr -> false;

private volatile Throwable appendRowsError = null;

public void setAppendRowsError(Throwable t) {
this.appendRowsError = t;
}

Map<String, List<String>> insertErrors = Maps.newHashMap();

// The counter for the number of insertions performed.
Expand Down Expand Up @@ -801,6 +808,9 @@ Exceptions.StorageException tryInitialize() throws Exception {
@Override
public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
throws Exception {
if (appendRowsError != null) {
return ApiFutures.immediateFailedFuture(appendRowsError);
}
// The BigQuery client returns stream-open errors when the first append is called, so we
// duplicate that here.
Exceptions.StorageException storageException = tryInitialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1659,6 +1659,48 @@ public void testStreamingStorageApiWriteWithErrorHandling() throws Exception {
storageWriteWithErrorHandling(false);
}

@Test
public void testStorageApiWriteFailureExhaustedRetries() throws Exception {
assumeTrue(useStorageApi);

// Set up fake dataset service to return PERMISSION_DENIED for appendRows
fakeDatasetService.setAppendRowsError(
new io.grpc.StatusRuntimeException(
io.grpc.Status.PERMISSION_DENIED.withDescription("Missing permissions")));

List<Integer> elements = Lists.newArrayList(1, 2, 3);

BigQueryIO.Write<Integer> write =
BigQueryIO.<Integer>write()
.to("project-id:dataset-id.table-id")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withFormatFunction(
(SerializableFunction<Integer, TableRow>)
input -> new TableRow().set("number", input))
.withSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("number").setType("INTEGER"))))
.withTestServices(fakeBqServices)
.withoutValidation();

if (useStreaming) {
write = write.withTriggeringFrequency(Duration.standardSeconds(30));
}

PCollection<Integer> input = p.apply(Create.of(elements).withCoder(BigEndianIntegerCoder.of()));
input.apply("WriteToBQ", write);

thrown.expect(RuntimeException.class);
thrown.expectMessage("More than");
thrown.expectMessage("attempts to call AppendRows failed");
Comment thread
damccorm marked this conversation as resolved.
thrown.expectMessage("PERMISSION_DENIED");
thrown.expectMessage("TABLES_UPDATE_DATA");
Comment thread
damccorm marked this conversation as resolved.
Outdated

p.run().waitUntilFinish();
}

@Test
public void testStreamingStorageApiWriteWithAutoShardingWithErrorHandling() throws Exception {
assumeTrue(useStreaming);
Expand Down
Loading