diff --git a/.claude/skills/xds-dev/SKILL.md b/.claude/skills/xds-dev/SKILL.md new file mode 100644 index 00000000000..1e530800f9c --- /dev/null +++ b/.claude/skills/xds-dev/SKILL.md @@ -0,0 +1,16 @@ +--- +name: xds-dev +description: Build and test all xDS-related modules. Useful during xDS development to verify changes compile and pass tests across the full xDS module set. +--- + +# xDS Development + +Build and test commands for all xDS-related modules. + +## Commands + +### Test all xDS modules + +``` +./gradlew --parallel -Pretry=true :xds:test :xds-api:test :xds-validator:test :it:xds-client:test :it:xds-controlplane-api:test :it:xds-no-validation:test :it:xds-istio:test +``` diff --git a/it/xds-client/src/test/java/com/linecorp/armeria/xds/client/endpoint/HttpProtocolOptionsTest.java b/it/xds-client/src/test/java/com/linecorp/armeria/xds/client/endpoint/HttpProtocolOptionsTest.java new file mode 100644 index 00000000000..b2f98fa8603 --- /dev/null +++ b/it/xds-client/src/test/java/com/linecorp/armeria/xds/client/endpoint/HttpProtocolOptionsTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import com.linecorp.armeria.client.BlockingWebClient; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import com.linecorp.armeria.xds.XdsBootstrap; +import com.linecorp.armeria.xds.it.XdsResourceReader; + +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; + +class HttpProtocolOptionsTest { + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.http(0); + sb.service("/", (ctx, req) -> HttpResponse.of("OK")); + } + }; + + // language=YAML + private static final String BOOTSTRAP_TEMPLATE = + """ + static_resources: + listeners: + - name: my-listener + api_listener: + api_listener: + "@type": type.googleapis.com/envoy.extensions.filters.network.\ + http_connection_manager.v3.HttpConnectionManager + stat_prefix: http + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: [ "*" ] + routes: + - match: + prefix: / + route: + cluster: my-cluster + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.\ + http.router.v3.Router + clusters: + - name: my-cluster + type: STATIC + load_assignment: + cluster_name: my-cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: %d + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.\ + http.v3.HttpProtocolOptions + %s + """; + + static Stream sessionProtocolSelection() { + return Stream.of( + Arguments.of("explicit_http_config:\n http2_protocol_options: {}", + SessionProtocol.H2C), + Arguments.of("explicit_http_config:\n http_protocol_options: {}", + SessionProtocol.H1C), + Arguments.of("auto_config: {}", SessionProtocol.HTTP) + ); + } + + @ParameterizedTest + @MethodSource + void sessionProtocolSelection(String protocolConfig, + SessionProtocol expectedProtocol) { + final String yaml = BOOTSTRAP_TEMPLATE.formatted( + server.httpPort(), protocolConfig.indent(20).strip()); + final Bootstrap bootstrap = XdsResourceReader.fromYaml(yaml, Bootstrap.class); + final AtomicReference protocolRef = new AtomicReference<>(); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap); + XdsHttpPreprocessor preprocessor = + XdsHttpPreprocessor.ofListener("my-listener", xdsBootstrap)) { + final BlockingWebClient client = + WebClient.builder(preprocessor) + .decorator((delegate, ctx, req) -> { + protocolRef.set(ctx.sessionProtocol()); + return delegate.execute(ctx, req); + }) + .build() + .blocking(); + client.get("/"); + assertThat(protocolRef.get()).isEqualTo(expectedProtocol); + } + } +} diff --git a/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/AuthTokenFilterTest.java b/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/AuthTokenFilterTest.java new file mode 100644 index 00000000000..c96fef51b72 --- /dev/null +++ b/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/AuthTokenFilterTest.java @@ -0,0 +1,364 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.it; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Any; + +import com.linecorp.armeria.client.BlockingWebClient; +import com.linecorp.armeria.client.HttpPreprocessor; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.TlsProvider; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.util.AsyncLoader; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.ServerTlsConfig; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; +import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import com.linecorp.armeria.xds.ClusterSnapshot; +import com.linecorp.armeria.xds.filter.FactoryContext; +import com.linecorp.armeria.xds.XdsBootstrap; +import com.linecorp.armeria.xds.client.endpoint.XdsHttpPreprocessor; +import com.linecorp.armeria.xds.filter.HttpFilterFactory; +import com.linecorp.armeria.xds.filter.XdsHttpFilter; +import com.linecorp.armeria.xds.stream.SnapshotStream; + +import io.envoyproxy.controlplane.cache.v3.SimpleCache; +import io.envoyproxy.controlplane.cache.v3.Snapshot; +import io.envoyproxy.controlplane.server.V3DiscoveryServer; +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; +import io.netty.handler.ssl.ClientAuth; + +class AuthTokenFilterTest { + + private static final String GROUP = "key"; + private static final SimpleCache cache = new SimpleCache<>(node -> GROUP); + private static final String ECHO_CLUSTER = "echo-cluster"; + private static final String TOKEN_SERVER_CLUSTER = "token-server"; + private static final String LISTENER_NAME = "listener1"; + private static final String ROUTE_NAME = "route1"; + private static final String BOOTSTRAP_CLUSTER_NAME = "bootstrap-cluster"; + private static final AtomicInteger tokenRequestCount = new AtomicInteger(); + + @Order(1) + @RegisterExtension + static final SelfSignedCertificateExtension serverCert = new SelfSignedCertificateExtension(); + + @Order(2) + @RegisterExtension + static final SelfSignedCertificateExtension clientCert = new SelfSignedCertificateExtension(); + + @RegisterExtension + static final ServerExtension controlPlane = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + final V3DiscoveryServer v3DiscoveryServer = new V3DiscoveryServer(cache); + sb.service(GrpcService.builder() + .addService(v3DiscoveryServer.getAggregatedDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getListenerDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getClusterDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getRouteDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getEndpointDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getSecretDiscoveryServiceImpl()) + .build()); + sb.http(0); + } + }; + + @RegisterExtension + static final ServerExtension tokenServer = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.tlsProvider(TlsProvider.of(serverCert.tlsKeyPair()), + ServerTlsConfig.builder() + .clientAuth(ClientAuth.REQUIRE) + .tlsCustomizer(b -> b.trustManager(clientCert.certificate())) + .build()); + sb.service("/token", (ctx, req) -> { + tokenRequestCount.incrementAndGet(); + return HttpResponse.of("test-token-12345"); + }); + } + }; + + @RegisterExtension + static final ServerExtension echoServer = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.service("/echo", (ctx, req) -> { + final String token = req.headers().get("x-auth-token"); + return HttpResponse.of(token != null ? token : "no-token"); + }); + sb.http(0); + } + }; + + @RegisterExtension + static final EventLoopExtension eventLoop = new EventLoopExtension(); + + @BeforeEach + void beforeEach() { + tokenRequestCount.set(0); + + final Cluster echoCluster = XdsResourceReader.fromYaml(""" + name: %s + type: EDS + connect_timeout: 1s + eds_cluster_config: + eds_config: + ads: {} + """.formatted(ECHO_CLUSTER), Cluster.class); + + final Cluster tokenCluster = XdsResourceReader.fromYaml(""" + name: %s + type: EDS + connect_timeout: 1s + eds_cluster_config: + eds_config: + ads: {} + transport_socket: + name: envoy.transport_sockets.tls + typed_config: + "@type": type.googleapis.com/envoy.extensions.transport_sockets\ + .tls.v3.UpstreamTlsContext + common_tls_context: + tls_certificates: + - private_key: + filename: '%s' + certificate_chain: + filename: '%s' + validation_context: + trusted_ca: + filename: '%s' + """.formatted(TOKEN_SERVER_CLUSTER, + clientCert.privateKeyFile().getAbsolutePath(), + clientCert.certificateFile().getAbsolutePath(), + serverCert.certificateFile().getAbsolutePath()), Cluster.class); + + final ClusterLoadAssignment echoAssignment = XdsResourceReader.fromYaml(""" + cluster_name: %s + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: %s + port_value: %s + """.formatted(ECHO_CLUSTER, + echoServer.httpSocketAddress().getHostString(), + echoServer.httpPort()), ClusterLoadAssignment.class); + + final ClusterLoadAssignment tokenAssignment = XdsResourceReader.fromYaml(""" + cluster_name: %s + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: %s + port_value: %s + """.formatted(TOKEN_SERVER_CLUSTER, + tokenServer.httpsSocketAddress().getHostString(), + tokenServer.httpsPort()), ClusterLoadAssignment.class); + + final Listener listener = XdsResourceReader.fromYaml(""" + name: %s + api_listener: + api_listener: + "@type": type.googleapis.com/envoy.extensions.filters.network\ + .http_connection_manager.v3.HttpConnectionManager + stat_prefix: http + rds: + route_config_name: %s + config_source: + ads: {} + http_filters: + - name: test.auth_token_filter + typed_config: + "@type": type.googleapis.com/google.protobuf.Empty + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + """.formatted(LISTENER_NAME, ROUTE_NAME), Listener.class); + + final RouteConfiguration route = XdsResourceReader.fromYaml(""" + name: %s + virtual_hosts: + - name: local_service + domains: [ "*" ] + routes: + - match: + prefix: / + route: + cluster: %s + """.formatted(ROUTE_NAME, ECHO_CLUSTER), RouteConfiguration.class); + + cache.setSnapshot(GROUP, Snapshot.create( + ImmutableList.of(echoCluster, tokenCluster), + ImmutableList.of(echoAssignment, tokenAssignment), + ImmutableList.of(listener), + ImmutableList.of(route), + ImmutableList.of(), + "1")); + } + + @Test + void tokenFetchedAndCachedAcrossRequests() { + final Bootstrap bootstrap = XdsResourceReader.fromYaml(bootstrapYaml()); + final AuthTokenFilterFactory filterFactory = new AuthTokenFilterFactory(); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.builder(bootstrap) + .eventExecutor(eventLoop.get()) + .extensionFactory(filterFactory) + .build(); + XdsHttpPreprocessor preprocessor = + XdsHttpPreprocessor.ofListener(LISTENER_NAME, xdsBootstrap)) { + final BlockingWebClient client = WebClient.of(preprocessor).blocking(); + + for (int i = 0; i < 3; i++) { + final AggregatedHttpResponse response = client.get("/echo"); + assertThat(response.status()).isEqualTo(HttpStatus.OK); + assertThat(response.contentUtf8()).isEqualTo("test-token-12345"); + } + + assertThat(tokenRequestCount.get()).isEqualTo(1); + } + } + + private String bootstrapYaml() { + return """ + dynamic_resources: + ads_config: + api_type: GRPC + grpc_services: + - envoy_grpc: + cluster_name: %s + cds_config: + ads: {} + lds_config: + ads: {} + static_resources: + clusters: + - name: %s + type: STATIC + load_assignment: + cluster_name: %s + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: %s + port_value: %s + """.formatted(BOOTSTRAP_CLUSTER_NAME, + BOOTSTRAP_CLUSTER_NAME, BOOTSTRAP_CLUSTER_NAME, + controlPlane.httpSocketAddress().getHostString(), + controlPlane.httpPort()); + } + + private static final class AuthTokenFilterFactory implements HttpFilterFactory { + + private static final String NAME = "test.auth_token_filter"; + private static final String TYPE_URL = "type.googleapis.com/google.protobuf.Empty"; + private static final List TYPE_URLS = ImmutableList.of(TYPE_URL); + + @Override + public String name() { + return NAME; + } + + @Override + public List typeUrls() { + return TYPE_URLS; + } + + @Nullable + @Override + public XdsHttpFilter create(HttpFilter httpFilter, Any config, FactoryContext context) { + throw new UnsupportedOperationException("use createStream()"); + } + + @Override + public SnapshotStream createStream(HttpFilter httpFilter, Any config, + FactoryContext context) { + final SnapshotStream clusterStream = + context.clusterStream(TOKEN_SERVER_CLUSTER); + + return clusterStream.switchMapEager( + AuthTokenFilterFactory::buildTokenFilter); + } + + private static SnapshotStream buildTokenFilter( + ClusterSnapshot clusterSnapshot) { + if (clusterSnapshot.loadBalancer() == null) { + return SnapshotStream.just(new XdsHttpFilter() {}); + } + final WebClient tokenClient = WebClient.of(clusterSnapshot.httpPreprocessor()); + // we may consider caching the token across cluster updates in the future + final AsyncLoader tokenLoader = AsyncLoader + .builder(cached -> tokenClient.get("/token") + .aggregate() + .thenApply(AggregatedHttpResponse::contentUtf8)) + .expireAfterLoad(Duration.ofMinutes(5)) + .build(); + + return SnapshotStream.just(new CachedTokenXdsHttpFilter(tokenLoader)); + } + } + + private static final class CachedTokenXdsHttpFilter implements XdsHttpFilter { + + private final AsyncLoader tokenLoader; + + CachedTokenXdsHttpFilter(AsyncLoader tokenLoader) { + this.tokenLoader = tokenLoader; + } + + @Override + public HttpPreprocessor httpPreprocessor() { + return (delegate, ctx, req) -> HttpResponse.of( + tokenLoader.load().thenApply(token -> { + ctx.setAdditionalRequestHeader("x-auth-token", token); + try { + return delegate.execute(ctx, req); + } catch (Exception e) { + return HttpResponse.ofFailure(e); + } + })); + } + } +} diff --git a/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.java b/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.java new file mode 100644 index 00000000000..82a8d66d7c1 --- /dev/null +++ b/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/CustomConfigSourceTest.java @@ -0,0 +1,201 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import com.linecorp.armeria.xds.ClusterSnapshot; +import com.linecorp.armeria.xds.filter.FactoryContext; +import com.linecorp.armeria.xds.SnapshotWatcher; +import com.linecorp.armeria.xds.XdsBootstrap; +import com.linecorp.armeria.xds.configsource.InterestedResources; +import com.linecorp.armeria.xds.configsource.SotwConfigSourceSubscriptionFactory; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; + +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; + +class CustomConfigSourceTest { + + @RegisterExtension + static final EventLoopExtension eventLoop = new EventLoopExtension(); + + //language=JSON + private static final String DISCOVERY_RESPONSE_JSON = """ + { + "typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster", + "resources": [ + { + "@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster", + "name": "my-dynamic-cluster", + "type": "STATIC", + "loadAssignment": { + "clusterName": "my-dynamic-cluster", + "endpoints": [ + { + "lbEndpoints": [ + { + "endpoint": { + "address": { + "socketAddress": { + "address": "127.0.0.1", + "portValue": 9999 + } + } + } + } + ] + } + ] + } + } + ], + "versionInfo": "1" + } + """; + + @RegisterExtension + static final ServerExtension configServer = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + sb.service("/xds/cluster/my-dynamic-cluster", (ctx, req) -> + HttpResponse.of(HttpStatus.OK, MediaType.JSON, DISCOVERY_RESPONSE_JSON)); + } + }; + + @Test + void customConfigSourceFetchesClusterViaHttp() { + //language=YAML + final String bootstrapYaml = """ + static_resources: + clusters: + - name: config-server + type: STATIC + load_assignment: + cluster_name: config-server + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: %d + dynamic_resources: + cds_config: + custom_config_source: + "@type": "type.googleapis.com/google.protobuf.Empty" + """.formatted(configServer.httpPort()); + + final Bootstrap bootstrap = XdsResourceReader.fromYaml(bootstrapYaml); + final AtomicReference snapshotRef = new AtomicReference<>(); + final AtomicReference errorRef = new AtomicReference<>(); + final SnapshotWatcher watcher = (snapshot, t) -> { + if (t != null) { + errorRef.set(t); + return; + } + if (snapshot instanceof ClusterSnapshot) { + snapshotRef.set((ClusterSnapshot) snapshot); + } + }; + + try (XdsBootstrap xdsBootstrap = XdsBootstrap.builder(bootstrap) + .eventExecutor(eventLoop.get()) + .extensionFactory(new HttpConfigSourceFactory()) + .defaultSnapshotWatcher(watcher) + .build()) { + xdsBootstrap.clusterRoot("my-dynamic-cluster"); + await().untilAsserted(() -> { + assertThat(errorRef.get()).isNull(); + assertThat(snapshotRef.get()).isNotNull(); + assertThat(snapshotRef.get().xdsResource().resource().getName()) + .isEqualTo("my-dynamic-cluster"); + }); + } + } + + static final class HttpConfigSourceFactory implements SotwConfigSourceSubscriptionFactory { + + @Override + public String name() { + return "http-config-source"; + } + + @Override + public List typeUrls() { + return ImmutableList.of("type.googleapis.com/google.protobuf.Empty"); + } + + @Override + public SnapshotStream create( + ConfigSource configSource, + FactoryContext factoryContext, + SnapshotStream interestedResources) { + final SnapshotStream clusterStream = + factoryContext.clusterStream("config-server"); + return SnapshotStream.combineLatest(interestedResources, clusterStream, Map::entry) + .switchMapEager(entry -> fetchViaHttp(entry.getKey(), + entry.getValue())); + } + + private SnapshotStream fetchViaHttp(InterestedResources interest, + ClusterSnapshot clusterSnapshot) { + return watcher -> { + final WebClient client = WebClient.of(clusterSnapshot.httpPreprocessor()); + final String type = interest.type().name().toLowerCase(Locale.ENGLISH); + for (String name : interest.resourceNames()) { + client.get("/xds/%s/%s".formatted(type, name)).aggregate().handle((response, cause) -> { + if (cause != null) { + watcher.onUpdate(null, cause); + return null; + } + try { + final DiscoveryResponse discoveryResponse = + XdsResourceReader.fromJson( + response.contentUtf8(), DiscoveryResponse.class); + watcher.onUpdate(discoveryResponse, null); + } catch (Exception e) { + watcher.onUpdate(null, e); + } + return null; + }); + } + return Subscription.noop(); + }; + } + } +} diff --git a/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/FilterFactoryEventLoopTest.java b/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/FilterFactoryEventLoopTest.java new file mode 100644 index 00000000000..c2e06f3bedd --- /dev/null +++ b/it/xds-client/src/test/java/com/linecorp/armeria/xds/it/FilterFactoryEventLoopTest.java @@ -0,0 +1,233 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Any; + +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; +import com.linecorp.armeria.xds.filter.FactoryContext; +import com.linecorp.armeria.xds.ListenerRoot; +import com.linecorp.armeria.xds.SnapshotWatcher; +import com.linecorp.armeria.xds.XdsBootstrap; +import com.linecorp.armeria.xds.filter.HttpFilterFactory; +import com.linecorp.armeria.xds.filter.XdsHttpFilter; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; + +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; + +class FilterFactoryEventLoopTest { + + private static final String LISTENER_NAME = "listener1"; + private static final String CUSTOM_FILTER_NAME = "test.custom_filter"; + private static final String CUSTOM_FILTER_TYPE_URL = + "type.googleapis.com/google.protobuf.Empty"; + + @RegisterExtension + static final EventLoopExtension xdsEventLoop = new EventLoopExtension(); + + @RegisterExtension + static final EventLoopExtension otherEventLoop = new EventLoopExtension(); + + //language=YAML + private static final String BOOTSTRAP_YAML = """ + static_resources: + clusters: + - name: my-cluster + type: STATIC + load_assignment: + cluster_name: my-cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 8080 + listeners: + - name: listener1 + api_listener: + api_listener: + "@type": type.googleapis.com/envoy.extensions.filters.network\ + .http_connection_manager.v3.HttpConnectionManager + stat_prefix: http + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: [ "*" ] + routes: + - match: + prefix: / + route: + cluster: my-cluster + http_filters: + - name: test.custom_filter + typed_config: + "@type": type.googleapis.com/google.protobuf.Empty + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + """; + + /** + * Verifies that subscribing to {@code genericSecretStream} from a non-event-loop thread + * triggers an {@link IllegalStateException} from {@code checkSubscribeOn}. + */ + @Test + void strictCheckPropagatesError() { + final CompletableFuture subscribeOffEventLoop = new CompletableFuture<>(); + final HttpFilterFactory offThreadFactory = new HttpFilterFactory() { + @Override + public String name() { + return CUSTOM_FILTER_NAME; + } + + @Override + public List typeUrls() { + return ImmutableList.of(CUSTOM_FILTER_TYPE_URL); + } + + @Nullable + @Override + public XdsHttpFilter create(HttpFilter httpFilter, Any config, FactoryContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public SnapshotStream createStream( + HttpFilter httpFilter, Any config, FactoryContext context) { + return watcher -> { + otherEventLoop.get().execute(() -> { + try { + // Subscribe to genericSecretStream from a different event loop. + // checkSubscribeOn should throw IllegalStateException. + final Subscription unused = context.genericSecretStream( + SdsSecretConfig.newBuilder() + .setName("nonexistent") + .build() + ).subscribe((value, error) -> {}); + subscribeOffEventLoop.complete(null); + } catch (Throwable t) { + subscribeOffEventLoop.completeExceptionally(t); + } + }); + return Subscription.noop(); + }; + } + }; + + final Bootstrap bootstrap = XdsResourceReader.fromYaml(BOOTSTRAP_YAML); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.builder(bootstrap) + .eventExecutor(xdsEventLoop.get()) + .extensionFactory(offThreadFactory) + .build()) { + final ListenerRoot root = xdsBootstrap.listenerRoot(LISTENER_NAME); + try { + await().untilAsserted(() -> assertThat(subscribeOffEventLoop).isCompletedExceptionally()); + assertThat(subscribeOffEventLoop) + .failsWithin(0, TimeUnit.SECONDS) + .withThrowableThat() + .withCauseInstanceOf(IllegalStateException.class) + .withMessageContaining("subscribe must be called from the event loop"); + } finally { + root.close(); + } + } + } + + /** + * Verifies that a custom filter factory emitting from a non-event-loop thread + * still delivers events successfully, thanks to {@code rescheduleEventsOn}. + */ + @Test + void reschedulingDeliversEventsFromOffThread() { + final HttpFilterFactory offThreadEmitterFactory = new HttpFilterFactory() { + @Override + public String name() { + return CUSTOM_FILTER_NAME; + } + + @Override + public List typeUrls() { + return ImmutableList.of(CUSTOM_FILTER_TYPE_URL); + } + + @Nullable + @Override + public XdsHttpFilter create(HttpFilter httpFilter, Any config, FactoryContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public SnapshotStream createStream( + HttpFilter httpFilter, Any config, FactoryContext context) { + return watcher -> { + otherEventLoop.get().execute(() -> { + // Emit a no-op filter from a different event loop. + // Without rescheduleEventsOn this would be a threading violation. + watcher.onUpdate(new XdsHttpFilter() {}, null); + }); + return Subscription.noop(); + }; + } + }; + + final List snapshots = new CopyOnWriteArrayList<>(); + final List errors = new CopyOnWriteArrayList<>(); + final SnapshotWatcher defaultWatcher = (snapshot, error) -> { + if (snapshot != null) { + snapshots.add(snapshot); + } + if (error != null) { + errors.add(error); + } + }; + + final Bootstrap bootstrap = XdsResourceReader.fromYaml(BOOTSTRAP_YAML); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.builder(bootstrap) + .eventExecutor(xdsEventLoop.get()) + .extensionFactory(offThreadEmitterFactory) + .defaultSnapshotWatcher(defaultWatcher) + .build()) { + final ListenerRoot root = xdsBootstrap.listenerRoot(LISTENER_NAME); + try { + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(snapshots).isNotEmpty(); + }); + assertThat(errors).isEmpty(); + } finally { + root.close(); + } + } + } +} diff --git a/it/xds-istio/src/test/resources/META-INF/services/com.linecorp.armeria.xds.filter.HttpFilterFactory b/it/xds-istio/src/test/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactory similarity index 100% rename from it/xds-istio/src/test/resources/META-INF/services/com.linecorp.armeria.xds.filter.HttpFilterFactory rename to it/xds-istio/src/test/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactory diff --git a/xds-api/src/main/proto/envoy/config/cluster/v3/cluster.proto b/xds-api/src/main/proto/envoy/config/cluster/v3/cluster.proto index b736903638e..0dbf2c16732 100644 --- a/xds-api/src/main/proto/envoy/config/cluster/v3/cluster.proto +++ b/xds-api/src/main/proto/envoy/config/cluster/v3/cluster.proto @@ -1025,6 +1025,7 @@ message Cluster { // specific options. // [#next-major-version: make this a list of typed extensions.] // [#extension-category: envoy.upstream_options] + option (armeria.xds.supported.field) = 36; map typed_extension_protocol_options = 36; // If the DNS refresh rate is specified and the cluster type is either diff --git a/xds-api/src/main/proto/envoy/extensions/upstreams/http/v3/http_protocol_options.proto b/xds-api/src/main/proto/envoy/extensions/upstreams/http/v3/http_protocol_options.proto index 03f0158ed17..2e5f6d67793 100644 --- a/xds-api/src/main/proto/envoy/extensions/upstreams/http/v3/http_protocol_options.proto +++ b/xds-api/src/main/proto/envoy/extensions/upstreams/http/v3/http_protocol_options.proto @@ -11,6 +11,8 @@ import "envoy/extensions/filters/network/http_connection_manager/v3/http_connect import "udpa/annotations/status.proto"; import "validate/validate.proto"; +import "armeria/xds/supported.proto"; + option java_package = "io.envoyproxy.envoy.extensions.upstreams.http.v3"; option java_outer_classname = "HttpProtocolOptionsProto"; option java_multiple_files = true; @@ -69,8 +71,10 @@ message HttpProtocolOptions { oneof protocol_config { option (validate.required) = true; + option (armeria.xds.supported.oneof_field) = 1; config.core.v3.Http1ProtocolOptions http_protocol_options = 1; + option (armeria.xds.supported.oneof_field) = 2; config.core.v3.Http2ProtocolOptions http2_protocol_options = 2; // .. warning:: @@ -149,6 +153,7 @@ message HttpProtocolOptions { option (validate.required) = true; // To explicitly configure either HTTP/1 or HTTP/2 (but not both!) use ``explicit_http_config``. + option (armeria.xds.supported.oneof_field) = 3; ExplicitHttpConfig explicit_http_config = 3; // This allows switching on protocol based on what protocol the downstream @@ -156,6 +161,7 @@ message HttpProtocolOptions { UseDownstreamHttpConfig use_downstream_protocol_config = 4; // This allows switching on protocol based on ALPN + option (armeria.xds.supported.oneof_field) = 5; AutoHttpConfig auto_config = 5; } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java b/xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java index 4a327027e59..a32f9102db5 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java @@ -23,11 +23,15 @@ import com.linecorp.armeria.client.retry.Backoff; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.configsource.InterestedResources; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.grpc.Status; import io.netty.util.concurrent.EventExecutor; -final class AdsXdsStream implements XdsStream { +final class AdsXdsStream extends RefCountedStream implements XdsStream { interface ActualStream { void closeStream(); @@ -46,6 +50,7 @@ interface ActualStreamFactory { private final StateCoordinator stateCoordinator; private final ConfigSourceLifecycleObserver lifecycleObserver; private final Set targetTypes; + private final SnapshotStream interestStream; StateCoordinator stateCoordinator() { return stateCoordinator; @@ -58,13 +63,14 @@ StateCoordinator stateCoordinator() { AdsXdsStream(ActualStreamFactory factory, Backoff backoff, EventExecutor eventLoop, StateCoordinator stateCoordinator, ConfigSourceLifecycleObserver lifecycleObserver, - Set targetTypes) { + Set targetTypes, SnapshotStream interestStream) { this.factory = requireNonNull(factory, "factory"); this.backoff = requireNonNull(backoff, "backoff"); this.eventLoop = requireNonNull(eventLoop, "eventLoop"); this.stateCoordinator = requireNonNull(stateCoordinator, "stateCoordinator"); this.lifecycleObserver = requireNonNull(lifecycleObserver, "lifecycleObserver"); this.targetTypes = requireNonNull(targetTypes, "targetTypes"); + this.interestStream = interestStream; } void stop() { @@ -86,14 +92,18 @@ void stop(Throwable throwable) { } @Override - public void close() { - stop(); - lifecycleObserver.close(); - } - - @Override - public void resourcesUpdated(XdsType type) { - ensureStream().resourcesUpdated(type); + protected Subscription onStart(SnapshotWatcher watcher) { + final Subscription subscription = interestStream.subscribe((snapshot, error) -> { + assert snapshot != null; + if (targetTypes.contains(snapshot.type())) { + ensureStream().resourcesUpdated(snapshot.type()); + } + }); + return () -> { + subscription.close(); + stop(); + lifecycleObserver.close(); + }; } void retryOrClose(boolean closedByError) { @@ -127,7 +137,7 @@ private void reset() { } for (XdsType targetType : targetTypes) { if (!stateCoordinator.interestedResources(targetType).isEmpty()) { - resourcesUpdated(targetType); + ensureStream().resourcesUpdated(targetType); } } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java b/xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java index 8d98bd9530d..64572dd8244 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java @@ -16,17 +16,35 @@ package com.linecorp.armeria.xds; +import com.google.protobuf.Any; + +import com.linecorp.armeria.common.annotation.Nullable; + import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.extensions.upstreams.http.v3.HttpProtocolOptions; final class ClusterResourceParser extends ResourceParser { + private static final String HTTP_PROTOCOL_OPTIONS_KEY = + "envoy.extensions.upstreams.http.v3.HttpProtocolOptions"; + static final ClusterResourceParser INSTANCE = new ClusterResourceParser(); private ClusterResourceParser() {} @Override ClusterXdsResource parse(Cluster cluster, XdsExtensionRegistry registry, String version) { - return new ClusterXdsResource(cluster, version); + return new ClusterXdsResource(cluster, version, parseHttpProtocolOptions(cluster, registry)); + } + + @Nullable + private static HttpProtocolOptions parseHttpProtocolOptions(Cluster cluster, + XdsExtensionRegistry registry) { + final Any any = cluster.getTypedExtensionProtocolOptionsMap().get(HTTP_PROTOCOL_OPTIONS_KEY); + if (any == null) { + return null; + } + return registry.unpack(any, HttpProtocolOptions.class); } @Override diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ClusterSnapshot.java b/xds/src/main/java/com/linecorp/armeria/xds/ClusterSnapshot.java index df47074830c..97e15e66447 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ClusterSnapshot.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ClusterSnapshot.java @@ -24,9 +24,12 @@ import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.HttpPreprocessor; +import com.linecorp.armeria.client.UnprocessedRequestException; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.xds.client.endpoint.XdsLoadBalancer; +import com.linecorp.armeria.xds.internal.XdsCommonUtil; import io.envoyproxy.envoy.config.cluster.v3.Cluster; @@ -112,6 +115,22 @@ public TransportSocketSnapshot transportSocket() { return transportSocket; } + /** + * Returns an {@link HttpPreprocessor} that selects an endpoint from this cluster's + * load balancer, configures TLS parameters and session protocol based on the endpoint's + * transport socket, and sets the endpoint on the request context. + * + * @throws UnprocessedRequestException if no load balancer is available or no endpoint + * could be selected + */ + @UnstableApi + public HttpPreprocessor httpPreprocessor() { + return (delegate, ctx, req) -> { + XdsCommonUtil.applyClusterToCtx(this, ctx); + return delegate.execute(ctx, req); + }; + } + @Override public int hashCode() { return Objects.hashCode(clusterXdsResource, endpointSnapshot, loadBalancer, diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.java b/xds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.java index c3a69292089..fcd9df31ffe 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ClusterXdsResource.java @@ -16,9 +16,11 @@ package com.linecorp.armeria.xds; +import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.extensions.upstreams.http.v3.HttpProtocolOptions; /** * A resource object for a {@link Cluster}. @@ -27,18 +29,19 @@ public final class ClusterXdsResource extends AbstractXdsResource { private final Cluster cluster; + @Nullable + private final HttpProtocolOptions httpProtocolOptions; - ClusterXdsResource(Cluster cluster) { - this(cluster, ""); + ClusterXdsResource(Cluster cluster, String version, + @Nullable HttpProtocolOptions httpProtocolOptions) { + this(cluster, version, 0, httpProtocolOptions); } - ClusterXdsResource(Cluster cluster, String version) { - this(cluster, version, 0); - } - - private ClusterXdsResource(Cluster cluster, String version, long revision) { + private ClusterXdsResource(Cluster cluster, String version, long revision, + @Nullable HttpProtocolOptions httpProtocolOptions) { super(version, revision); this.cluster = cluster; + this.httpProtocolOptions = httpProtocolOptions; } @Override @@ -56,11 +59,20 @@ public String name() { return cluster.getName(); } + /** + * Returns the parsed {@link HttpProtocolOptions} from the cluster's + * {@code typed_extension_protocol_options}, or {@code null} if not present. + */ + @Nullable + public HttpProtocolOptions httpProtocolOptions() { + return httpProtocolOptions; + } + @Override ClusterXdsResource withRevision(long revision) { if (revision == revision()) { return this; } - return new ClusterXdsResource(cluster, version(), revision); + return new ClusterXdsResource(cluster, version(), revision, httpProtocolOptions); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java b/xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java new file mode 100644 index 00000000000..15939deccce --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/CompositeSnapshotWatcher.java @@ -0,0 +1,95 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.util.SafeCloseable; + +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.ScheduledFuture; + +/** + * A composite watcher that fans out resource updates to multiple {@link SnapshotWatcher}s. + * Also manages the absent-on-timeout timer for SotW protocols. + */ +class CompositeSnapshotWatcher implements SnapshotWatcher, SafeCloseable { + + private static final Logger logger = LoggerFactory.getLogger(CompositeSnapshotWatcher.class); + + private final XdsType type; + private final String resource; + @Nullable + private ScheduledFuture initialAbsentFuture; + private final Set> watchers = new HashSet<>(); + + CompositeSnapshotWatcher(XdsType type, String resource, EventExecutor eventLoop, long timeoutMillis, + boolean enableAbsentOnTimeout) { + this.type = type; + this.resource = resource; + + if (enableAbsentOnTimeout) { + initialAbsentFuture = eventLoop.schedule(() -> { + initialAbsentFuture = null; + onUpdate(null, new MissingXdsResourceException(type, resource)); + }, timeoutMillis, TimeUnit.MILLISECONDS); + } + } + + private void maybeCancelAbsentTimer() { + if (initialAbsentFuture != null && initialAbsentFuture.isCancellable()) { + initialAbsentFuture.cancel(false); + initialAbsentFuture = null; + } + } + + @Override + public void close() { + maybeCancelAbsentTimer(); + } + + void addWatcher(SnapshotWatcher watcher) { + watchers.add(watcher); + } + + void removeWatcher(SnapshotWatcher watcher) { + watchers.remove(watcher); + } + + @Override + public void onUpdate(@Nullable T value, @Nullable Throwable error) { + maybeCancelAbsentTimer(); + for (SnapshotWatcher watcher : watchers) { + try { + watcher.onUpdate(value, error); + } catch (Exception e) { + logger.warn("Unexpected exception while invoking onUpdate() with ({}, {}).", + type, resource, e); + } + } + } + + boolean isEmpty() { + return watchers.isEmpty(); + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.java b/xds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.java index 339451be760..fdeed821fdd 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/CompositeXdsStream.java @@ -16,12 +16,17 @@ package com.linecorp.armeria.xds; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.function.Function; import com.google.common.collect.ImmutableMap; -final class CompositeXdsStream implements XdsStream { +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.Subscription; + +final class CompositeXdsStream extends RefCountedStream implements XdsStream { private final Map streamMap; @@ -34,14 +39,13 @@ final class CompositeXdsStream implements XdsStream { } @Override - public void close() { - streamMap.values().forEach(XdsStream::close); - } - - @Override - public void resourcesUpdated(XdsType type) { - final XdsStream stream = streamMap.get(type); - assert stream != null; - stream.resourcesUpdated(type); + protected Subscription onStart(SnapshotWatcher watcher) { + final List subscriptions = new ArrayList<>(); + for (XdsStream stream : streamMap.values()) { + subscriptions.add(stream.subscribe(watcher)); + } + return () -> { + subscriptions.forEach(Subscription::close); + }; } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java b/xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java index d11164849c1..318af980b85 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceHandler.java @@ -17,26 +17,85 @@ package com.linecorp.armeria.xds; import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.xds.configsource.InterestedResources; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; + +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; final class ConfigSourceHandler implements SafeCloseable { private final StateCoordinator stateCoordinator; - private final ConfigSourceSubscription stream; + private final InterestPublisher interestPublisher; + private final Subscription subscription; + + static ConfigSourceHandler of(StateCoordinator stateCoordinator, InterestPublisher interestPublisher, + SnapshotStream stream, + SnapshotWatcher defaultWatcher) { + return new ConfigSourceHandler( + stateCoordinator, interestPublisher, + stream.map(res -> parseResponse(res, stateCoordinator.extensionRegistry())), + defaultWatcher); + } - ConfigSourceHandler(StateCoordinator stateCoordinator, ConfigSourceSubscription stream) { + ConfigSourceHandler(StateCoordinator stateCoordinator, InterestPublisher interestPublisher, + SnapshotStream stream, SnapshotWatcher defaultWatcher) { this.stateCoordinator = stateCoordinator; - this.stream = stream; + this.interestPublisher = interestPublisher; + subscription = stream.subscribe((parsed, error) -> { + if (error != null) { + defaultWatcher.onUpdate(null, error); + return; + } + if (parsed instanceof ParsedResources.SotwParsedResources) { + apply((ParsedResources.SotwParsedResources) parsed); + } else { + assert parsed instanceof ParsedResources.DeltaParsedResources; + apply((ParsedResources.DeltaParsedResources) parsed); + } + }); } - void addSubscriber(XdsType type, String resourceName, ResourceWatcher watcher) { + private void apply(ParsedResources.SotwParsedResources sotw) { + final XdsType type = sotw.type(); + applyUpdatesAndErrors(sotw); + if (sotw.isFullStateOfTheWorld()) { + for (String name : stateCoordinator.activeResources(type)) { + if (!sotw.parsedResources().containsKey(name)) { + stateCoordinator.onResourceMissing(type, name); + } + } + } + } + + private void apply(ParsedResources.DeltaParsedResources delta) { + applyUpdatesAndErrors(delta); + delta.removed().forEach(name -> stateCoordinator.onResourceMissing(delta.type(), name)); + } + + private void applyUpdatesAndErrors(ParsedResources parsed) { + final XdsType type = parsed.type(); + parsed.parsedResources().forEach((name, resource) -> { + if (resource instanceof XdsResource) { + stateCoordinator.onResourceUpdated(type, name, (XdsResource) resource); + } + }); + parsed.invalidResources().forEach((name, cause) -> + stateCoordinator.onResourceError(type, name, cause)); + } + + void addSubscriber(XdsType type, String resourceName, SnapshotWatcher watcher) { if (stateCoordinator.register(type, resourceName, watcher)) { - stream.updateInterests(type, stateCoordinator.interestedResources(type)); + interestPublisher.publish( + new InterestedResources(type, stateCoordinator.interestedResources(type))); } } - boolean removeSubscriber(XdsType type, String resourceName, ResourceWatcher watcher) { + boolean removeSubscriber(XdsType type, String resourceName, + SnapshotWatcher watcher) { if (stateCoordinator.unregister(type, resourceName, watcher)) { - stream.updateInterests(type, stateCoordinator.interestedResources(type)); + interestPublisher.publish( + new InterestedResources(type, stateCoordinator.interestedResources(type))); } return stateCoordinator.hasNoSubscribers(); } @@ -44,9 +103,27 @@ boolean removeSubscriber(XdsType type, String resourceName, ResourceWatcher w @Override public void close() { try { - stream.close(); + subscription.close(); } finally { stateCoordinator.close(); } } + + private static ParsedResources parseResponse(DiscoveryResponse response, XdsExtensionRegistry registry) { + final String typeUrl = response.getTypeUrl(); + final ResourceParser parser = XdsResourceParserUtil.fromTypeUrl(typeUrl); + if (parser == null) { + throw new IllegalArgumentException("Unknown type URL in discovery response: " + typeUrl); + } + + final ParsedResources.SotwParsedResources holder = + parser.parseResources(response.getResourcesList(), registry, response.getVersionInfo()); + + if (!holder.errors().isEmpty()) { + throw new IllegalArgumentException( + "Failed to parse resource(s) from discovery response: " + holder); + } + + return holder; + } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.java b/xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.java deleted file mode 100644 index e6bfaf908d1..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/ConfigSourceSubscription.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2026 LY Corporation - * - * LY Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -import java.util.Set; - -import com.linecorp.armeria.common.annotation.UnstableApi; -import com.linecorp.armeria.common.util.SafeCloseable; - -/** - * A subscription that delivers xDS resources from a custom config source. - * Created by {@link SotwConfigSourceSubscriptionFactory#create} and managed by Armeria. - * - *

Implementations watch an external source (file, KV store, etc.) and call - * {@link SotwSubscriptionCallbacks#onDiscoveryResponse} when new data arrives. - * Armeria calls {@link #updateInterests} when watched resource names change - * and {@link #close()} on shutdown. - * - * @see SotwConfigSourceSubscriptionFactory - */ -@UnstableApi -public interface ConfigSourceSubscription extends SafeCloseable { - - /** - * Updates the resource names that this subscription is interested in. - */ - void updateInterests(XdsType type, Set resourceNames); -} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java b/xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java index 44001111d40..695a87d75f2 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ControlPlaneClientManager.java @@ -23,30 +23,37 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.xds.configsource.SotwConfigSourceSubscriptionFactory; +import com.linecorp.armeria.xds.stream.SnapshotStream; import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.core.v3.Node; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.netty.util.concurrent.EventExecutor; final class ControlPlaneClientManager implements SafeCloseable { + private final Node bootstrapNode; private final EventExecutor eventLoop; private final BootstrapClusters bootstrapClusters; private final ConfigSourceMapper configSourceMapper; private final XdsExtensionRegistry extensionRegistry; + private final SnapshotWatcher defaultWatcher; private final Map clientMap = new HashMap<>(); private boolean closed; ControlPlaneClientManager(Bootstrap bootstrap, EventExecutor eventLoop, BootstrapClusters bootstrapClusters, ConfigSourceMapper configSourceMapper, - XdsExtensionRegistry extensionRegistry) { + XdsExtensionRegistry extensionRegistry, + SnapshotWatcher defaultWatcher) { bootstrapNode = bootstrap.getNode(); this.eventLoop = eventLoop; this.bootstrapClusters = bootstrapClusters; this.configSourceMapper = configSourceMapper; this.extensionRegistry = extensionRegistry; + this.defaultWatcher = defaultWatcher; } void subscribe(ResourceNode node) { @@ -59,7 +66,8 @@ void subscribe(ResourceNode node) { final ConfigSource configSource = node.configSource(); checkArgument(configSource != null, "Cannot subscribe to a node without a configSource"); - final ConfigSourceHandler client = clientMap.computeIfAbsent(configSource, this::createClient); + final ConfigSourceHandler client = + clientMap.computeIfAbsent(configSource, cs -> createClient(cs, node.factoryContext())); client.addSubscriber(type, name, node); } @@ -77,7 +85,8 @@ void unsubscribe(ResourceNode node) { } } - private ConfigSourceHandler createClient(ConfigSource configSource) { + private ConfigSourceHandler createClient(ConfigSource configSource, FactoryContext factoryContext) { + final InterestPublisher interestPublisher = new InterestPublisher(); switch (configSource.getConfigSourceSpecifierCase()) { case PATH_CONFIG_SOURCE: case CUSTOM_CONFIG_SOURCE: @@ -86,9 +95,11 @@ private ConfigSourceHandler createClient(ConfigSource configSource) { "No SotwConfigSourceSubscriptionFactory found for: %s", configSource); final StateCoordinator stateCoordinator = new StateCoordinator(eventLoop, configSource, false, extensionRegistry); - final ConfigSourceSubscription stream = - streamFactory.create(configSource, stateCoordinator, eventLoop); - return new ConfigSourceHandler(stateCoordinator, stream); + final SnapshotStream stream = + streamFactory.create(configSource, factoryContext, + interestPublisher.checkSubscribeOn(eventLoop)) + .rescheduleEventsOn(eventLoop); + return ConfigSourceHandler.of(stateCoordinator, interestPublisher, stream, defaultWatcher); case ADS: case API_CONFIG_SOURCE: final GrpcConfigSourceStreamFactory grpcFactory = @@ -97,7 +108,7 @@ private ConfigSourceHandler createClient(ConfigSource configSource) { checkArgument(grpcFactory != null, "No GrpcConfigSourceStreamFactory registered"); return grpcFactory.create( configSource, eventLoop, bootstrapNode, bootstrapClusters, - configSourceMapper, extensionRegistry); + configSourceMapper, extensionRegistry, interestPublisher, defaultWatcher); default: throw new IllegalArgumentException("Unsupported config source: " + configSource); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java b/xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java index a589eb7ccf6..5ec1093cfab 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/DeltaActualStream.java @@ -21,7 +21,6 @@ import java.util.ArrayDeque; import java.util.EnumSet; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -37,7 +36,6 @@ import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse; -import io.envoyproxy.envoy.service.discovery.v3.Resource; import io.grpc.stub.StreamObserver; import io.netty.util.concurrent.EventExecutor; @@ -251,33 +249,21 @@ public void onCompleted() { } private void handleResponse(ResourceParser resourceParser, DeltaDiscoveryResponse response) { - final StateCoordinator stateCoordinator = stateCoordinator(); final XdsType type = resourceParser.type(); - final List deltaResources = response.getResourcesList(); - final ParsedResourcesHolder holder = - resourceParser.parseDeltaResources(deltaResources, - stateCoordinator.extensionRegistry()); + final ParsedResources holder = + resourceParser.parseDeltaResources(response.getResourcesList(), + stateCoordinator().extensionRegistry(), + response.getRemovedResourcesList()); if (!holder.errors().isEmpty()) { - holder.invalidResources().forEach((name, error) -> - stateCoordinator.onResourceError(type, name, error)); + owner.emit(holder, null); lifecycleObserver.resourceRejected(type, response, holder.invalidResources()); nackResponse(type, response.getNonce(), String.join("\n", holder.errors())); return; } lifecycleObserver.resourceUpdated(type, response, holder.parsedResources()); - - holder.parsedResources().forEach((name, resource) -> { - if (resource instanceof XdsResource) { - stateCoordinator.onResourceUpdated(type, name, (XdsResource) resource); - } - }); - - for (String removedName : response.getRemovedResourcesList()) { - stateCoordinator.onResourceMissing(type, removedName); - } - - // ack after processing so that the diff between interested - state is computed correctly + owner.emit(holder, null); + // ack after emit so that storage in ConfigSourceHandler completes first ackResponse(type, response.getNonce()); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/GrpcConfigSourceStreamFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/GrpcConfigSourceStreamFactory.java index c84fc14ff79..a97378425f6 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/GrpcConfigSourceStreamFactory.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/GrpcConfigSourceStreamFactory.java @@ -21,13 +21,13 @@ import java.util.EnumSet; import java.util.List; import java.util.Locale; -import java.util.Set; import java.util.function.Function; import com.linecorp.armeria.client.grpc.GrpcClientBuilder; import com.linecorp.armeria.client.grpc.GrpcClients; import com.linecorp.armeria.client.retry.Backoff; import com.linecorp.armeria.common.metric.MeterIdPrefix; +import com.linecorp.armeria.xds.stream.SnapshotStream; import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; import io.envoyproxy.envoy.config.core.v3.ApiConfigSource.ApiType; @@ -57,130 +57,98 @@ public String name() { ConfigSourceHandler create(ConfigSource configSource, EventExecutor eventLoop, - Node bootstrapNode, + Node node, BootstrapClusters bootstrapClusters, ConfigSourceMapper configSourceMapper, - XdsExtensionRegistry extensionRegistry) { - final GrpcConfigSourceSubscription stream = new GrpcConfigSourceSubscription( - configSource, eventLoop, bootstrapNode, bootstrapClusters, - configSourceMapper, meterRegistry, meterIdPrefix, extensionRegistry); - return new ConfigSourceHandler(stream.stateCoordinator(), stream); - } + XdsExtensionRegistry extensionRegistry, + InterestPublisher interestedStream, + SnapshotWatcher defaultWatcher) { + final ApiConfigSource apiConfigSource; + if (configSource.hasAds()) { + apiConfigSource = configSourceMapper.bootstrapAdsConfig(); + } else if (configSource.hasApiConfigSource()) { + apiConfigSource = configSource.getApiConfigSource(); + } else { + throw new IllegalArgumentException("Unsupported config source: " + configSource); + } - /** - * A {@link ConfigSourceSubscription} backed by a gRPC {@link XdsStream}. - * gRPC streams fetch interests from {@link StateCoordinator} directly, - * so the resource names parameter in {@link ConfigSourceSubscription#updateInterests} is ignored. - */ - static final class GrpcConfigSourceSubscription implements ConfigSourceSubscription { - - private final StateCoordinator stateCoordinator; - private final XdsStream xdsStream; - - GrpcConfigSourceSubscription(ConfigSource configSource, - EventExecutor eventLoop, - Node node, - BootstrapClusters bootstrapClusters, - ConfigSourceMapper configSourceMapper, - MeterRegistry meterRegistry, - MeterIdPrefix meterIdPrefix, - XdsExtensionRegistry registry) { - final ApiConfigSource apiConfigSource; - if (configSource.hasAds()) { - apiConfigSource = configSourceMapper.bootstrapAdsConfig(); - } else if (configSource.hasApiConfigSource()) { - apiConfigSource = configSource.getApiConfigSource(); + final List grpcServices = apiConfigSource.getGrpcServicesList(); + checkArgument(!grpcServices.isEmpty(), + "At least one GrpcService should be specified for '%s'", configSource); + final GrpcService firstGrpcService = grpcServices.get(0); + checkArgument(firstGrpcService.hasEnvoyGrpc(), + "Only envoyGrpc is supported for '%s'", configSource); + final EnvoyGrpc envoyGrpc = firstGrpcService.getEnvoyGrpc(); + + final GrpcClientBuilder builder = + GrpcClients.builder(new GrpcServicesPreprocessor(grpcServices, bootstrapClusters)); + builder.responseTimeoutMillis(Long.MAX_VALUE); + builder.maxResponseLength(0); + + final ApiType apiType = apiConfigSource.getApiType(); + checkArgument(apiType == ApiType.GRPC || apiType == ApiType.DELTA_GRPC || + apiType == ApiType.AGGREGATED_GRPC || + apiType == ApiType.AGGREGATED_DELTA_GRPC, + "Unsupported api_type: %s", apiType); + final Function metersFunction = + xdsType -> new DefaultConfigSourceLifecycleObserver( + meterRegistry, meterIdPrefix, + configSource.getConfigSourceSpecifierCase(), + envoyGrpc.getClusterName(), xdsType, apiType); + + final boolean isDelta = + apiType == ApiType.AGGREGATED_DELTA_GRPC || apiType == ApiType.DELTA_GRPC; + final boolean isAds = configSource.hasAds() || + apiType == ApiType.AGGREGATED_GRPC || + apiType == ApiType.AGGREGATED_DELTA_GRPC; + + final StateCoordinator stateCoordinator = + new StateCoordinator(eventLoop, configSource, isDelta, extensionRegistry); + final Backoff backoff = Backoff.ofDefault(); + + final SnapshotStream xdsStream; + if (isAds) { + final ConfigSourceLifecycleObserver lifecycleObserver = metersFunction.apply("ads"); + if (isDelta) { + final DeltaDiscoveryStub stub = DeltaDiscoveryStub.ads(builder); + xdsStream = new AdsXdsStream( + owner -> new DeltaActualStream(stub, owner, eventLoop, + lifecycleObserver, node), + backoff, eventLoop, stateCoordinator, lifecycleObserver, + XdsType.discoverableTypes(), interestedStream); } else { - throw new IllegalArgumentException("Unsupported config source: " + configSource); + final SotwDiscoveryStub stub = SotwDiscoveryStub.ads(builder); + xdsStream = new AdsXdsStream( + owner -> new SotwActualStream(stub, owner, eventLoop, + lifecycleObserver, node), + backoff, eventLoop, stateCoordinator, lifecycleObserver, + XdsType.discoverableTypes(), interestedStream); } - - final List grpcServices = apiConfigSource.getGrpcServicesList(); - checkArgument(!grpcServices.isEmpty(), - "At least one GrpcService should be specified for '%s'", configSource); - final GrpcService firstGrpcService = grpcServices.get(0); - checkArgument(firstGrpcService.hasEnvoyGrpc(), - "Only envoyGrpc is supported for '%s'", configSource); - final EnvoyGrpc envoyGrpc = firstGrpcService.getEnvoyGrpc(); - - final GrpcClientBuilder builder = - GrpcClients.builder(new GrpcServicesPreprocessor(grpcServices, bootstrapClusters)); - builder.responseTimeoutMillis(Long.MAX_VALUE); - builder.maxResponseLength(0); - - final ApiType apiType = apiConfigSource.getApiType(); - checkArgument(apiType == ApiType.GRPC || apiType == ApiType.DELTA_GRPC || - apiType == ApiType.AGGREGATED_GRPC || - apiType == ApiType.AGGREGATED_DELTA_GRPC, - "Unsupported api_type: %s", apiType); - final Function metersFunction = - xdsType -> new DefaultConfigSourceLifecycleObserver( - meterRegistry, meterIdPrefix, - configSource.getConfigSourceSpecifierCase(), - envoyGrpc.getClusterName(), xdsType, apiType); - - final boolean isDelta = - apiType == ApiType.AGGREGATED_DELTA_GRPC || apiType == ApiType.DELTA_GRPC; - final boolean isAds = configSource.hasAds() || - apiType == ApiType.AGGREGATED_GRPC || - apiType == ApiType.AGGREGATED_DELTA_GRPC; - - stateCoordinator = new StateCoordinator(eventLoop, configSource, isDelta, registry); - final Backoff backoff = Backoff.ofDefault(); - - if (isAds) { - final ConfigSourceLifecycleObserver lifecycleObserver = metersFunction.apply("ads"); - if (isDelta) { - final DeltaDiscoveryStub stub = DeltaDiscoveryStub.ads(builder); - xdsStream = new AdsXdsStream( + } else { + if (isDelta) { + xdsStream = new CompositeXdsStream(type -> { + final DeltaDiscoveryStub stub = DeltaDiscoveryStub.basic(type, builder); + final ConfigSourceLifecycleObserver lifecycleObserver = + metersFunction.apply(type.name().toLowerCase(Locale.ROOT)); + return new AdsXdsStream( owner -> new DeltaActualStream(stub, owner, eventLoop, lifecycleObserver, node), backoff, eventLoop, stateCoordinator, lifecycleObserver, - XdsType.discoverableTypes()); - } else { - final SotwDiscoveryStub stub = SotwDiscoveryStub.ads(builder); - xdsStream = new AdsXdsStream( + EnumSet.of(type), interestedStream); + }); + } else { + xdsStream = new CompositeXdsStream(type -> { + final SotwDiscoveryStub stub = SotwDiscoveryStub.basic(type, builder); + final ConfigSourceLifecycleObserver lifecycleObserver = + metersFunction.apply(type.name().toLowerCase(Locale.ROOT)); + return new AdsXdsStream( owner -> new SotwActualStream(stub, owner, eventLoop, lifecycleObserver, node), backoff, eventLoop, stateCoordinator, lifecycleObserver, - XdsType.discoverableTypes()); - } - } else { - if (isDelta) { - xdsStream = new CompositeXdsStream(type -> { - final DeltaDiscoveryStub stub = DeltaDiscoveryStub.basic(type, builder); - final ConfigSourceLifecycleObserver lifecycleObserver = - metersFunction.apply(type.name().toLowerCase(Locale.ROOT)); - return new AdsXdsStream( - owner -> new DeltaActualStream(stub, owner, eventLoop, - lifecycleObserver, node), - backoff, eventLoop, stateCoordinator, lifecycleObserver, EnumSet.of(type)); - }); - } else { - xdsStream = new CompositeXdsStream(type -> { - final SotwDiscoveryStub stub = SotwDiscoveryStub.basic(type, builder); - final ConfigSourceLifecycleObserver lifecycleObserver = - metersFunction.apply(type.name().toLowerCase(Locale.ROOT)); - return new AdsXdsStream( - owner -> new SotwActualStream(stub, owner, eventLoop, - lifecycleObserver, node), - backoff, eventLoop, stateCoordinator, lifecycleObserver, EnumSet.of(type)); - }); - } + EnumSet.of(type), interestedStream); + }); } } - - StateCoordinator stateCoordinator() { - return stateCoordinator; - } - - @Override - public void updateInterests(XdsType type, Set resourceNames) { - xdsStream.resourcesUpdated(type); - } - - @Override - public void close() { - xdsStream.close(); - } + return new ConfigSourceHandler(stateCoordinator, interestedStream, xdsStream, defaultWatcher); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/GrpcServicesPreprocessor.java b/xds/src/main/java/com/linecorp/armeria/xds/GrpcServicesPreprocessor.java index 42c9d1d5726..de883062948 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/GrpcServicesPreprocessor.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/GrpcServicesPreprocessor.java @@ -22,14 +22,12 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.HttpPreprocessor; import com.linecorp.armeria.client.PreClient; import com.linecorp.armeria.client.PreClientRequestContext; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.util.Exceptions; -import com.linecorp.armeria.xds.client.endpoint.XdsLoadBalancer; import com.linecorp.armeria.xds.internal.XdsCommonUtil; import io.envoyproxy.envoy.config.core.v3.GrpcService; @@ -67,12 +65,7 @@ public HttpResponse execute(PreClient delegate, PreCl bootstrapClusters.snapshotFuture(clusterName); checkArgument(snapshotFuture != null, "No cluster found for name: %s", clusterName); return HttpResponse.of(snapshotFuture.thenApply(snapshot -> { - final XdsLoadBalancer loadBalancer = snapshot.loadBalancer(); - checkArgument(loadBalancer != null, "No endpoints found for name: %s", clusterName); - final Endpoint endpoint = loadBalancer.selectNow(ctx); - checkArgument(endpoint != null, "Endpoint not selected found for name: %s", clusterName); - XdsCommonUtil.setTlsParams(ctx, endpoint); - ctx.setEndpointGroup(endpoint); + XdsCommonUtil.applyClusterToCtx(snapshot, ctx); try { return delegate.execute(ctx, req); } catch (Exception e) { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/InterestPublisher.java b/xds/src/main/java/com/linecorp/armeria/xds/InterestPublisher.java new file mode 100644 index 00000000000..8e7243d0ebc --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/InterestPublisher.java @@ -0,0 +1,33 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds; + +import com.linecorp.armeria.xds.configsource.InterestedResources; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.Subscription; + +final class InterestPublisher extends RefCountedStream { + + void publish(InterestedResources interestedResources) { + emit(interestedResources, null); + } + + @Override + protected Subscription onStart(SnapshotWatcher watcher) { + return () -> {}; + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ParsedResourcesHolder.java b/xds/src/main/java/com/linecorp/armeria/xds/ParsedResources.java similarity index 52% rename from xds/src/main/java/com/linecorp/armeria/xds/ParsedResourcesHolder.java rename to xds/src/main/java/com/linecorp/armeria/xds/ParsedResources.java index d22fd127595..f4bbdc97c26 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ParsedResourcesHolder.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ParsedResources.java @@ -18,16 +18,19 @@ import java.util.List; import java.util.Map; +import java.util.Set; import com.google.common.collect.ImmutableList; -final class ParsedResourcesHolder { +abstract class ParsedResources { + private final XdsType type; private final Map parsedResources; private final Map invalidResources; private final List errors; - ParsedResourcesHolder(Map parsedResources, - Map invalidResources) { + ParsedResources(XdsType type, Map parsedResources, + Map invalidResources) { + this.type = type; this.parsedResources = parsedResources; this.invalidResources = invalidResources; errors = invalidResources.values().stream() @@ -35,6 +38,10 @@ final class ParsedResourcesHolder { .collect(ImmutableList.toImmutableList()); } + XdsType type() { + return type; + } + Map parsedResources() { return parsedResources; } @@ -46,4 +53,34 @@ Map invalidResources() { List errors() { return errors; } + + static final class DeltaParsedResources extends ParsedResources { + + private final Set removed; + + DeltaParsedResources(XdsType type, Map parsedResources, + Map invalidResources, Set removed) { + super(type, parsedResources, invalidResources); + this.removed = removed; + } + + Set removed() { + return removed; + } + } + + static final class SotwParsedResources extends ParsedResources { + + private final boolean fullStateOfTheWorld; + + SotwParsedResources(XdsType type, Map parsedResources, + Map invalidResources, boolean fullStateOfTheWorld) { + super(type, parsedResources, invalidResources); + this.fullStateOfTheWorld = fullStateOfTheWorld; + } + + boolean isFullStateOfTheWorld() { + return fullStateOfTheWorld; + } + } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java index 186486c8536..3b0dc174938 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/PathSotwConfigSourceSubscriptionFactory.java @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Set; import com.linecorp.armeria.common.Cancellable; import com.linecorp.armeria.common.CommonPools; @@ -29,6 +28,11 @@ import com.linecorp.armeria.common.file.DirectoryWatchService; import com.linecorp.armeria.common.file.PathWatcher; import com.linecorp.armeria.common.metric.MeterIdPrefix; +import com.linecorp.armeria.xds.configsource.InterestedResources; +import com.linecorp.armeria.xds.configsource.SotwConfigSourceSubscriptionFactory; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.core.v3.PathConfigSource; @@ -58,30 +62,26 @@ public String name() { } @Override - public ConfigSourceSubscription create(ConfigSource configSource, - SotwSubscriptionCallbacks callbacks, - EventExecutor eventLoop) { - return new PathConfigSourceSubscription( - configSource.getPathConfigSource(), watchService, - callbacks, eventLoop, meterRegistry, meterIdPrefix); + public SnapshotStream create(ConfigSource configSource, + FactoryContext factoryContext, + SnapshotStream interestedResources) { + return new PathConfigSourceSubscription(configSource.getPathConfigSource(), watchService, + factoryContext.eventLoop(), meterRegistry, meterIdPrefix); } - static final class PathConfigSourceSubscription implements ConfigSourceSubscription { + static final class PathConfigSourceSubscription extends RefCountedStream { private final Path filePath; private final Path watchDir; private final DirectoryWatchService watchService; - private final SotwSubscriptionCallbacks callbacks; private final EventExecutor eventLoop; private final PathConfigSourceLifecycleObserver lifecycleObserver; - @Nullable private Cancellable watchCancellable; private boolean closed; PathConfigSourceSubscription(PathConfigSource pathConfigSource, DirectoryWatchService watchService, - SotwSubscriptionCallbacks callbacks, EventExecutor eventLoop, MeterRegistry meterRegistry, MeterIdPrefix meterIdPrefix) { filePath = Paths.get(pathConfigSource.getPath()).toAbsolutePath(); @@ -91,11 +91,8 @@ static final class PathConfigSourceSubscription implements ConfigSourceSubscript watchDir = requireNonNull(filePath.getParent(), "filePath.getParent()"); } this.watchService = watchService; - this.callbacks = callbacks; this.eventLoop = eventLoop; lifecycleObserver = new PathConfigSourceLifecycleObserver(filePath, meterRegistry, meterIdPrefix); - - startWatching(); } private void startWatching() { @@ -104,10 +101,11 @@ private void startWatching() { try { cancellable = watchService.register( watchDir, PathWatcher.ofFile(filePath, bytes -> { - eventLoop.execute(() -> parseAndPush(bytes)); + eventLoop.execute(() -> parseAndEmit(bytes)); })); } catch (Exception e) { lifecycleObserver.fileParseError(e); + emit(null, e); lifecycleObserver.close(); return; } @@ -121,7 +119,7 @@ private void startWatching() { }); } - private void parseAndPush(byte[] bytes) { + private void parseAndEmit(byte[] bytes) { if (closed) { return; } @@ -131,23 +129,22 @@ private void parseAndPush(byte[] bytes) { response = XdsResourceReader.from(content, DiscoveryResponse.class); } catch (Exception e) { lifecycleObserver.fileParseError(e); + emit(null, e); return; } lifecycleObserver.fileLoaded(); - callbacks.onDiscoveryResponse(response); + emit(response, null); } @Override - public void updateInterests(XdsType type, Set resourceNames) { - // Path config source pushes all resources on every file change regardless of interests. - } - - @Override - public void close() { - closed = true; - if (watchCancellable != null) { - close0(watchCancellable); - } + protected Subscription onStart(SnapshotWatcher watcher) { + startWatching(); + return () -> { + closed = true; + if (watchCancellable != null) { + close0(watchCancellable); + } + }; } private void close0(Cancellable watchCancellable) { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNode.java b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNode.java index 041bb5f9c45..79b699e0be5 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNode.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNode.java @@ -25,7 +25,7 @@ * * @param the type of the current {@link XdsResource} */ -interface ResourceNode extends ResourceWatcher { +interface ResourceNode extends SnapshotWatcher { @Nullable ConfigSource configSource(); @@ -33,4 +33,6 @@ interface ResourceNode extends ResourceWatcher { XdsType type(); String name(); + + FactoryContext factoryContext(); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java index 065f152e0ce..2f95a9337bc 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java @@ -16,6 +16,7 @@ package com.linecorp.armeria.xds; +import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.xds.stream.RefCountedStream; import com.linecorp.armeria.xds.stream.Subscription; @@ -55,21 +56,21 @@ public String name() { } @Override - public void onChanged(T update) { - resourceNodeMeterBinder.onChanged(update); - emit(update, null); + public FactoryContext factoryContext() { + return context; } @Override - public void onError(XdsType type, String resourceName, Throwable t) { - resourceNodeMeterBinder.onError(); - emit(null, XdsResourceException.maybeWrap(type, resourceName, t)); - } - - @Override - public void onResourceDoesNotExist(XdsType type, String resourceName) { - resourceNodeMeterBinder.onResourceDoesNotExist(); - emit(null, new MissingXdsResourceException(type, resourceName)); + public void onUpdate(@Nullable T value, @Nullable Throwable error) { + if (value != null) { + resourceNodeMeterBinder.onChanged(value); + } else if (error instanceof MissingXdsResourceException) { + resourceNodeMeterBinder.onResourceDoesNotExist( + ); + } else if (error != null) { + resourceNodeMeterBinder.onError(); + } + emit(value, error); } @Override diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java b/xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java index 47942a5fe15..6b767e20a58 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ResourceParser.java @@ -20,6 +20,7 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.protobuf.Any; import com.google.protobuf.Message; @@ -33,8 +34,9 @@ abstract class ResourceParser { abstract O parse(I message, XdsExtensionRegistry extensionRegistry, String version); - ParsedResourcesHolder parseResources(List resources, XdsExtensionRegistry extensionRegistry, - String version) { + ParsedResources.SotwParsedResources parseResources(List resources, + XdsExtensionRegistry extensionRegistry, + String version) { final ImmutableMap.Builder parsedResources = ImmutableMap.builder(); final ImmutableMap.Builder invalidResources = ImmutableMap.builder(); @@ -63,12 +65,14 @@ ParsedResourcesHolder parseResources(List resources, XdsExtensionRegistry e parsedResources.put(name, resourceUpdate); } - return new ParsedResourcesHolder(parsedResources.buildKeepingLast(), - invalidResources.buildKeepingLast()); + return new ParsedResources.SotwParsedResources(type(), parsedResources.buildKeepingLast(), + invalidResources.buildKeepingLast(), + isFullStateOfTheWorld()); } - ParsedResourcesHolder parseDeltaResources(List resources, - XdsExtensionRegistry extensionRegistry) { + ParsedResources.DeltaParsedResources parseDeltaResources(List resources, + XdsExtensionRegistry extensionRegistry, + List removedResources) { final ImmutableMap.Builder parsedResources = ImmutableMap.builder(); final ImmutableMap.Builder invalidResources = ImmutableMap.builder(); @@ -96,8 +100,9 @@ ParsedResourcesHolder parseDeltaResources(List resources, parsedResources.put(name, resourceUpdate); } - return new ParsedResourcesHolder(parsedResources.buildKeepingLast(), - invalidResources.buildKeepingLast()); + return new ParsedResources.DeltaParsedResources(type(), parsedResources.buildKeepingLast(), + invalidResources.buildKeepingLast(), + ImmutableSet.copyOf(removedResources)); } // Do not confuse with the SotW approach: it is the mechanism in which the client must specify all diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ResourceWatcher.java b/xds/src/main/java/com/linecorp/armeria/xds/ResourceWatcher.java deleted file mode 100644 index f2b9474e2f3..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/ResourceWatcher.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2023 LINE Corporation - * - * LINE Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -/** - * A resource watcher. - * - * @param the type of the {@link XdsResource} that is notified - */ -@FunctionalInterface -interface ResourceWatcher { - - default void onError(XdsType type, String resourceName, Throwable t) {} - - default void onResourceDoesNotExist(XdsType type, String resourceName) {} - - void onChanged(T update); -} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java b/xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java index a1e90a65890..f616eeb1765 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SotwActualStream.java @@ -172,24 +172,22 @@ public void onCompleted() { } private void handleResponse(ResourceParser resourceParser, DiscoveryResponse response) { - final StateCoordinator stateCoordinator = stateCoordinator(); - final ParsedResourcesHolder holder = + final ParsedResources.SotwParsedResources holder = resourceParser.parseResources(response.getResourcesList(), - stateCoordinator.extensionRegistry(), response.getVersionInfo()); + stateCoordinator().extensionRegistry(), + response.getVersionInfo()); final XdsType type = resourceParser.type(); if (!holder.errors().isEmpty()) { - holder.invalidResources().forEach((name, error) -> stateCoordinator.onResourceError( - type, name, error)); + owner.emit(holder, null); lifecycleObserver.resourceRejected(type, response, holder.invalidResources()); nackResponse(type, response.getNonce(), String.join("\n", holder.errors())); return; } - stateCoordinator.onSotwConfigUpdate(type, holder.parsedResources()); + owner.emit(holder, null); lifecycleObserver.resourceUpdated(type, response, holder.parsedResources()); - // send the ack ackResponse(type, response.getVersionInfo(), response.getNonce()); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.java deleted file mode 100644 index 8928ef01daa..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/SotwConfigSourceSubscriptionFactory.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2026 LY Corporation - * - * LY Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -import com.linecorp.armeria.common.annotation.UnstableApi; - -import io.envoyproxy.envoy.config.core.v3.ConfigSource; -import io.netty.util.concurrent.EventExecutor; - -/** - * A factory that creates a {@link ConfigSourceSubscription} for a non-gRPC config source. - * Implementations are resolved by name or by the {@code custom_config_source} type URL - * via the extension registry. - * - *

How the pieces fit together

- *
    - *
  1. Armeria calls {@link #create} with a {@link ConfigSource}, a - * {@link SotwSubscriptionCallbacks}, and an {@link EventExecutor}.
  2. - *
  3. The returned {@link ConfigSourceSubscription} watches the external source - * (file, KV store, etc.) and calls - * {@link SotwSubscriptionCallbacks#onDiscoveryResponse} whenever new data arrives.
  4. - *
  5. Armeria calls {@link ConfigSourceSubscription#updateInterests} when the set of - * watched resource names changes, and {@link ConfigSourceSubscription#close()} on shutdown.
  6. - *
- * - *

Example

- *
{@code
- * public class MyConfigSourceFactory implements SotwConfigSourceSubscriptionFactory {
- *
- *     @Override
- *     public String name() {
- *         return "my-config-source";
- *     }
- *
- *     @Override
- *     public ConfigSourceSubscription create(ConfigSource configSource,
- *                                            SotwSubscriptionCallbacks callbacks,
- *                                            EventExecutor eventLoop) {
- *         return new ConfigSourceSubscription() {
- *             // Start watching the external source and deliver updates:
- *             //   callbacks.onDiscoveryResponse(response);
- *
- *             @Override
- *             public void updateInterests(XdsType type, Set resourceNames) {
- *                 // Optionally adjust what this subscription fetches.
- *             }
- *
- *             @Override
- *             public void close() {
- *                 // Clean up resources (close connections, cancel watchers, etc.).
- *             }
- *         };
- *     }
- * }
- * }
- * - *

Custom implementations can be registered via {@link java.util.ServiceLoader} SPI. - */ -@UnstableApi -public interface SotwConfigSourceSubscriptionFactory extends XdsExtensionFactory { - - /** - * Creates a {@link ConfigSourceSubscription} for the given config source. - * - * @param configSource the full {@link ConfigSource} from xDS bootstrap or resource - * @param callbacks the callbacks to invoke when resources are received - * @param eventLoop the event loop for scheduling and synchronization - * @return a new {@link ConfigSourceSubscription} - */ - ConfigSourceSubscription create(ConfigSource configSource, - SotwSubscriptionCallbacks callbacks, - EventExecutor eventLoop); -} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.java b/xds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.java deleted file mode 100644 index 291262a8828..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/SotwSubscriptionCallbacks.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2026 LY Corporation - * - * LY Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -import com.linecorp.armeria.common.annotation.UnstableApi; - -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; - -/** - * Callback interface provided to a {@link ConfigSourceSubscription} so it can feed - * raw {@link DiscoveryResponse}s back into the xDS resource system. - * - *

Implementations are created internally by Armeria and passed to - * {@link SotwConfigSourceSubscriptionFactory#create}. Custom config source subscriptions - * should call {@link #onDiscoveryResponse} whenever new data is received from the - * external source. - * - * @see SotwConfigSourceSubscriptionFactory - */ -@UnstableApi -public interface SotwSubscriptionCallbacks { - - /** - * Called with a raw {@link DiscoveryResponse} from a SotW config source. - * The implementation is responsible for resolving the resource type, parsing resources, - * and applying the results. - * - *

Threading: This method must be called from the - * {@link io.netty.util.concurrent.EventExecutor} that was supplied to - * {@link SotwConfigSourceSubscriptionFactory#create}. Calling it from any other - * thread will result in an {@link IllegalArgumentException}. - * - * @param response the discovery response - */ - void onDiscoveryResponse(DiscoveryResponse response); -} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java b/xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java index 9e83c656eef..ac1acc4d7a3 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java @@ -17,38 +17,25 @@ package com.linecorp.armeria.xds; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import java.util.Map; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.protobuf.Duration; import com.google.protobuf.util.Durations; -import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.SafeCloseable; import io.envoyproxy.envoy.config.core.v3.ConfigSource; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.netty.util.concurrent.EventExecutor; -final class StateCoordinator implements SotwSubscriptionCallbacks, SafeCloseable { - - private static final Logger logger = LoggerFactory.getLogger(StateCoordinator.class); +final class StateCoordinator implements SafeCloseable { private final SubscriberStorage subscriberStorage; private final ResourceStateStore stateStore; private final XdsExtensionRegistry extensionRegistry; - private final EventExecutor eventLoop; StateCoordinator(EventExecutor eventLoop, ConfigSource configSource, boolean delta, XdsExtensionRegistry extensionRegistry) { - this.eventLoop = eventLoop; final long timeoutMillis = initialFetchTimeoutMillis(configSource); subscriberStorage = new SubscriberStorage(eventLoop, timeoutMillis, delta); stateStore = new ResourceStateStore(); @@ -70,16 +57,27 @@ private static long initialFetchTimeoutMillis(ConfigSource configSource) { return epochMilli; } - boolean register(XdsType type, String resourceName, ResourceWatcher watcher) { + boolean register(XdsType type, String resourceName, + SnapshotWatcher watcher) { final boolean updated = subscriberStorage.register(type, resourceName, watcher); replayToWatcher(type, resourceName, watcher); return updated; } - boolean unregister(XdsType type, String resourceName, ResourceWatcher watcher) { + boolean unregister(XdsType type, String resourceName, + SnapshotWatcher watcher) { return subscriberStorage.unregister(type, resourceName, watcher); } + private void replayToWatcher(XdsType type, String resourceName, + SnapshotWatcher watcher) { + @SuppressWarnings("unchecked") + final T cached = (T) stateStore.resource(type, resourceName); + if (cached != null) { + watcher.onUpdate(cached, null); + } + } + ImmutableSet interestedResources(XdsType type) { return subscriberStorage.resources(type); } @@ -101,9 +99,10 @@ void onResourceUpdated(XdsType type, String resourceName, XdsResource resource) if (revised == null) { return; } - final XdsStreamSubscriber subscriber = subscriber(type, resourceName); + final CompositeSnapshotWatcher subscriber = + subscriberStorage.subscriber(type, resourceName); if (subscriber != null) { - subscriber.onData(revised); + subscriber.onUpdate(revised, null); } } @@ -111,79 +110,16 @@ void onResourceMissing(XdsType type, String resourceName) { if (!stateStore.remove(type, resourceName)) { return; } - final XdsStreamSubscriber subscriber = subscriber(type, resourceName); + final CompositeSnapshotWatcher subscriber = subscriberStorage.subscriber(type, resourceName); if (subscriber != null) { - subscriber.onAbsent(); + subscriber.onUpdate(null, new MissingXdsResourceException(type, resourceName)); } } void onResourceError(XdsType type, String resourceName, Throwable cause) { - final XdsStreamSubscriber subscriber = subscriber(type, resourceName); + final CompositeSnapshotWatcher subscriber = subscriberStorage.subscriber(type, resourceName); if (subscriber != null) { - subscriber.onError(resourceName, cause); - } - } - - @Nullable - private XdsStreamSubscriber subscriber(XdsType type, String resourceName) { - return subscriberStorage.subscriber(type, resourceName); - } - - private void replayToWatcher(XdsType type, String resourceName, - ResourceWatcher watcher) { - final XdsResource resource = stateStore.resource(type, resourceName); - if (resource != null) { - //noinspection unchecked - watcher.onChanged((T) resource); - } - } - - @Override - public void onDiscoveryResponse(DiscoveryResponse response) { - checkState(eventLoop.inEventLoop(), "eventLoop must be inEventLoop"); - final String typeUrl = response.getTypeUrl(); - final ResourceParser parser = XdsResourceParserUtil.fromTypeUrl(typeUrl); - if (parser == null) { - logger.warn("Unknown type URL in discovery response: {}", typeUrl); - return; - } - - final XdsType type = parser.type(); - final ParsedResourcesHolder holder = - parser.parseResources(response.getResourcesList(), - extensionRegistry, response.getVersionInfo()); - if (!holder.errors().isEmpty()) { - // Report errors for invalid resources - holder.invalidResources().forEach((name, error) -> onResourceError(type, name, error)); - logger.warn("Failed to parse {} resource(s) from discovery response (type: {})", - holder.errors().size(), typeUrl); - return; - } - onSotwConfigUpdate(type, holder.parsedResources()); - } - - void onSotwConfigUpdate(XdsType type, Map parsedResources) { - // Apply successfully parsed resources - parsedResources.forEach((name, resource) -> { - if (resource instanceof XdsResource) { - onResourceUpdated(type, name, (XdsResource) resource); - } - }); - - // SotW absent detection for full-state types (LDS/CDS) - final ResourceParser resourceParser = XdsResourceParserUtil.fromType(type); - assert resourceParser != null; - final boolean fullStateOfTheWorld = resourceParser.isFullStateOfTheWorld(); - if (fullStateOfTheWorld) { - final Set active = activeResources(resourceParser.type()); - for (String name : active) { - if (parsedResources.containsKey(name)) { - continue; - } - onResourceMissing(resourceParser.type(), name); - } - } else { - // A limitation of sotw - we can't know if resources should be removed. + subscriber.onUpdate(null, XdsResourceException.maybeWrap(type, resourceName, cause)); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java b/xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java index e5806a9a471..ae139bc9a26 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java @@ -33,7 +33,7 @@ final class SubscriberStorage implements SafeCloseable { private final EventExecutor eventLoop; private final long timeoutMillis; private final boolean delta; - private final Map>> subscriberMap = + private final Map>> subscriberMap = new EnumMap<>(XdsType.class); SubscriberStorage(EventExecutor eventLoop, long timeoutMillis, boolean delta) { @@ -45,37 +45,39 @@ final class SubscriberStorage implements SafeCloseable { /** * Returns {@code true} if a new subscriber is added. */ - boolean register(XdsType type, String resourceName, ResourceWatcher watcher) { + boolean register(XdsType type, String resourceName, + SnapshotWatcher watcher) { //noinspection unchecked - XdsStreamSubscriber subscriber = (XdsStreamSubscriber) subscriberMap.computeIfAbsent( + CompositeSnapshotWatcher subscriber = (CompositeSnapshotWatcher) subscriberMap.computeIfAbsent( type, key -> new HashMap<>()).get(resourceName); boolean updated = false; if (subscriber == null) { final boolean enableAbsentOnTimeout = !delta && timeoutMillis > 0; - subscriber = new XdsStreamSubscriber<>(type, resourceName, eventLoop, timeoutMillis, - enableAbsentOnTimeout); + subscriber = new CompositeSnapshotWatcher<>(type, resourceName, eventLoop, timeoutMillis, + enableAbsentOnTimeout); subscriberMap.get(type).put(resourceName, subscriber); updated = true; } - subscriber.registerWatcher(watcher); + subscriber.addWatcher(watcher); return updated; } /** * Returns {@code true} if a subscriber is removed. */ - boolean unregister(XdsType type, String resourceName, ResourceWatcher watcher) { + boolean unregister(XdsType type, String resourceName, + SnapshotWatcher watcher) { if (!subscriberMap.containsKey(type)) { return false; } - final Map> resourceToSubscriber = subscriberMap.get(type); + final Map> resourceToSubscriber = subscriberMap.get(type); if (!resourceToSubscriber.containsKey(resourceName)) { return false; } //noinspection unchecked - final XdsStreamSubscriber subscriber = - (XdsStreamSubscriber) resourceToSubscriber.get(resourceName); - subscriber.unregisterWatcher(watcher); + final CompositeSnapshotWatcher subscriber = + (CompositeSnapshotWatcher) resourceToSubscriber.get(resourceName); + subscriber.removeWatcher(watcher); if (subscriber.isEmpty()) { resourceToSubscriber.remove(resourceName); subscriber.close(); @@ -88,7 +90,7 @@ boolean unregister(XdsType type, String resourceName, Re } @Nullable - XdsStreamSubscriber subscriber(XdsType type, String resourceName) { + CompositeSnapshotWatcher subscriber(XdsType type, String resourceName) { return unsafeCast(subscriberMap.getOrDefault(type, ImmutableMap.of()).get(resourceName)); } @@ -109,7 +111,7 @@ boolean hasNoSubscribers() { @Override public void close() { subscriberMap.values().forEach(subscribers -> { - subscribers.values().forEach(XdsStreamSubscriber::close); + subscribers.values().forEach(CompositeSnapshotWatcher::close); }); subscriberMap.clear(); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java b/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java index 1fca1c86c59..2d88df25f57 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SubscriptionContext.java @@ -40,6 +40,13 @@ default SnapshotStream genericSecretStream( .checkSubscribeOn(eventLoop()); } + @Override + default SnapshotStream clusterStream(String clusterName) { + final SnapshotStream stream = + watcher -> clusterManager().register(clusterName, this, watcher); + return stream.checkSubscribeOn(eventLoop()); + } + void subscribe(ResourceNode node); void unsubscribe(ResourceNode node); diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapBuilder.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapBuilder.java index 6028329d704..969c6ab806f 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapBuilder.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapBuilder.java @@ -17,17 +17,26 @@ package com.linecorp.armeria.xds; import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableSet; + import com.linecorp.armeria.common.Flags; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.common.metric.MeterIdPrefix; import com.linecorp.armeria.common.util.EventLoopGroups; import com.linecorp.armeria.internal.common.util.ReentrantShortLock; +import com.linecorp.armeria.xds.configsource.SotwConfigSourceSubscriptionFactory; +import com.linecorp.armeria.xds.filter.HttpFilterFactory; import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; import io.micrometer.core.instrument.MeterRegistry; @@ -40,6 +49,8 @@ @UnstableApi public final class XdsBootstrapBuilder { + static final Set> ALLOWED_EXTENSION_TYPES = + ImmutableSet.of(HttpFilterFactory.class, SotwConfigSourceSubscriptionFactory.class); static final MeterIdPrefix DEFAULT_METER_ID_PREFIX = new MeterIdPrefix("armeria.xds"); private static final Logger logger = LoggerFactory.getLogger(XdsBootstrapBuilder.class); @@ -81,6 +92,7 @@ public void onUpdate(@Nullable Object snapshot, @Nullable Throwable t) { private EventExecutor eventExecutor; private final Bootstrap bootstrap; private SnapshotWatcher snapshotWatcher = DEFAULT_SNAPSHOT_WATCHER; + private final List extensionFactories = new ArrayList<>(); XdsBootstrapBuilder(Bootstrap bootstrap) { this.bootstrap = requireNonNull(bootstrap, "bootstrap"); @@ -119,12 +131,29 @@ public XdsBootstrapBuilder defaultSnapshotWatcher(SnapshotWatcher snapsh return this; } + /** + * Adds an {@link XdsExtensionFactory} that takes precedence over those + * discovered via {@link java.util.ServiceLoader}. + */ + public XdsBootstrapBuilder extensionFactory(XdsExtensionFactory factory) { + requireNonNull(factory, "factory"); + validateExtensionFactory(factory); + extensionFactories.add(factory); + return this; + } + + static void validateExtensionFactory(XdsExtensionFactory factory) { + checkArgument(ALLOWED_EXTENSION_TYPES.stream().anyMatch(t -> t.isInstance(factory)), + "Unsupported extension factory type: %s. Allowed types: %s", + factory.getClass().getName(), ALLOWED_EXTENSION_TYPES); + } + /** * Builds the {@link XdsBootstrap}. */ public XdsBootstrap build() { final EventExecutor eventExecutor = firstNonNull(this.eventExecutor, defaultGroup().next()); return new XdsBootstrapImpl(bootstrap, eventExecutor, meterIdPrefix, meterRegistry, - snapshotWatcher); + snapshotWatcher, extensionFactories); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java index f14352faf8a..f741f416d1d 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrapImpl.java @@ -18,6 +18,7 @@ import static java.util.Objects.requireNonNull; +import java.util.List; import java.util.Map; import com.google.common.annotations.VisibleForTesting; @@ -43,7 +44,8 @@ final class XdsBootstrapImpl implements XdsBootstrap { XdsBootstrapImpl(Bootstrap bootstrap, EventExecutor eventLoop, MeterIdPrefix meterIdPrefix, MeterRegistry meterRegistry, - SnapshotWatcher defaultWatcher) { + SnapshotWatcher defaultWatcher, + List extensionFactories) { this.bootstrap = bootstrap; this.defaultWatcher = defaultWatcher; this.eventLoop = requireNonNull(eventLoop, "eventLoop"); @@ -51,7 +53,8 @@ final class XdsBootstrapImpl implements XdsBootstrap { final XdsResourceValidator resourceValidator = new XdsResourceValidator(); final XdsExtensionRegistry extensionRegistry = XdsExtensionRegistry.of(resourceValidator, watchService, - meterRegistry, meterIdPrefix); + meterRegistry, meterIdPrefix, + extensionFactories); extensionRegistry.assertValid(bootstrap); clusterManager = new XdsClusterManager(eventLoop, bootstrap, meterIdPrefix, meterRegistry); final BootstrapClusters bootstrapClusters = @@ -60,7 +63,7 @@ final class XdsBootstrapImpl implements XdsBootstrap { final ConfigSourceMapper configSourceMapper = new ConfigSourceMapper(bootstrap); controlPlaneClientManager = new ControlPlaneClientManager( - bootstrap, eventLoop, bootstrapClusters, configSourceMapper, extensionRegistry); + bootstrap, eventLoop, bootstrapClusters, configSourceMapper, extensionRegistry, defaultWatcher); subscriptionContext = new DefaultSubscriptionContext( eventLoop, clusterManager, configSourceMapper, controlPlaneClientManager, meterRegistry, meterIdPrefix, watchService, bootstrapSecrets, extensionRegistry); diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java index 0dd68aeef80..f1360884bc7 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java @@ -90,7 +90,7 @@ void register(Cluster cluster, SubscriptionContext context, } Subscription register(String name, SubscriptionContext context, - SnapshotWatcher watcher) { + SnapshotWatcher watcher) { if (closed) { return Subscription.noop(); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java index 759d107b11e..e5111e118b8 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsExtensionRegistry.java @@ -16,6 +16,8 @@ package com.linecorp.armeria.xds; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.ServiceLoader; @@ -26,7 +28,6 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.file.DirectoryWatchService; import com.linecorp.armeria.common.metric.MeterIdPrefix; -import com.linecorp.armeria.xds.filter.HttpFilterFactory; import io.micrometer.core.instrument.MeterRegistry; @@ -54,7 +55,8 @@ private XdsExtensionRegistry(Map byTypeUrl, static XdsExtensionRegistry of(XdsResourceValidator validator, DirectoryWatchService watchService, MeterRegistry meterRegistry, - MeterIdPrefix meterIdPrefix) { + MeterIdPrefix meterIdPrefix, + List extensionFactories) { final ImmutableMap.Builder byName = ImmutableMap.builder(); final ImmutableMap.Builder byTypeUrl = ImmutableMap.builder(); @@ -62,13 +64,9 @@ static XdsExtensionRegistry of(XdsResourceValidator validator, byName, byTypeUrl); register(new GrpcConfigSourceStreamFactory(meterRegistry, meterIdPrefix), byName, byTypeUrl); - // Load SPI-discovered HttpFilterFactory instances as base factories - ServiceLoader.load(HttpFilterFactory.class).forEach(factory -> { - register(factory, byName, byTypeUrl); - }); - - // Load SPI-discovered SotwConfigSourceSubscriptionFactory instances - ServiceLoader.load(SotwConfigSourceSubscriptionFactory.class).forEach(factory -> { + // Load SPI-discovered extension factories + ServiceLoader.load(XdsExtensionFactory.class).forEach(factory -> { + XdsBootstrapBuilder.validateExtensionFactory(factory); register(factory, byName, byTypeUrl); }); @@ -79,7 +77,19 @@ static XdsExtensionRegistry of(XdsResourceValidator validator, register(UpstreamTlsTransportSocketFactory.INSTANCE, byName, byTypeUrl); register(RawBufferTransportSocketFactory.INSTANCE, byName, byTypeUrl); - return new XdsExtensionRegistry(byTypeUrl.build(), byName.build(), validator); + final Map nameMap = new HashMap<>(byName.build()); + final Map typeUrlMap = new HashMap<>(byTypeUrl.build()); + + // User-provided factories override all of the above + for (XdsExtensionFactory factory : extensionFactories) { + nameMap.put(factory.name(), factory); + for (String typeUrl : factory.typeUrls()) { + typeUrlMap.put(typeUrl, factory); + } + } + + return new XdsExtensionRegistry(ImmutableMap.copyOf(typeUrlMap), + ImmutableMap.copyOf(nameMap), validator); } private static void register(XdsExtensionFactory factory, diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsStream.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsStream.java index bc7fac40701..629857fc9cf 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsStream.java @@ -16,9 +16,7 @@ package com.linecorp.armeria.xds; -import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.xds.stream.SnapshotStream; -interface XdsStream extends SafeCloseable { - - void resourcesUpdated(XdsType type); +interface XdsStream extends SnapshotStream { } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java deleted file mode 100644 index 487965a3349..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsStreamSubscriber.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright 2023 LINE Corporation - * - * LINE Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.common.util.SafeCloseable; - -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.ScheduledFuture; - -class XdsStreamSubscriber implements SafeCloseable { - - private static final Logger logger = LoggerFactory.getLogger(XdsStreamSubscriber.class); - - private final XdsType type; - private final String resource; - private final long timeoutMillis; - private final EventExecutor eventLoop; - private final boolean enableAbsentOnTimeout; - @Nullable - private ScheduledFuture initialAbsentFuture; - private final Set> resourceWatchers = new HashSet<>(); - - XdsStreamSubscriber(XdsType type, String resource, EventExecutor eventLoop, long timeoutMillis, - boolean enableAbsentOnTimeout) { - this.type = type; - this.resource = resource; - this.eventLoop = eventLoop; - this.timeoutMillis = timeoutMillis; - this.enableAbsentOnTimeout = enableAbsentOnTimeout; - - restartTimer(); - } - - private void restartTimer() { - if (!enableAbsentOnTimeout) { - return; - } - - initialAbsentFuture = eventLoop.schedule(() -> { - initialAbsentFuture = null; - onAbsent(); - }, timeoutMillis, TimeUnit.MILLISECONDS); - } - - private void maybeCancelAbsentTimer() { - if (initialAbsentFuture != null && initialAbsentFuture.isCancellable()) { - initialAbsentFuture.cancel(false); - initialAbsentFuture = null; - } - } - - @Override - public void close() { - maybeCancelAbsentTimer(); - } - - void onData(T data) { - maybeCancelAbsentTimer(); - for (ResourceWatcher watcher: resourceWatchers) { - try { - watcher.onChanged(data); - } catch (Exception e) { - logger.warn("Unexpected exception while invoking {}.onChanged() with ({}, {}) for ({}).", - getClass().getSimpleName(), type, resource, data, e); - } - } - } - - void onError(String resourceName, Throwable t) { - maybeCancelAbsentTimer(); - for (ResourceWatcher watcher: resourceWatchers) { - try { - watcher.onError(type, resourceName, t); - } catch (Exception e) { - logger.warn("Unexpected exception while invoking {}.onError() with ({}, {}) for ({}).", - getClass().getSimpleName(), type, resource, t, e); - } - } - } - - void onAbsent() { - maybeCancelAbsentTimer(); - for (ResourceWatcher watcher: resourceWatchers) { - try { - watcher.onResourceDoesNotExist(type, resource); - } catch (Exception e) { - logger.warn("Unexpected exception while invoking" + - " {}.onResourceDoesNotExist() with ({}, {}).", - getClass().getSimpleName(), type, resource, e); - } - } - } - - boolean isEmpty() { - return resourceWatchers.isEmpty(); - } - - void registerWatcher(ResourceWatcher watcher) { - resourceWatchers.add(watcher); - } - - void unregisterWatcher(ResourceWatcher watcher) { - resourceWatchers.remove(watcher); - } -} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java index e305471458f..9450b9f15c4 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilter.java @@ -18,15 +18,12 @@ import static com.linecorp.armeria.xds.client.endpoint.XdsAttributeKeys.SELECTED_ROUTE; -import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.PreClient; import com.linecorp.armeria.client.PreClientRequestContext; import com.linecorp.armeria.client.Preprocessor; import com.linecorp.armeria.client.UnprocessedRequestException; import com.linecorp.armeria.common.Request; import com.linecorp.armeria.common.Response; -import com.linecorp.armeria.common.TimeoutException; -import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.xds.ClusterSnapshot; import com.linecorp.armeria.xds.RouteEntry; import com.linecorp.armeria.xds.internal.XdsCommonUtil; @@ -60,30 +57,7 @@ public O execute(PreClient delegate, PreClientRequestContext ctx, I req) t ctx.setResponseTimeoutMillis(responseTimeoutMillis); } - final XdsLoadBalancer loadBalancer = clusterSnapshot.loadBalancer(); - if (loadBalancer == null) { - final UnprocessedRequestException e = UnprocessedRequestException.of( - new IllegalArgumentException("The target cluster '" + clusterSnapshot + - "' does not specify ClusterLoadAssignments.")); - ctx.cancel(e); - throw e; - } - - final Endpoint endpoint = loadBalancer.selectNow(ctx); - return execute0(delegate, ctx, req, endpoint); - } - - private O execute0(PreClient delegate, PreClientRequestContext ctx, I req, - @Nullable Endpoint endpoint) throws Exception { - if (endpoint == null) { - final Throwable cancellationCause = ctx.cancellationCause(); - if (cancellationCause != null) { - throw UnprocessedRequestException.of(cancellationCause); - } - throw UnprocessedRequestException.of(new TimeoutException("Failed to select an endpoint.")); - } - XdsCommonUtil.setTlsParams(ctx, endpoint); - ctx.setEndpointGroup(endpoint); + XdsCommonUtil.applyClusterToCtx(clusterSnapshot, ctx); return delegate.execute(ctx, req); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/configsource/InterestedResources.java b/xds/src/main/java/com/linecorp/armeria/xds/configsource/InterestedResources.java new file mode 100644 index 00000000000..c418e36aea9 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/configsource/InterestedResources.java @@ -0,0 +1,62 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.configsource; + +import static java.util.Objects.requireNonNull; + +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.XdsType; + +/** + * Represents the set of xDS resource names that are currently subscribed for a given + * {@link XdsType}. Emitted by the interest stream whenever subscriptions change. + */ +@UnstableApi +public final class InterestedResources { + + private final XdsType type; + private final Set resourceNames; + + /** + * Creates a new {@link InterestedResources} instance. + * + * @param type the xDS resource type + * @param resourceNames the set of resource names currently subscribed + */ + public InterestedResources(XdsType type, Set resourceNames) { + this.type = requireNonNull(type, "type"); + this.resourceNames = ImmutableSet.copyOf(requireNonNull(resourceNames, "resourceNames")); + } + + /** + * Returns the {@link XdsType} of the interested resources. + */ + public XdsType type() { + return type; + } + + /** + * Returns the set of resource names currently subscribed. + */ + public Set resourceNames() { + return resourceNames; + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/configsource/SotwConfigSourceSubscriptionFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/configsource/SotwConfigSourceSubscriptionFactory.java new file mode 100644 index 00000000000..ef8989a0b01 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/configsource/SotwConfigSourceSubscriptionFactory.java @@ -0,0 +1,88 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.configsource; + +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.filter.FactoryContext; +import com.linecorp.armeria.xds.XdsExtensionFactory; +import com.linecorp.armeria.xds.stream.SnapshotStream; + +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; + +/** + * A factory that creates a {@link SnapshotStream} of {@link DiscoveryResponse}s + * for a non-gRPC config source. Implementations are resolved by name or by the + * {@code custom_config_source} type URL via the extension registry. + * + *

How the pieces fit together

+ *
    + *
  1. Armeria calls {@link #create} with a {@link ConfigSource}, a {@link FactoryContext}, + * and a {@link SnapshotStream} of {@link InterestedResources}.
  2. + *
  3. The returned {@link SnapshotStream} watches the external source + * (file, KV store, etc.) and emits {@link DiscoveryResponse}s to subscribers.
  4. + *
  5. Armeria subscribes to the stream and handles resource parsing, storage, and + * subscriber notification internally.
  6. + *
+ * + *

Example

+ *
{@code
+ * public class MyConfigSourceFactory implements SotwConfigSourceSubscriptionFactory {
+ *
+ *     @Override
+ *     public String name() {
+ *         return "my-config-source";
+ *     }
+ *
+ *     @Override
+ *     public SnapshotStream create(
+ *             ConfigSource configSource,
+ *             FactoryContext factoryContext,
+ *             SnapshotStream interestedResources) {
+ *         return new RefCountedStream() {
+ *             @Override
+ *             protected Subscription onStart(SnapshotWatcher watcher) {
+ *                 // Start watching the external source and call emit(response, null)
+ *                 // whenever new data arrives.
+ *                 ...
+ *                 return () -> {
+ *                     // Clean up resources (close connections, cancel watchers, etc.).
+ *                 };
+ *             }
+ *         };
+ *     }
+ * }
+ * }
+ * + *

Custom implementations can be registered via {@link java.util.ServiceLoader} SPI. + */ +@UnstableApi +public interface SotwConfigSourceSubscriptionFactory extends XdsExtensionFactory { + + /** + * Creates a {@link SnapshotStream} of {@link DiscoveryResponse}s for the given config source. + * + * @param configSource the full {@link ConfigSource} from xDS bootstrap or resource + * @param factoryContext provides runtime infrastructure such as the event loop and metrics + * @param interestedResources a stream that emits the currently subscribed resource names + * whenever subscriptions change + * @return a new {@link SnapshotStream} + */ + SnapshotStream create(ConfigSource configSource, + FactoryContext factoryContext, + SnapshotStream interestedResources); +} diff --git a/xds/src/test/java/com/linecorp/armeria/xds/DummyResourceWatcher.java b/xds/src/main/java/com/linecorp/armeria/xds/configsource/package-info.java similarity index 66% rename from xds/src/test/java/com/linecorp/armeria/xds/DummyResourceWatcher.java rename to xds/src/main/java/com/linecorp/armeria/xds/configsource/package-info.java index 5677ee9f244..c9bc1c91d7f 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/DummyResourceWatcher.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/configsource/package-info.java @@ -1,5 +1,5 @@ /* - * Copyright 2025 LY Corporation + * Copyright 2026 LY Corporation * * LY Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -14,11 +14,12 @@ * under the License. */ -package com.linecorp.armeria.xds; - -final class DummyResourceWatcher implements ResourceWatcher { +/** + * Config source abstractions for xDS resource discovery. + */ +@NonNullByDefault +@UnstableApi +package com.linecorp.armeria.xds.configsource; - @Override - public void onChanged(XdsResource update) { - } -} +import com.linecorp.armeria.common.annotation.NonNullByDefault; +import com.linecorp.armeria.common.annotation.UnstableApi; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.java b/xds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.java index 5e60c00bbb9..656ddc85595 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/filter/FactoryContext.java @@ -18,6 +18,7 @@ import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.common.metric.MeterIdPrefix; +import com.linecorp.armeria.xds.ClusterSnapshot; import com.linecorp.armeria.xds.GenericSecretSnapshot; import com.linecorp.armeria.xds.XdsResourceValidator; import com.linecorp.armeria.xds.stream.SnapshotStream; @@ -62,4 +63,14 @@ public interface FactoryContext { * @param sdsSecretConfig the SDS secret configuration describing which secret to fetch */ SnapshotStream genericSecretStream(SdsSecretConfig sdsSecretConfig); + + /** + * Creates a reactive {@link SnapshotStream} of {@link ClusterSnapshot} for the given cluster name. + * The stream resolves the cluster via CDS (or static bootstrap clusters), including its + * endpoints, transport sockets, and load balancer, and emits snapshots whenever any of + * these change. + * + * @param clusterName the name of the cluster to watch + */ + SnapshotStream clusterStream(String clusterName); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/internal/XdsCommonUtil.java b/xds/src/main/java/com/linecorp/armeria/xds/internal/XdsCommonUtil.java index d3676551ba1..acf04d53f16 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/internal/XdsCommonUtil.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/internal/XdsCommonUtil.java @@ -27,12 +27,18 @@ import com.linecorp.armeria.client.ClientTlsSpec; import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.PreClientRequestContext; +import com.linecorp.armeria.client.UnprocessedRequestException; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.TimeoutException; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.ClusterSnapshot; import com.linecorp.armeria.xds.TransportSocketSnapshot; +import com.linecorp.armeria.xds.client.endpoint.XdsLoadBalancer; +import io.envoyproxy.envoy.extensions.upstreams.http.v3.HttpProtocolOptions; +import io.envoyproxy.envoy.extensions.upstreams.http.v3.HttpProtocolOptions.ExplicitHttpConfig; import io.netty.util.AttributeKey; public final class XdsCommonUtil { @@ -100,22 +106,56 @@ public static boolean isGrpcRequest(@Nullable HttpRequest req) { return "grpc".equals(subtype) || subtype.startsWith("grpc+"); } - public static void setTlsParams(PreClientRequestContext ctx, Endpoint endpoint) { + public static Endpoint applyClusterToCtx(ClusterSnapshot clusterSnapshot, PreClientRequestContext ctx) { + final XdsLoadBalancer loadBalancer = clusterSnapshot.loadBalancer(); + if (loadBalancer == null) { + throw UnprocessedRequestException.of( + new IllegalStateException( + "The cluster '" + clusterSnapshot.xdsResource().resource().getName() + + "' does not have a load balancer.")); + } + final Endpoint endpoint = loadBalancer.selectNow(ctx); + if (endpoint == null) { + throw UnprocessedRequestException.of( + new TimeoutException("Failed to select an endpoint.")); + } + setTlsParams(ctx, endpoint, clusterSnapshot.xdsResource().httpProtocolOptions()); + ctx.setEndpointGroup(endpoint); + return endpoint; + } + + private static void setTlsParams(PreClientRequestContext ctx, Endpoint endpoint, + @Nullable HttpProtocolOptions httpProtocolOptions) { final TransportSocketSnapshot transportSocket = endpoint.attr(TRANSPORT_SOCKET_SNAPSHOT_KEY); checkArgument(transportSocket != null, "TransportSocket not set for selected endpoint: %s", endpoint); - ClientTlsSpec clientTlsSpec = transportSocket.clientTlsSpec(); + final ClientTlsSpec clientTlsSpec = transportSocket.clientTlsSpec(); if (clientTlsSpec == null) { - ctx.setSessionProtocol(SessionProtocol.HTTP); + ctx.setSessionProtocol(sessionProtocol(false, httpProtocolOptions)); return; } final Set alpnOverride = ctx.attr(ALPN_OVERRIDE_KEY); if (alpnOverride != null && !alpnOverride.isEmpty()) { - clientTlsSpec = clientTlsSpec.toBuilder().alpnProtocols(alpnOverride).build(); + ctx.setClientTlsSpec(clientTlsSpec.toBuilder().alpnProtocols(alpnOverride).build()); + } else { + ctx.setClientTlsSpec(clientTlsSpec); + } + ctx.setSessionProtocol(sessionProtocol(true, httpProtocolOptions)); + } + + private static SessionProtocol sessionProtocol(boolean tls, + @Nullable HttpProtocolOptions httpProtocolOptions) { + if (httpProtocolOptions != null && httpProtocolOptions.hasExplicitHttpConfig()) { + final ExplicitHttpConfig explicitConfig = httpProtocolOptions.getExplicitHttpConfig(); + if (explicitConfig.hasHttp2ProtocolOptions()) { + return tls ? SessionProtocol.H2 : SessionProtocol.H2C; + } + if (explicitConfig.hasHttpProtocolOptions()) { + return tls ? SessionProtocol.H1 : SessionProtocol.H1C; + } } - ctx.setSessionProtocol(SessionProtocol.HTTPS); - ctx.setClientTlsSpec(clientTlsSpec); + return tls ? SessionProtocol.HTTPS : SessionProtocol.HTTP; } private XdsCommonUtil() {} diff --git a/xds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.filter.HttpFilterFactory b/xds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactory similarity index 100% rename from xds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.filter.HttpFilterFactory rename to xds/src/main/resources/META-INF/services/com.linecorp.armeria.xds.XdsExtensionFactory diff --git a/xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java b/xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java index 944f4f85fcc..bda05e323e3 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/StateCoordinatorTest.java @@ -18,11 +18,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import com.linecorp.armeria.common.annotation.Nullable; +import com.google.common.collect.ImmutableList; + import com.linecorp.armeria.common.file.DirectoryWatchService; import com.linecorp.armeria.common.metric.MeterIdPrefix; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; @@ -49,19 +52,51 @@ static void tearDown() { } @Test - void lateSubscriberReceivesCachedResource() { + void dataDeliveredToSubscribedWatcher() { final XdsExtensionRegistry extensionRegistry = extensionRegistry(); final StateCoordinator coordinator = new StateCoordinator( eventLoop.get(), ConfigSource.getDefaultInstance(), false, extensionRegistry); final ClusterXdsResource resource = - new ClusterXdsResource(createCluster(CLUSTER_NAME), "1").withRevision(1); + new ClusterXdsResource(createCluster(CLUSTER_NAME), "1", null).withRevision(1); + + final AtomicReference changed = new AtomicReference<>(); + final SnapshotWatcher watcher = (value, error) -> { + if (value != null) { + changed.set(value); + } + }; + coordinator.register(XdsType.CLUSTER, CLUSTER_NAME, watcher); + + // onResourceUpdated stores and notifies the subscribed watcher coordinator.onResourceUpdated(XdsType.CLUSTER, CLUSTER_NAME, resource); - final CapturingWatcher watcher = new CapturingWatcher(); - coordinator.register(XdsType.CLUSTER, CLUSTER_NAME, watcher); + assertThat(changed.get()).isNotNull(); + assertThat(changed.get().resource()).isEqualTo(resource.resource()); + } + + @Test + void registerDeliversCachedResource() { + final XdsExtensionRegistry extensionRegistry = extensionRegistry(); + final StateCoordinator coordinator = new StateCoordinator( + eventLoop.get(), ConfigSource.getDefaultInstance(), false, extensionRegistry); + final ClusterXdsResource resource = + new ClusterXdsResource(createCluster(CLUSTER_NAME), "1", null).withRevision(1); + + // First register + store a resource + final SnapshotWatcher noopWatcher = (value, error) -> {}; + coordinator.register(XdsType.CLUSTER, CLUSTER_NAME, noopWatcher); + coordinator.onResourceUpdated(XdsType.CLUSTER, CLUSTER_NAME, resource); - assertThat(watcher.changed).isSameAs(resource); - assertThat(watcher.missingType).isNull(); + // A late watcher registered to the same resource gets the cached value + final AtomicReference replayed = new AtomicReference<>(); + final SnapshotWatcher lateWatcher = (value, error) -> { + if (value != null) { + replayed.set(value); + } + }; + coordinator.register(XdsType.CLUSTER, CLUSTER_NAME, lateWatcher); + assertThat(replayed.get()).isNotNull(); + assertThat(replayed.get().resource()).isEqualTo(resource.resource()); } @Test @@ -69,20 +104,27 @@ void missingResourceNotCachedAfterRemoval() { final XdsExtensionRegistry extensionRegistry = extensionRegistry(); final StateCoordinator coordinator = new StateCoordinator( eventLoop.get(), ConfigSource.getDefaultInstance(), false, extensionRegistry); - final CapturingWatcher watcher1 = new CapturingWatcher(); - coordinator.register(XdsType.CLUSTER, CLUSTER_NAME, watcher1); + final SnapshotWatcher noopWatcher = (value, error) -> {}; + coordinator.register(XdsType.CLUSTER, CLUSTER_NAME, noopWatcher); coordinator.onResourceMissing(XdsType.CLUSTER, CLUSTER_NAME); - coordinator.unregister(XdsType.CLUSTER, CLUSTER_NAME, watcher1); - - // After missing + unregister, the state is removed from stateStore. - // A new watcher should not get a replay — it waits for the server. - final CapturingWatcher watcher2 = new CapturingWatcher(); - coordinator.register(XdsType.CLUSTER, CLUSTER_NAME, watcher2); - - assertThat(watcher2.changed).isNull(); - assertThat(watcher2.missingType).isNull(); - assertThat(watcher2.missingName).isNull(); + coordinator.unregister(XdsType.CLUSTER, CLUSTER_NAME, noopWatcher); + + // After missing + unregister, a new register should not deliver anything + final AtomicReference changed = new AtomicReference<>(); + final AtomicReference error = new AtomicReference<>(); + final SnapshotWatcher newWatcher = (value, err) -> { + if (value != null) { + changed.set(value); + } + if (err != null) { + error.set(err); + } + }; + coordinator.register(XdsType.CLUSTER, CLUSTER_NAME, newWatcher); + + assertThat(changed.get()).isNull(); + assertThat(error.get()).isNull(); } @Test @@ -93,20 +135,32 @@ void stateRetainedAfterUnsubscribe() { final RouteXdsResource resource = new RouteXdsResource(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build(), "1") .withRevision(1); - coordinator.onResourceUpdated(XdsType.ROUTE, ROUTE_NAME, resource); - final CapturingWatcher watcher1 = new CapturingWatcher(); + final AtomicReference changed1 = new AtomicReference<>(); + final SnapshotWatcher watcher1 = (value, err) -> { + if (value != null) { + changed1.set(value); + } + }; coordinator.register(XdsType.ROUTE, ROUTE_NAME, watcher1); - assertThat(watcher1.changed).isSameAs(resource); - // Unregister does not touch stateStore, so the cached resource remains. - coordinator.unregister(XdsType.ROUTE, ROUTE_NAME, watcher1); - - final CapturingWatcher watcher2 = new CapturingWatcher(); + coordinator.onResourceUpdated(XdsType.ROUTE, ROUTE_NAME, resource); + assertThat(changed1.get()).isNotNull(); + + // Unregister removes the watcher and the subscriber slot. + assertThat(coordinator.unregister(XdsType.ROUTE, ROUTE_NAME, watcher1)).isTrue(); + + // stateStore retains the resource even after subscriber is removed. + // Re-register with a new watcher delivers the cached value. + final AtomicReference changed2 = new AtomicReference<>(); + final SnapshotWatcher watcher2 = (value, err) -> { + if (value != null) { + changed2.set(value); + } + }; coordinator.register(XdsType.ROUTE, ROUTE_NAME, watcher2); - - assertThat(watcher2.changed).isSameAs(resource); - assertThat(watcher2.missingType).isNull(); + assertThat(changed2.get()).isNotNull(); + assertThat(changed2.get().resource()).isEqualTo(resource.resource()); } private static XdsExtensionRegistry extensionRegistry() { @@ -114,30 +168,11 @@ private static XdsExtensionRegistry extensionRegistry() { return XdsExtensionRegistry.of(new XdsResourceValidator(), watchService, meterRegistry, - new MeterIdPrefix("test")); + new MeterIdPrefix("test"), + ImmutableList.of()); } private static Cluster createCluster(String name) { return Cluster.newBuilder().setName(name).build(); } - - private static final class CapturingWatcher implements ResourceWatcher { - @Nullable - private XdsResource changed; - @Nullable - private XdsType missingType; - @Nullable - private String missingName; - - @Override - public void onChanged(XdsResource update) { - changed = update; - } - - @Override - public void onResourceDoesNotExist(XdsType type, String resourceName) { - missingType = type; - missingName = resourceName; - } - } } diff --git a/xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java b/xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java index 6fbe68aa93c..6b5df99d913 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/SubscriberStorageTest.java @@ -20,6 +20,7 @@ import static org.awaitility.Awaitility.await; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -35,9 +36,9 @@ class SubscriberStorageTest { static EventLoopExtension eventLoop = new EventLoopExtension(); @Test - void registerAndUnregister() throws Exception { - final DummyResourceWatcher watcher = new DummyResourceWatcher(); + void registerAndUnregister() { final SubscriberStorage storage = new SubscriberStorage(eventLoop.get(), 15_000, false); + final SnapshotWatcher watcher = (value, error) -> {}; storage.register(XdsType.CLUSTER, CLUSTER_NAME, watcher); assertThat(storage.resources(XdsType.CLUSTER)).hasSize(1); storage.unregister(XdsType.CLUSTER, CLUSTER_NAME, watcher); @@ -46,43 +47,41 @@ void registerAndUnregister() throws Exception { } @Test - void identityBasedUnregister() { - final DummyResourceWatcher watcher1 = new DummyResourceWatcher(); + void duplicateRegisterReturnsFalse() { final SubscriberStorage storage = new SubscriberStorage(eventLoop.get(), 15_000, false); - storage.register(XdsType.CLUSTER, CLUSTER_NAME, watcher1); + final SnapshotWatcher watcher1 = (value, error) -> {}; + final SnapshotWatcher watcher2 = (value, error) -> {}; + assertThat(storage.register(XdsType.CLUSTER, CLUSTER_NAME, watcher1)).isTrue(); assertThat(storage.resources(XdsType.CLUSTER)).hasSize(1); - storage.register(XdsType.CLUSTER, CLUSTER_NAME, watcher1); + assertThat(storage.register(XdsType.CLUSTER, CLUSTER_NAME, watcher2)).isFalse(); assertThat(storage.resources(XdsType.CLUSTER)).hasSize(1); storage.unregister(XdsType.CLUSTER, CLUSTER_NAME, watcher1); + // watcher2 still present, so slot not removed yet + assertThat(storage.hasNoSubscribers()).isFalse(); + storage.unregister(XdsType.CLUSTER, CLUSTER_NAME, watcher2); assertThat(storage.resources(XdsType.CLUSTER)).isEmpty(); assertThat(storage.hasNoSubscribers()).isTrue(); } @Test void nonClusterListenerTimeout() { - final CapturingWatcher watcher = new CapturingWatcher(); final SubscriberStorage storage = new SubscriberStorage(eventLoop.get(), 50, false); + + final AtomicReference missingType = new AtomicReference<>(); + final AtomicReference missingName = new AtomicReference<>(); + final SnapshotWatcher watcher = (value, error) -> { + if (error instanceof MissingXdsResourceException) { + missingType.set(((MissingXdsResourceException) error).type()); + missingName.set(((MissingXdsResourceException) error).name()); + } + }; storage.register(XdsType.ROUTE, ROUTE_NAME, watcher); await().atMost(1, TimeUnit.SECONDS) .untilAsserted(() -> { - assertThat(watcher.missingType).isEqualTo(XdsType.ROUTE); - assertThat(watcher.missingName).isEqualTo(ROUTE_NAME); + assertThat(missingType.get()).isEqualTo(XdsType.ROUTE); + assertThat(missingName.get()).isEqualTo(ROUTE_NAME); }); } - - private static final class CapturingWatcher implements ResourceWatcher { - private volatile XdsType missingType; - private volatile String missingName; - - @Override - public void onChanged(XdsResource update) {} - - @Override - public void onResourceDoesNotExist(XdsType type, String resourceName) { - missingType = type; - missingName = resourceName; - } - } } diff --git a/xds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.java b/xds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.java index 29dea6d2d81..ad7d8bb62be 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/XdsExtensionRegistryTest.java @@ -22,6 +22,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; +import com.google.common.collect.ImmutableList; import com.google.protobuf.Any; import com.google.protobuf.Duration; @@ -39,7 +40,8 @@ private static XdsExtensionRegistry createRegistry() { return XdsExtensionRegistry.of(new XdsResourceValidator(), watchService, meterRegistry, - new MeterIdPrefix("test")); + new MeterIdPrefix("test"), + ImmutableList.of()); } private static final DirectoryWatchService watchService = new DirectoryWatchService(); diff --git a/xds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java b/xds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java index 8ab8891b080..13eed978652 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java @@ -131,7 +131,7 @@ void emitWithErrorBroadcastsToAllWatchers() { } @Test - void emitNotifiesWatchersInSubscriptionOrder() { + void emitNotifiesWatchersInStreamSubscriptionOrder() { final TestRefCountedStream stream = new TestRefCountedStream(Subscription::noop); final List order = new ArrayList<>(); @@ -200,7 +200,7 @@ void lastUnsubscribeTriggersOnStop() { } @Test - void lastUnsubscribeClosesUpstreamSubscription() { + void lastUnsubscribeClosesUpstreamStreamSubscription() { final AtomicBoolean upstreamClosed = new AtomicBoolean(false); final TestRefCountedStream stream = new TestRefCountedStream(() -> () -> upstreamClosed.set(true));