Add support for credential_injector#6785
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
💤 Files with no reviewable changes (1)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughAdds Envoy credential injector support with reactive SDS/file/bootstrap secret resolution, FactoryContext API, SnapshotStream event-loop wrappers and RescheduleSubscription, reactive filter composition via FilterUtil, explicit Router extraction and route composition updates, plus integration tests covering injection scenarios. ChangesCredential Injector Filter Implementation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@xds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.java`:
- Around line 57-76: The new public methods in CredentialInjectorFilterFactory
(name(), typeUrls(), create(HttpFilter, Any, FactoryContext),
createStream(HttpFilter, Any, FactoryContext)) lack method-level Javadoc; add
concise Javadoc to each public/protected method in the class (or use
{`@inheritDoc`} where it simply implements/overrides behavior) so the public API
is documented per guidelines—ensure the Javadoc on create(...) explains it
throws UnsupportedOperationException and that createStream(...) returns a
SnapshotStream for reactive secret subscription, and include brief descriptions
for name() and typeUrls().
- Around line 69-76: The public methods create(...) and createStream(...) in
CredentialInjectorFilterFactory must validate their parameters; add explicit
null checks using Objects.requireNonNull for each user-facing parameter
(httpFilter, config, and context) at the start of both methods (e.g.,
Objects.requireNonNull(httpFilter, "httpFilter"); Objects.requireNonNull(config,
"config"); Objects.requireNonNull(context, "context")). Ensure these checks are
placed before any other logic or thrown exceptions so callers get a clear NPE
with the parameter name.
In `@xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java`:
- Around line 72-171: Add explicit null checks using Objects.requireNonNull for
all public API parameters: in SnapshotStream.map validate "mapper"; in
switchMapEager validate "mapper"; in combineNLatest validate "streams"; in both
combineLatest overloads validate "a", "b", and "combiner" (and "c" for the 3-arg
overload); and in error validate "error". Use Objects.requireNonNull(param,
"paramName") at the start of each method before constructing the corresponding
stream implementation (MapStream, SwitchMapEagerStream, CombineNLatestStream,
CombineLatest2Stream, CombineLatest3Stream, StaticSnapshotStream) so invalid
null inputs fail fast with clear messages.
In `@xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java`:
- Around line 33-37: SubscriptionContext#genericSecretStream currently forwards
a potentially null sdsSecretConfig into new SecretStream(...) without
validation; add a null check at the start of the default method
(genericSecretStream) that throws a clear NPE (or IllegalArgumentException) if
sdsSecretConfig is null, then proceed to construct new
SecretStream(sdsSecretConfig, null, this). Keep the rest of the chain
(.switchMapEager(... new GenericSecretStream(this, resource))
.checkSubscribeOn(eventLoop())) unchanged so behavior and call sites remain the
same.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 67323ba0-838a-4222-8d0c-b2503814f361
📒 Files selected for processing (54)
it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CredentialInjectorFilterTest.javait/xds-istio/src/test/java/com/linecorp/armeria/it/xds/filter/IstioFilterFactories.javaxds-api/src/main/proto/envoy/config/core/v3/extension.protoxds-api/src/main/proto/envoy/extensions/filters/http/credential_injector/v3/credential_injector.protoxds-api/src/main/proto/envoy/extensions/http/injected_credentials/generic/v3/generic.protoxds-api/src/main/proto/envoy/extensions/transport_sockets/tls/v3/secret.protoxds/src/main/java/com/linecorp/armeria/xds/CertificateValidationContextStream.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterRoot.javaxds/src/main/java/com/linecorp/armeria/xds/ClusterStream.javaxds/src/main/java/com/linecorp/armeria/xds/DataSourceStream.javaxds/src/main/java/com/linecorp/armeria/xds/EndpointStream.javaxds/src/main/java/com/linecorp/armeria/xds/FilterUtil.javaxds/src/main/java/com/linecorp/armeria/xds/GenericSecretSnapshot.javaxds/src/main/java/com/linecorp/armeria/xds/GenericSecretStream.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerManager.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerRoot.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerStream.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/RawBufferTransportSocketFactory.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.javaxds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.javaxds/src/main/java/com/linecorp/armeria/xds/RouteEntry.javaxds/src/main/java/com/linecorp/armeria/xds/RouteStream.javaxds/src/main/java/com/linecorp/armeria/xds/SecretStream.javaxds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.javaxds/src/main/java/com/linecorp/armeria/xds/SnapshotWatcher.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.javaxds/src/main/java/com/linecorp/armeria/xds/TlsCertificateStream.javaxds/src/main/java/com/linecorp/armeria/xds/TransportSocketFactory.javaxds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.javaxds/src/main/java/com/linecorp/armeria/xds/UpstreamTlsTransportSocketFactory.javaxds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.javaxds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.javaxds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.javaxds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.javaxds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.javaxds/src/main/java/com/linecorp/armeria/xds/filter/XdsHttpFilter.javaxds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest2Stream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest3Stream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/CombineNLatestStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/MapStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/RefCountedStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.javaxds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/StaticSnapshotStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/Subscription.javaxds/src/main/java/com/linecorp/armeria/xds/stream/SwitchMapEagerStream.javaxds/src/main/java/com/linecorp/armeria/xds/stream/TriFunction.javaxds/src/main/java/com/linecorp/armeria/xds/stream/package-info.javaxds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.filter.HttpFilterFactoryxds/src/test/java/com/linecorp/armeria/xds/stream/CombineNLatestStreamTest.javaxds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.javaxds/src/test/java/com/linecorp/armeria/xds/stream/StreamSwitchMapEagerTest.java
💤 Files with no reviewable changes (1)
- xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java
| public XdsHttpFilter create(HttpFilter httpFilter, Any config, FactoryContext context) { | ||
| throw new UnsupportedOperationException( | ||
| "credential_injector requires reactive secret subscription; use createStream()"); | ||
| } | ||
|
|
||
| @Override | ||
| public SnapshotStream<XdsHttpFilter> createStream(HttpFilter httpFilter, Any config, | ||
| FactoryContext context) { |
There was a problem hiding this comment.
Validate public API inputs explicitly with Objects.requireNonNull.
Line 69 and Line 75 expose public entry points but do not perform required null validation on parameters.
Suggested fix
import java.util.List;
+import java.util.Objects;
@@
`@Nullable`
public XdsHttpFilter create(HttpFilter httpFilter, Any config, FactoryContext context) {
+ Objects.requireNonNull(httpFilter, "httpFilter");
+ Objects.requireNonNull(config, "config");
+ Objects.requireNonNull(context, "context");
throw new UnsupportedOperationException(
"credential_injector requires reactive secret subscription; use createStream()");
}
@@
public SnapshotStream<XdsHttpFilter> createStream(HttpFilter httpFilter, Any config,
FactoryContext context) {
+ Objects.requireNonNull(httpFilter, "httpFilter");
+ Objects.requireNonNull(config, "config");
+ Objects.requireNonNull(context, "context");
final CredentialInjector injectorConfig = context.validator().unpack(config,
CredentialInjector.class);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public XdsHttpFilter create(HttpFilter httpFilter, Any config, FactoryContext context) { | |
| throw new UnsupportedOperationException( | |
| "credential_injector requires reactive secret subscription; use createStream()"); | |
| } | |
| @Override | |
| public SnapshotStream<XdsHttpFilter> createStream(HttpFilter httpFilter, Any config, | |
| FactoryContext context) { | |
| `@Nullable` | |
| public XdsHttpFilter create(HttpFilter httpFilter, Any config, FactoryContext context) { | |
| Objects.requireNonNull(httpFilter, "httpFilter"); | |
| Objects.requireNonNull(config, "config"); | |
| Objects.requireNonNull(context, "context"); | |
| throw new UnsupportedOperationException( | |
| "credential_injector requires reactive secret subscription; use createStream()"); | |
| } | |
| `@Override` | |
| public SnapshotStream<XdsHttpFilter> createStream(HttpFilter httpFilter, Any config, | |
| FactoryContext context) { | |
| Objects.requireNonNull(httpFilter, "httpFilter"); | |
| Objects.requireNonNull(config, "config"); | |
| Objects.requireNonNull(context, "context"); | |
| final CredentialInjector injectorConfig = context.validator().unpack(config, | |
| CredentialInjector.class); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@xds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.java`
around lines 69 - 76, The public methods create(...) and createStream(...) in
CredentialInjectorFilterFactory must validate their parameters; add explicit
null checks using Objects.requireNonNull for each user-facing parameter
(httpFilter, config, and context) at the start of both methods (e.g.,
Objects.requireNonNull(httpFilter, "httpFilter"); Objects.requireNonNull(config,
"config"); Objects.requireNonNull(context, "context")). Ensure these checks are
placed before any other logic or thrown exceptions so callers get a clear NPE
with the parameter name.
There was a problem hiding this comment.
This is called by internal APIs
There was a problem hiding this comment.
Understood — since create() and createStream() are invoked only by internal framework code (FilterUtil.resolveInstance) that controls the inputs, the null validation guard is unnecessary here. Withdrawing the suggestion.
✏️ Learnings added
Learnt from: jrhee17
Repo: line/armeria PR: 6785
File: xds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.java:69-76
Timestamp: 2026-06-01T07:24:42.277Z
Learning: In the Armeria XDS module, `HttpFilterFactory.create()` and `HttpFilterFactory.createStream()` (and their overrides, e.g. in `CredentialInjectorFilterFactory`) are called exclusively by internal framework code (`FilterUtil.resolveInstance`). They are NOT directly user-facing entry points, so `Objects.requireNonNull` parameter validation is not required on their parameters (`httpFilter`, `config`, `context`).
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: jrhee17
Repo: line/armeria PR: 6628
File: xds/src/main/java/com/linecorp/armeria/xds/TransportSocketSnapshot.java:48-48
Timestamp: 2026-02-19T06:03:54.715Z
Learning: In the Armeria XDS module (package com.linecorp.armeria.xds), code paths that run on the event loop (e.g., TransportSocketSnapshot and related XDS paths) are single-threaded. Do not use volatile or AtomicBoolean to guard static fields that are accessed only within these execution paths. If a field must be thread-safe, restrict access to code outside the event loop or document the event-loop assumption; otherwise, keep such fields as plain statics initialized once and mutably used only within the event loop context.
6ad3afd to
4b1330b
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6785 +/- ##
============================================
+ Coverage 74.46% 75.00% +0.54%
- Complexity 22234 24915 +2681
============================================
Files 1963 2215 +252
Lines 82437 92677 +10240
Branches 10764 12105 +1341
============================================
+ Hits 61385 69512 +8127
- Misses 15918 17372 +1454
- Partials 5134 5793 +659 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java (1)
194-224: ⚡ Quick winValidate
watcherin the wrapper subscriptions.Both lambdas implement a public
subscribe(...)path, but neither checkswatcherfor null before delegating. AddrequireNonNull(watcher, "watcher")at the top of each lambda so failures stay consistent with the rest of this API.Proposed fix
default SnapshotStream<T> checkSubscribeOn(EventExecutor eventLoop) { requireNonNull(eventLoop, "eventLoop"); final SnapshotStream<T> self = this; return watcher -> { + requireNonNull(watcher, "watcher"); checkState(eventLoop.inEventLoop(), "subscribe must be called from the event loop: %s", eventLoop); final Subscription sub = self.subscribe(watcher); return () -> { @@ default SnapshotStream<T> rescheduleEventsOn(EventExecutor eventLoop) { requireNonNull(eventLoop, "eventLoop"); final SnapshotStream<T> self = this; return watcher -> { + requireNonNull(watcher, "watcher"); final RescheduleSubscription<T> sub = new RescheduleSubscription<>(watcher, eventLoop); sub.setUpstream(self.subscribe(sub)); return sub; }; }As per coding guidelines, "Null/validation: do explicit parameter null-checks for user-facing public methods using
Objects.requireNonNull(obj, "name")."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java` around lines 194 - 224, The two wrapper lambdas returned by SnapshotStream.checkSubscribeOn(EventExecutor) and SnapshotStream.rescheduleEventsOn(EventExecutor) do not validate the watcher parameter; add requireNonNull(watcher, "watcher") as the first statement inside each lambda (the subscribe(...) implementations) so the public subscribe path fails fast and consistently with the API; update the lambdas in checkSubscribeOn and rescheduleEventsOn to call Objects.requireNonNull(watcher, "watcher") (or requireNonNull if statically imported) before any other logic such as checkState, self.subscribe, or constructing RescheduleSubscription.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@xds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.java`:
- Around line 42-55: The RescheduleSubscription.close() must preserve event-loop
affinity and visibility: schedule the close logic onto eventLoop (like
eventLoop.execute(() -> { closed = true; if (upstream != null) upstream.close();
})) instead of calling upstream.close() on the caller thread so upstream.close()
runs on the same event loop as onUpdate handlers, and also mark the closed field
as volatile to ensure visibility across threads; update the close() method and
the closed field accordingly (references: class RescheduleSubscription, methods
onUpdate and close, fields eventLoop, upstream, closed).
---
Nitpick comments:
In `@xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java`:
- Around line 194-224: The two wrapper lambdas returned by
SnapshotStream.checkSubscribeOn(EventExecutor) and
SnapshotStream.rescheduleEventsOn(EventExecutor) do not validate the watcher
parameter; add requireNonNull(watcher, "watcher") as the first statement inside
each lambda (the subscribe(...) implementations) so the public subscribe path
fails fast and consistently with the API; update the lambdas in checkSubscribeOn
and rescheduleEventsOn to call Objects.requireNonNull(watcher, "watcher") (or
requireNonNull if statically imported) before any other logic such as
checkState, self.subscribe, or constructing RescheduleSubscription.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 8aad84cd-aabd-4b72-b9a4-5f663c42cedd
📒 Files selected for processing (23)
it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CredentialInjectorFilterTest.javait/xds-istio/src/test/java/com/linecorp/armeria/it/xds/filter/IstioFilterFactories.javaxds-api/src/main/proto/envoy/config/core/v3/extension.protoxds-api/src/main/proto/envoy/extensions/filters/http/credential_injector/v3/credential_injector.protoxds-api/src/main/proto/envoy/extensions/http/injected_credentials/generic/v3/generic.protoxds-api/src/main/proto/envoy/extensions/transport_sockets/tls/v3/secret.protoxds/src/main/java/com/linecorp/armeria/xds/FilterUtil.javaxds/src/main/java/com/linecorp/armeria/xds/GenericSecretSnapshot.javaxds/src/main/java/com/linecorp/armeria/xds/GenericSecretStream.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerManager.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.javaxds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.javaxds/src/main/java/com/linecorp/armeria/xds/RouteEntry.javaxds/src/main/java/com/linecorp/armeria/xds/RouteStream.javaxds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.javaxds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.javaxds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.javaxds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.javaxds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.javaxds/src/main/java/com/linecorp/armeria/xds/filter/XdsHttpFilter.javaxds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.javaxds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.javaxds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.filter.HttpFilterFactory
✅ Files skipped from review due to trivial changes (2)
- xds-api/src/main/proto/envoy/extensions/filters/http/credential_injector/v3/credential_injector.proto
- xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java
🚧 Files skipped from review as they are similar to previous changes (19)
- xds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.filter.HttpFilterFactory
- it/xds-istio/src/test/java/com/linecorp/armeria/it/xds/filter/IstioFilterFactories.java
- xds-api/src/main/proto/envoy/config/core/v3/extension.proto
- xds-api/src/main/proto/envoy/extensions/http/injected_credentials/generic/v3/generic.proto
- xds/src/main/java/com/linecorp/armeria/xds/filter/XdsHttpFilter.java
- xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java
- xds/src/main/java/com/linecorp/armeria/xds/GenericSecretSnapshot.java
- xds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.java
- xds-api/src/main/proto/envoy/extensions/transport_sockets/tls/v3/secret.proto
- xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java
- xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java
- xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java
- xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java
- xds/src/main/java/com/linecorp/armeria/xds/filter/CredentialInjectorFilterFactory.java
- xds/src/main/java/com/linecorp/armeria/xds/GenericSecretStream.java
- it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CredentialInjectorFilterTest.java
- xds/src/main/java/com/linecorp/armeria/xds/ListenerXdsResource.java
- xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java
- xds/src/main/java/com/linecorp/armeria/xds/FilterUtil.java
| public void onUpdate(@Nullable T value, @Nullable Throwable error) { | ||
| eventLoop.execute(() -> { | ||
| if (!closed) { | ||
| downstream.onUpdate(value, error); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| closed = true; | ||
| if (upstream != null) { | ||
| upstream.close(); | ||
| } |
There was a problem hiding this comment.
Preserve event-loop affinity when closing the rescheduled subscription.
onUpdate() always hops onto eventLoop, but close() forwards upstream.close() on the caller thread. That breaks composition with checkSubscribeOn(eventLoop) and also shares closed across threads without a visibility guarantee. Close should either hop to eventLoop or explicitly enforce same-thread close semantics.
Proposed fix
final class RescheduleSubscription<T> implements SnapshotWatcher<T>, Subscription {
@@
- private boolean closed;
+ private volatile boolean closed;
@@
`@Override`
public void close() {
+ if (closed) {
+ return;
+ }
closed = true;
- if (upstream != null) {
- upstream.close();
+ final Subscription upstream = this.upstream;
+ if (upstream != null) {
+ if (eventLoop.inEventLoop()) {
+ upstream.close();
+ } else {
+ eventLoop.execute(upstream::close);
+ }
}
}
}Based on learnings, "If a field must be thread-safe, restrict access to code outside the event loop or document the event-loop assumption."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public void onUpdate(@Nullable T value, @Nullable Throwable error) { | |
| eventLoop.execute(() -> { | |
| if (!closed) { | |
| downstream.onUpdate(value, error); | |
| } | |
| }); | |
| } | |
| @Override | |
| public void close() { | |
| closed = true; | |
| if (upstream != null) { | |
| upstream.close(); | |
| } | |
| public void onUpdate(`@Nullable` T value, `@Nullable` Throwable error) { | |
| eventLoop.execute(() -> { | |
| if (!closed) { | |
| downstream.onUpdate(value, error); | |
| } | |
| }); | |
| } | |
| `@Override` | |
| public void close() { | |
| if (closed) { | |
| return; | |
| } | |
| closed = true; | |
| final Subscription upstream = this.upstream; | |
| if (upstream != null) { | |
| if (eventLoop.inEventLoop()) { | |
| upstream.close(); | |
| } else { | |
| eventLoop.execute(upstream::close); | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@xds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.java`
around lines 42 - 55, The RescheduleSubscription.close() must preserve
event-loop affinity and visibility: schedule the close logic onto eventLoop
(like eventLoop.execute(() -> { closed = true; if (upstream != null)
upstream.close(); })) instead of calling upstream.close() on the caller thread
so upstream.close() runs on the same event loop as onUpdate handlers, and also
mark the closed field as volatile to ensure visibility across threads; update
the close() method and the closed field accordingly (references: class
RescheduleSubscription, methods onUpdate and close, fields eventLoop, upstream,
closed).
There was a problem hiding this comment.
Because close is called by downstream, which is from the event loop
There was a problem hiding this comment.
@jrhee17, thanks for the clarification! That makes sense — since both the close() call and the onUpdate lambda execute on the same event loop thread, closed is always accessed from a single thread, so no volatile or cross-thread forwarding is needed. My concern doesn't apply here.
✏️ Learnings added
Learnt from: jrhee17
Repo: line/armeria PR: 6785
File: xds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.java:42-55
Timestamp: 2026-06-01T08:09:06.657Z
Learning: In `xds/src/main/java/com/linecorp/armeria/xds/stream/RescheduleSubscription.java`, `close()` is always called by downstream from the event loop thread. Because both `close()` and the lambda inside `onUpdate()` run on the same event loop, the `closed` boolean field does not need to be `volatile`, and `upstream.close()` does not need to be rescheduled onto the event loop. The event-loop single-threading contract is upheld by the caller.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: jrhee17
Repo: line/armeria PR: 6628
File: xds/src/main/java/com/linecorp/armeria/xds/TransportSocketSnapshot.java:48-48
Timestamp: 2026-02-19T06:03:54.715Z
Learning: In the Armeria XDS module (package com.linecorp.armeria.xds), code paths that run on the event loop (e.g., TransportSocketSnapshot and related XDS paths) are single-threaded. Do not use volatile or AtomicBoolean to guard static fields that are accessed only within these execution paths. If a field must be thread-safe, restrict access to code outside the event loop or document the event-loop assumption; otherwise, keep such fields as plain statics initialized once and mutably used only within the event loop context.
There was a problem hiding this comment.
How about adding
assert eventLoop.inEventLoop();
There was a problem hiding this comment.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
| /** | ||
| * A no-op filter that passes through all operations unchanged. | ||
| */ | ||
| XdsHttpFilter NOOP = new XdsHttpFilter() {}; |
There was a problem hiding this comment.
nit) Should we use a static method instead? Conventionally, we don't set a field in the public interface.
| private static final String GENERIC_TYPE_URL = | ||
| "type.googleapis.com/envoy.extensions.http.injected_credentials.generic.v3.Generic"; | ||
| private static final List<String> TYPE_URLS = ImmutableList.of(TYPE_URL); | ||
| private static final String DEFAULT_HEADER = "authorization"; |
There was a problem hiding this comment.
HttpHeadersNames.AUTHORIZATION?
| * | ||
| * @param eventLoop the event loop that subscribe and close must be called from | ||
| */ | ||
| @UnstableApi |
There was a problem hiding this comment.
Don't need to add
| @UnstableApi |
| * | ||
| * @param eventLoop the event loop to deliver emissions on | ||
| */ | ||
| @UnstableApi |
There was a problem hiding this comment.
| @UnstableApi |
| public void onUpdate(@Nullable T value, @Nullable Throwable error) { | ||
| eventLoop.execute(() -> { | ||
| if (!closed) { | ||
| downstream.onUpdate(value, error); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| closed = true; | ||
| if (upstream != null) { | ||
| upstream.close(); | ||
| } |
There was a problem hiding this comment.
How about adding
assert eventLoop.inEventLoop();
This PR should be reviewed after #6784
This PR is a subset of #6781
Motivation:
xDS extension factories (e.g.
HttpFilterFactory) currently receive only anXdsResourceValidator, which is insufficient for filters that depend on external xDS resources like SDS secrets. TheCredentialInjectorFilterFactory(envoy.filters.http.credential_injector) needs to reactively subscribe to generic secrets via SDS and inject credentials into outgoing requests, requiring access to the event loop, metrics, and secret streams at construction time.Modifications:
FactoryContextinterface providing runtime infrastructure to extension factories (eventLoop(),meterRegistry(),meterIdPrefix(),validator(),genericSecretStream()).HttpFilterFactory.create()to acceptFactoryContextinstead ofXdsResourceValidator.HttpFilterFactory.createStream()that returns aSnapshotStream<XdsHttpFilter>for reactive filter lifecycle management.XdsHttpFilter.NOOPstatic constant for no-op filter instances.GenericSecretSnapshotandGenericSecretStreamfor resolving generic secrets via SDS.FilterUtilnow returnsSnapshotStream<ClientPreprocessors>/SnapshotStream<ClientDecoration>, andRouteStreamcomposes cluster, downstream, and upstream filter streams viacombineLatest.RouteEntryto accept pre-built filter configs instead of building them internally.ListenerResourceParser/ListenerXdsResourceto parse and carry theRouterHTTP filter for upstream filter extraction.SubscriptionContextextendFactoryContextfor internal use.CredentialInjectorFilterFactoryimplementingenvoy.filters.http.credential_injector, registered via service loader.TypedExtensionConfig,CredentialInjector,Generic, andSecret.generic_secretproto fields as supported.RouterFilterFactoryandIstioFilterFactoriesto useFactoryContext.Result:
CredentialInjectorFilterFactoryinjects credentials from SDS-backed generic secrets into outgoing HTTP requests, supporting theenvoy.filters.http.credential_injectorfilter type.