Skip to content

Make SnapshotStream public#6784

Merged
jrhee17 merged 2 commits into
line:mainfrom
jrhee17:refactor/xds-stream
Jun 1, 2026
Merged

Make SnapshotStream public#6784
jrhee17 merged 2 commits into
line:mainfrom
jrhee17:refactor/xds-stream

Conversation

@jrhee17
Copy link
Copy Markdown
Contributor

@jrhee17 jrhee17 commented May 27, 2026

This PR is a subset of changes for #6781

Motivation:

The SnapshotStream reactive infrastructure in the xds module is currently package-private, which prevents xDS extension factories (e.g. HttpFilterFactory) from using reactive streams to model filters that depend on external resources. Making this API public is a prerequisite for upcoming features like CredentialInjectorFilterFactory that need to subscribe to SDS secrets reactively.

Modifications:

  • Moved the SnapshotStream reactive stream infrastructure from package-private com.linecorp.armeria.xds to a new public com.linecorp.armeria.xds.stream package.
    • SnapshotStream, RefCountedStream, Subscription (formerly SnapshotStream.Subscription), and all operators (MapStream, SwitchMapEagerStream, CombineLatest2Stream, CombineLatest3Stream, CombineNLatestStream, StaticSnapshotStream, TriFunction).
  • Made SnapshotWatcher public so it can be used as the subscriber type in the public stream API.
  • Updated all internal stream consumers (ClusterStream, EndpointStream, ListenerStream, RouteStream, SecretStream, DataSourceStream, TransportSocketStream, etc.) to import from the new xds.stream package.
  • Simplified ResourceNodeMeterBinderFactory.ResourceNodeMeterBinder to no longer implement ResourceWatcher, using plain methods instead.
  • Moved stream tests to the com.linecorp.armeria.xds.stream package.

Result:

  • The SnapshotStream API is now public and available for xDS extension factories to build reactive filter pipelines.
  • No behavioral changes — this is a pure refactoring with no changes to existing functionality.

@jrhee17 jrhee17 added this to the 1.40.0 milestone May 27, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 27, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 18d33cba-fc95-4c60-82a9-c5cb029b5cb4

📥 Commits

Reviewing files that changed from the base of the PR and between b25a6ef and efa1ad9.

📒 Files selected for processing (1)
  • xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java

📝 Walkthrough

Walkthrough

Extracts XDS stream primitives into com.linecorp.armeria.xds.stream (new interfaces and package annotations), introduces public SnapshotStream APIs and exposes RefCountedStream, moves concrete stream implementations and tests into the new package, updates consumer imports/signatures, and refactors resource meter-binder callbacks.

Changes

XDS Stream Infrastructure Refactoring

Layer / File(s) Summary
Stream package contracts and core interfaces
xds/src/main/java/com/linecorp/armeria/xds/stream/package-info.java, xds/src/main/java/com/linecorp/armeria/xds/stream/Subscription.java, xds/src/main/java/com/linecorp/armeria/xds/stream/TriFunction.java
Adds com.linecorp.armeria.xds.stream package with @NonNullByDefault and @UnstableApi, introduces Subscription and TriFunction interfaces.
SnapshotStream interface and RefCountedStream public API
xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java, xds/src/main/java/com/linecorp/armeria/xds/stream/RefCountedStream.java
Adds public SnapshotStream<T> functional interface with subscribe/map/switchMapEager/combinators and factories; exposes RefCountedStream as public with emit() and hasWatchers() and Javadoc.
Stream implementation classes moved to stream package
xds/src/main/java/com/linecorp/armeria/xds/stream/MapStream.java, .../CombineLatest2Stream.java, .../CombineLatest3Stream.java, .../CombineNLatestStream.java, .../StaticSnapshotStream.java, .../SwitchMapEagerStream.java
Move concrete stream implementations into com.linecorp.armeria.xds.stream and adjust imports; remove nested TriFunction in CombineLatest3Stream in favor of top-level TriFunction.
Update XDS package classes to use new stream package locations
xds/src/main/java/com/linecorp/armeria/xds/*.java (multiple files)
Add explicit imports for RefCountedStream, SnapshotStream, and Subscription from com.linecorp.armeria.xds.stream across XDS consumers; adjust a few watcher generic types and clarify SnapshotWatcher Javadoc.
Resource meter-binder and error callback refactoring
xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.java, xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java, xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java
ResourceNodeMeterBinder no longer implements ResourceWatcher<XdsResource>; meter callbacks changed to parameterless onError()/onResourceDoesNotExist(); adapter and cluster manager wiring updated (use ClusterResourceParser.parse).
Test class migrations to stream package
xds/src/test/java/com/linecorp/armeria/xds/stream/*
Move tests into the stream package, update imports, and remove redundant emit() overrides so tests use RefCountedStream.emit().

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • trustin
  • ikhoon
  • minwoox

Poem

🐰 A rabbit hops through packages with cheer,
Streams split and shuffled so contracts are clear,
Subscription and Snapshot now find a new home,
Tests follow gladly, no more roam,
Emit and watchers sing—refactor, hooray—let’s roam!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 58.62% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main change: making the SnapshotStream API public, which aligns with the primary objective of the refactoring.
Description check ✅ Passed The description comprehensively explains the motivation, modifications, and results of the PR, directly relating to the changeset of moving stream infrastructure to a public package.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

Review ran into problems

🔥 Problems

Git: Failed to clone repository. Please run the @coderabbitai full review command to re-trigger a full review. If the issue persists, set path_filters to include or exclude specific files.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
xds/src/main/java/com/linecorp/armeria/xds/stream/RefCountedStream.java (1)

37-50: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Fail fast on null watcher in public subscription API.

Line 48 should explicitly reject null watcher; currently null can flow into callbacks and fail later with a less clear NPE.

Suggested fix
+import static java.util.Objects.requireNonNull;
+
@@
     `@Override`
     public final Subscription subscribe(SnapshotWatcher<? super T> watcher) {
+        requireNonNull(watcher, "watcher");
         if (latestValue != null) {
             watcher.onUpdate(latestValue, null);
         }

As per coding guidelines, "for any user-facing public methods, add explicit null checks; use Objects.requireNonNull(param, "name") for required params".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@xds/src/main/java/com/linecorp/armeria/xds/stream/RefCountedStream.java`
around lines 37 - 50, The public subscribe(SnapshotWatcher<? super T> watcher)
method in RefCountedStream should immediately validate its watcher parameter to
fail fast on nulls; add Objects.requireNonNull(watcher, "watcher") at the top of
RefCountedStream.subscribe and ensure java.util.Objects is imported so a clear
NPE is thrown immediately rather than letting null flow into watcher.onUpdate or
later callbacks.
xds/src/main/java/com/linecorp/armeria/xds/SnapshotWatcher.java (1)

2-2: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Update this modified file to the LY copyright header.

Line 2 still uses LINE Corporation; this file was modified and should use the LY header format.

As per coding guidelines, "every modified source file must start with the LY copyright header".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@xds/src/main/java/com/linecorp/armeria/xds/SnapshotWatcher.java` at line 2,
Replace the existing copyright header in the modified SnapshotWatcher.java with
the standard LY header format: update the top-of-file header (the one above the
SnapshotWatcher class definition) so it uses the LY copyright text required by
the project guidelines, ensuring the new header replaces the current "LINE
Corporation" line and matches the project's canonical LY header exactly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java`:
- Around line 67-167: All public/default API methods should validate their
parameters and fail fast: add Objects.requireNonNull(...) checks at the start of
map (check mapper), switchMapEager (check mapper), combineNLatest (check streams
and iterate to requireNonNull each element), combineLatest(a,b,combiner) (check
a, b, combiner), combineLatest(a,b,c,combiner) (check a, b, c, combiner), just
(check value), and error (check error); use Objects.requireNonNull(param,
"paramName") and throw early rather than relying on downstream NPEs.

---

Outside diff comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/SnapshotWatcher.java`:
- Line 2: Replace the existing copyright header in the modified
SnapshotWatcher.java with the standard LY header format: update the top-of-file
header (the one above the SnapshotWatcher class definition) so it uses the LY
copyright text required by the project guidelines, ensuring the new header
replaces the current "LINE Corporation" line and matches the project's canonical
LY header exactly.

In `@xds/src/main/java/com/linecorp/armeria/xds/stream/RefCountedStream.java`:
- Around line 37-50: The public subscribe(SnapshotWatcher<? super T> watcher)
method in RefCountedStream should immediately validate its watcher parameter to
fail fast on nulls; add Objects.requireNonNull(watcher, "watcher") at the top of
RefCountedStream.subscribe and ensure java.util.Objects is imported so a clear
NPE is thrown immediately rather than letting null flow into watcher.onUpdate or
later callbacks.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 3b37515e-e436-414c-a75c-0ac8bea5cc1c

📥 Commits

Reviewing files that changed from the base of the PR and between fc75bbb and b25a6ef.

📒 Files selected for processing (34)
  • xds/src/main/java/com/linecorp/armeria/xds/CertificateValidationContextStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/ClusterRoot.java
  • xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/DataSourceStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/EndpointStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java
  • xds/src/main/java/com/linecorp/armeria/xds/ListenerRoot.java
  • xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/RawBufferTransportSocketFactory.java
  • xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java
  • xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.java
  • xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/SecretStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/SnapshotWatcher.java
  • xds/src/main/java/com/linecorp/armeria/xds/TlsCertificateStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/TransportSocketFactory.java
  • xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/UpstreamTlsTransportSocketFactory.java
  • xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest2Stream.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest3Stream.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/CombineNLatestStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/MapStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/RefCountedStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/StaticSnapshotStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/Subscription.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/SwitchMapEagerStream.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/TriFunction.java
  • xds/src/main/java/com/linecorp/armeria/xds/stream/package-info.java
  • xds/src/test/java/com/linecorp/armeria/xds/stream/CombineNLatestStreamTest.java
  • xds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java
  • xds/src/test/java/com/linecorp/armeria/xds/stream/StreamSwitchMapEagerTest.java
💤 Files with no reviewable changes (1)
  • xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java

@jrhee17 jrhee17 marked this pull request as ready for review May 27, 2026 09:34
@codecov
Copy link
Copy Markdown

codecov Bot commented May 27, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 0.00%. Comparing base (8150425) to head (efa1ad9).
⚠️ Report is 473 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #6784       +/-   ##
============================================
- Coverage     74.46%       0   -74.47%     
============================================
  Files          1963       0     -1963     
  Lines         82437       0    -82437     
  Branches      10764       0    -10764     
============================================
- Hits          61385       0    -61385     
+ Misses        15918       0    -15918     
+ Partials       5134       0     -5134     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@ikhoon ikhoon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 👍

Copy link
Copy Markdown
Contributor

@minwoox minwoox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@jrhee17 jrhee17 merged commit eb8520c into line:main Jun 1, 2026
17 of 19 checks passed
jrhee17 added a commit that referenced this pull request Jun 4, 2026
This PR should be reviewed after #6784
This PR is a subset of #6781 

Motivation:

xDS extension factories (e.g. `HttpFilterFactory`) currently receive
only an `XdsResourceValidator`, which is insufficient for filters that
depend on external xDS resources like SDS secrets. The
`CredentialInjectorFilterFactory`
(`envoy.filters.http.credential_injector`) needs to reactively subscribe
to generic secrets via SDS and inject credentials into outgoing
requests, requiring access to the event loop, metrics, and secret
streams at construction time.

Modifications:

- Added `FactoryContext` interface providing runtime infrastructure to
extension factories (`eventLoop()`, `meterRegistry()`,
`meterIdPrefix()`, `validator()`, `genericSecretStream()`).
- Changed `HttpFilterFactory.create()` to accept `FactoryContext`
instead of `XdsResourceValidator`.
- Added `HttpFilterFactory.createStream()` that returns a
`SnapshotStream<XdsHttpFilter>` for reactive filter lifecycle
management.
- Added `XdsHttpFilter.NOOP` static constant for no-op filter instances.
- Added `GenericSecretSnapshot` and `GenericSecretStream` for resolving
generic secrets via SDS.
- Made the filter pipeline reactive: `FilterUtil` now returns
`SnapshotStream<ClientPreprocessors>` /
`SnapshotStream<ClientDecoration>`, and `RouteStream` composes cluster,
downstream, and upstream filter streams via `combineLatest`.
- Simplified `RouteEntry` to accept pre-built filter configs instead of
building them internally.
- Updated `ListenerResourceParser` / `ListenerXdsResource` to parse and
carry the `Router` HTTP filter for upstream filter extraction.
- Made `SubscriptionContext` extend `FactoryContext` for internal use.
- Added `CredentialInjectorFilterFactory` implementing
`envoy.filters.http.credential_injector`, registered via service loader.
- Annotated `TypedExtensionConfig`, `CredentialInjector`, `Generic`, and
`Secret.generic_secret` proto fields as supported.
- Updated `RouterFilterFactory` and `IstioFilterFactories` to use
`FactoryContext`.

Result:

- xDS extension factories can now build reactive filter pipelines that
depend on external resources like SDS secrets.
- The `CredentialInjectorFilterFactory` injects credentials from
SDS-backed generic secrets into outgoing HTTP requests, supporting the
`envoy.filters.http.credential_injector` filter type.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants