diff --git a/xds/src/main/java/com/linecorp/armeria/xds/CertificateValidationContextStream.java b/xds/src/main/java/com/linecorp/armeria/xds/CertificateValidationContextStream.java index b432809ea31..11de0bd95fd 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/CertificateValidationContextStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/CertificateValidationContextStream.java @@ -24,6 +24,9 @@ import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.internal.common.util.CertificateUtil; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ClusterRoot.java b/xds/src/main/java/com/linecorp/armeria/xds/ClusterRoot.java index 2702531f463..e2b674d0f52 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ClusterRoot.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ClusterRoot.java @@ -18,7 +18,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; -import com.linecorp.armeria.xds.SnapshotStream.Subscription; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.cluster.v3.Cluster; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java b/xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java index 65a5f63b21f..9c127e8bf80 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ClusterStream.java @@ -27,6 +27,9 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.xds.client.endpoint.XdsLoadBalancer; import com.linecorp.armeria.xds.client.endpoint.XdsLoadBalancerFactory; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/DataSourceStream.java b/xds/src/main/java/com/linecorp/armeria/xds/DataSourceStream.java index 3ad0b7e2f07..49872bd6231 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/DataSourceStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/DataSourceStream.java @@ -30,6 +30,9 @@ import com.linecorp.armeria.common.Cancellable; import com.linecorp.armeria.common.CommonPools; import com.linecorp.armeria.common.file.PathWatcher; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.core.v3.DataSource; import io.envoyproxy.envoy.config.core.v3.DataSource.SpecifierCase; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/EndpointStream.java b/xds/src/main/java/com/linecorp/armeria/xds/EndpointStream.java index a72b7aa5e8d..9a2d414b15d 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/EndpointStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/EndpointStream.java @@ -17,6 +17,8 @@ package com.linecorp.armeria.xds; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.core.v3.ConfigSource; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java b/xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java index 8faa1fc5358..5d7026061d4 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ListenerManager.java @@ -26,7 +26,7 @@ import java.util.Map; import com.linecorp.armeria.common.util.SafeCloseable; -import com.linecorp.armeria.xds.SnapshotStream.Subscription; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap.StaticResources; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ListenerRoot.java b/xds/src/main/java/com/linecorp/armeria/xds/ListenerRoot.java index 3ab627ad7e0..6a96f7c2be3 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ListenerRoot.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ListenerRoot.java @@ -18,7 +18,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; -import com.linecorp.armeria.xds.SnapshotStream.Subscription; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.listener.v3.Listener; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java b/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java index 0aeb3a41a1e..31340024416 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java @@ -19,6 +19,9 @@ import static com.linecorp.armeria.xds.XdsType.LISTENER; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RawBufferTransportSocketFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/RawBufferTransportSocketFactory.java index 652a138617b..935014a825a 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RawBufferTransportSocketFactory.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/RawBufferTransportSocketFactory.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.stream.SnapshotStream; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.core.v3.TransportSocket; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java index 7f946814c88..065f152e0ce 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeAdapter.java @@ -16,6 +16,9 @@ package com.linecorp.armeria.xds; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.Subscription; + import io.envoyproxy.envoy.config.core.v3.ConfigSource; final class ResourceNodeAdapter extends RefCountedStream implements ResourceNode { @@ -59,13 +62,13 @@ public void onChanged(T update) { @Override public void onError(XdsType type, String resourceName, Throwable t) { - resourceNodeMeterBinder.onError(type, resourceName, t); + resourceNodeMeterBinder.onError(); emit(null, XdsResourceException.maybeWrap(type, resourceName, t)); } @Override public void onResourceDoesNotExist(XdsType type, String resourceName) { - resourceNodeMeterBinder.onResourceDoesNotExist(type, resourceName); + resourceNodeMeterBinder.onResourceDoesNotExist(); emit(null, new MissingXdsResourceException(type, resourceName)); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.java index 9481bacc01c..c7db85e3a47 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ResourceNodeMeterBinderFactory.java @@ -56,7 +56,7 @@ ResourceNodeMeterBinder acquire(XdsType type, String resourceName) { * This is not done at the user-exposed {@link SnapshotWatcher} level so that users can * observe the internal state/lifecycle of {@link ResourceNode}s via metrics. */ - final class ResourceNodeMeterBinder implements ResourceWatcher { + final class ResourceNodeMeterBinder { private final Key key; private boolean closed; @@ -98,18 +98,15 @@ void close() { } } - @Override - public void onError(XdsType type, String resourceName, Throwable t) { + void onError() { errorCounter.increment(); } - @Override - public void onResourceDoesNotExist(XdsType type, String resourceName) { + void onResourceDoesNotExist() { missingCounter.increment(); } - @Override - public void onChanged(XdsResource update) { + void onChanged(XdsResource update) { updatedRevision.set(update.revision()); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java b/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java index aab85fb375c..51cb85b9ecb 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java @@ -21,6 +21,9 @@ import com.google.common.collect.ImmutableList; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.route.v3.Route; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SecretStream.java b/xds/src/main/java/com/linecorp/armeria/xds/SecretStream.java index 26a25619aba..2ea0cdc50bd 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SecretStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SecretStream.java @@ -17,6 +17,9 @@ package com.linecorp.armeria.xds; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java b/xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java deleted file mode 100644 index b3eacc6ca1f..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2026 LY Corporation - * - * LY Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -import java.util.List; -import java.util.Optional; -import java.util.function.BiFunction; -import java.util.function.Function; - -import com.google.common.collect.ImmutableList; -import com.google.errorprone.annotations.CheckReturnValue; - -import com.linecorp.armeria.xds.CombineLatest3Stream.TriFunction; - -@FunctionalInterface -interface SnapshotStream { - - @CheckReturnValue - Subscription subscribe(SnapshotWatcher watcher); - - @FunctionalInterface - interface Subscription { - - static Subscription noop() { - return () -> {}; - } - - void close(); - } - - default SnapshotStream map(Function mapper) { - return new MapStream<>(this, mapper); - } - - default > SnapshotStream switchMapEager( - Function mapper) { - return new SwitchMapEagerStream<>(this, mapper); - } - - static , I> SnapshotStream> combineNLatest(List stream) { - return new CombineNLatestStream<>(ImmutableList.copyOf(stream)); - } - - static SnapshotStream combineLatest( - SnapshotStream a, - SnapshotStream b, - BiFunction combiner) { - return new CombineLatest2Stream<>(a, b, combiner); - } - - static SnapshotStream combineLatest( - SnapshotStream a, - SnapshotStream b, - SnapshotStream c, - TriFunction combiner) { - return new CombineLatest3Stream<>(a, b, c, combiner); - } - - static SnapshotStream just(T value) { - return new StaticSnapshotStream<>(value, null); - } - - @SuppressWarnings("unchecked") - static SnapshotStream> empty() { - return (SnapshotStream>) StaticSnapshotStream.EMPTY; - } - - static SnapshotStream error(Throwable error) { - return new StaticSnapshotStream<>(null, error); - } -} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SnapshotWatcher.java b/xds/src/main/java/com/linecorp/armeria/xds/SnapshotWatcher.java index 85d1880e9d8..9a341f7f228 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SnapshotWatcher.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/SnapshotWatcher.java @@ -18,9 +18,12 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.stream.SnapshotStream; /** - * A watcher implementation which waits for updates on an xDS snapshot. + * A callback interface for receiving snapshot updates from a {@link SnapshotStream}. + * + * @param the type of snapshot values received by this watcher */ @UnstableApi @FunctionalInterface @@ -28,7 +31,10 @@ public interface SnapshotWatcher { /** * Invoked when a snapshot is updated or an error occurs. - * Either snapshot or error will be non-null. + * Exactly one of {@code snapshot} or {@code error} will be non-null. + * + * @param snapshot the updated snapshot value, or {@code null} if an error occurred + * @param error the error, or {@code null} if a snapshot was delivered */ - void onUpdate(@Nullable T snapshot, @Nullable Throwable t); + void onUpdate(@Nullable T snapshot, @Nullable Throwable error); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/TlsCertificateStream.java b/xds/src/main/java/com/linecorp/armeria/xds/TlsCertificateStream.java index f87de487aa8..59fa97459bd 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/TlsCertificateStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/TlsCertificateStream.java @@ -22,6 +22,9 @@ import com.google.protobuf.ByteString; import com.linecorp.armeria.common.TlsKeyPair; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.core.v3.WatchedDirectory; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.TlsCertificate; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketFactory.java index 09ede15b0cf..7b5619a57b6 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketFactory.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketFactory.java @@ -17,6 +17,7 @@ package com.linecorp.armeria.xds; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.stream.SnapshotStream; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.core.v3.TransportSocket; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java b/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java index fe6634047e5..941c9ba8e17 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/TransportSocketStream.java @@ -17,6 +17,9 @@ package com.linecorp.armeria.xds; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.stream.RefCountedStream; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.core.v3.TransportSocket; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/UpstreamTlsTransportSocketFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/UpstreamTlsTransportSocketFactory.java index 51cc986175d..2f6ce0f5fed 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/UpstreamTlsTransportSocketFactory.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/UpstreamTlsTransportSocketFactory.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.stream.SnapshotStream; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.core.v3.TransportSocket; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java b/xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java index c2bfc7ff6c5..0dd68aeef80 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/XdsClusterManager.java @@ -27,7 +27,8 @@ import com.linecorp.armeria.common.metric.MeterIdPrefix; import com.linecorp.armeria.common.util.SafeCloseable; -import com.linecorp.armeria.xds.SnapshotStream.Subscription; +import com.linecorp.armeria.xds.stream.SnapshotStream; +import com.linecorp.armeria.xds.stream.Subscription; import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; import io.envoyproxy.envoy.config.cluster.v3.Cluster; @@ -73,7 +74,9 @@ void register(Cluster cluster, SubscriptionContext context, List> watchers) { checkArgument(!nodes.containsKey(cluster.getName()), "Cluster with name '%s' already registered", cluster.getName()); - final ClusterStream node = new ClusterStream(new ClusterXdsResource(cluster), context, + final ClusterXdsResource resource = + ClusterResourceParser.INSTANCE.parse(cluster, context.extensionRegistry(), ""); + final ClusterStream node = new ClusterStream(resource, context, loadBalancerFactoryPool); nodes.put(cluster.getName(), node); for (SnapshotWatcher watcher : watchers) { @@ -86,7 +89,8 @@ void register(Cluster cluster, SubscriptionContext context, } } - Subscription register(String name, SubscriptionContext context, SnapshotWatcher watcher) { + Subscription register(String name, SubscriptionContext context, + SnapshotWatcher watcher) { if (closed) { return Subscription.noop(); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/CombineLatest2Stream.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest2Stream.java similarity index 96% rename from xds/src/main/java/com/linecorp/armeria/xds/CombineLatest2Stream.java rename to xds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest2Stream.java index c5bd75e82ea..98b3a195c61 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/CombineLatest2Stream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest2Stream.java @@ -14,11 +14,12 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.stream; import java.util.function.BiFunction; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.SnapshotWatcher; final class CombineLatest2Stream extends RefCountedStream { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/CombineLatest3Stream.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest3Stream.java similarity index 95% rename from xds/src/main/java/com/linecorp/armeria/xds/CombineLatest3Stream.java rename to xds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest3Stream.java index 7b2fa1e0556..ab1019418b4 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/CombineLatest3Stream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/CombineLatest3Stream.java @@ -14,17 +14,13 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.stream; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.SnapshotWatcher; final class CombineLatest3Stream extends RefCountedStream { - @FunctionalInterface - interface TriFunction { - O apply(A a, B b, C c); - } - private final SnapshotStream streamA; private final SnapshotStream streamB; private final SnapshotStream streamC; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/CombineNLatestStream.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/CombineNLatestStream.java similarity index 96% rename from xds/src/main/java/com/linecorp/armeria/xds/CombineNLatestStream.java rename to xds/src/main/java/com/linecorp/armeria/xds/stream/CombineNLatestStream.java index 9b4eb4cb925..ba473c4b34d 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/CombineNLatestStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/CombineNLatestStream.java @@ -14,13 +14,14 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.stream; import java.util.List; import com.google.common.collect.ImmutableList; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.SnapshotWatcher; final class CombineNLatestStream extends RefCountedStream> { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/MapStream.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/MapStream.java similarity index 93% rename from xds/src/main/java/com/linecorp/armeria/xds/MapStream.java rename to xds/src/main/java/com/linecorp/armeria/xds/stream/MapStream.java index 4ede7afbf36..25122bddf3a 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/MapStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/MapStream.java @@ -14,10 +14,12 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.stream; import java.util.function.Function; +import com.linecorp.armeria.xds.SnapshotWatcher; + final class MapStream extends RefCountedStream { private final SnapshotStream upstream; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RefCountedStream.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/RefCountedStream.java similarity index 64% rename from xds/src/main/java/com/linecorp/armeria/xds/RefCountedStream.java rename to xds/src/main/java/com/linecorp/armeria/xds/stream/RefCountedStream.java index 3831ebedda7..d8b105019d0 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RefCountedStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/RefCountedStream.java @@ -14,14 +14,27 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.stream; import java.util.LinkedHashSet; import java.util.Set; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.SnapshotWatcher; -abstract class RefCountedStream implements SnapshotStream { +/** + * A {@link SnapshotStream} that manages subscriber reference counting. + * The upstream subscription is started when the first watcher subscribes + * and stopped when the last watcher unsubscribes. + * + *

Subclasses implement {@link #onStart} to set up the upstream data source + * and optionally override {@link #onStop} for cleanup. + * + * @param the type of snapshot values delivered by this stream + */ +@UnstableApi +public abstract class RefCountedStream implements SnapshotStream { private final Set> watchers = new LinkedHashSet<>(); @@ -63,12 +76,25 @@ public final Subscription subscribe(SnapshotWatcher watcher) { }; } + /** + * Called when the first watcher subscribes. Implementations should set up the upstream + * data source and deliver values to the given watcher. + * + * @param watcher the watcher to deliver upstream values to + * @return a subscription to close when the upstream should be stopped + */ protected abstract Subscription onStart(SnapshotWatcher watcher); + /** + * Called when the last watcher unsubscribes. Override to perform cleanup. + */ protected void onStop() { } - protected void emit(@Nullable T value, @Nullable Throwable error) { + /** + * Emits a value or error to all current watchers. + */ + public final void emit(@Nullable T value, @Nullable Throwable error) { if (value != null) { latestValue = value; } @@ -81,7 +107,10 @@ protected void emit(@Nullable T value, @Nullable Throwable error) { } } - boolean hasWatchers() { + /** + * Returns whether this stream currently has any active watchers. + */ + public boolean hasWatchers() { return !watchers.isEmpty(); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java new file mode 100644 index 00000000000..7340a9888e7 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/SnapshotStream.java @@ -0,0 +1,182 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.stream; + +import static java.util.Objects.requireNonNull; + +import java.util.List; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Function; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.CheckReturnValue; + +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.SnapshotWatcher; + +/** + * A reactive stream that delivers snapshot values to {@link SnapshotWatcher} subscribers. + * Subscribers receive the latest value immediately upon subscription (if available) + * and subsequent updates as they occur. + * + *

This is a {@link FunctionalInterface} — custom streams can be created with a lambda + * that receives a watcher and returns a {@link Subscription}: + *

{@code
+ * SnapshotStream stream = watcher -> {
+ *     watcher.onUpdate("hello", null);
+ *     return Subscription.noop();
+ * };
+ * }
+ * + * @param the type of snapshot values delivered by this stream + */ +@UnstableApi +@FunctionalInterface +public interface SnapshotStream { + + /** + * Subscribes the given watcher to this stream. The watcher will receive the current + * value (if any) and all subsequent updates. + * + * @param watcher the watcher to receive snapshot updates + * @return a {@link Subscription} that can be closed to unsubscribe + */ + @CheckReturnValue + Subscription subscribe(SnapshotWatcher watcher); + + /** + * Returns a new stream that applies the given mapping function to each value + * emitted by this stream. + * + * @param mapper the function to apply to each emitted value + * @param the type of the mapped values + */ + default SnapshotStream map(Function mapper) { + requireNonNull(mapper, "mapper"); + return new MapStream<>(this, mapper); + } + + /** + * Returns a new stream that, for each value emitted by this stream, subscribes to + * the inner stream produced by the mapper and emits its values. When this stream + * emits a new value, the previous inner subscription is closed eagerly. + * + * @param mapper the function that produces an inner stream for each value + * @param the element type of the inner streams + * @param the type of the inner stream + */ + default > SnapshotStream switchMapEager( + Function mapper) { + requireNonNull(mapper, "mapper"); + return new SwitchMapEagerStream<>(this, mapper); + } + + /** + * Returns a stream that combines the latest values from all given streams into a list. + * The combined stream emits a new list whenever any source stream emits a new value, + * but only after all source streams have emitted at least one value. + * + * @param streams the source streams to combine + * @param the type of the source streams + * @param the element type of each source stream + */ + static , I> SnapshotStream> combineNLatest(List streams) { + requireNonNull(streams, "streams"); + return new CombineNLatestStream<>(ImmutableList.copyOf(streams)); + } + + /** + * Returns a stream that combines the latest values from two streams using the given + * combiner function. The combined stream emits a new value whenever either source + * stream emits, but only after both have emitted at least one value. + * + * @param a the first source stream + * @param b the second source stream + * @param combiner the function to combine the latest values + * @param
the type of the first stream's values + * @param the type of the second stream's values + * @param the type of the combined values + */ + static SnapshotStream combineLatest( + SnapshotStream a, + SnapshotStream b, + BiFunction combiner) { + requireNonNull(a, "a"); + requireNonNull(b, "b"); + requireNonNull(combiner, "combiner"); + return new CombineLatest2Stream<>(a, b, combiner); + } + + /** + * Returns a stream that combines the latest values from three streams using the given + * combiner function. The combined stream emits a new value whenever any source stream + * emits, but only after all three have emitted at least one value. + * + * @param a the first source stream + * @param b the second source stream + * @param c the third source stream + * @param combiner the function to combine the latest values + * @param the type of the first stream's values + * @param the type of the second stream's values + * @param the type of the third stream's values + * @param the type of the combined values + */ + static SnapshotStream combineLatest( + SnapshotStream a, + SnapshotStream b, + SnapshotStream c, + TriFunction combiner) { + requireNonNull(a, "a"); + requireNonNull(b, "b"); + requireNonNull(c, "c"); + requireNonNull(combiner, "combiner"); + return new CombineLatest3Stream<>(a, b, c, combiner); + } + + /** + * Returns a stream that immediately emits the given value to every subscriber. + * + * @param value the value to emit + * @param the type of the value + */ + static SnapshotStream just(T value) { + requireNonNull(value, "value"); + return new StaticSnapshotStream<>(value, null); + } + + /** + * Returns a stream that immediately emits an empty {@link Optional} to every subscriber. + * + * @param the element type of the optional + */ + @SuppressWarnings("unchecked") + static SnapshotStream> empty() { + return (SnapshotStream>) StaticSnapshotStream.EMPTY; + } + + /** + * Returns a stream that immediately emits the given error to every subscriber. + * + * @param error the error to emit + * @param the nominal value type + */ + static SnapshotStream error(Throwable error) { + requireNonNull(error, "error"); + return new StaticSnapshotStream<>(null, error); + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/StaticSnapshotStream.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/StaticSnapshotStream.java similarity index 94% rename from xds/src/main/java/com/linecorp/armeria/xds/StaticSnapshotStream.java rename to xds/src/main/java/com/linecorp/armeria/xds/stream/StaticSnapshotStream.java index 0e22398f6b4..68c63b1727d 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/StaticSnapshotStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/StaticSnapshotStream.java @@ -14,11 +14,12 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.stream; import java.util.Optional; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.SnapshotWatcher; final class StaticSnapshotStream implements SnapshotStream { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/stream/Subscription.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/Subscription.java new file mode 100644 index 00000000000..fe211f72ac8 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/Subscription.java @@ -0,0 +1,41 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.stream; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +/** + * A handle returned by {@link SnapshotStream#subscribe} that can be closed to + * cancel the subscription. Closing a subscription stops the subscriber from + * receiving further updates. + */ +@UnstableApi +@FunctionalInterface +public interface Subscription { + + /** + * Returns a no-op subscription that does nothing when closed. + */ + static Subscription noop() { + return () -> {}; + } + + /** + * Closes this subscription, stopping delivery of further updates to the subscriber. + */ + void close(); +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/SwitchMapEagerStream.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/SwitchMapEagerStream.java similarity index 96% rename from xds/src/main/java/com/linecorp/armeria/xds/SwitchMapEagerStream.java rename to xds/src/main/java/com/linecorp/armeria/xds/stream/SwitchMapEagerStream.java index b8de6c34611..96f973e2db1 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/SwitchMapEagerStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/SwitchMapEagerStream.java @@ -14,11 +14,12 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.stream; import java.util.function.Function; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.SnapshotWatcher; final class SwitchMapEagerStream extends RefCountedStream { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/stream/TriFunction.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/TriFunction.java new file mode 100644 index 00000000000..6c65f520ed2 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/TriFunction.java @@ -0,0 +1,36 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.stream; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +/** + * A function that accepts three arguments and produces a result. + * + * @param the type of the first argument + * @param the type of the second argument + * @param the type of the third argument + * @param the type of the result + */ +@UnstableApi +@FunctionalInterface +public interface TriFunction { + /** + * Applies this function to the given arguments. + */ + O apply(A a, B b, C c); +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/stream/package-info.java b/xds/src/main/java/com/linecorp/armeria/xds/stream/package-info.java new file mode 100644 index 00000000000..2d6ce72e38c --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/stream/package-info.java @@ -0,0 +1,25 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Reactive stream infrastructure for xDS snapshot delivery. + */ +@NonNullByDefault +@UnstableApi +package com.linecorp.armeria.xds.stream; + +import com.linecorp.armeria.common.annotation.NonNullByDefault; +import com.linecorp.armeria.common.annotation.UnstableApi; diff --git a/xds/src/test/java/com/linecorp/armeria/xds/CombineNLatestStreamTest.java b/xds/src/test/java/com/linecorp/armeria/xds/stream/CombineNLatestStreamTest.java similarity index 90% rename from xds/src/test/java/com/linecorp/armeria/xds/CombineNLatestStreamTest.java rename to xds/src/test/java/com/linecorp/armeria/xds/stream/CombineNLatestStreamTest.java index 7741334c25d..802b61d3279 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/CombineNLatestStreamTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/stream/CombineNLatestStreamTest.java @@ -14,7 +14,7 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.stream; import static org.assertj.core.api.Assertions.assertThat; @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList; -import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.SnapshotWatcher; @SuppressWarnings("CheckReturnValue") class CombineNLatestStreamTest { @@ -70,10 +70,5 @@ static final class TestStream extends RefCountedStream { protected Subscription onStart(SnapshotWatcher watcher) { return Subscription.noop(); } - - @Override - public void emit(@Nullable T value, @Nullable Throwable error) { - super.emit(value, error); - } } } diff --git a/xds/src/test/java/com/linecorp/armeria/xds/RefCountedStreamTest.java b/xds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java similarity index 97% rename from xds/src/test/java/com/linecorp/armeria/xds/RefCountedStreamTest.java rename to xds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java index a5de7541a49..8ab8891b080 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/RefCountedStreamTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/stream/RefCountedStreamTest.java @@ -14,7 +14,7 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -28,8 +28,7 @@ import org.junit.jupiter.api.Test; -import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.xds.SnapshotStream.Subscription; +import com.linecorp.armeria.xds.SnapshotWatcher; class RefCountedStreamTest { @@ -323,10 +322,5 @@ protected Subscription onStart(SnapshotWatcher watcher) { protected void onStop() { onStopCallback.run(); } - - @Override - protected void emit(@Nullable String value, @Nullable Throwable error) { - super.emit(value, error); - } } } diff --git a/xds/src/test/java/com/linecorp/armeria/xds/StreamSwitchMapEagerTest.java b/xds/src/test/java/com/linecorp/armeria/xds/stream/StreamSwitchMapEagerTest.java similarity index 97% rename from xds/src/test/java/com/linecorp/armeria/xds/StreamSwitchMapEagerTest.java rename to xds/src/test/java/com/linecorp/armeria/xds/stream/StreamSwitchMapEagerTest.java index fc094b9be62..2dd4a3ec2e2 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/StreamSwitchMapEagerTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/stream/StreamSwitchMapEagerTest.java @@ -14,7 +14,7 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.stream; import static org.assertj.core.api.Assertions.assertThat; @@ -29,8 +29,7 @@ import org.junit.jupiter.api.Test; -import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.xds.SnapshotStream.Subscription; +import com.linecorp.armeria.xds.SnapshotWatcher; @SuppressWarnings("CheckReturnValue") class StreamSwitchMapEagerTest { @@ -383,10 +382,5 @@ static class TestStream extends RefCountedStream { protected Subscription onStart(SnapshotWatcher watcher) { return onStartFunction.apply(watcher); } - - @Override - protected void emit(@Nullable T value, @Nullable Throwable error) { - super.emit(value, error); - } } }