From df78adf7848f0278a7eee33e25b77ac1ab8fde47 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 12 Jun 2026 15:14:50 +0000 Subject: [PATCH 1/5] Improve BigQueryIO Storage Write API error messages * Surface the root cause in the RuntimeException when AppendRows retries are exhausted. * Elevate the final failure logging to ERROR level. * Provide actionable advice for PERMISSION_DENIED and NOT_FOUND errors, suggesting to check if the destination table exists and if the service account has the TABLES_UPDATE_DATA permission. * Add a unit test to verify the improved error messages on retry exhaustion. --- .../StorageApiWriteUnshardedRecords.java | 13 +++++- .../StorageApiWritesShardedRecords.java | 22 +++++++++- .../io/gcp/testing/FakeDatasetService.java | 10 +++++ .../io/gcp/bigquery/BigQueryIOWriteTest.java | 42 +++++++++++++++++++ 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 2dfc8b2f1c00..5895a332f330 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -792,9 +792,18 @@ long flush( // Maximum number of times we retry before we fail the work item. if (failedContext.failureCount > allowedRetry) { - throw new RuntimeException( + String errorMessage = String.format( - "More than %d attempts to call AppendRows failed.", allowedRetry)); + "More than %d attempts to call AppendRows failed. Last encountered error: %s", + allowedRetry, error != null ? error.toString() : "unknown"); + if (statusCode.equals(Status.Code.PERMISSION_DENIED) + || statusCode.equals(Status.Code.NOT_FOUND)) { + errorMessage += + ". Please check if the destination table exists and if the service account has the " + + "TABLES_UPDATE_DATA permission."; + } + LOG.error("{}", errorMessage, error); + throw new RuntimeException(errorMessage, error); } // The following errors are known to be persistent, so always fail the work item in diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index b644d7aa752c..4b753d74fa1d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -1066,7 +1066,27 @@ public void process( if (numAppends > 0) { initializeContexts.accept(contexts); - retryManager.run(true); + try { + retryManager.run(true); + } catch (Exception e) { + Throwable cause = e.getCause(); + if (cause == null) { + cause = e; + } + Status.Code statusCode = Status.fromThrowable(cause).getCode(); + String errorMessage = + String.format( + "More than %d attempts to call AppendRows failed. Last encountered error: %s", + maxRetries, cause.toString()); + if (statusCode.equals(Status.Code.PERMISSION_DENIED) + || statusCode.equals(Status.Code.NOT_FOUND)) { + errorMessage += + ". Please check if the destination table exists and if the service account has the " + + "TABLES_UPDATE_DATA permission."; + } + LOG.error("{}", errorMessage, cause); + throw new RuntimeException(errorMessage, cause); + } appendSplitDistribution.update(numAppends); if (autoUpdateSchema) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 814f4eec421f..6c0d97fbc6b8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -238,6 +238,13 @@ void commit() { Function shouldFailRow = (Function & Serializable) tr -> false; + + private volatile Throwable appendRowsError = null; + + public void setAppendRowsError(Throwable t) { + this.appendRowsError = t; + } + Map> insertErrors = Maps.newHashMap(); // The counter for the number of insertions performed. @@ -801,6 +808,9 @@ Exceptions.StorageException tryInitialize() throws Exception { @Override public ApiFuture appendRows(long offset, ProtoRows rows) throws Exception { + if (appendRowsError != null) { + return ApiFutures.immediateFailedFuture(appendRowsError); + } // The BigQuery client returns stream-open errors when the first append is called, so we // duplicate that here. Exceptions.StorageException storageException = tryInitialize(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 601ed71473ed..2a03567e6528 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1659,6 +1659,48 @@ public void testStreamingStorageApiWriteWithErrorHandling() throws Exception { storageWriteWithErrorHandling(false); } + @Test + public void testStorageApiWriteFailureExhaustedRetries() throws Exception { + assumeTrue(useStorageApi); + + // Set up fake dataset service to return PERMISSION_DENIED for appendRows + fakeDatasetService.setAppendRowsError( + new io.grpc.StatusRuntimeException( + io.grpc.Status.PERMISSION_DENIED.withDescription("Missing permissions"))); + + List elements = Lists.newArrayList(1, 2, 3); + + BigQueryIO.Write write = + BigQueryIO.write() + .to("project-id:dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) + .withFormatFunction( + (SerializableFunction) + input -> new TableRow().set("number", input)) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withoutValidation(); + + if (useStreaming) { + write = write.withTriggeringFrequency(Duration.standardSeconds(30)); + } + + PCollection input = p.apply(Create.of(elements).withCoder(BigEndianIntegerCoder.of())); + input.apply("WriteToBQ", write); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("More than"); + thrown.expectMessage("attempts to call AppendRows failed"); + thrown.expectMessage("PERMISSION_DENIED"); + thrown.expectMessage("TABLES_UPDATE_DATA"); + + p.run().waitUntilFinish(); + } + @Test public void testStreamingStorageApiWriteWithAutoShardingWithErrorHandling() throws Exception { assumeTrue(useStreaming); From 99890ecbfeaa30445e27ac2d9da0c8604ca60eba Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 12 Jun 2026 16:11:40 +0000 Subject: [PATCH 2/5] Address PR feedback: Use null-safe == operator for enum comparison --- .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 4 ++-- .../sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 5895a332f330..7835837e1d38 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -796,8 +796,8 @@ long flush( String.format( "More than %d attempts to call AppendRows failed. Last encountered error: %s", allowedRetry, error != null ? error.toString() : "unknown"); - if (statusCode.equals(Status.Code.PERMISSION_DENIED) - || statusCode.equals(Status.Code.NOT_FOUND)) { + if (statusCode == Status.Code.PERMISSION_DENIED + || statusCode == Status.Code.NOT_FOUND) { errorMessage += ". Please check if the destination table exists and if the service account has the " + "TABLES_UPDATE_DATA permission."; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 4b753d74fa1d..e06fbcce33b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -1078,8 +1078,8 @@ public void process( String.format( "More than %d attempts to call AppendRows failed. Last encountered error: %s", maxRetries, cause.toString()); - if (statusCode.equals(Status.Code.PERMISSION_DENIED) - || statusCode.equals(Status.Code.NOT_FOUND)) { + if (statusCode == Status.Code.PERMISSION_DENIED + || statusCode == Status.Code.NOT_FOUND) { errorMessage += ". Please check if the destination table exists and if the service account has the " + "TABLES_UPDATE_DATA permission."; From a375825078d9cf6c53564bffc17f504acd98d61d Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 12 Jun 2026 16:49:49 +0000 Subject: [PATCH 3/5] Address PR feedback: remove duplicate logging, simplify status extraction, and preserve exception chain --- .../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 1 - .../gcp/bigquery/StorageApiWritesShardedRecords.java | 11 +++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 7835837e1d38..c830346b3bf7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -802,7 +802,6 @@ long flush( ". Please check if the destination table exists and if the service account has the " + "TABLES_UPDATE_DATA permission."; } - LOG.error("{}", errorMessage, error); throw new RuntimeException(errorMessage, error); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index e06fbcce33b6..f368bc4786a6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -1069,23 +1069,18 @@ public void process( try { retryManager.run(true); } catch (Exception e) { - Throwable cause = e.getCause(); - if (cause == null) { - cause = e; - } - Status.Code statusCode = Status.fromThrowable(cause).getCode(); + Status.Code statusCode = Status.fromThrowable(e).getCode(); String errorMessage = String.format( "More than %d attempts to call AppendRows failed. Last encountered error: %s", - maxRetries, cause.toString()); + maxRetries, e.toString()); if (statusCode == Status.Code.PERMISSION_DENIED || statusCode == Status.Code.NOT_FOUND) { errorMessage += ". Please check if the destination table exists and if the service account has the " + "TABLES_UPDATE_DATA permission."; } - LOG.error("{}", errorMessage, cause); - throw new RuntimeException(errorMessage, cause); + throw new RuntimeException(errorMessage, e); } appendSplitDistribution.update(numAppends); From a71f69ff7985383e3855cd7cb7c066766e0ef75f Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 12 Jun 2026 17:13:33 +0000 Subject: [PATCH 4/5] Address PR feedback: handle InterruptedException properly and use official IAM permission name --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 2a03567e6528..0581e00fe558 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -1696,7 +1696,7 @@ public void testStorageApiWriteFailureExhaustedRetries() throws Exception { thrown.expectMessage("More than"); thrown.expectMessage("attempts to call AppendRows failed"); thrown.expectMessage("PERMISSION_DENIED"); - thrown.expectMessage("TABLES_UPDATE_DATA"); + thrown.expectMessage("bigquery.tables.updateData"); p.run().waitUntilFinish(); } From c8f0a3c69c266415e6066db1fd3f18569392cca2 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 12 Jun 2026 17:14:57 +0000 Subject: [PATCH 5/5] Address PR feedback: handle InterruptedException and use official permission name in source files --- .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 2 +- .../sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index c830346b3bf7..e639b5812d24 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -800,7 +800,7 @@ long flush( || statusCode == Status.Code.NOT_FOUND) { errorMessage += ". Please check if the destination table exists and if the service account has the " - + "TABLES_UPDATE_DATA permission."; + + "bigquery.tables.updateData permission."; } throw new RuntimeException(errorMessage, error); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index f368bc4786a6..188f393dbe8c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -1068,6 +1068,9 @@ public void process( initializeContexts.accept(contexts); try { retryManager.run(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; } catch (Exception e) { Status.Code statusCode = Status.fromThrowable(e).getCode(); String errorMessage = @@ -1078,7 +1081,7 @@ public void process( || statusCode == Status.Code.NOT_FOUND) { errorMessage += ". Please check if the destination table exists and if the service account has the " - + "TABLES_UPDATE_DATA permission."; + + "bigquery.tables.updateData permission."; } throw new RuntimeException(errorMessage, e); }