[Dataflow Streaming] [Multi Key] MultiKey failure handling + Integration #38919
[Dataflow Streaming] [Multi Key] MultiKey failure handling + Integration #38919arunpandianp wants to merge 26 commits into
Conversation
…lients - Add MultiKeyWorkItemCommitRequest to windmill.proto. - Support MultiKey commits in Commit model and StreamingEngineWorkCommitter. - Update GrpcCommitWorkStream to batch and stream MultiKey commit requests.
…oring for multi-key execution.
# Conflicts: # runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
Resolved conflicts in StreamingModeExecutionContext.java and StreamingModeExecutionContextTest.java. Fixed compilation error in Work.java by removing duplicate getComputationId() method. TAG=agy CONV=143daaa5-e902-4d26-820d-cf1af2babb84
|
R: @scwhittle This change is on top of #38814 and #38768 PTAL |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces multi-key commit support for Dataflow streaming, allowing multiple work items to be committed in a single transaction. It enhances failure handling by enabling re-execution of specific work items within a batch if a retryable failure occurs. Additionally, it optimizes the execution context to pull more work items from the BoundedWorkQueue, thereby increasing processing efficiency for multi-key bundles. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces multi-key bundle support for streaming in the Google Cloud Dataflow Java worker, enabling batching and advancing through multiple keys within a key group, as well as transactionally committing multi-key work batches. Feedback suggests adding robust error handling when parsing experimental options to prevent worker crashes, and implementing defensive null and bounds checks when accessing read operation receivers to avoid potential runtime exceptions.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| String batchSizeStr = | ||
| ExperimentalOptions.getExperimentValue(options, WINDMILL_MAX_KEY_GROUP_BATCH_SIZE); | ||
| this.maxKeyGroupBatchSize = batchSizeStr != null ? Integer.parseInt(batchSizeStr) : 100; | ||
|
|
||
| String batchTimeStr = | ||
| ExperimentalOptions.getExperimentValue(options, WINDMILL_MAX_KEY_GROUP_BATCH_TIME_MS); | ||
| this.maxKeyGroupBatchTimeNanos = | ||
| TimeUnit.MILLISECONDS.toNanos(batchTimeStr != null ? Long.parseLong(batchTimeStr) : 100); |
There was a problem hiding this comment.
Parsing user-provided experimental options directly using Integer.parseInt and Long.parseLong without error handling can cause the worker to crash if the values are malformed. It is safer to wrap these in a try-catch block and fall back to the default values with a warning log.
String batchSizeStr =
ExperimentalOptions.getExperimentValue(options, WINDMILL_MAX_KEY_GROUP_BATCH_SIZE);
int batchSize = 100;
if (batchSizeStr != null) {
try {
batchSize = Integer.parseInt(batchSizeStr);
} catch (NumberFormatException e) {
LOG.warn("Failed to parse {} as integer, using default of 100", WINDMILL_MAX_KEY_GROUP_BATCH_SIZE, e);
}
}
this.maxKeyGroupBatchSize = batchSize;
String batchTimeStr =
ExperimentalOptions.getExperimentValue(options, WINDMILL_MAX_KEY_GROUP_BATCH_TIME_MS);
long batchTimeMs = 100;
if (batchTimeStr != null) {
try {
batchTimeMs = Long.parseLong(batchTimeStr);
} catch (NumberFormatException e) {
LOG.warn("Failed to parse {} as long, using default of 100", WINDMILL_MAX_KEY_GROUP_BATCH_TIME_MS, e);
}
}
this.maxKeyGroupBatchTimeNanos = TimeUnit.MILLISECONDS.toNanos(batchTimeMs);| HashMap<String, ElementCounter> counters = | ||
| ((DataflowMapTaskExecutor) workExecutor) | ||
| .getReadOperation() | ||
| .receivers[0] | ||
| .getOutputCounters(); |
There was a problem hiding this comment.
Defensive programming: Accessing receivers[0] directly without checking if getReadOperation() is null, or if receivers is null or empty, can lead to NullPointerException or ArrayIndexOutOfBoundsException. Adding appropriate guards ensures robust execution.
DataflowMapTaskExecutor mapTaskExecutor = (DataflowMapTaskExecutor) workExecutor;
if (mapTaskExecutor.getReadOperation() == null
|| mapTaskExecutor.getReadOperation().receivers == null
|| mapTaskExecutor.getReadOperation().receivers.length == 0) {
return 0L;
}
HashMap<String, ElementCounter> counters =
mapTaskExecutor.getReadOperation().receivers[0].getOutputCounters();
if (counters == null) {
return 0L;
}
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #38919 +/- ##
=============================================
- Coverage 59.33% 55.39% -3.95%
+ Complexity 16593 2243 -14350
=============================================
Files 2845 1104 -1741
Lines 291334 171355 -119979
Branches 14421 1437 -12984
=============================================
- Hits 172859 94915 -77944
+ Misses 111065 74000 -37065
+ Partials 7410 2440 -4970
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
The change connects adds failure handling for multi key commits.
Integrates StreamingWorkScheduler and multikey commit methods.
Updates StreamingModeExecutionContext::advance to pull in more items from BoundedWorkQueue
All changes are behind the experiment
unstable_enable_multi_key_bundleand does not affect default logic.