Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5154679
[Dataflow Streaming][Multikey] Support MultiKey commits in windmill c…
arunpandianp Jun 2, 2026
73faa68
[Dataflow Streaming] [Multi Key] StreamingModeExecutionContext refact…
arunpandianp Jun 4, 2026
53bc9a6
trigger postsubmit tests
arunpandianp Jun 4, 2026
7720cf8
fix tests
arunpandianp Jun 4, 2026
72740d2
fix tests
arunpandianp Jun 4, 2026
0f96e72
improve work synchronization
arunpandianp Jun 4, 2026
51b9257
cleanup logic
arunpandianp Jun 5, 2026
9a4e7be
cleanup logic
arunpandianp Jun 5, 2026
3f36afd
address comments
arunpandianp Jun 8, 2026
f3cc628
improve WindowingWindmillReader
arunpandianp Jun 8, 2026
58e0ef9
spotless fix
arunpandianp Jun 8, 2026
3dceab0
[Dataflow Streaming] Fix nullness supression in StreamingModeExecutio…
arunpandianp Jun 8, 2026
e199438
make windmillTagEncoding final
arunpandianp Jun 8, 2026
700dfbc
address comments
arunpandianp Jun 8, 2026
bc5bee2
Move SideInputStateFetcherFactory from start to constructor
arunpandianp Jun 8, 2026
6d7f28e
Merge branch 'contextnullness' into multikey_context_review
arunpandianp Jun 8, 2026
c4a52c1
Merge remote-tracking branch 'beam/master' into multikey_context_review
arunpandianp Jun 8, 2026
1107a60
Merge beam/master into multikey_context_review
arunpandianp Jun 8, 2026
47eb7d6
Address comment
arunpandianp Jun 8, 2026
24505dd
Address comment
arunpandianp Jun 8, 2026
4e0d174
Fix UnderInitialization
arunpandianp Jun 9, 2026
5f9fef5
Merge branch 'multikey_commit' into tmp
arunpandianp Jun 11, 2026
0e8157f
Merge branch 'multikey_context_review' into tmp
arunpandianp Jun 11, 2026
93de23a
Multikey commit failure handling and integration
arunpandianp Jun 11, 2026
9de3d84
Fix tests and add sink byte limit for batching
arunpandianp Jun 11, 2026
0df9f54
spotless
arunpandianp Jun 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ boolean isSinkFullHintSet() {
// the state size might grow unbounded.
}

protected final long getBytesSinked() {
return bytesSinked;
}

/**
* Sets a flag to indicate that a sink has enough data written to it. This hint is read by
* upstream producers to stop producing if they can. Mainly used in streaming.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public final class StreamingDataflowWorker {
"windmill_bounded_queue_executor_use_fair_monitor";
// Don't use. Experiment guarding multi key bundles. The feature is work in progress and
// incomplete.
private static final String UNSTABLE_ENABLE_MULTI_KEY_BUNDLE = "unstable_enable_multi_key_bundle";
public static final String UNSTABLE_ENABLE_MULTI_KEY_BUNDLE = "unstable_enable_multi_key_bundle";

private final WindmillStateCache stateCache;
private AtomicReference<StreamingWorkerStatusPages> statusPages = new AtomicReference<>();
Expand Down Expand Up @@ -257,6 +257,7 @@ private StreamingDataflowWorker(
this.streamingWorkScheduler =
StreamingWorkScheduler.create(
options,
DataflowRunner.hasExperiment(options, UNSTABLE_ENABLE_MULTI_KEY_BUNDLE),
clock,
readerCache,
mapTaskExecutorFactory,
Expand Down Expand Up @@ -1198,9 +1199,14 @@ private void onCompleteCommit(CompleteCommit completeCommit) {
computationStateCache
.getIfPresent(completeCommit.computationId())
.ifPresent(
state ->
state -> {
if (completeCommit.retryableFailure()) {
state.reExecuteActiveWork(completeCommit.shardedKey(), completeCommit.workId());
} else {
state.completeWorkAndScheduleNextWorkForKey(
completeCommit.shardedKey(), completeCommit.workId()));
completeCommit.shardedKey(), completeCommit.workId());
}
});
}

@AutoValue
Expand Down
Loading
Loading