diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 2dfc8b2f1c00..e639b5812d24 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -792,9 +792,17 @@ long flush( // Maximum number of times we retry before we fail the work item. if (failedContext.failureCount > allowedRetry) { - throw new RuntimeException( + String errorMessage = String.format( - "More than %d attempts to call AppendRows failed.", allowedRetry)); + "More than %d attempts to call AppendRows failed. Last encountered error: %s", + allowedRetry, error != null ? error.toString() : "unknown"); + if (statusCode == Status.Code.PERMISSION_DENIED + || statusCode == Status.Code.NOT_FOUND) { + errorMessage += + ". Please check if the destination table exists and if the service account has the " + + "bigquery.tables.updateData permission."; + } + throw new RuntimeException(errorMessage, error); } // The following errors are known to be persistent, so always fail the work item in diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index b644d7aa752c..188f393dbe8c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -1066,7 +1066,25 @@ public void process( if (numAppends > 0) { initializeContexts.accept(contexts); - retryManager.run(true); + try { + retryManager.run(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } catch (Exception e) { + Status.Code statusCode = Status.fromThrowable(e).getCode(); + String errorMessage = + String.format( + "More than %d attempts to call AppendRows failed. Last encountered error: %s", + maxRetries, e.toString()); + if (statusCode == Status.Code.PERMISSION_DENIED + || statusCode == Status.Code.NOT_FOUND) { + errorMessage += + ". Please check if the destination table exists and if the service account has the " + + "bigquery.tables.updateData permission."; + } + throw new RuntimeException(errorMessage, e); + } appendSplitDistribution.update(numAppends); if (autoUpdateSchema) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 814f4eec421f..6c0d97fbc6b8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -238,6 +238,13 @@ void commit() { Function shouldFailRow = (Function & Serializable) tr -> false; + + private volatile Throwable appendRowsError = null; + + public void setAppendRowsError(Throwable t) { + this.appendRowsError = t; + } + Map> insertErrors = Maps.newHashMap(); // The counter for the number of insertions performed. @@ -801,6 +808,9 @@ Exceptions.StorageException tryInitialize() throws Exception { @Override public ApiFuture appendRows(long offset, ProtoRows rows) throws Exception { + if (appendRowsError != null) { + return ApiFutures.immediateFailedFuture(appendRowsError); + } // The BigQuery client returns stream-open errors when the first append is called, so we // duplicate that here. Exceptions.StorageException storageException = tryInitialize(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 601ed71473ed..0581e00fe558 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1659,6 +1659,48 @@ public void testStreamingStorageApiWriteWithErrorHandling() throws Exception { storageWriteWithErrorHandling(false); } + @Test + public void testStorageApiWriteFailureExhaustedRetries() throws Exception { + assumeTrue(useStorageApi); + + // Set up fake dataset service to return PERMISSION_DENIED for appendRows + fakeDatasetService.setAppendRowsError( + new io.grpc.StatusRuntimeException( + io.grpc.Status.PERMISSION_DENIED.withDescription("Missing permissions"))); + + List elements = Lists.newArrayList(1, 2, 3); + + BigQueryIO.Write write = + BigQueryIO.write() + .to("project-id:dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) + .withFormatFunction( + (SerializableFunction) + input -> new TableRow().set("number", input)) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withoutValidation(); + + if (useStreaming) { + write = write.withTriggeringFrequency(Duration.standardSeconds(30)); + } + + PCollection input = p.apply(Create.of(elements).withCoder(BigEndianIntegerCoder.of())); + input.apply("WriteToBQ", write); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("More than"); + thrown.expectMessage("attempts to call AppendRows failed"); + thrown.expectMessage("PERMISSION_DENIED"); + thrown.expectMessage("bigquery.tables.updateData"); + + p.run().waitUntilFinish(); + } + @Test public void testStreamingStorageApiWriteWithAutoShardingWithErrorHandling() throws Exception { assumeTrue(useStreaming);