[POC] Allow more flexible factories via FactoryContext#6781
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR refactors the xDS (Envoy service discovery) system from callback-based resource watcher contracts to a reactive ChangesxDS Streaming Architecture Refactor
🎯 4 (Complex) | ⏱️ ~60 minutes Possibly Related PRs
Suggested Reviewers
✨ Finishing Touches🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java (1)
106-109:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPropagate watcher registration failures to stream subscribers.
If watch registration fails, the stream never emits an error, so consumers can hang waiting for updates. Emit the failure into the stream before closing lifecycle resources.
Proposed fix
} catch (Exception e) { - lifecycleObserver.fileParseError(e); - lifecycleObserver.close(); + eventLoop.execute(() -> { + if (!closed) { + lifecycleObserver.fileParseError(e); + emit(null, e); + } + lifecycleObserver.close(); + }); return; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java` around lines 106 - 109, When catching registration failures in PathSotwConfigSourceSubscriptionFactory, before calling lifecycleObserver.close(), propagate the exception into the stream so subscribers don't hang: call the stream-facing error emission routine (e.g. invoke the subscription/stream emitter or subscribers' onError method) with the caught Exception (replace with the actual symbol in your code, e.g. subscription.onError(e) or streamEmitter.error(e) or iterate subscribers and call s.onError(e)); only after that call lifecycleObserver.fileParseError(e) and lifecycleObserver.close().
🧹 Nitpick comments (3)
xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java (1)
62-64: ⚡ Quick winConstrain
setFactoryContext()to event-loop writes.
factoryContextis read from event-loop-confined paths (e.g., Line 95), but Line 62 currently permits unsynchronized writes from any thread.Based on learnings: "code paths that run on the event loop ... are single-threaded ... restrict access to code outside the event loop or document the event-loop assumption."Proposed patch
+import static java.util.Objects.requireNonNull; @@ void setFactoryContext(FactoryContext factoryContext) { - this.factoryContext = factoryContext; + if (!eventLoop.inEventLoop()) { + eventLoop.execute(() -> setFactoryContext(factoryContext)); + return; + } + checkState(this.factoryContext == null, "factoryContext is already set"); + this.factoryContext = requireNonNull(factoryContext, "factoryContext"); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java` around lines 62 - 64, setFactoryContext currently writes factoryContext from any thread while readers assume event-loop confinement; change setFactoryContext(FactoryContext) so the assignment only happens on the event loop: either assert the current thread is the event loop (e.g., check inEventLoop()/isEventLoopThread and throw/require caller to call from the event loop) or, if called off-loop, schedule the assignment onto the correct event loop using that event loop's execute()/submit() so the field write occurs on the same event loop that the readers use. Update the method setFactoryContext, reference the FactoryContext field factoryContext, and ensure the event-loop readers (the existing event-loop-confined paths) continue to see consistent state.xds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.java (1)
116-147: ⚡ Quick winDeduplicate the credential-injection flow shared by preprocessor and decorator.
Both paths have identical branching and header mutation logic; extracting one helper will reduce drift risk.
♻️ Proposed refactor
@@ `@Override` public HttpPreprocessor httpPreprocessor() { - return (delegate, ctx, req) -> { - if (credential == null) { - if (!allowWithoutCredential) { - return HttpResponse.of(HttpStatus.UNAUTHORIZED); - } - return delegate.execute(ctx, req); - } - if (!overwrite && headerExists(ctx, req)) { - return delegate.execute(ctx, req); - } - ctx.setAdditionalRequestHeader(header, credential); - return delegate.execute(ctx, req); - }; + return (delegate, ctx, req) -> executeWithCredential(ctx, req, delegate::execute); } @@ public DecoratingHttpClientFunction httpDecorator() { - return (delegate, ctx, req) -> { - if (credential == null) { - if (!allowWithoutCredential) { - return HttpResponse.of(HttpStatus.UNAUTHORIZED); - } - return delegate.execute(ctx, req); - } - if (!overwrite && headerExists(ctx, req)) { - return delegate.execute(ctx, req); - } - ctx.setAdditionalRequestHeader(header, credential); - return delegate.execute(ctx, req); - }; + return (delegate, ctx, req) -> executeWithCredential(ctx, req, delegate::execute); } + + `@FunctionalInterface` + private interface RequestExecutor { + HttpResponse execute(com.linecorp.armeria.client.ClientRequestContext ctx, + com.linecorp.armeria.common.HttpRequest req); + } + + private HttpResponse executeWithCredential( + com.linecorp.armeria.client.ClientRequestContext ctx, + com.linecorp.armeria.common.HttpRequest req, + RequestExecutor executor) { + if (credential == null) { + return allowWithoutCredential ? executor.execute(ctx, req) + : HttpResponse.of(HttpStatus.UNAUTHORIZED); + } + if (!overwrite && headerExists(ctx, req)) { + return executor.execute(ctx, req); + } + ctx.setAdditionalRequestHeader(header, credential); + return executor.execute(ctx, req); + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.java` around lines 116 - 147, Extract the duplicated credential-injection branching from httpPreprocessor() and httpDecorator() into a single private helper (e.g., injectCredentialOrRespond) that accepts (delegate, ctx, req) or the minimal pieces (delegate, ServiceRequestContext ctx, HttpRequest req) and implements the logic using credential, allowWithoutCredential, overwrite, headerExists(ctx, req), ctx.setAdditionalRequestHeader(header, credential) and then calls delegate.execute(ctx, req) or returns HttpResponse.of(HttpStatus.UNAUTHORIZED) as appropriate; replace the bodies of httpPreprocessor() and httpDecorator() to simply call this helper so both reuse the exact same code path.xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java (1)
75-82: 💤 Low valueConsider using a static sentinel and documenting the null-handling behavior.
Two minor suggestions:
Documentation clarity: The Javadoc doesn't mention what happens when
create()returns null. Sincecreate()documents that returning null means "skip this filter entirely," it would help to clarify thatcreateStream()returns a no-op sentinel filter in that case.Avoid repeated allocation: Creating a new anonymous
XdsHttpFilterinstance on each null return incurs minor allocation overhead. A static sentinel could be reused.♻️ Optional: use a static sentinel and clarify Javadoc
+ /** + * A no-op sentinel instance returned when {`@link` `#create`} yields {`@code` null}. + */ + XdsHttpFilter EMPTY_FILTER = new XdsHttpFilter() {}; + /** * Creates a {`@link` SnapshotStream} of {`@link` XdsHttpFilter} for the given filter and its raw typed config. * * <p>The default implementation delegates to {`@link` `#create`} and wraps the result in - * {`@link` SnapshotStream#just}. Override this method for filters that depend on external - * xDS resources and need reactive lifecycle management. + * {`@link` SnapshotStream#just}. If {`@link` `#create`} returns {`@code` null}, a no-op sentinel filter + * is emitted instead. Override this method for filters that depend on external xDS resources + * and need reactive lifecycle management. * ... */ default SnapshotStream<XdsHttpFilter> createStream(HttpFilter httpFilter, Any config, FactoryContext context) { final XdsHttpFilter filter = create(httpFilter, config, context); if (filter == null) { - return SnapshotStream.just(new XdsHttpFilter() {}); + return SnapshotStream.just(EMPTY_FILTER); } return SnapshotStream.just(filter); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java` around lines 75 - 82, Update createStream(HttpFilter,httpFilter,Any,FactoryContext) to reuse a single static sentinel XdsHttpFilter instance instead of allocating a new anonymous instance on each null return and document the behavior: when create(...) returns null (meaning "skip this filter entirely") createStream(...) will return SnapshotStream.just(SENTINEL) where SENTINEL is a private static final XdsHttpFilter no-op instance; also update the method/class Javadoc to explicitly state this null-handling behavior so callers know a reused no-op sentinel is returned.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In @.claude/skills/xds-dev/SKILL.md:
- Around line 14-16: Update the fenced code block containing the gradlew command
so it includes a language tag (e.g., add "bash" after the opening ```), e.g.,
change the block that wraps "./gradlew --parallel -Pretry=true :xds:test
:xds-api:test ..." to start with "```bash" to satisfy MD040 and enable proper
syntax highlighting.
In
`@it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.java`:
- Around line 181-190: The HTTP request callback attached to
client.get(...).aggregate() only handles successful responses and exceptions
thrown while parsing, so network or fetch failures never notify the watcher;
update the call site that creates the CompletableFuture (the
client.get("/xds/%s/%s".formatted(type, name)).aggregate() chain) to handle
terminal exceptions (e.g., using whenComplete/handle/exceptionally on the
returned future) and ensure any failure invokes watcher.onUpdate(null,
throwable), while keeping the existing success path that parses with
XdsResourceReader.fromJson and calls watcher.onUpdate(discoveryResponse, null).
In `@xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java`:
- Around line 79-90: The current CompositeSnapshotWatcher.onUpdate calls
maybeCancelAbsentTimer() then iterates directly over the mutable watchers set,
which can throw ConcurrentModificationException if a watcher callback calls
addWatcher() or removeWatcher(); fix it by taking a snapshot copy of the
watchers collection before iterating (e.g., new ArrayList<>(watchers) or
similar) and iterate over that copy so callbacks can safely modify the original
set; ensure the change is applied inside the onUpdate method and still logs
exceptions the same way.
In `@xds/src/main/java/com/linecorp/armeria/xds/GenericSecretSnapshot.java`:
- Around line 58-73: The equals and hashCode in GenericSecretSnapshot only
consider the resource field, causing instances with different credential values
to be treated as equal; update equals(Object) to also compare this.credential
with that.credential (e.g., include Objects.equal(credential, that.credential)
alongside the resource comparison) and update hashCode() to include both
resource and credential (e.g., compute hash from resource and credential) so
equality and hashing reflect both fields.
- Around line 82-85: The toDebugString() in GenericSecretSnapshot currently adds
the entire protobuf resource (resource) which can leak secret material; change
it to only include non-sensitive identifying fields instead of the full object —
e.g. replace .add("genericSecret", resource) with something like .add("name",
resource.getName() /* or resource.getMetadata().getName() */).add("version",
resource.getVersion() /* if available */) or .add("id", resource.getId()) so
that no inline secret contents from the GenericSecret protobuf are logged.
In `@xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java`:
- Around line 68-72: The code in findRouter uses reference identity (typedConfig
== Any.getDefaultInstance()) which can fail for an empty Any; replace that check
with a semantic one such as Any.getDefaultInstance().equals(typedConfig) or
check typedConfig.getTypeUrl().isEmpty() so an explicitly-empty Any is detected
correctly and you still return Router.getDefaultInstance(); keep the subsequent
type-url guard using ROUTER_TYPE_URL and typedConfig.getTypeUrl() unchanged.
In `@xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java`:
- Around line 205-210: rescheduleEventsOn currently requeues watcher.onUpdate
via eventLoop.execute but does not prevent those runnables from calling the
watcher after the returned StreamSubscription is closed; change
rescheduleEventsOn (SnapshotStream.rescheduleEventsOn) to capture the
StreamSubscription returned by self.subscribe, maintain a thread-safe "closed"
flag (or delegate to subscription.isClosed if available), and return a
StreamSubscription whose close sets that flag and cancels/ignores pending
callbacks; before invoking watcher.onUpdate inside the eventLoop.execute
runnable check the closed flag (or subscription state) and skip calling watcher
if closed so no post-close callbacks occur.
---
Outside diff comments:
In
`@xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java`:
- Around line 106-109: When catching registration failures in
PathSotwConfigSourceSubscriptionFactory, before calling
lifecycleObserver.close(), propagate the exception into the stream so
subscribers don't hang: call the stream-facing error emission routine (e.g.
invoke the subscription/stream emitter or subscribers' onError method) with the
caught Exception (replace with the actual symbol in your code, e.g.
subscription.onError(e) or streamEmitter.error(e) or iterate subscribers and
call s.onError(e)); only after that call lifecycleObserver.fileParseError(e) and
lifecycleObserver.close().
---
Nitpick comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java`:
- Around line 62-64: setFactoryContext currently writes factoryContext from any
thread while readers assume event-loop confinement; change
setFactoryContext(FactoryContext) so the assignment only happens on the event
loop: either assert the current thread is the event loop (e.g., check
inEventLoop()/isEventLoopThread and throw/require caller to call from the event
loop) or, if called off-loop, schedule the assignment onto the correct event
loop using that event loop's execute()/submit() so the field write occurs on the
same event loop that the readers use. Update the method setFactoryContext,
reference the FactoryContext field factoryContext, and ensure the event-loop
readers (the existing event-loop-confined paths) continue to see consistent
state.
In
`@xds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.java`:
- Around line 116-147: Extract the duplicated credential-injection branching
from httpPreprocessor() and httpDecorator() into a single private helper (e.g.,
injectCredentialOrRespond) that accepts (delegate, ctx, req) or the minimal
pieces (delegate, ServiceRequestContext ctx, HttpRequest req) and implements the
logic using credential, allowWithoutCredential, overwrite, headerExists(ctx,
req), ctx.setAdditionalRequestHeader(header, credential) and then calls
delegate.execute(ctx, req) or returns HttpResponse.of(HttpStatus.UNAUTHORIZED)
as appropriate; replace the bodies of httpPreprocessor() and httpDecorator() to
simply call this helper so both reuse the exact same code path.
In `@xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java`:
- Around line 75-82: Update
createStream(HttpFilter,httpFilter,Any,FactoryContext) to reuse a single static
sentinel XdsHttpFilter instance instead of allocating a new anonymous instance
on each null return and document the behavior: when create(...) returns null
(meaning "skip this filter entirely") createStream(...) will return
SnapshotStream.just(SENTINEL) where SENTINEL is a private static final
XdsHttpFilter no-op instance; also update the method/class Javadoc to explicitly
state this null-handling behavior so callers know a reused no-op sentinel is
returned.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 17b0cc7b-6832-4501-bad8-731e03ce6fac
📒 Files selected for processing (90)
.claude/skills/xds-dev/SKILL.mdit/xds-client/src/test/java/com/linecorp/armeria/xds/it/AuthTokenFilterTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/CredentialInjectorFilterTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/FilterFactoryEventLoopTest.javait/xds-istio/src/test/java/com/linecorp/armeria/it/xds/filter/IstioFilterFactories.javait/xds-istio/src/test/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactoryxds-api/src/main/proto/envoy/config/core/v3/extension.protoxds-api/src/main/proto/envoy/extensions/filters/http/credential_injector/v3/credential_injector.protoxds-api/src/main/proto/envoy/extensions/http/injected_credentials/generic/v3/generic.protoxds-api/src/main/proto/envoy/extensions/transport_sockets/tls/v3/secret.protoxds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/CertificateValidationContextStream.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterRoot.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterSnapshot.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterStream.javaxds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.javaxds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.javaxds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.javaxds/src/main/java/com/linecorp/armeria/xds/DataSourceStream.javaxds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/EndpointStream.javaxds/src/main/java/com/linecorp/armeria/xds/FactoryContext.javaxds/src/main/java/com/linecorp/armeria/xds/FilterUtil.javaxds/src/main/java/com/linecorp/armeria/xds/GenericSecretSnapshot.javaxds/src/main/java/com/linecorp/armeria/xds/GenericSecretStream.javaxds/src/main/java/com/linecorp/armeria/xds/GrpcConfigSourceStreamFactory.javaxds/src/main/java/com/linecorp/armeria/xds/GrpcServicesPreprocessor.javaxds/src/main/java/com/linecorp/armeria/xds/InterestPublisher.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerManager.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerRoot.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerStream.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/ParsedResources.javaxds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.javaxds/src/main/java/com/linecorp/armeria/xds/RawBufferTransportSocketFactory.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceNode.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/RouteEntry.javaxds/src/main/java/com/linecorp/armeria/xds/RouteStream.javaxds/src/main/java/com/linecorp/armeria/xds/SecretStream.javaxds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.javaxds/src/main/java/com/linecorp/armeria/xds/SnapshotWatcher.javaxds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.javaxds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.javaxds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.javaxds/src/main/java/com/linecorp/armeria/xds/TlsCertificateStream.javaxds/src/main/java/com/linecorp/armeria/xds/TransportSocketFactory.javaxds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.javaxds/src/main/java/com/linecorp/armeria/xds/UpstreamTlsTransportSocketFactory.javaxds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapBuilder.javaxds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.javaxds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.javaxds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.javaxds/src/main/java/com/linecorp/armeria/xds/XdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.javaxds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.javaxds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.javaxds/src/main/java/com/linecorp/armeria/xds/configsource/InterestedResources.javaxds/src/main/java/com/linecorp/armeria/xds/configsource/SotwConfigSourceSubscriptionFactory.javaxds/src/main/java/com/linecorp/armeria/xds/configsource/package-info.javaxds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.javaxds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.javaxds/src/main/java/com/linecorp/armeria/xds/internal/XdsCommonUtil.javaxds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest2Stream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest3Stream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/CombineNLatestStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/MapStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/RefCountedStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/StaticSnapshotStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/StreamSubscription.javaxds/src/main/java/com/linecorp/armeria/xds/stream/SwitchMapEagerStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/TriFunction.javaxds/src/main/java/com/linecorp/armeria/xds/stream/package-info.javaxds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactoryxds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.javaxds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.javaxds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.javaxds/src/test/java/com/linecorp/armeria/xds/stream/CombineNLatestStreamTest.javaxds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.javaxds/src/test/java/com/linecorp/armeria/xds/stream/StreamSwitchMapEagerTest.java
💤 Files with no reviewable changes (5)
- xds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.java
- xds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.java
- xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.java
- xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java
- xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java
| ``` | ||
| ./gradlew --parallel -Pretry=true :xds:test :xds-api:test :xds-validator:test :it:xds-client:test :it:xds-controlplane-api:test :it:xds-no-validation:test :it:xds-istio:test | ||
| ``` |
There was a problem hiding this comment.
Add a language tag to the fenced code block.
The code fence is missing a language specifier, which triggers MD040 and can reduce syntax highlighting/tooling quality.
Suggested fix
-```
+```bash
./gradlew --parallel -Pretry=true :xds:test :xds-api:test :xds-validator:test :it:xds-client:test :it:xds-controlplane-api:test :it:xds-no-validation:test :it:xds-istio:test</details>
<!-- suggestion_start -->
<details>
<summary>📝 Committable suggestion</summary>
> ‼️ **IMPORTANT**
> Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
```suggestion
🧰 Tools
🪛 markdownlint-cli2 (0.22.1)
[warning] 14-14: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In @.claude/skills/xds-dev/SKILL.md around lines 14 - 16, Update the fenced code
block containing the gradlew command so it includes a language tag (e.g., add
"bash" after the opening ```), e.g., change the block that wraps "./gradlew
--parallel -Pretry=true :xds:test :xds-api:test ..." to start with "```bash" to
satisfy MD040 and enable proper syntax highlighting.
| @Override | ||
| public void onUpdate(@Nullable T value, @Nullable Throwable error) { | ||
| maybeCancelAbsentTimer(); | ||
| for (SnapshotWatcher<? super T> watcher : watchers) { | ||
| try { | ||
| watcher.onUpdate(value, error); | ||
| } catch (Exception e) { | ||
| logger.warn("Unexpected exception while invoking onUpdate() with ({}, {}).", | ||
| type, resource, e); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Potential ConcurrentModificationException if a watcher callback modifies the watcher set.
If any watcher.onUpdate() callback invokes addWatcher() or removeWatcher() on this same CompositeSnapshotWatcher, iterating directly over watchers will throw ConcurrentModificationException.
Consider copying the set before iteration:
Proposed fix
`@Override`
public void onUpdate(`@Nullable` T value, `@Nullable` Throwable error) {
maybeCancelAbsentTimer();
- for (SnapshotWatcher<? super T> watcher : watchers) {
+ for (SnapshotWatcher<? super T> watcher : new ArrayList<>(watchers)) {
try {
watcher.onUpdate(value, error);
} catch (Exception e) {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java`
around lines 79 - 90, The current CompositeSnapshotWatcher.onUpdate calls
maybeCancelAbsentTimer() then iterates directly over the mutable watchers set,
which can throw ConcurrentModificationException if a watcher callback calls
addWatcher() or removeWatcher(); fix it by taking a snapshot copy of the
watchers collection before iterating (e.g., new ArrayList<>(watchers) or
similar) and iterate over that copy so callbacks can safely modify the original
set; ensure the change is applied inside the onUpdate method and still logs
exceptions the same way.
| @Override | ||
| public boolean equals(Object object) { | ||
| if (this == object) { | ||
| return true; | ||
| } | ||
| if (object == null || getClass() != object.getClass()) { | ||
| return false; | ||
| } | ||
| final GenericSecretSnapshot that = (GenericSecretSnapshot) object; | ||
| return Objects.equal(resource, that.resource); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(resource); | ||
| } |
There was a problem hiding this comment.
Include credential in equality and hash semantics.
Two snapshots with different resolved credentials currently compare equal. That can suppress credential-rotation updates anywhere equality-based deduping/caching is used.
Proposed fix
@@
`@Override`
public boolean equals(Object object) {
@@
}
final GenericSecretSnapshot that = (GenericSecretSnapshot) object;
- return Objects.equal(resource, that.resource);
+ return Objects.equal(resource, that.resource) &&
+ Objects.equal(credential, that.credential);
}
@@
`@Override`
public int hashCode() {
- return Objects.hashCode(resource);
+ return Objects.hashCode(resource, credential);
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@xds/src/main/java/com/linecorp/armeria/xds/GenericSecretSnapshot.java` around
lines 58 - 73, The equals and hashCode in GenericSecretSnapshot only consider
the resource field, causing instances with different credential values to be
treated as equal; update equals(Object) to also compare this.credential with
that.credential (e.g., include Objects.equal(credential, that.credential)
alongside the resource comparison) and update hashCode() to include both
resource and credential (e.g., compute hash from resource and credential) so
equality and hashing reflect both fields.
| public String toDebugString() { | ||
| return MoreObjects.toStringHelper(this) | ||
| .add("genericSecret", resource) | ||
| .toString(); |
There was a problem hiding this comment.
Avoid logging raw GenericSecret contents in debug output.
toDebugString() currently includes the full protobuf object, which may contain inline secret material and leak credentials to logs.
Proposed fix
@@
`@Override`
public String toDebugString() {
return MoreObjects.toStringHelper(this)
- .add("genericSecret", resource)
+ .add("hasSecret", resource.hasSecret())
+ .add("secretsCount", resource.getSecretsCount())
+ .add("credentialPresent", credential != null)
.toString();
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public String toDebugString() { | |
| return MoreObjects.toStringHelper(this) | |
| .add("genericSecret", resource) | |
| .toString(); | |
| public String toDebugString() { | |
| return MoreObjects.toStringHelper(this) | |
| .add("hasSecret", resource.hasSecret()) | |
| .add("secretsCount", resource.getSecretsCount()) | |
| .add("credentialPresent", credential != null) | |
| .toString(); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@xds/src/main/java/com/linecorp/armeria/xds/GenericSecretSnapshot.java` around
lines 82 - 85, The toDebugString() in GenericSecretSnapshot currently adds the
entire protobuf resource (resource) which can leak secret material; change it to
only include non-sensitive identifying fields instead of the full object — e.g.
replace .add("genericSecret", resource) with something like .add("name",
resource.getName() /* or resource.getMetadata().getName() */).add("version",
resource.getVersion() /* if available */) or .add("id", resource.getId()) so
that no inline secret contents from the GenericSecret protobuf are logged.
| if (typedConfig == Any.getDefaultInstance()) { | ||
| return Router.getDefaultInstance(); | ||
| } | ||
| if (!ROUTER_TYPE_URL.equals(typedConfig.getTypeUrl())) { | ||
| return null; |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In protobuf-java, is ==againstAny.getDefaultInstance()guaranteed to detect all logically emptyAnyvalues (including explicitly set empty messages), or shouldhasTypedConfig()/equals(defaultInstance) be used instead?
💡 Result:
Do not rely on == against Any.getDefaultInstance() to detect “logically empty” Any values. In protobuf-java, Any.getDefaultInstance() is documented as “an instance of the type with no fields set” (i.e., type_url empty and value empty) [1]. But == is Java reference identity, not semantic equality—so it is only guaranteed to work if you have the same object instance. Even the protobuf docs for getDefaultInstance()/getDefaultInstanceForType() explicitly note that the default instance “may or may not be a singleton” [1][2], which means object identity is not a reliable way to detect logical emptiness. Instead, use semantic equality: - Prefer any.equals(Any.getDefaultInstance()) (or any.equals(defaultInstance) where defaultInstance is the default-prototype you’re passed around) to check whether the payload is empty in the sense of “no fields set” [1]. - If you specifically care whether an embedded typed payload exists, use presence-style checks like hasTypedConfig() (if your generated code has such a method for your enclosing message) rather than comparing the container Any by identity. The official guidance for checking default vs set values in generated code is to use the appropriate presence/checking mechanisms rather than relying on object reference behavior [3]. Bottom line: use equals(defaultInstance) (or the higher-level presence method such as hasTypedConfig() in your containing message), not == with Any.getDefaultInstance().
Citations:
- 1: https://protobuf.dev/reference/java/api-docs/com/google/protobuf/Any.html
- 2: https://protobuf.dev/reference/java/api-docs/com/google/protobuf/Any.Builder.html
- 3: https://protobuf.dev/reference/java/java-generated/
Replace Any identity comparison with semantic checks in findRouter
findRouter(...) uses typedConfig == Any.getDefaultInstance(), which relies on Java reference identity; protobuf’s default instance may not be a singleton, so an explicitly-empty Any can fail the check and incorrectly return null. Use protobuf presence/value semantics instead.
♻️ Proposed fix
- final Any typedConfig = last.getTypedConfig();
- if (typedConfig == Any.getDefaultInstance()) {
+ if (!last.hasTypedConfig()) {
return Router.getDefaultInstance();
}
+ final Any typedConfig = last.getTypedConfig();
+ if (typedConfig.equals(Any.getDefaultInstance())) {
+ return Router.getDefaultInstance();
+ }
if (!ROUTER_TYPE_URL.equals(typedConfig.getTypeUrl())) {
return null;
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java`
around lines 68 - 72, The code in findRouter uses reference identity
(typedConfig == Any.getDefaultInstance()) which can fail for an empty Any;
replace that check with a semantic one such as
Any.getDefaultInstance().equals(typedConfig) or check
typedConfig.getTypeUrl().isEmpty() so an explicitly-empty Any is detected
correctly and you still return Router.getDefaultInstance(); keep the subsequent
type-url guard using ROUTER_TYPE_URL and typedConfig.getTypeUrl() unchanged.
4da75eb to
8473f79
Compare
🔍 Build Scan® (commit: 0342d9e) |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@xds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.java`:
- Line 30: The non-volatile field `closed` in class RescheduleSubscription can
lead to visibility issues between threads; change the declaration of `closed` to
a volatile boolean (i.e., `private volatile boolean closed;`) so writes in
`close()` are visible to the lambda scheduled in `onUpdate()` (and any other
readers) and no further synchronization is needed for that flag.
- Around line 37-39: In RescheduleSubscription, validate the required upstream
parameter in the setUpstream(StreamSubscription upstream) method by calling
Objects.requireNonNull(upstream, "upstream") before assigning to this.upstream;
also add or ensure the import java.util.Objects is present. This replaces the
plain assignment in setUpstream and guarantees the upstream reference is
non-null per coding guidelines.
- Around line 32-35: The RescheduleSubscription constructor currently doesn't
validate its inputs; update the constructor
(RescheduleSubscription(SnapshotWatcher<? super T> downstream, EventExecutor
eventLoop)) to call Objects.requireNonNull(downstream, "downstream") and
Objects.requireNonNull(eventLoop, "eventLoop") before assigning to
this.downstream and this.eventLoop so both parameters are validated per coding
guidelines.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 4229bace-7253-4aad-a4b0-a6c8ad727c72
📒 Files selected for processing (4)
it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.javaxds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.javaxds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.javaxds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java
🚧 Files skipped from review as they are similar to previous changes (3)
- it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.java
- xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java
- xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java
| private final EventExecutor eventLoop; | ||
| @Nullable | ||
| private StreamSubscription upstream; | ||
| private boolean closed; |
There was a problem hiding this comment.
Make closed volatile to ensure visibility across threads.
The closed field is written by close() (potentially from any thread) and read by the lambda scheduled in onUpdate() (executed on the eventLoop thread). Without volatile, the eventLoop thread may not observe the updated value, causing the subscription to deliver updates after close() has been called.
🔒 Proposed fix
- private boolean closed;
+ private volatile boolean closed;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private boolean closed; | |
| private volatile boolean closed; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@xds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.java`
at line 30, The non-volatile field `closed` in class RescheduleSubscription can
lead to visibility issues between threads; change the declaration of `closed` to
a volatile boolean (i.e., `private volatile boolean closed;`) so writes in
`close()` are visible to the lambda scheduled in `onUpdate()` (and any other
readers) and no further synchronization is needed for that flag.
| RescheduleSubscription(SnapshotWatcher<? super T> downstream, EventExecutor eventLoop) { | ||
| this.downstream = downstream; | ||
| this.eventLoop = eventLoop; | ||
| } |
There was a problem hiding this comment.
Add null checks for constructor parameters.
Per coding guidelines, validate non-null parameters using Objects.requireNonNull. Both downstream and eventLoop should be validated.
✅ Proposed fix
+import static java.util.Objects.requireNonNull;
+
final class RescheduleSubscription<T> implements SnapshotWatcher<T>, StreamSubscription {
private final SnapshotWatcher<? super T> downstream;
private final EventExecutor eventLoop;
`@Nullable`
private StreamSubscription upstream;
private volatile boolean closed;
RescheduleSubscription(SnapshotWatcher<? super T> downstream, EventExecutor eventLoop) {
- this.downstream = downstream;
- this.eventLoop = eventLoop;
+ this.downstream = requireNonNull(downstream, "downstream");
+ this.eventLoop = requireNonNull(eventLoop, "eventLoop");
}As per coding guidelines: "null-handling via Objects.requireNonNull(x, "param")".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| RescheduleSubscription(SnapshotWatcher<? super T> downstream, EventExecutor eventLoop) { | |
| this.downstream = downstream; | |
| this.eventLoop = eventLoop; | |
| } | |
| import static java.util.Objects.requireNonNull; | |
| final class RescheduleSubscription<T> implements SnapshotWatcher<T>, StreamSubscription { | |
| private final SnapshotWatcher<? super T> downstream; | |
| private final EventExecutor eventLoop; | |
| `@Nullable` | |
| private StreamSubscription upstream; | |
| private volatile boolean closed; | |
| RescheduleSubscription(SnapshotWatcher<? super T> downstream, EventExecutor eventLoop) { | |
| this.downstream = requireNonNull(downstream, "downstream"); | |
| this.eventLoop = requireNonNull(eventLoop, "eventLoop"); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@xds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.java`
around lines 32 - 35, The RescheduleSubscription constructor currently doesn't
validate its inputs; update the constructor
(RescheduleSubscription(SnapshotWatcher<? super T> downstream, EventExecutor
eventLoop)) to call Objects.requireNonNull(downstream, "downstream") and
Objects.requireNonNull(eventLoop, "eventLoop") before assigning to
this.downstream and this.eventLoop so both parameters are validated per coding
guidelines.
| void setUpstream(StreamSubscription upstream) { | ||
| this.upstream = upstream; | ||
| } |
There was a problem hiding this comment.
Add null check for upstream parameter.
The parameter should be validated with Objects.requireNonNull per coding guidelines, since it represents a required upstream subscription reference.
✅ Proposed fix
void setUpstream(StreamSubscription upstream) {
- this.upstream = upstream;
+ this.upstream = requireNonNull(upstream, "upstream");
}As per coding guidelines: "null-handling via Objects.requireNonNull(x, "param")".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| void setUpstream(StreamSubscription upstream) { | |
| this.upstream = upstream; | |
| } | |
| void setUpstream(StreamSubscription upstream) { | |
| this.upstream = Objects.requireNonNull(upstream, "upstream"); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@xds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.java`
around lines 37 - 39, In RescheduleSubscription, validate the required upstream
parameter in the setUpstream(StreamSubscription upstream) method by calling
Objects.requireNonNull(upstream, "upstream") before assigning to this.upstream;
also add or ensure the import java.util.Objects is present. This replaces the
plain assignment in setUpstream and guarantees the upstream reference is
non-null per coding guidelines.
This PR is a subset of changes for #6781 Motivation: The `SnapshotStream` reactive infrastructure in the `xds` module is currently package-private, which prevents xDS extension factories (e.g. `HttpFilterFactory`) from using reactive streams to model filters that depend on external resources. Making this API public is a prerequisite for upcoming features like `CredentialInjectorFilterFactory` that need to subscribe to SDS secrets reactively. Modifications: - Moved the `SnapshotStream` reactive stream infrastructure from package-private `com.linecorp.armeria.xds` to a new public `com.linecorp.armeria.xds.stream` package. - `SnapshotStream`, `RefCountedStream`, `Subscription` (formerly `SnapshotStream.Subscription`), and all operators (`MapStream`, `SwitchMapEagerStream`, `CombineLatest2Stream`, `CombineLatest3Stream`, `CombineNLatestStream`, `StaticSnapshotStream`, `TriFunction`). - Made `SnapshotWatcher` public so it can be used as the subscriber type in the public stream API. - Updated all internal stream consumers (`ClusterStream`, `EndpointStream`, `ListenerStream`, `RouteStream`, `SecretStream`, `DataSourceStream`, `TransportSocketStream`, etc.) to import from the new `xds.stream` package. - Simplified `ResourceNodeMeterBinderFactory.ResourceNodeMeterBinder` to no longer implement `ResourceWatcher`, using plain methods instead. - Moved stream tests to the `com.linecorp.armeria.xds.stream` package. Result: - The `SnapshotStream` API is now public and available for xDS extension factories to build reactive filter pipelines. - No behavioral changes — this is a pure refactoring with no changes to existing functionality.
86b76f7 to
5686281
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java (1)
99-109:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winKeep watch-registration failures on
eventLoop.The success path reschedules back onto
eventLoop, but thecatchbranch callsemit(null, e)andlifecycleObserver.close()directly from the blocking executor. Ifclose()wins whilewatchService.register(...)is still in flight, this path can still emit after shutdown and race the event-loop-owned lifecycle state. Please hop the failure path ontoeventLoopand skipemit(...)onceclosedis set.Based on learnings, xDS paths in `com.linecorp.armeria.xds` rely on the event-loop single-threading contract once they enter stream lifecycle code.💡 Proposed fix
} catch (Exception e) { - lifecycleObserver.fileParseError(e); - emit(null, e); - lifecycleObserver.close(); + eventLoop.execute(() -> { + if (closed) { + lifecycleObserver.close(); + return; + } + lifecycleObserver.fileParseError(e); + emit(null, e); + lifecycleObserver.close(); + }); return; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java` around lines 99 - 109, The catch block inside the blockingTaskExecutor lambda (around watchService.register in PathSotwConfigSourceSubscriptionFactory) performs lifecycleObserver.fileParseError(e), emit(null, e) and lifecycleObserver.close() on the blocking thread; move those error-handling calls onto the eventLoop by scheduling a task with eventLoop.execute(() -> { ... }), and inside that task check the subscription/observer closed state before calling emit(null, e) so you skip emit if already closed; ensure lifecycleObserver.close() is still invoked on the eventLoop after the closed check to preserve the event-loop single-threading contract.xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java (1)
1-5:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUpdate this modified file to the LY header.
This file changed in the PR, but the header still says
LINE Corporation. The repo guide explicitly requires modified source files to keep the LY header.As per coding guidelines, "Every modified source file must keep the required LY copyright header (don’t introduce noise by updating $today.year on every file)."💡 Proposed fix
- * Copyright 2023 LINE Corporation + * Copyright 2023 LY Corporation @@ - * LINE Corporation licenses this file to you under the Apache License, + * LY Corporation licenses this file to you under the Apache License,🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java` around lines 1 - 5, The file's license header still references "LINE Corporation" and must be updated to the required LY header; locate the AdsXdsStream class file (class name AdsXdsStream) and replace the existing top-of-file copyright block that mentions "LINE Corporation" with the standard LY copyright header used across the repo, preserving the same license text and year handling convention (do not change the date to today), then save the file so the header matches other modified sources.
♻️ Duplicate comments (1)
.claude/skills/xds-dev/SKILL.md (1)
14-16:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAdd a language tag to the fenced code block.
The code fence is missing a language specifier, which reduces syntax highlighting quality and triggers MD040.
📝 Proposed fix
-``` +```bash ./gradlew --parallel -Pretry=true :xds:test :xds-api:test :xds-validator:test :it:xds-client:test :it:xds-controlplane-api:test :it:xds-no-validation:test :it:xds-istio:test</details> <details> <summary>🤖 Prompt for AI Agents</summary>Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.In @.claude/skills/xds-dev/SKILL.md around lines 14 - 16, The fenced code block
containing the gradlew command is missing a language tag; update the opening
fence for the block that contains the line starting with "./gradlew --parallel
-Pretry=true :xds:test ..." to include a language specifier (e.g., "bash") so
the block becomes ```bash, which will enable proper syntax highlighting and
satisfy MD040.</details> </blockquote></details> </blockquote></details> <details> <summary>🤖 Prompt for all review comments with AI agents</summary>Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.Outside diff comments:
In@xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java:
- Around line 1-5: The file's license header still references "LINE Corporation"
and must be updated to the required LY header; locate the AdsXdsStream class
file (class name AdsXdsStream) and replace the existing top-of-file copyright
block that mentions "LINE Corporation" with the standard LY copyright header
used across the repo, preserving the same license text and year handling
convention (do not change the date to today), then save the file so the header
matches other modified sources.In
@xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java:
- Around line 99-109: The catch block inside the blockingTaskExecutor lambda
(around watchService.register in PathSotwConfigSourceSubscriptionFactory)
performs lifecycleObserver.fileParseError(e), emit(null, e) and
lifecycleObserver.close() on the blocking thread; move those error-handling
calls onto the eventLoop by scheduling a task with eventLoop.execute(() -> { ...
}), and inside that task check the subscription/observer closed state before
calling emit(null, e) so you skip emit if already closed; ensure
lifecycleObserver.close() is still invoked on the eventLoop after the closed
check to preserve the event-loop single-threading contract.
Duplicate comments:
In @.claude/skills/xds-dev/SKILL.md:
- Around line 14-16: The fenced code block containing the gradlew command is
missing a language tag; update the opening fence for the block that contains the
line starting with "./gradlew --parallel -Pretry=true :xds:test ..." to include
a language specifier (e.g., "bash") so the block becomes ```bash, which will
enable proper syntax highlighting and satisfy MD040.</details> --- <details> <summary>ℹ️ Review info</summary> <details> <summary>⚙️ Run configuration</summary> **Configuration used**: Path: .coderabbit.yaml **Review profile**: CHILL **Plan**: Pro **Run ID**: `95154f8c-aeb6-4abf-9307-76cc6d591a5f` </details> <details> <summary>📥 Commits</summary> Reviewing files that changed from the base of the PR and between 86b76f7ab4287e005bfae63fe75aaa7fd6b403da and 5686281315ecdc2ebeb78a0c7dc9a616e574b7da. </details> <details> <summary>📒 Files selected for processing (68)</summary> * `.claude/skills/xds-dev/SKILL.md` * `it/xds-client/src/test/java/com/linecorp/armeria/xds/client/endpoint/HttpProtocolOptionsTest.java` * `it/xds-client/src/test/java/com/linecorp/armeria/xds/it/AuthTokenFilterTest.java` * `it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CredentialInjectorFilterTest.java` * `it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.java` * `it/xds-client/src/test/java/com/linecorp/armeria/xds/it/FilterFactoryEventLoopTest.java` * `it/xds-istio/src/test/java/com/linecorp/armeria/it/xds/filter/IstioFilterFactories.java` * `it/xds-istio/src/test/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactory` * `xds-api/src/main/proto/envoy/config/cluster/v3/cluster.proto` * `xds-api/src/main/proto/envoy/config/core/v3/extension.proto` * `xds-api/src/main/proto/envoy/extensions/filters/http/credential_injector/v3/credential_injector.proto` * `xds-api/src/main/proto/envoy/extensions/http/injected_credentials/generic/v3/generic.proto` * `xds-api/src/main/proto/envoy/extensions/transport_sockets/tls/v3/secret.proto` * `xds-api/src/main/proto/envoy/extensions/upstreams/http/v3/http_protocol_options.proto` * `xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java` * `xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java` * `xds/src/main/java/com/linecorp/armeria/xds/ClusterSnapshot.java` * `xds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.java` * `xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java` * `xds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.java` * `xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java` * `xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.java` * `xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java` * `xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java` * `xds/src/main/java/com/linecorp/armeria/xds/FactoryContext.java` * `xds/src/main/java/com/linecorp/armeria/xds/FilterUtil.java` * `xds/src/main/java/com/linecorp/armeria/xds/GenericSecretSnapshot.java` * `xds/src/main/java/com/linecorp/armeria/xds/GenericSecretStream.java` * `xds/src/main/java/com/linecorp/armeria/xds/GrpcConfigSourceStreamFactory.java` * `xds/src/main/java/com/linecorp/armeria/xds/GrpcServicesPreprocessor.java` * `xds/src/main/java/com/linecorp/armeria/xds/InterestPublisher.java` * `xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java` * `xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java` * `xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java` * `xds/src/main/java/com/linecorp/armeria/xds/ParsedResources.java` * `xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java` * `xds/src/main/java/com/linecorp/armeria/xds/ResourceNode.java` * `xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java` * `xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java` * `xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java` * `xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java` * `xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java` * `xds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.java` * `xds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.java` * `xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java` * `xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java` * `xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java` * `xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapBuilder.java` * `xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java` * `xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java` * `xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java` * `xds/src/main/java/com/linecorp/armeria/xds/XdsStream.java` * `xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java` * `xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java` * `xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java` * `xds/src/main/java/com/linecorp/armeria/xds/configsource/InterestedResources.java` * `xds/src/main/java/com/linecorp/armeria/xds/configsource/SotwConfigSourceSubscriptionFactory.java` * `xds/src/main/java/com/linecorp/armeria/xds/configsource/package-info.java` * `xds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.java` * `xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java` * `xds/src/main/java/com/linecorp/armeria/xds/internal/XdsCommonUtil.java` * `xds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.java` * `xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java` * `xds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactory` * `xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java` * `xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java` * `xds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.java` * `xds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java` </details> <details> <summary>💤 Files with no reviewable changes (4)</summary> * xds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.java * xds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.java * xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.java * xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java </details> <details> <summary>✅ Files skipped from review due to trivial changes (7)</summary> * xds-api/src/main/proto/envoy/config/core/v3/extension.proto * xds-api/src/main/proto/envoy/extensions/transport_sockets/tls/v3/secret.proto * xds-api/src/main/proto/envoy/config/cluster/v3/cluster.proto * xds/src/main/java/com/linecorp/armeria/xds/configsource/package-info.java * xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java * xds-api/src/main/proto/envoy/extensions/upstreams/http/v3/http_protocol_options.proto * xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java </details> <details> <summary>🚧 Files skipped from review as they are similar to previous changes (44)</summary> * xds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactory * xds/src/main/java/com/linecorp/armeria/xds/InterestPublisher.java * xds/src/main/java/com/linecorp/armeria/xds/XdsStream.java * xds/src/main/java/com/linecorp/armeria/xds/GenericSecretStream.java * xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java * xds/src/main/java/com/linecorp/armeria/xds/ResourceNode.java * xds/src/main/java/com/linecorp/armeria/xds/configsource/SotwConfigSourceSubscriptionFactory.java * xds/src/main/java/com/linecorp/armeria/xds/configsource/InterestedResources.java * xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java * xds-api/src/main/proto/envoy/extensions/filters/http/credential_injector/v3/credential_injector.proto * xds/src/main/java/com/linecorp/armeria/xds/FactoryContext.java * xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java * xds/src/main/java/com/linecorp/armeria/xds/GenericSecretSnapshot.java * xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java * xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java * xds-api/src/main/proto/envoy/extensions/http/injected_credentials/generic/v3/generic.proto * xds/src/main/java/com/linecorp/armeria/xds/ParsedResources.java * xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java * xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java * xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java * xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java * xds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.java * xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java * it/xds-istio/src/test/java/com/linecorp/armeria/it/xds/filter/IstioFilterFactories.java * xds/src/main/java/com/linecorp/armeria/xds/ClusterSnapshot.java * xds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.java * xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java * xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapBuilder.java * xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java * xds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.java * xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java * xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java * xds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.java * xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java * xds/src/main/java/com/linecorp/armeria/xds/internal/XdsCommonUtil.java * it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CredentialInjectorFilterTest.java * xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java * xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java * xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java * it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.java * xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java * xds/src/main/java/com/linecorp/armeria/xds/FilterUtil.java * it/xds-client/src/test/java/com/linecorp/armeria/xds/it/FilterFactoryEventLoopTest.java * xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java </details> </details> <!-- This is an auto-generated comment by CodeRabbit for review status -->
5686281 to
a357f5b
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6781 +/- ##
============================================
+ Coverage 74.46% 75.02% +0.56%
- Complexity 22234 24962 +2728
============================================
Files 1963 2215 +252
Lines 82437 92693 +10256
Branches 10764 12112 +1348
============================================
+ Hits 61385 69547 +8162
- Misses 15918 17356 +1438
- Partials 5134 5790 +656 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
This PR should be reviewed after #6784 This PR is a subset of #6781 Motivation: xDS extension factories (e.g. `HttpFilterFactory`) currently receive only an `XdsResourceValidator`, which is insufficient for filters that depend on external xDS resources like SDS secrets. The `CredentialInjectorFilterFactory` (`envoy.filters.http.credential_injector`) needs to reactively subscribe to generic secrets via SDS and inject credentials into outgoing requests, requiring access to the event loop, metrics, and secret streams at construction time. Modifications: - Added `FactoryContext` interface providing runtime infrastructure to extension factories (`eventLoop()`, `meterRegistry()`, `meterIdPrefix()`, `validator()`, `genericSecretStream()`). - Changed `HttpFilterFactory.create()` to accept `FactoryContext` instead of `XdsResourceValidator`. - Added `HttpFilterFactory.createStream()` that returns a `SnapshotStream<XdsHttpFilter>` for reactive filter lifecycle management. - Added `XdsHttpFilter.NOOP` static constant for no-op filter instances. - Added `GenericSecretSnapshot` and `GenericSecretStream` for resolving generic secrets via SDS. - Made the filter pipeline reactive: `FilterUtil` now returns `SnapshotStream<ClientPreprocessors>` / `SnapshotStream<ClientDecoration>`, and `RouteStream` composes cluster, downstream, and upstream filter streams via `combineLatest`. - Simplified `RouteEntry` to accept pre-built filter configs instead of building them internally. - Updated `ListenerResourceParser` / `ListenerXdsResource` to parse and carry the `Router` HTTP filter for upstream filter extraction. - Made `SubscriptionContext` extend `FactoryContext` for internal use. - Added `CredentialInjectorFilterFactory` implementing `envoy.filters.http.credential_injector`, registered via service loader. - Annotated `TypedExtensionConfig`, `CredentialInjector`, `Generic`, and `Secret.generic_secret` proto fields as supported. - Updated `RouterFilterFactory` and `IstioFilterFactories` to use `FactoryContext`. Result: - xDS extension factories can now build reactive filter pipelines that depend on external resources like SDS secrets. - The `CredentialInjectorFilterFactory` injects credentials from SDS-backed generic secrets into outgoing HTTP requests, supporting the `envoy.filters.http.credential_injector` filter type.
a357f5b to
0342d9e
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
xds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.java (1)
67-75: ⚡ Quick winEnsure parameter validation in the implementation.
The default implementation of this method in
SubscriptionContext(context snippet 1) does not validate thatclusterNameis non-null before passing it toclusterManager().register(...). Since this is a user-facing public API that can be called from custom extension factories, the parameter should be validated at the start of the implementation.As per coding guidelines: "do explicit null-checks on user-facing public parameters (use
Objects.requireNonNull)".Suggested fix for the implementation
In the default implementation (SubscriptionContext.java), add:
default SnapshotStream<ClusterSnapshot> clusterStream(String clusterName) { + requireNonNull(clusterName, "clusterName"); final SnapshotStream<ClusterSnapshot> stream = watcher -> clusterManager().register(clusterName, this, watcher); return stream.checkSubscribeOn(eventLoop()); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.java` around lines 67 - 75, The default implementation of clusterStream in SubscriptionContext must validate the public parameter; add an explicit null-check using Objects.requireNonNull(clusterName, "clusterName") at the start of SubscriptionContext.clusterStream before calling clusterManager().register(...) (and add the java.util.Objects import if missing) so the user-facing FactoryContext.clusterStream contract enforces non-null inputs.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.java`:
- Around line 67-75: The default implementation of clusterStream in
SubscriptionContext must validate the public parameter; add an explicit
null-check using Objects.requireNonNull(clusterName, "clusterName") at the start
of SubscriptionContext.clusterStream before calling
clusterManager().register(...) (and add the java.util.Objects import if missing)
so the user-facing FactoryContext.clusterStream contract enforces non-null
inputs.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: f7dcee23-0aa7-4612-88fb-20a95a253686
📒 Files selected for processing (49)
.claude/skills/xds-dev/SKILL.mdit/xds-client/src/test/java/com/linecorp/armeria/xds/client/endpoint/HttpProtocolOptionsTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/AuthTokenFilterTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.javait/xds-client/src/test/java/com/linecorp/armeria/xds/it/FilterFactoryEventLoopTest.javait/xds-istio/src/test/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactoryxds-api/src/main/proto/envoy/config/cluster/v3/cluster.protoxds-api/src/main/proto/envoy/extensions/upstreams/http/v3/http_protocol_options.protoxds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterSnapshot.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.javaxds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.javaxds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.javaxds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.javaxds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/GrpcConfigSourceStreamFactory.javaxds/src/main/java/com/linecorp/armeria/xds/GrpcServicesPreprocessor.javaxds/src/main/java/com/linecorp/armeria/xds/InterestPublisher.javaxds/src/main/java/com/linecorp/armeria/xds/ParsedResources.javaxds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceNode.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.javaxds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.javaxds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.javaxds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.javaxds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapBuilder.javaxds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.javaxds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.javaxds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.javaxds/src/main/java/com/linecorp/armeria/xds/XdsStream.javaxds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.javaxds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.javaxds/src/main/java/com/linecorp/armeria/xds/configsource/InterestedResources.javaxds/src/main/java/com/linecorp/armeria/xds/configsource/SotwConfigSourceSubscriptionFactory.javaxds/src/main/java/com/linecorp/armeria/xds/configsource/package-info.javaxds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.javaxds/src/main/java/com/linecorp/armeria/xds/internal/XdsCommonUtil.javaxds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactoryxds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.javaxds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.javaxds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.javaxds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java
💤 Files with no reviewable changes (4)
- xds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.java
- xds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.java
- xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.java
- xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java
✅ Files skipped from review due to trivial changes (2)
- xds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java
- xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java
🚧 Files skipped from review as they are similar to previous changes (35)
- xds-api/src/main/proto/envoy/config/cluster/v3/cluster.proto
- xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java
- xds/src/main/java/com/linecorp/armeria/xds/GrpcServicesPreprocessor.java
- xds/src/main/java/com/linecorp/armeria/xds/XdsStream.java
- xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java
- xds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.java
- xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java
- xds/src/main/java/com/linecorp/armeria/xds/configsource/SotwConfigSourceSubscriptionFactory.java
- xds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.java
- it/xds-client/src/test/java/com/linecorp/armeria/xds/client/endpoint/HttpProtocolOptionsTest.java
- xds-api/src/main/proto/envoy/extensions/upstreams/http/v3/http_protocol_options.proto
- xds/src/main/java/com/linecorp/armeria/xds/configsource/package-info.java
- xds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.java
- xds/src/main/java/com/linecorp/armeria/xds/configsource/InterestedResources.java
- xds/src/main/java/com/linecorp/armeria/xds/ParsedResources.java
- xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java
- xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java
- xds/src/main/java/com/linecorp/armeria/xds/InterestPublisher.java
- it/xds-client/src/test/java/com/linecorp/armeria/xds/it/FilterFactoryEventLoopTest.java
- xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java
- xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java
- xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java
- it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.java
- xds/src/main/java/com/linecorp/armeria/xds/GrpcConfigSourceStreamFactory.java
- xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapBuilder.java
- xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java
- xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java
- xds/src/main/java/com/linecorp/armeria/xds/internal/XdsCommonUtil.java
- xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java
- xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java
- it/xds-client/src/test/java/com/linecorp/armeria/xds/it/AuthTokenFilterTest.java
- xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java
- xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java
- xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java
- xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java
Motivation:
The xDS filter and config source extension system lacked the ability to reactively subscribe to xDS resources (clusters, secrets) from within extension factories. Filter factories received only a
XdsResourceValidator— insufficient for use cases like credential injection (which needs SDS secrets) or custom config sources (which need to resolve clusters for HTTP-based config fetching). Additionally, the common pattern of selecting an endpoint from aClusterSnapshot, configuring TLS, and setting the endpoint on a request context was duplicated acrossRouterFilter,GrpcServicesPreprocessor, and test code.Modifications:
Extension factory APIs
FactoryContextinterface providingeventLoop(),meterRegistry(),genericSecretStream(), andclusterStream()to extension factories during creation.HttpFilterFactory.create()to acceptFactoryContextinstead ofXdsResourceValidator.HttpFilterFactory.createStream()default method for factories that need reactive xDS resource subscriptions (returnsSnapshotStream<XdsHttpFilter>).XdsBootstrapBuilder.extensionFactory()for registering custom factories that take precedence over ServiceLoader-discovered ones.SnapshotStream,StreamSubscription, and reactive operators (map,switchMapEager,combineLatest,just) to the new publiccom.linecorp.armeria.xds.streampackage.ClusterSnapshot endpoint selection
ClusterSnapshot.httpPreprocessor()returning anHttpPreprocessorthat selects an endpoint, configures TLS/session protocol, and sets the endpoint on the context.XdsCommonUtil.applyClusterToCtx()consolidating the endpoint selection + TLS + endpoint group pattern; makesetTlsParams()private.RouterFilterandGrpcServicesPreprocessorto useapplyClusterToCtx().New integrations
GenericSecretSnapshotandGenericSecretStreamfor resolving SDS generic secrets reactively.CredentialInjectorFilterFactoryimplementing theenvoy.filters.http.credential_injectorfilter with Generic credential provider, usingcreateStream()+genericSecretStream().AuthTokenFilterTestdemonstrating a customHttpFilterFactorywithcreateStream()+clusterStream()+ mTLS viaUpstreamTlsContext+AsyncLoadertoken caching.CustomConfigSourceTestdemonstrating a customSotwConfigSourceSubscriptionFactorythat fetches cluster config via HTTP usingclusterStream()+httpPreprocessor().CredentialInjectorFilterTestandFilterFactoryEventLoopTest.Result:
FactoryContext.SnapshotStreamreactive operators are now public API, enabling extension authors to compose reactive xDS data flows.ClusterSnapshot.httpPreprocessor()provides a convenient way to create aWebClientbacked by an xDS cluster with automatic endpoint selection and TLS configuration.envoy.filters.http.credential_injectorfilter is now supported out of the box.