diff --git a/api/src/main/java/io/grpc/ChannelConfigurator.java b/api/src/main/java/io/grpc/ChannelConfigurator.java new file mode 100644 index 00000000000..8c679a10ff7 --- /dev/null +++ b/api/src/main/java/io/grpc/ChannelConfigurator.java @@ -0,0 +1,63 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + + + +/** + * A configurator for child channels created by gRPC's internal infrastructure. + * + *

This interface allows users to inject configuration (such as credentials, interceptors, + * or flow control settings) into channels created automatically by gRPC for control plane + * operations. Common use cases include: + *

+ * + *

Usage Example: + *

{@code
+ * // 1. Define the configurator
+ * ChannelConfigurator configurator = builder -> {
+ *   builder.maxInboundMessageSize(4 * 1024 * 1024);
+ * };
+ *
+ * // 2. Apply to parent channel - automatically used for ALL child channels
+ * ManagedChannel channel = ManagedChannelBuilder
+ *     .forTarget("xds:///my-service")
+ *     .childChannelConfigurator(configurator)
+ *     .build();
+ * }
+ * + *

Implementations must be thread-safe as the configure methods may be invoked concurrently + * by multiple internal components. + * + * @since 1.83.0 + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/12574") +public interface ChannelConfigurator { + + /** + * Configures a builder for a new child channel. + * + *

This method is invoked synchronously during the creation of the child channel, + * before {@link ManagedChannelBuilder#build()} is called. + * + * @param builder the mutable channel builder for the new child channel + */ + void configureChannelBuilder(ManagedChannelBuilder builder); +} diff --git a/api/src/main/java/io/grpc/ForwardingChannelBuilder.java b/api/src/main/java/io/grpc/ForwardingChannelBuilder.java index 1202582421a..d340ff8ef88 100644 --- a/api/src/main/java/io/grpc/ForwardingChannelBuilder.java +++ b/api/src/main/java/io/grpc/ForwardingChannelBuilder.java @@ -242,6 +242,13 @@ public T disableServiceConfigLookUp() { return thisT(); } + + @Override + public T childChannelConfigurator(ChannelConfigurator channelConfigurator) { + delegate().childChannelConfigurator(channelConfigurator); + return thisT(); + } + /** * Returns the correctly typed version of the builder. */ diff --git a/api/src/main/java/io/grpc/ForwardingChannelBuilder2.java b/api/src/main/java/io/grpc/ForwardingChannelBuilder2.java index 78fe730d91a..4e67748da51 100644 --- a/api/src/main/java/io/grpc/ForwardingChannelBuilder2.java +++ b/api/src/main/java/io/grpc/ForwardingChannelBuilder2.java @@ -269,6 +269,13 @@ public T setNameResolverArg(NameResolver.Args.Key key, X value) { return thisT(); } + + @Override + public T childChannelConfigurator(ChannelConfigurator channelConfigurator) { + delegate().childChannelConfigurator(channelConfigurator); + return thisT(); + } + /** * Returns the {@link ManagedChannel} built by the delegate by default. Overriding method can * return different value. diff --git a/api/src/main/java/io/grpc/ManagedChannelBuilder.java b/api/src/main/java/io/grpc/ManagedChannelBuilder.java index 3f370ab3003..7b943b6894a 100644 --- a/api/src/main/java/io/grpc/ManagedChannelBuilder.java +++ b/api/src/main/java/io/grpc/ManagedChannelBuilder.java @@ -661,6 +661,23 @@ public T setNameResolverArg(NameResolver.Args.Key key, X value) { throw new UnsupportedOperationException(); } + + /** + * Sets a configurator that will be applied to all internal child channels created by this + * channel. + * + *

This allows injecting configuration (like credentials, interceptors, or flow control) + * into auxiliary channels created by gRPC infrastructure, such as xDS control plane connections. + * + * @param channelConfigurator the configurator to apply. + * @return this + * @since 1.83.0 + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/12574") + public T childChannelConfigurator(ChannelConfigurator channelConfigurator) { + throw new UnsupportedOperationException("Not implemented"); + } + /** * Builds a channel using the given parameters. * diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java index 80bc338d86b..06b8446eccd 100644 --- a/api/src/main/java/io/grpc/NameResolver.java +++ b/api/src/main/java/io/grpc/NameResolver.java @@ -358,6 +358,7 @@ public static final class Args { private final MetricRecorder metricRecorder; @Nullable private final NameResolverRegistry nameResolverRegistry; @Nullable private final IdentityHashMap, Object> customArgs; + private final ChannelConfigurator channelConfigurator; private Args(Builder builder) { this.defaultPort = checkNotNull(builder.defaultPort, "defaultPort not set"); @@ -373,6 +374,7 @@ private Args(Builder builder) { : new MetricRecorder() {}; this.nameResolverRegistry = builder.nameResolverRegistry; this.customArgs = cloneCustomArgs(builder.customArgs); + this.channelConfigurator = builder.channelConfigurator; } /** @@ -471,6 +473,16 @@ public ChannelLogger getChannelLogger() { return channelLogger; } + /** + * Returns the configurator for child channels. + * + * @since 1.83.0 + */ + @Internal + public ChannelConfigurator getChildChannelConfigurator() { + return channelConfigurator; + } + /** * Returns the Executor on which this resolver should execute long-running or I/O bound work. * Null if no Executor was set. @@ -579,6 +591,7 @@ public static final class Builder { private MetricRecorder metricRecorder; private NameResolverRegistry nameResolverRegistry; private IdentityHashMap, Object> customArgs; + private ChannelConfigurator channelConfigurator = builder -> { }; Builder() { } @@ -694,6 +707,16 @@ public Builder setNameResolverRegistry(NameResolverRegistry registry) { return this; } + /** + * See {@link Args#getChildChannelConfigurator()}. This is an optional field. + * + * @since 1.83.0 + */ + public Builder setChildChannelConfigurator(ChannelConfigurator channelConfigurator) { + this.channelConfigurator = checkNotNull(channelConfigurator, "channelConfigurator"); + return this; + } + /** * Builds an {@link Args}. * diff --git a/api/src/test/java/io/grpc/NameResolverTest.java b/api/src/test/java/io/grpc/NameResolverTest.java index 82abe5c7505..347271a69f6 100644 --- a/api/src/test/java/io/grpc/NameResolverTest.java +++ b/api/src/test/java/io/grpc/NameResolverTest.java @@ -105,6 +105,7 @@ public void args() { } private NameResolver.Args createArgs() { + ChannelConfigurator channelConfigurator = builder -> { }; return NameResolver.Args.newBuilder() .setDefaultPort(defaultPort) .setProxyDetector(proxyDetector) @@ -116,9 +117,46 @@ private NameResolver.Args createArgs() { .setOverrideAuthority(overrideAuthority) .setMetricRecorder(metricRecorder) .setArg(FOO_ARG_KEY, customArgValue) + .setChildChannelConfigurator(channelConfigurator) .build(); } + @Test + public void args_childChannelConfigurator() { + final ManagedChannelBuilder[] capturedBuilder = new ManagedChannelBuilder[1]; + ChannelConfigurator channelConfigurator = new ChannelConfigurator() { + @Override + public void configureChannelBuilder(ManagedChannelBuilder builder) { + capturedBuilder[0] = builder; + } + }; + + SynchronizationContext realSyncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + + NameResolver.Args args = NameResolver.Args.newBuilder() + .setDefaultPort(8080) + .setProxyDetector(mock(ProxyDetector.class)) + .setSynchronizationContext(realSyncContext) + .setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class)) + .setChannelLogger(mock(ChannelLogger.class)) + .setChildChannelConfigurator(channelConfigurator) + .build(); + + ChannelConfigurator configurator = args.getChildChannelConfigurator(); + assertThat(configurator).isSameInstanceAs(channelConfigurator); + + // Validate configurator accepts builders + ManagedChannelBuilder mockBuilder = mock(ManagedChannelBuilder.class); + configurator.configureChannelBuilder(mockBuilder); + assertThat(capturedBuilder[0]).isSameInstanceAs(mockBuilder); + } + @Test @SuppressWarnings("deprecation") public void startOnOldListener_wrapperListener2UsedToStart() { diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index e423220e3ad..71afcb7e0c1 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -37,6 +37,7 @@ import io.grpc.CallCredentials; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ChannelConfigurator; import io.grpc.ChannelCredentials; import io.grpc.ChannelLogger; import io.grpc.ChannelLogger.ChannelLogLevel; @@ -155,6 +156,14 @@ public Result selectConfig(PickSubchannelArgs args) { private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER = new LoadBalancer.PickDetailsConsumer() {}; + /** + * Retrieves the user-provided configuration function for internal child channels. + * + *

This is intended for use by gRPC internal components + * that are responsible for creating auxiliary {@code ManagedChannel} instances. + */ + private final ChannelConfigurator channelConfigurator; + private final InternalLogId logId; private final String target; @Nullable @@ -545,6 +554,8 @@ ClientStream newSubstream( Supplier stopwatchSupplier, List interceptors, final TimeProvider timeProvider) { + this.channelConfigurator = checkNotNull(builder.channelConfigurator, + "channelConfigurator"); this.target = checkNotNull(builder.target, "target"); this.logId = InternalLogId.allocate("Channel", target); this.timeProvider = checkNotNull(timeProvider, "timeProvider"); @@ -589,7 +600,8 @@ ClientStream newSubstream( .setOffloadExecutor(this.offloadExecutorHolder) .setOverrideAuthority(this.authorityOverride) .setMetricRecorder(this.metricRecorder) - .setNameResolverRegistry(builder.nameResolverRegistry); + .setNameResolverRegistry(builder.nameResolverRegistry) + .setChildChannelConfigurator(this.channelConfigurator); builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder); this.nameResolverArgs = nameResolverArgsBuilder.build(); this.nameResolver = getNameResolver( @@ -1486,6 +1498,10 @@ protected ManagedChannelBuilder delegate() { ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder(); + // Note that we follow the global configurator pattern and try to fuse the configurations as + // soon as the builder gets created + channelConfigurator.configureChannelBuilder(builder); + return builder // TODO(zdapeng): executors should not outlive the parent channel. .executor(executor) diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java index 128c929ec0e..f0ec3c2ec09 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java @@ -29,6 +29,7 @@ import io.grpc.CallCredentials; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ChannelConfigurator; import io.grpc.ChannelCredentials; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; @@ -149,6 +150,8 @@ public static ManagedChannelBuilder forTarget(String target) { } + ChannelConfigurator channelConfigurator = builder -> { }; + ObjectPool executorPool = DEFAULT_EXECUTOR_POOL; ObjectPool offloadExecutorPool = DEFAULT_EXECUTOR_POOL; @@ -717,6 +720,14 @@ protected ManagedChannelImplBuilder addMetricSink(MetricSink metricSink) { return this; } + @Override + public ManagedChannelImplBuilder childChannelConfigurator( + ChannelConfigurator channelConfigurator) { + this.channelConfigurator = checkNotNull(channelConfigurator, + "childChannelConfigurator"); + return this; + } + @Override public ManagedChannel build() { ClientTransportFactory clientTransportFactory = diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplBuilderTest.java index b0939239477..b475e4c2cac 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplBuilderTest.java @@ -34,6 +34,7 @@ import com.google.common.util.concurrent.MoreExecutors; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ChannelConfigurator; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.CompressorRegistry; @@ -42,6 +43,7 @@ import io.grpc.InternalConfigurator; import io.grpc.InternalConfiguratorRegistry; import io.grpc.InternalFeatureFlags; +import io.grpc.InternalManagedChannelBuilder; import io.grpc.InternalManagedChannelBuilder.InternalInterceptorFactory; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -49,7 +51,9 @@ import io.grpc.MetricSink; import io.grpc.NameResolver; import io.grpc.NameResolverRegistry; +import io.grpc.NoopMetricSink; import io.grpc.StaticTestingClassLoader; +import io.grpc.Uri; import io.grpc.internal.ManagedChannelImplBuilder.ChannelBuilderDefaultPortProvider; import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder; import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider; @@ -780,6 +784,113 @@ public void setNameResolverExtArgs() { assertThat(builder.nameResolverCustomArgs.get(testKey)).isEqualTo(42); } + @Test + public void childChannelConfigurator_setsField() { + ChannelConfigurator configurator = builder -> { }; + assertSame(builder, builder.childChannelConfigurator(configurator)); + assertSame(configurator, builder.channelConfigurator); + } + + @Test + public void childChannelConfigurator_propagatesMetricsAndInterceptors_xdsTarget() { + // Setup Mocks + when(mockClientTransportFactory.getScheduledExecutorService()) + .thenReturn(clock.getScheduledExecutorService()); + when(mockClientTransportFactoryBuilder.buildClientTransportFactory()) + .thenReturn(mockClientTransportFactory); + when(mockClientTransportFactory.getSupportedSocketAddressTypes()) + .thenReturn(Collections.singleton(InetSocketAddress.class)); + + MetricSink mockMetricSink = new NoopMetricSink(); + ClientInterceptor mockInterceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return next.newCall(method, callOptions); + } + }; + + // Define the Configurator + ChannelConfigurator configurator = builder -> { + InternalManagedChannelBuilder.addMetricSink(builder, mockMetricSink); + + InternalManagedChannelBuilder.interceptWithTarget(builder, target -> mockInterceptor); + }; + + // Use NameResolver.Factory to capture Args + final NameResolver.Args[] capturedArgs = new NameResolver.Args[1]; + final boolean[] newNameResolverCalled = new boolean[1]; + + NameResolver realNameResolver = new NameResolver() { + @Override + public String getServiceAuthority() { + return "foo.authority"; + } + + @Override + public void start(Listener2 listener) {} + + @Override + public void shutdown() {} + }; + + NameResolver.Factory realNameResolverFactory = new NameResolver.Factory() { + @Override + public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { + newNameResolverCalled[0] = true; + capturedArgs[0] = args; + return realNameResolver; + } + + @Override + public NameResolver newNameResolver(Uri targetUri, NameResolver.Args args) { + newNameResolverCalled[0] = true; + capturedArgs[0] = args; + return realNameResolver; + } + + @Override + public String getDefaultScheme() { + return "xds"; + } + }; + + // Use the configurator and the custom factory + NameResolverRegistry registry = new NameResolverRegistry(); + registry.register(new NameResolverFactoryToProviderFacade(realNameResolverFactory)); + + ManagedChannelBuilder parentBuilder = new ManagedChannelImplBuilder( + "xds:///my-service-target", + mockClientTransportFactoryBuilder, + new FixedPortProvider(DUMMY_PORT)) + .childChannelConfigurator(configurator) + .nameResolverRegistry(registry); + + ManagedChannel channel = parentBuilder.build(); + grpcCleanupRule.register(channel); + + // Verify that newNameResolver was called + assertThat(newNameResolverCalled[0]).isTrue(); + + // Extract the childChannelConfigurator from Args + NameResolver.Args args = capturedArgs[0]; + ChannelConfigurator channelConfiguratorInArgs = args.getChildChannelConfigurator(); + assertNotNull("Child channel configurator should be present in NameResolver.Args", + channelConfiguratorInArgs); + + // Verify the configurator is the one we passed + assertThat(channelConfiguratorInArgs).isSameInstanceAs(configurator); + + // Verify the configurator logically applies (by running it on a real builder) + ManagedChannelImplBuilder childBuilder = new ManagedChannelImplBuilder( + "xds:///child-service-target", + mockClientTransportFactoryBuilder, + new FixedPortProvider(DUMMY_PORT)); + + configurator.configureChannelBuilder(childBuilder); + assertThat(childBuilder.metricSinks).contains(mockMetricSink); + } + @Test public void metricSinks() { MetricSink mocksink = mock(MetricSink.class); diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java index f0bd6f93098..77eadf9ebbb 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java @@ -25,7 +25,9 @@ import com.google.common.collect.ImmutableList; import io.grpc.ClientInterceptor; +import io.grpc.ForwardingChannelBuilder2; import io.grpc.ManagedChannelBuilder; +import io.grpc.MetricSink; import io.grpc.ServerBuilder; import io.grpc.internal.GrpcUtil; import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; @@ -168,6 +170,36 @@ public void disableAllMetrics() { assertThat(module.getEnableMetrics()).isEmpty(); } - // TODO(dnvindhya): Add tests for configurator + @Test + public void configureChannelBuilder_registersMetricSink() { + GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder().build(); + TestChannelBuilder testBuilder = new TestChannelBuilder(); + grpcOpenTelemetry.configureChannelBuilder(testBuilder); + assertThat(testBuilder.metricSink).isSameInstanceAs(grpcOpenTelemetry.getSink()); + assertThat(testBuilder.interceptorFactory).isNotNull(); + } + private static class TestChannelBuilder extends ForwardingChannelBuilder2 { + Object interceptorFactory; + MetricSink metricSink; + + @Override + protected ManagedChannelBuilder delegate() { + return null; + } + + @Override + protected TestChannelBuilder interceptWithTarget(InterceptorFactory factory) { + this.interceptorFactory = factory; + return this; + } + + @Override + public TestChannelBuilder addMetricSink(MetricSink metricSink) { + this.metricSink = metricSink; + return this; + } + } + + // TODO(dnvindhya): Add tests for configurator } diff --git a/xds/build.gradle b/xds/build.gradle index 8036f8691ec..c6325f7fc2d 100644 --- a/xds/build.gradle +++ b/xds/build.gradle @@ -66,7 +66,10 @@ dependencies { testImplementation project(':grpc-api') testImplementation project(':grpc-rls') + testImplementation project(':grpc-opentelemetry') testImplementation project(':grpc-inprocess') + testImplementation libraries.opentelemetry.api + testImplementation libraries.opentelemetry.sdk.testing testImplementation libraries.cel.compiler testImplementation testFixtures(project(':grpc-core')), testFixtures(project(':grpc-api')), diff --git a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java index 5100537aea2..f7a93681877 100644 --- a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java +++ b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java @@ -21,11 +21,13 @@ import com.google.common.annotations.VisibleForTesting; import io.grpc.CallCredentials; import io.grpc.CallOptions; +import io.grpc.ChannelConfigurator; import io.grpc.ChannelCredentials; import io.grpc.ClientCall; import io.grpc.Context; import io.grpc.Grpc; import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; @@ -52,6 +54,8 @@ final class GrpcXdsTransportFactory implements XdsTransportFactory { private final CallCredentials callCredentials; + private final ChannelConfigurator channelConfigurator; + // The map of xDS server info to its corresponding gRPC xDS transport. // This enables reusing and sharing the same underlying gRPC channel. // @@ -61,8 +65,10 @@ final class GrpcXdsTransportFactory implements XdsTransportFactory { private static final Map xdsServerInfoToTransportMap = new ConcurrentHashMap<>(); - GrpcXdsTransportFactory(CallCredentials callCredentials) { + GrpcXdsTransportFactory(CallCredentials callCredentials, + ChannelConfigurator channelConfigurator) { this.callCredentials = callCredentials; + this.channelConfigurator = channelConfigurator; } @Override @@ -71,7 +77,7 @@ public XdsTransport create(Bootstrapper.ServerInfo serverInfo) { serverInfo, (info, transport) -> { if (transport == null) { - transport = new GrpcXdsTransport(serverInfo, callCredentials); + transport = new GrpcXdsTransport(serverInfo, callCredentials, channelConfigurator); } ++transport.refCount; return transport; @@ -93,7 +99,7 @@ static class GrpcXdsTransport implements XdsTransport { private int refCount = 0; public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) { - this(serverInfo, null); + this(serverInfo, null, null); } @VisibleForTesting @@ -102,11 +108,20 @@ public GrpcXdsTransport(ManagedChannel channel) { } public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, CallCredentials callCredentials) { + this(serverInfo, callCredentials, null); + } + + public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, + CallCredentials callCredentials, + ChannelConfigurator channelConfigurator) { String target = serverInfo.target(); ChannelCredentials channelCredentials = (ChannelCredentials) serverInfo.implSpecificConfig(); - this.channel = Grpc.newChannelBuilder(target, channelCredentials) - .keepAliveTime(5, TimeUnit.MINUTES) - .build(); + ManagedChannelBuilder channelBuilder = Grpc.newChannelBuilder(target, channelCredentials) + .keepAliveTime(5, TimeUnit.MINUTES); + if (channelConfigurator != null) { + channelConfigurator.configureChannelBuilder(channelBuilder); + } + this.channel = channelBuilder.build(); this.callCredentials = callCredentials; this.serverInfo = serverInfo; } diff --git a/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java index cc5ff128274..06ce7eb6f53 100644 --- a/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java @@ -86,7 +86,8 @@ public static XdsClientResult getOrCreate( String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder, CallCredentials transportCallCredentials) { return new XdsClientResult(SharedXdsClientPoolProvider.getDefaultProvider() - .getOrCreate(target, bootstrapInfo, metricRecorder, transportCallCredentials)); + .getOrCreate(target, bootstrapInfo, metricRecorder, transportCallCredentials, + null)); } /** diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index 45c379244af..24500d6a7f6 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.concurrent.GuardedBy; import io.grpc.CallCredentials; +import io.grpc.ChannelConfigurator; import io.grpc.MetricRecorder; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; @@ -57,6 +58,10 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { @Nullable private final Bootstrapper bootstrapper; private final Object lock = new Object(); + /* + The first one wins. + Anything with the same target string uses the client created for the first one. + */ private final Map> targetToXdsClientMap = new ConcurrentHashMap<>(); SharedXdsClientPoolProvider() { @@ -88,20 +93,28 @@ public ObjectPool getOrCreate( } else { bootstrapInfo = GrpcBootstrapperImpl.defaultBootstrap(); } - return getOrCreate(target, bootstrapInfo, metricRecorder, transportCallCredentials); + return getOrCreate(target, bootstrapInfo, metricRecorder, transportCallCredentials, null); } @Override public ObjectPool getOrCreate( String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder) { - return getOrCreate(target, bootstrapInfo, metricRecorder, null); + return getOrCreate(target, bootstrapInfo, metricRecorder, null, null); + } + + @Override + public ObjectPool getOrCreate( + String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder, + ChannelConfigurator channelConfigurator) { + return getOrCreate(target, bootstrapInfo, metricRecorder, null, channelConfigurator); } public ObjectPool getOrCreate( String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder, - CallCredentials transportCallCredentials) { + CallCredentials transportCallCredentials, + ChannelConfigurator channelConfigurator) { ObjectPool ref = targetToXdsClientMap.get(target); if (ref == null) { synchronized (lock) { @@ -109,7 +122,8 @@ public ObjectPool getOrCreate( if (ref == null) { ref = new RefCountedXdsClientObjectPool( - bootstrapInfo, target, metricRecorder, transportCallCredentials); + bootstrapInfo, target, metricRecorder, transportCallCredentials, + channelConfigurator); targetToXdsClientMap.put(target, ref); } } @@ -134,6 +148,7 @@ class RefCountedXdsClientObjectPool implements ObjectPool { private final String target; // The target associated with the xDS client. private final MetricRecorder metricRecorder; private final CallCredentials transportCallCredentials; + private final ChannelConfigurator channelConfigurator; private final Object lock = new Object(); @GuardedBy("lock") private ScheduledExecutorService scheduler; @@ -147,7 +162,7 @@ class RefCountedXdsClientObjectPool implements ObjectPool { @VisibleForTesting RefCountedXdsClientObjectPool( BootstrapInfo bootstrapInfo, String target, MetricRecorder metricRecorder) { - this(bootstrapInfo, target, metricRecorder, null); + this(bootstrapInfo, target, metricRecorder, null, null); } @VisibleForTesting @@ -155,11 +170,13 @@ class RefCountedXdsClientObjectPool implements ObjectPool { BootstrapInfo bootstrapInfo, String target, MetricRecorder metricRecorder, - CallCredentials transportCallCredentials) { + CallCredentials transportCallCredentials, + ChannelConfigurator channelConfigurator) { this.bootstrapInfo = checkNotNull(bootstrapInfo, "bootstrapInfo"); this.target = target; this.metricRecorder = checkNotNull(metricRecorder, "metricRecorder"); this.transportCallCredentials = transportCallCredentials; + this.channelConfigurator = channelConfigurator; } @Override @@ -172,7 +189,7 @@ public XdsClient getObject() { scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); metricReporter = new XdsClientMetricReporterImpl(metricRecorder, target); GrpcXdsTransportFactory xdsTransportFactory = - new GrpcXdsTransportFactory(transportCallCredentials); + new GrpcXdsTransportFactory(transportCallCredentials, channelConfigurator); xdsClient = new XdsClientImpl( xdsTransportFactory, diff --git a/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java b/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java index 6df8d566a7a..cdd198474bb 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java @@ -16,6 +16,7 @@ package io.grpc.xds; +import io.grpc.ChannelConfigurator; import io.grpc.MetricRecorder; import io.grpc.internal.ObjectPool; import io.grpc.xds.client.Bootstrapper.BootstrapInfo; @@ -30,5 +31,9 @@ interface XdsClientPoolFactory { ObjectPool getOrCreate( String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder); + ObjectPool getOrCreate( + String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder, + ChannelConfigurator channelConfigurator); + List getTargets(); } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 69b0b824433..f4accf3869d 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -31,6 +31,7 @@ import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ChannelConfigurator; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; @@ -186,7 +187,8 @@ final class XdsNameResolver extends NameResolver { } else { checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); this.xdsClientPool = new BootstrappingXdsClientPool( - xdsClientPoolFactory, target, bootstrapOverride, metricRecorder); + xdsClientPoolFactory, target, bootstrapOverride, metricRecorder, + nameResolverArgs.getChildChannelConfigurator()); } this.random = checkNotNull(random, "random"); this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry"); @@ -1060,16 +1062,19 @@ private static final class BootstrappingXdsClientPool implements XdsClientPool { private final @Nullable Map bootstrapOverride; private final MetricRecorder metricRecorder; private ObjectPool xdsClientPool; + private final ChannelConfigurator channelConfigurator; BootstrappingXdsClientPool( XdsClientPoolFactory xdsClientPoolFactory, String target, @Nullable Map bootstrapOverride, - MetricRecorder metricRecorder) { + MetricRecorder metricRecorder, + ChannelConfigurator channelConfigurator) { this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); this.target = checkNotNull(target, "target"); this.bootstrapOverride = bootstrapOverride; - this.metricRecorder = checkNotNull(metricRecorder, "metricRecorder"); + this.metricRecorder = metricRecorder; + this.channelConfigurator = checkNotNull(channelConfigurator, "channelConfigurator"); } @Override @@ -1082,7 +1087,8 @@ public XdsClient getObject() throws XdsInitializationException { bootstrapInfo = new GrpcBootstrapperImpl().bootstrap(bootstrapOverride); } this.xdsClientPool = - xdsClientPoolFactory.getOrCreate(target, bootstrapInfo, metricRecorder); + xdsClientPoolFactory.getOrCreate( + target, bootstrapInfo, metricRecorder, channelConfigurator); } return xdsClientPool.getObject(); } diff --git a/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java b/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java index 4a4fb71aa84..1c0eb3cd024 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.DoNotCall; import io.grpc.Attributes; +import io.grpc.ChannelConfigurator; import io.grpc.ExperimentalApi; import io.grpc.ForwardingServerBuilder; import io.grpc.Internal; @@ -58,6 +59,8 @@ public final class XdsServerBuilder extends ForwardingServerBuilder bootstrapOverride; private long drainGraceTime = 10; private TimeUnit drainGraceTimeUnit = TimeUnit.MINUTES; + private ChannelConfigurator channelConfigurator = builder -> { }; + private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) { this.delegate = nettyDelegate; @@ -100,6 +103,20 @@ public XdsServerBuilder drainGraceTime(long drainGraceTime, TimeUnit drainGraceT return this; } + /** + * Sets the configurator that will be stored in the server built by this builder. + * + *

This configurator will subsequently be used to configure any child channels + * created by that server. + * + * @param channelConfigurator the configurator to store in the channel. + * @return this + */ + public XdsServerBuilder childChannelConfigurator(ChannelConfigurator channelConfigurator) { + this.channelConfigurator = checkNotNull(channelConfigurator, "channelConfigurator"); + return this; + } + @DoNotCall("Unsupported. Use forPort(int, ServerCredentials) instead") public static ServerBuilder forPort(int port) { throw new UnsupportedOperationException( @@ -128,7 +145,8 @@ public Server build() { } InternalNettyServerBuilder.eagAttributes(delegate, builder.build()); return new XdsServerWrapper("0.0.0.0:" + port, delegate, xdsServingStatusListener, - filterChainSelectorManager, xdsClientPoolFactory, bootstrapOverride, filterRegistry); + filterChainSelectorManager, xdsClientPoolFactory, bootstrapOverride, filterRegistry, + this.channelConfigurator); } @VisibleForTesting diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index 5529f96c7a2..daf226236f5 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol; import io.grpc.Attributes; +import io.grpc.ChannelConfigurator; import io.grpc.InternalServerInterceptors; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -128,6 +129,8 @@ public void uncaughtException(Thread t, Throwable e) { // NamedFilterConfig.filterStateKey -> filter_instance. private final HashMap activeFiltersDefaultChain = new HashMap<>(); + private final ChannelConfigurator channelConfigurator; + XdsServerWrapper( String listenerAddress, ServerBuilder delegateBuilder, @@ -135,7 +138,8 @@ public void uncaughtException(Thread t, Throwable e) { FilterChainSelectorManager filterChainSelectorManager, XdsClientPoolFactory xdsClientPoolFactory, @Nullable Map bootstrapOverride, - FilterRegistry filterRegistry) { + FilterRegistry filterRegistry, + ChannelConfigurator channelConfigurator) { this( listenerAddress, delegateBuilder, @@ -144,10 +148,30 @@ public void uncaughtException(Thread t, Throwable e) { xdsClientPoolFactory, bootstrapOverride, filterRegistry, - SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE)); + SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE), + channelConfigurator); sharedTimeService = true; } + XdsServerWrapper( + String listenerAddress, + ServerBuilder delegateBuilder, + XdsServingStatusListener listener, + FilterChainSelectorManager filterChainSelectorManager, + XdsClientPoolFactory xdsClientPoolFactory, + @Nullable Map bootstrapOverride, + FilterRegistry filterRegistry) { + this( + listenerAddress, + delegateBuilder, + listener, + filterChainSelectorManager, + xdsClientPoolFactory, + bootstrapOverride, + filterRegistry, + builder -> { }); + } + @VisibleForTesting XdsServerWrapper( String listenerAddress, @@ -158,6 +182,29 @@ public void uncaughtException(Thread t, Throwable e) { @Nullable Map bootstrapOverride, FilterRegistry filterRegistry, ScheduledExecutorService timeService) { + this( + listenerAddress, + delegateBuilder, + listener, + filterChainSelectorManager, + xdsClientPoolFactory, + bootstrapOverride, + filterRegistry, + timeService, + builder -> { }); + } + + @VisibleForTesting + XdsServerWrapper( + String listenerAddress, + ServerBuilder delegateBuilder, + XdsServingStatusListener listener, + FilterChainSelectorManager filterChainSelectorManager, + XdsClientPoolFactory xdsClientPoolFactory, + @Nullable Map bootstrapOverride, + FilterRegistry filterRegistry, + ScheduledExecutorService timeService, + ChannelConfigurator channelConfigurator) { this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress"); this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder"); this.delegateBuilder.intercept(new ConfigApplyingInterceptor()); @@ -169,6 +216,7 @@ public void uncaughtException(Thread t, Throwable e) { this.timeService = checkNotNull(timeService, "timeService"); this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry"); this.delegate = delegateBuilder.build(); + this.channelConfigurator = checkNotNull(channelConfigurator, "channelConfigurator"); } @Override @@ -202,7 +250,8 @@ private void internalStart() { bootstrapInfo = new GrpcBootstrapperImpl().bootstrap(bootstrapOverride); } xdsClientPool = xdsClientPoolFactory.getOrCreate( - "#server", bootstrapInfo, new MetricRecorder() {}); + "#server", bootstrapInfo, new MetricRecorder() {}, + channelConfigurator); } catch (Exception e) { StatusException statusException = Status.UNAVAILABLE.withDescription( "Failed to initialize xDS").withCause(e).asException(); diff --git a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java index e8bd7461736..98853804421 100644 --- a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java +++ b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java @@ -37,6 +37,7 @@ import io.envoyproxy.envoy.service.status.v3.ClientStatusRequest; import io.envoyproxy.envoy.service.status.v3.ClientStatusResponse; import io.envoyproxy.envoy.type.matcher.v3.NodeMatcher; +import io.grpc.ChannelConfigurator; import io.grpc.Deadline; import io.grpc.InsecureChannelCredentials; import io.grpc.MetricRecorder; @@ -517,5 +518,12 @@ public ObjectPool getOrCreate( String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder) { throw new UnsupportedOperationException("Should not be called"); } + + @Override + public ObjectPool getOrCreate( + String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder, + ChannelConfigurator channelConfigurator) { + throw new UnsupportedOperationException("Should not be called"); + } } } diff --git a/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java b/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java index a273c6f3ebf..7ef50fb4a5b 100644 --- a/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java +++ b/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java @@ -47,22 +47,33 @@ import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ChannelConfigurator; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientStreamTracer; import io.grpc.FlagResetRule; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.InsecureServerCredentials; import io.grpc.InternalFeatureFlags; +import io.grpc.InternalManagedChannelBuilder; import io.grpc.LoadBalancerRegistry; +import io.grpc.LongCounterMetricInstrument; import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.NoopMetricSink; +import io.grpc.Server; import io.grpc.testing.protobuf.SimpleRequest; import io.grpc.testing.protobuf.SimpleResponse; import io.grpc.testing.protobuf.SimpleServiceGrpc; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -360,4 +371,82 @@ public void pingPong_logicalDns_authorityOverride() { System.clearProperty("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE"); } } + + @Test + public void childChannelConfigurator_passesMetricSinkToChannel_E2E() throws Exception { + CountingMetricSink sink = new CountingMetricSink(); + ChannelConfigurator configurator = new ChannelConfigurator() { + @Override + public void configureChannelBuilder(ManagedChannelBuilder builder) { + InternalManagedChannelBuilder.addMetricSink(builder, sink); + } + }; + + ManagedChannel channel = Grpc.newChannelBuilder("test-xds:///test-server", + InsecureChannelCredentials.create()) + .childChannelConfigurator(configurator) + .build(); + + try { + SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub( + channel); + blockingStub.unaryRpc(SimpleRequest.getDefaultInstance()); + + // The xDS client inside the channel configurator will have created an ADS stream. + // The metric sink should have received attempt or connection metrics. + sink.awaitCall(); + } finally { + channel.shutdownNow(); + } + } + + @Test + public void childChannelConfigurator_passesMetricSinkToServer_E2E() throws Exception { + CountingMetricSink sink = new CountingMetricSink(); + ChannelConfigurator configurator = builder -> { + // Child channels (xDS client connections) created by this server get the sink. + InternalManagedChannelBuilder.addMetricSink(builder, sink); + }; + + // We start an XdsServer manually. + // XdsServer needs RDS, LDS, etc. from control plane. + XdsServerBuilder serverBuilder = XdsServerBuilder.forPort( + 0, InsecureServerCredentials.create()) + .addService(new SimpleServiceGrpc.SimpleServiceImplBase() {}) + .overrideBootstrapForTest(controlPlane.defaultBootstrapOverride()) + .childChannelConfigurator(configurator); + + Server childServer = serverBuilder.build().start(); + + try { + // The server xDS client will connect to control plane to get LDS. + sink.awaitCall(); + } finally { + childServer.shutdownNow(); + } + } + + private static final class CountingMetricSink extends NoopMetricSink { + private final AtomicInteger count = + new AtomicInteger(); + + @Override + public void addLongCounter( + LongCounterMetricInstrument metricInstrument, + long value, + List requiredLabelValues, + List optionalLabelValues) { + count.incrementAndGet(); + } + + public void awaitCall() throws InterruptedException { + long start = System.currentTimeMillis(); + while (count.get() == 0) { + if (System.currentTimeMillis() - start > 5000) { + throw new AssertionError("Timed out waiting for metric sink call"); + } + Thread.sleep(50); + } + } + } } diff --git a/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsOtelIntegrationTest.java b/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsOtelIntegrationTest.java new file mode 100644 index 00000000000..011e55fe5cf --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsOtelIntegrationTest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; + +import io.grpc.ChannelConfigurator; +import io.grpc.FlagResetRule; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.InternalFeatureFlags; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.opentelemetry.GrpcOpenTelemetry; +import io.grpc.testing.protobuf.SimpleRequest; +import io.grpc.testing.protobuf.SimpleServiceGrpc; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Arrays; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * xDS + OpenTelemetry E2E integration test using a fake control plane. + */ +@RunWith(Parameterized.class) +public class FakeControlPlaneXdsOtelIntegrationTest { + + @Rule(order = 0) + public ControlPlaneRule controlPlane = new ControlPlaneRule(); + @Rule(order = 1) + public DataPlaneRule dataPlane = new DataPlaneRule(controlPlane); + @Rule(order = 2) + public final FlagResetRule flagResetRule = new FlagResetRule(); + + @Parameters(name = "enableRfc3986UrisParam={0}") + public static Iterable data() { + return Arrays.asList(new Object[][] {{true}, {false}}); + } + + @Parameter public boolean enableRfc3986UrisParam; + + @Before + public void setupRfc3986UrisFeatureFlag() throws Exception { + flagResetRule.setFlagForTest( + InternalFeatureFlags::setRfc3986UrisEnabled, enableRfc3986UrisParam); + } + + @Test + public void childChannelConfigurator_passesOtelSdkToChannel_E2E() throws Exception { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + GrpcOpenTelemetry grpcOtel = GrpcOpenTelemetry.newBuilder() + .sdk(openTelemetry) + .build(); + + ChannelConfigurator configurator = new ChannelConfigurator() { + @Override + public void configureChannelBuilder(ManagedChannelBuilder builder) { + grpcOtel.configureChannelBuilder(builder); + } + }; + + ManagedChannel channel = Grpc.newChannelBuilder("test-xds:///test-server", + InsecureChannelCredentials.create()) + .childChannelConfigurator(configurator) + .build(); + + try { + SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub( + channel); + blockingStub.unaryRpc(SimpleRequest.getDefaultInstance()); + + boolean hasMetrics = false; + for (int i = 0; i < 20; i++) { + for (MetricData metric : metricReader.collectAllMetrics()) { + if (metric.getName().startsWith("grpc.client.")) { + hasMetrics = true; + break; + } + } + if (hasMetrics) { + break; + } + Thread.sleep(100); + } + assertThat(hasMetrics).isTrue(); + } finally { + channel.shutdownNow(); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 4918c2af7a4..60bb9ab8da2 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -5113,7 +5113,7 @@ public void serverFailureMetricReport_forRetryAndBackoff() { private XdsClientImpl createXdsClient(String serverUri) { BootstrapInfo bootstrapInfo = buildBootStrap(serverUri); return new XdsClientImpl( - new GrpcXdsTransportFactory(null), + new GrpcXdsTransportFactory(null, null), bootstrapInfo, fakeClock.getScheduledExecutorService(), backoffPolicyProvider, diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java b/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java index 9c606a962f6..48595504d55 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsTransportFactoryTest.java @@ -17,20 +17,30 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import com.google.common.util.concurrent.SettableFuture; import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.grpc.BindableService; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ChannelConfigurator; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannelBuilder; import io.grpc.MethodDescriptor; +import io.grpc.NoopClientCall; import io.grpc.Server; import io.grpc.Status; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.TestMethodDescriptors; import io.grpc.xds.client.Bootstrapper; import io.grpc.xds.client.XdsTransportFactory; import java.util.concurrent.BlockingQueue; @@ -96,7 +106,7 @@ public void onCompleted() { @Test public void callApis() throws Exception { XdsTransportFactory.XdsTransport xdsTransport = - new GrpcXdsTransportFactory(null) + new GrpcXdsTransportFactory(null, null) .create( Bootstrapper.ServerInfo.create( "localhost:" + server.getPort(), InsecureChannelCredentials.create())); @@ -127,7 +137,7 @@ public void refCountedXdsTransport_sameXdsServerAddress_returnsExistingTransport Bootstrapper.ServerInfo xdsServerInfo = Bootstrapper.ServerInfo.create( "localhost:" + server.getPort(), InsecureChannelCredentials.create()); - GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null); + GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null, null); // Calling create() for the first time creates a new GrpcXdsTransport instance. // The ref count was previously 0 and now is 1. XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo); @@ -159,7 +169,7 @@ public void refCountedXdsTransport_differentXdsServerAddress_returnsDifferentTra Bootstrapper.ServerInfo xdsServerInfo2 = Bootstrapper.ServerInfo.create( "localhost:" + server2.getPort(), InsecureChannelCredentials.create()); - GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null); + GrpcXdsTransportFactory xdsTransportFactory = new GrpcXdsTransportFactory(null, null); // Calling create() to the first xDS server creates a new GrpcXdsTransport instance. // The ref count was previously 0 and now is 1. XdsTransportFactory.XdsTransport transport1 = xdsTransportFactory.create(xdsServerInfo1); @@ -196,5 +206,139 @@ public void onStatusReceived(Status status) { endFuture.set(status); } } + + @Test + public void verifyConfigApplied_interceptor() { + final boolean[] interceptorCalled = new boolean[1]; + final ClientInterceptor interceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, + CallOptions callOptions, + Channel next) { + interceptorCalled[0] = true; + return new NoopClientCall<>(); + } + }; + + // Create Configurer that adds the interceptor + ChannelConfigurator configurer = new ChannelConfigurator() { + @Override + public void configureChannelBuilder(ManagedChannelBuilder builder) { + builder.intercept(interceptor); + } + }; + + // Create Factory + GrpcXdsTransportFactory factory = new GrpcXdsTransportFactory( + null, + configurer); + + // Create Transport + XdsTransportFactory.XdsTransport transport = factory.create( + Bootstrapper.ServerInfo.create("localhost:8080", InsecureChannelCredentials.create())); + + // Create a Call to trigger interceptors + MethodDescriptor method = MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(TestMethodDescriptors.voidMarshaller()) + .setResponseMarshaller(TestMethodDescriptors.voidMarshaller()) + .build(); + + transport.createStreamingCall(method.getFullMethodName(), method.getRequestMarshaller(), + method.getResponseMarshaller()); + + // Verify interceptor was invoked + assertThat(interceptorCalled[0]).isTrue(); + + transport.shutdown(); + } + + @Test + public void useChannelConfigurator() { + final boolean[] called = new boolean[1]; + ChannelConfigurator configurer = new ChannelConfigurator() { + @Override + public void configureChannelBuilder(ManagedChannelBuilder builder) { + called[0] = true; + } + }; + + // Create Factory + GrpcXdsTransportFactory factory = new GrpcXdsTransportFactory( + null, // CallCredentials + configurer); + + // Create Transport (triggers channel creation) + XdsTransportFactory.XdsTransport transport = factory.create( + Bootstrapper.ServerInfo.create("localhost:8080", InsecureChannelCredentials.create())); + + // Verify Configurer was accessed and applied + assertThat(called[0]).isTrue(); + + transport.shutdown(); + } + + @Test + public void useChannelConfigurator_throwsException_propagates() { + final RuntimeException testException = new RuntimeException("test exception"); + ChannelConfigurator configurer = new ChannelConfigurator() { + @Override + public void configureChannelBuilder(ManagedChannelBuilder builder) { + throw testException; + } + }; + + GrpcXdsTransportFactory factory = new GrpcXdsTransportFactory(null, configurer); + + try { + factory.create( + Bootstrapper.ServerInfo.create("localhost:8080", InsecureChannelCredentials.create())); + org.junit.Assert.fail("Expected RuntimeException"); + } catch (RuntimeException e) { + assertThat(e).isSameInstanceAs(testException); + } + } + + @Test + public void verifyConfigApplied_maxInboundMessageSize() { + // Create a mock Builder + ManagedChannelBuilder mockBuilder = mock(ManagedChannelBuilder.class); + + // Create Configurer that modifies message size + ChannelConfigurator configurer = new ChannelConfigurator() { + @Override + public void configureChannelBuilder(ManagedChannelBuilder builder) { + builder.maxInboundMessageSize(1024); + } + }; + + // Apply configurer to builder + configurer.configureChannelBuilder(mockBuilder); + + // Verify builder was modified + verify(mockBuilder).maxInboundMessageSize(1024); + } + + @Test + public void verifyConfigApplied_interceptors() { + ClientInterceptor interceptor1 = mock(ClientInterceptor.class); + ClientInterceptor interceptor2 = mock(ClientInterceptor.class); + + ChannelConfigurator configurer = new ChannelConfigurator() { + @Override + public void configureChannelBuilder(ManagedChannelBuilder builder) { + builder.intercept(interceptor1); + builder.intercept(interceptor2); + } + }; + + ManagedChannelBuilder mockBuilder = mock(ManagedChannelBuilder.class); + configurer.configureChannelBuilder(mockBuilder); + + verify(mockBuilder).intercept(interceptor1); + verify(mockBuilder).intercept(interceptor2); + } } diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index 80eb5cc1f47..4d1be47ef19 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -185,7 +185,7 @@ public void cancelled(Context context) { lrsClient = new LoadReportClient( loadStatsManager, - new GrpcXdsTransportFactory(null).createForTest(channel), + new GrpcXdsTransportFactory(null, null).createForTest(channel), NODE, syncContext, fakeClock.getScheduledExecutorService(), diff --git a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java index 29b149f166f..15887ff3d26 100644 --- a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java @@ -28,9 +28,12 @@ import com.google.auth.oauth2.OAuth2Credentials; import com.google.common.util.concurrent.SettableFuture; import io.grpc.CallCredentials; +import io.grpc.ChannelConfigurator; +import io.grpc.ClientInterceptor; import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MetricRecorder; import io.grpc.Server; @@ -207,7 +210,8 @@ public void xdsClient_usesCallCredentials() throws Exception { // Create xDS client that uses the CallCredentials on the transport ObjectPool xdsClientPool = - provider.getOrCreate("target", bootstrapInfo, metricRecorder, sampleCreds); + provider.getOrCreate("target", bootstrapInfo, metricRecorder, sampleCreds, + null); XdsClient xdsClient = xdsClientPool.getObject(); xdsClient.watchXdsResource( XdsListenerResource.getInstance(), "someLDSresource", ldsResourceWatcher); @@ -220,4 +224,65 @@ public void xdsClient_usesCallCredentials() throws Exception { xdsClientPool.returnObject(xdsClient); xdsServer.shutdownNow(); } + + @Test + public void xdsClient_usesChannelConfigurator() throws Exception { + // Set up fake xDS server + XdsTestControlPlaneService fakeXdsService = new XdsTestControlPlaneService(); + CallCredsServerInterceptor callInterceptor = new CallCredsServerInterceptor(); + Server xdsServer = + Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create()) + .addService(fakeXdsService) + .intercept(callInterceptor) + .build() + .start(); + String xdsServerUri = "localhost:" + xdsServer.getPort(); + + // Set up bootstrap & xDS client pool provider + ServerInfo server = ServerInfo.create(xdsServerUri, InsecureChannelCredentials.create()); + BootstrapInfo bootstrapInfo = + BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); + SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(); + + // Create a client interceptor that actually just injects a test token + ClientInterceptor testInterceptor = new ClientInterceptor() { + @Override + public io.grpc.ClientCall interceptCall( + io.grpc.MethodDescriptor method, + io.grpc.CallOptions callOptions, + io.grpc.Channel next) { + return new io.grpc.ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + headers.put(AUTHORIZATION_METADATA_KEY, "Bearer test-configurator-token"); + super.start(responseListener, headers); + } + }; + } + }; + + ChannelConfigurator configurator = new ChannelConfigurator() { + @Override + public void configureChannelBuilder(ManagedChannelBuilder builder) { + builder.intercept(testInterceptor); + } + }; + + // Create xDS client that uses the ChannelConfigurator on the transport + ObjectPool xdsClientPool = + provider.getOrCreate("target", bootstrapInfo, metricRecorder, null, configurator); + XdsClient xdsClient = xdsClientPool.getObject(); + xdsClient.watchXdsResource( + XdsListenerResource.getInstance(), "someLDSresource", ldsResourceWatcher); + + // Wait for xDS server to get the request and verify that it received the token from + // configurator + assertThat(callInterceptor.getTokenWithTimeout(5, TimeUnit.SECONDS)) + .isEqualTo("Bearer test-configurator-token"); + + // Clean up + xdsClientPool.returnObject(xdsClient); + xdsServer.shutdownNow(); + } } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java index 27ee8d22825..4d5e7d09ad4 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java @@ -484,7 +484,7 @@ public void fallbackFromBadUrlToGoodOne() { XdsClientImpl client = CommonBootstrapperTestUtils.createXdsClient( Arrays.asList(garbageUri, validUri), - new GrpcXdsTransportFactory(null), + new GrpcXdsTransportFactory(null, null), fakeClock, new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, @@ -509,7 +509,7 @@ public void testGoodUrlFollowedByBadUrl() { XdsClientImpl client = CommonBootstrapperTestUtils.createXdsClient( Arrays.asList(validUri, garbageUri), - new GrpcXdsTransportFactory(null), + new GrpcXdsTransportFactory(null, null), fakeClock, new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, @@ -536,7 +536,7 @@ public void testTwoBadUrl() { XdsClientImpl client = CommonBootstrapperTestUtils.createXdsClient( Arrays.asList(garbageUri1, garbageUri2), - new GrpcXdsTransportFactory(null), + new GrpcXdsTransportFactory(null, null), fakeClock, new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index df3a0af5111..83a8ddfd7c8 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -47,6 +47,7 @@ import com.google.re2j.Pattern; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ChannelConfigurator; import io.grpc.ChannelLogger; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; @@ -67,6 +68,7 @@ import io.grpc.NameResolver.ServiceConfigParser; import io.grpc.NoopClientCall; import io.grpc.NoopClientCall.NoopClientCallListener; +import io.grpc.ProxyDetector; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusOr; @@ -2495,6 +2497,7 @@ private PickSubchannelArgs newPickSubchannelArgs( private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory { Set targets = new HashSet<>(); XdsClient xdsClient = new FakeXdsClient(); + ChannelConfigurator savedChannelConfigurator; @Override @Nullable @@ -2519,6 +2522,25 @@ public XdsClient returnObject(Object object) { }; } + @Override + public ObjectPool getOrCreate( + String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder, + ChannelConfigurator channelConfigurator) { + targets.add(target); + this.savedChannelConfigurator = channelConfigurator; + return new ObjectPool() { + @Override + public XdsClient getObject() { + return xdsClient; + } + + @Override + public XdsClient returnObject(Object object) { + return null; + } + }; + } + @Override public List getTargets() { if (targets.isEmpty()) { @@ -2957,4 +2979,41 @@ void deliverErrorStatus() { listener.onClose(Status.UNAVAILABLE, new Metadata()); } } + + @Test + public void start_passesChannelConfiguratorToClientPoolFactory() { + ChannelConfigurator channelConfigurator = builder -> { }; + + // Build NameResolver.Args containing the channel configurator + NameResolver.Args args = NameResolver.Args.newBuilder() + .setDefaultPort(8080) + .setProxyDetector(mock(ProxyDetector.class)) + .setSynchronizationContext(syncContext) + .setServiceConfigParser(serviceConfigParser) + .setChannelLogger(mock(ChannelLogger.class)) + .setChildChannelConfigurator(channelConfigurator) + .build(); + + XdsNameResolver resolver = new XdsNameResolver( + targetUri, + null, // targetAuthority (nullable) + AUTHORITY, // name + null, // overrideAuthority (nullable) + serviceConfigParser, + syncContext, + scheduler, + xdsClientPoolFactory, + mockRandom, + FilterRegistry.getDefaultRegistry(), + rawBootstrap, + metricRecorder, + args); + + // Start the resolver + resolver.start(mockListener); + + assertThat(xdsClientPoolFactory.savedChannelConfigurator).isSameInstanceAs(channelConfigurator); + + resolver.shutdown(); + } } diff --git a/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java b/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java index ac990226259..e4846ae54ec 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java @@ -20,6 +20,7 @@ import static io.grpc.xds.XdsServerTestHelper.buildTestListener; import static org.junit.Assert.fail; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -30,15 +31,18 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.SettableFuture; import io.grpc.BindableService; +import io.grpc.ChannelConfigurator; import io.grpc.InsecureServerCredentials; import io.grpc.ServerServiceDefinition; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.StatusOr; +import io.grpc.internal.ObjectPool; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.XdsServerTestHelper.FakeXdsClient; import io.grpc.xds.XdsServerTestHelper.FakeXdsClientPoolFactory; +import io.grpc.xds.client.XdsClient; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; import java.io.IOException; import java.net.InetSocketAddress; @@ -321,8 +325,28 @@ public void testOverrideBootstrap() throws Exception { buildBuilder(null); builder.overrideBootstrapForTest(b); xdsServer = cleanupRule.register((XdsServerWrapper) builder.build()); - Future unused = startServerAsync(); + Future unused = startServerAsync(); assertThat(xdsClientPoolFactory.savedBootstrapInfo.node().getId()) .isEqualTo(XdsServerTestHelper.BOOTSTRAP_INFO.node().getId()); } + + @Test + public void start_passesChannelConfiguratorToClientPoolFactory() throws Exception { + ChannelConfigurator configurer = builder -> { }; + XdsClientPoolFactory mockPoolFactory = mock(XdsClientPoolFactory.class); + @SuppressWarnings("unchecked") + ObjectPool mockPool = mock(ObjectPool.class); + when(mockPool.getObject()).thenReturn(xdsClient); + when(mockPoolFactory.getOrCreate(any(), any(), any(), any())).thenReturn(mockPool); + + buildBuilder(null); + builder.childChannelConfigurator(configurer); + builder.xdsClientPoolFactory(mockPoolFactory); + xdsServer = cleanupRule.register((XdsServerWrapper) builder.build()); + + Future unused = startServerAsync(); + + verify(mockPoolFactory).getOrCreate( + any(), any(), any(), eq(configurer)); + } } diff --git a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java index 386793299d8..aa546a564f9 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.SettableFuture; import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol; +import io.grpc.ChannelConfigurator; import io.grpc.InsecureChannelCredentials; import io.grpc.MetricRecorder; import io.grpc.Status; @@ -182,6 +183,13 @@ public XdsClient returnObject(Object object) { }; } + @Override + public ObjectPool getOrCreate( + String target, BootstrapInfo bootstrapInfo, MetricRecorder metricRecorder, + ChannelConfigurator channelConfigurator) { + return getOrCreate(target, bootstrapInfo, metricRecorder); + } + @Override public List getTargets() { return Collections.singletonList("fake-target");