Skip to content

SotwConfigSourceSubscriptionFactory is a SnapshotStream of DiscoveryResponse#6795

Draft
jrhee17 wants to merge 3 commits into
line:mainfrom
jrhee17:pr/config-source
Draft

SotwConfigSourceSubscriptionFactory is a SnapshotStream of DiscoveryResponse#6795
jrhee17 wants to merge 3 commits into
line:mainfrom
jrhee17:pr/config-source

Conversation

@jrhee17
Copy link
Copy Markdown
Contributor

@jrhee17 jrhee17 commented Jun 5, 2026

#6796 should be reviewed before this PR
The changes for this PR is a subset of #6781

Motivation:

The existing SotwConfigSourceSubscriptionFactory SPI requires implementors to manage threading, interest tracking, and response parsing via callbacks (SotwSubscriptionCallbacks, ConfigSourceSubscription). This makes custom config source implementations (file-based, KV-store-backed, etc.) unnecessarily complex.

This PR reshapes the factory into a reactive stream-based API where create() simply returns a SnapshotStream<DiscoveryResponse> — making custom implementations straightforward:

public SnapshotStream<DiscoveryResponse> create(
        ConfigSource configSource,
        FactoryContext factoryContext,
        SnapshotStream<InterestedResources> interestedResources) {
    return new RefCountedStream<DiscoveryResponse>() {
        @Override
        protected Subscription onStart(SnapshotWatcher<DiscoveryResponse> watcher) {
            // Watch the external source and call emit(response, null)
            ...
            return () -> { /* cleanup */ };
        }
    };
}

Modifications:

  • Added configsource.SotwConfigSourceSubscriptionFactory — new public factory interface returning SnapshotStream<DiscoveryResponse>
  • Added configsource.InterestedResources — value type representing currently subscribed resource names, delivered as a SnapshotStream
  • Added InterestPublisher — stream-based replacement for the updateInterests callback
  • Added CompositeSnapshotWatcher — fan-out adapter for notifying multiple SnapshotWatchers
  • Renamed ParsedResourcesHolder to ParsedResources with SotwParsedResources / DeltaParsedResources subtypes
  • Rewrote ConfigSourceHandler to subscribe to SnapshotStream<DiscoveryResponse> instead of receiving callbacks
  • Rewrote GrpcConfigSourceStreamFactory.create() to return a ConfigSourceHandler wired via streams
  • Changed ResourceNode / ResourceNodeAdapter / SubscriberStorage / StateCoordinator to use SnapshotWatcher instead of ResourceWatcher
  • Passed defaultWatcher to ControlPlaneClientManager for error propagation on stream failures
  • Deleted ResourceWatcher, XdsStreamSubscriber, ConfigSourceSubscription, SotwSubscriptionCallbacks, and the old package-private SotwConfigSourceSubscriptionFactory

Result:

  • Custom config source implementations only need to return a SnapshotStream<DiscoveryResponse> — Armeria handles parsing, storage, and subscriber notification
  • No breaking changes to existing public API
  • All existing xDS unit tests pass

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 5, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 13282d9a-8f5f-4253-85a0-b64590b80a6e

📥 Commits

Reviewing files that changed from the base of the PR and between 514555d and ee063aa.

📒 Files selected for processing (1)
  • xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java
🚧 Files skipped from review as they are similar to previous changes (1)
  • xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java

📝 Walkthrough

Walkthrough

Migrates xDS from callback-driven ResourceWatcher/ConfigSourceSubscription to a snapshot-stream model with RefCountedStream lifecycle, ParsedResources holders (SOTW/Delta), interest publishing (InterestedResources), CompositeSnapshotWatcher fanout, and corresponding wiring in factories, manager, and tests.

Changes

xDS Snapshot Stream Migration

Layer / File(s) Summary
Parsed resources data types and public API contracts
xds/src/main/java/com/linecorp/armeria/xds/ParsedResources.java, xds/src/main/java/com/linecorp/armeria/xds/configsource/InterestedResources.java, xds/src/main/java/com/linecorp/armeria/xds/configsource/SotwConfigSourceSubscriptionFactory.java, xds/src/main/java/com/linecorp/armeria/xds/configsource/package-info.java
ParsedResources is now an abstract base with SotwParsedResources and DeltaParsedResources; new public InterestedResources type and SotwConfigSourceSubscriptionFactory interface added.
Stream lifecycle with ref-counted initialization
xds/src/main/java/com/linecorp/armeria/xds/XdsStream.java, xds/src/main/java/com/linecorp/armeria/xds/InterestPublisher.java, xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java, xds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.java
XdsStream now extends SnapshotStream<ParsedResources>; InterestPublisher emits interested resource sets; AdsXdsStream and CompositeXdsStream extend RefCountedStream and implement onStart(...) to subscribe to interest streams and manage lifecycle.
Composite snapshot watcher and ResourceNode transition
xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java, xds/src/main/java/com/linecorp/armeria/xds/ResourceNode.java, xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java
CompositeSnapshotWatcher fans out onUpdate(value,error) to child SnapshotWatchers with per-watcher isolation and optional absent-on-timeout; ResourceNode and ResourceNodeAdapter move to the SnapshotWatcher contract and add factoryContext().
State coordination with snapshot watchers
xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java
StateCoordinator registers SnapshotWatchers, replays cached state via onUpdate, and dispatches updates/missing/errors through CompositeSnapshotWatcher.
Subscriber storage uses CompositeSnapshotWatcher
xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java
SubscriberStorage now stores CompositeSnapshotWatcher per (type, resource) and register/unregister operate on SnapshotWatcher instances, closing watchers when empty.
Actual stream handlers emit ParsedResources
xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java, xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java, xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java
ResourceParser returns typed ParsedResources; SOTW/Delta streams parse into holders and emit(holder, null) for downstream handling; NACK/ACK timing adjusted to occur after emit.
ConfigSourceHandler pipeline with interest publishing
xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java
ConfigSourceHandler.of(...) maps DiscoveryResponse -> ParsedResources, subscribes to the parsed stream, applies SOTW/Delta updates via shared apply logic, and publishes InterestedResources via InterestPublisher when subscribers change.
SOTW factory refactored to snapshot streams
xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java
SOTW factories now produce SnapshotStream<DiscoveryResponse> implemented as RefCountedStream; parsing/watch lifecycle uses onStart/emit(response,null) and emit(null,error) on failure.
Manager and factory orchestration
xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java, xds/src/main/java/com/linecorp/armeria/xds/GrpcConfigSourceStreamFactory.java, xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java, xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java
ControlPlaneClientManager and Grpc factory now pass InterestPublisher and defaultWatcher into constructed handlers/streams; Grpc factory inlines stream construction and returns ConfigSourceHandler.
Tests updated to snapshot watcher API
xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java, xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java
Tests refactored to use SnapshotWatcher lambdas with AtomicReference; added duplicateRegisterReturnsFalse and updated timeout/missing tests.

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs:

  • line/armeria#6608: Introduced SnapshotWatcher callback shape and related refactor pieces used here.
  • line/armeria#6610: Added SnapshotStream/RefCountedStream primitives leveraged in this change.
  • line/armeria#6709: Overlaps in AdsXdsStream/ADS update-subscription wiring and actual-stream handling.

Suggested reviewers:

  • trustin
  • ikhoon
  • minwoox

"A rabbit hops through streams and snapshots bright,
Callbacks hush as watchers wake in light,
Holders carry resources, ACKs trail behind,
Interests bloom and composite fans unwind,
xDS reborn — subscriptions leap in flight." 🐰✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 6.59% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title directly captures the core architectural change: converting SotwConfigSourceSubscriptionFactory from a callback-based SPI to a stream-based API that returns SnapshotStream. This aligns with the main modification throughout the changeset.
Description check ✅ Passed The PR description comprehensively covers the motivation, modifications, and results of the changeset, including specific details about new types, interface changes, and the shift from callbacks to reactive streams.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java`:
- Around line 79-90: The onUpdate method in CompositeSnapshotWatcher can throw
ConcurrentModificationException if a watcher’s onUpdate() synchronously calls
addWatcher() or removeWatcher(); fix by iterating over a snapshot copy of the
watchers collection instead of the original. In
CompositeSnapshotWatcher.onUpdate, call maybeCancelAbsentTimer() as before, then
make a defensive copy (e.g., new ArrayList<>(watchers)) and iterate that copy
when invoking each watcher.onUpdate(value, error); keep the existing try/catch
and logging and ensure addWatcher/removeWatcher semantics remain unchanged.

In `@xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java`:
- Around line 59-85: The current apply(...) methods call stateCoordinator
methods unconditionally and can mutate state on NACKed ParsedResources; before
applying updates or marking missing resources, guard with the ParsedResources
acceptance flag (e.g., check parsed.isAccepted() or parsed.accepted()) so that
apply(ParsedResources.SotwParsedResources),
apply(ParsedResources.DeltaParsedResources) and
applyUpdatesAndErrors(ParsedResources) only invoke
stateCoordinator.onResourceUpdated(...) and
stateCoordinator.onResourceMissing(...) when the parsed snapshot is accepted;
for rejected snapshots, skip updates/missing notifications (you may still
surface errors from parsed.invalidResources() differently if desired) so the
last accepted config remains unchanged.

In `@xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java`:
- Around line 100-125: The test currently never seeds the coordinator cache, so
onResourceMissing() doesn't exercise eviction; before calling
coordinator.onResourceMissing(XdsType.CLUSTER, CLUSTER_NAME) add a call to
populate the state (e.g. call coordinator.onResourceUpdated(XdsType.CLUSTER,
CLUSTER_NAME, /* a non-null XdsResource snapshot */) or use the existing
register + an explicit update path) so the resource exists in StateCoordinator's
stateStore, then call onResourceMissing, unregister, and re-register the new
watcher to assert no replay; ensure you reference
StateCoordinator.onResourceUpdated, StateCoordinator.onResourceMissing,
register/unregister, XdsType.CLUSTER and CLUSTER_NAME when making the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 89ce7eb7-ebfd-487d-b681-8b19c3ef8d3e

📥 Commits

Reviewing files that changed from the base of the PR and between 423d994 and 70aaae8.

📒 Files selected for processing (29)
  • xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java
  • xds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java
  • xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.java
  • xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java
  • xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/GrpcConfigSourceStreamFactory.java
  • xds/src/main/java/com/linecorp/armeria/xds/InterestPublisher.java
  • xds/src/main/java/com/linecorp/armeria/xds/ParsedResources.java
  • xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java
  • xds/src/main/java/com/linecorp/armeria/xds/ResourceNode.java
  • xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java
  • xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java
  • xds/src/main/java/com/linecorp/armeria/xds/ResourceWatcher.java
  • xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.java
  • xds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.java
  • xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java
  • xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java
  • xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java
  • xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java
  • xds/src/main/java/com/linecorp/armeria/xds/XdsStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java
  • xds/src/main/java/com/linecorp/armeria/xds/configsource/InterestedResources.java
  • xds/src/main/java/com/linecorp/armeria/xds/configsource/SotwConfigSourceSubscriptionFactory.java
  • xds/src/main/java/com/linecorp/armeria/xds/configsource/package-info.java
  • xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java
  • xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java
💤 Files with no reviewable changes (5)
  • xds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.java
  • xds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.java
  • xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.java
  • xds/src/main/java/com/linecorp/armeria/xds/ResourceWatcher.java
  • xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java

Comment thread xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java
@codecov
Copy link
Copy Markdown

codecov Bot commented Jun 5, 2026

Codecov Report

❌ Patch coverage is 90.08264% with 24 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.99%. Comparing base (8150425) to head (ee063aa).
⚠️ Report is 482 commits behind head on main.

Files with missing lines Patch % Lines
.../com/linecorp/armeria/xds/ConfigSourceHandler.java 81.48% 5 Missing and 5 partials ⚠️
...orp/armeria/xds/GrpcConfigSourceStreamFactory.java 90.00% 1 Missing and 4 partials ⚠️
...linecorp/armeria/xds/CompositeSnapshotWatcher.java 90.00% 2 Missing and 1 partial ⚠️
...a/xds/PathSotwConfigSourceSubscriptionFactory.java 75.00% 2 Missing and 1 partial ⚠️
...in/java/com/linecorp/armeria/xds/AdsXdsStream.java 92.85% 0 Missing and 1 partial ⚠️
.../com/linecorp/armeria/xds/ResourceNodeAdapter.java 87.50% 0 Missing and 1 partial ⚠️
.../armeria/xds/configsource/InterestedResources.java 83.33% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #6795      +/-   ##
============================================
+ Coverage     74.46%   74.99%   +0.53%     
- Complexity    22234    24942    +2708     
============================================
  Files          1963     2216     +253     
  Lines         82437    92703   +10266     
  Branches      10764    12107    +1343     
============================================
+ Hits          61385    69521    +8136     
- Misses        15918    17396    +1478     
- Partials       5134     5786     +652     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant