Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.common.util.CertificateUtil;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.xds.SnapshotStream.Subscription;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.cluster.v3.Cluster;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.client.endpoint.XdsLoadBalancer;
import com.linecorp.armeria.xds.client.endpoint.XdsLoadBalancerFactory;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import com.linecorp.armeria.common.Cancellable;
import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.file.PathWatcher;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.core.v3.DataSource;
import io.envoyproxy.envoy.config.core.v3.DataSource.SpecifierCase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.linecorp.armeria.xds;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.core.v3.ConfigSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Map;

import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.xds.SnapshotStream.Subscription;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap;
import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap.StaticResources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.xds.SnapshotStream.Subscription;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.listener.v3.Listener;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import static com.linecorp.armeria.xds.XdsType.LISTENER;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.core.v3.ConfigSource;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.stream.SnapshotStream;

import io.envoyproxy.envoy.config.core.v3.ConfigSource;
import io.envoyproxy.envoy.config.core.v3.TransportSocket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.linecorp.armeria.xds;

import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.core.v3.ConfigSource;

final class ResourceNodeAdapter<T extends XdsResource> extends RefCountedStream<T> implements ResourceNode<T> {
Expand Down Expand Up @@ -59,13 +62,13 @@ public void onChanged(T update) {

@Override
public void onError(XdsType type, String resourceName, Throwable t) {
resourceNodeMeterBinder.onError(type, resourceName, t);
resourceNodeMeterBinder.onError();
emit(null, XdsResourceException.maybeWrap(type, resourceName, t));
}

@Override
public void onResourceDoesNotExist(XdsType type, String resourceName) {
resourceNodeMeterBinder.onResourceDoesNotExist(type, resourceName);
resourceNodeMeterBinder.onResourceDoesNotExist();
emit(null, new MissingXdsResourceException(type, resourceName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ ResourceNodeMeterBinder acquire(XdsType type, String resourceName) {
* This is not done at the user-exposed {@link SnapshotWatcher} level so that users can
* observe the internal state/lifecycle of {@link ResourceNode}s via metrics.
*/
final class ResourceNodeMeterBinder implements ResourceWatcher<XdsResource> {
final class ResourceNodeMeterBinder {

private final Key key;
private boolean closed;
Expand Down Expand Up @@ -98,18 +98,15 @@ void close() {
}
}

@Override
public void onError(XdsType type, String resourceName, Throwable t) {
void onError() {
errorCounter.increment();
}

@Override
public void onResourceDoesNotExist(XdsType type, String resourceName) {
void onResourceDoesNotExist() {
missingCounter.increment();
}

@Override
public void onChanged(XdsResource update) {
void onChanged(XdsResource update) {
updatedRevision.set(update.revision());
}
}
Expand Down
3 changes: 3 additions & 0 deletions xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.core.v3.ConfigSource;
import io.envoyproxy.envoy.config.route.v3.Route;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package com.linecorp.armeria.xds;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.core.v3.ConfigSource;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig;
Expand Down
85 changes: 0 additions & 85 deletions xds/src/main/java/com/linecorp/armeria/xds/SnapshotStream.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.xds.stream.SnapshotStream;

/**
* A watcher implementation which waits for updates on an xDS snapshot.
* A callback interface for receiving snapshot updates from a {@link SnapshotStream}.
*
* @param <T> the type of snapshot values received by this watcher
*/
@UnstableApi
@FunctionalInterface
public interface SnapshotWatcher<T> {

/**
* Invoked when a snapshot is updated or an error occurs.
* Either snapshot or error will be non-null.
* Exactly one of {@code snapshot} or {@code error} will be non-null.
*
* @param snapshot the updated snapshot value, or {@code null} if an error occurred
* @param error the error, or {@code null} if a snapshot was delivered
*/
void onUpdate(@Nullable T snapshot, @Nullable Throwable t);
void onUpdate(@Nullable T snapshot, @Nullable Throwable error);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.google.protobuf.ByteString;

import com.linecorp.armeria.common.TlsKeyPair;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.core.v3.WatchedDirectory;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.TlsCertificate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.linecorp.armeria.xds;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.stream.SnapshotStream;

import io.envoyproxy.envoy.config.core.v3.ConfigSource;
import io.envoyproxy.envoy.config.core.v3.TransportSocket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package com.linecorp.armeria.xds;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.core.v3.ConfigSource;
import io.envoyproxy.envoy.config.core.v3.TransportSocket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.stream.SnapshotStream;

import io.envoyproxy.envoy.config.core.v3.ConfigSource;
import io.envoyproxy.envoy.config.core.v3.TransportSocket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@

import com.linecorp.armeria.common.metric.MeterIdPrefix;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.xds.SnapshotStream.Subscription;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
Expand Down Expand Up @@ -73,7 +74,9 @@ void register(Cluster cluster, SubscriptionContext context,
List<SnapshotWatcher<? super ClusterSnapshot>> watchers) {
checkArgument(!nodes.containsKey(cluster.getName()),
"Cluster with name '%s' already registered", cluster.getName());
final ClusterStream node = new ClusterStream(new ClusterXdsResource(cluster), context,
final ClusterXdsResource resource =
ClusterResourceParser.INSTANCE.parse(cluster, context.extensionRegistry(), "");
final ClusterStream node = new ClusterStream(resource, context,
loadBalancerFactoryPool);
nodes.put(cluster.getName(), node);
for (SnapshotWatcher<? super ClusterSnapshot> watcher : watchers) {
Expand All @@ -86,7 +89,8 @@ void register(Cluster cluster, SubscriptionContext context,
}
}

Subscription register(String name, SubscriptionContext context, SnapshotWatcher<ClusterSnapshot> watcher) {
Subscription register(String name, SubscriptionContext context,
SnapshotWatcher<? super ClusterSnapshot> watcher) {
if (closed) {
return Subscription.noop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* under the License.
*/

package com.linecorp.armeria.xds;
package com.linecorp.armeria.xds.stream;

import java.util.function.BiFunction;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.SnapshotWatcher;

final class CombineLatest2Stream<A, B, O> extends RefCountedStream<O> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,13 @@
* under the License.
*/

package com.linecorp.armeria.xds;
package com.linecorp.armeria.xds.stream;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.SnapshotWatcher;

final class CombineLatest3Stream<A, B, C, O> extends RefCountedStream<O> {

@FunctionalInterface
interface TriFunction<A, B, C, O> {
O apply(A a, B b, C c);
}

private final SnapshotStream<A> streamA;
private final SnapshotStream<B> streamB;
private final SnapshotStream<C> streamC;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
* under the License.
*/

package com.linecorp.armeria.xds;
package com.linecorp.armeria.xds.stream;

import java.util.List;

import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.SnapshotWatcher;

final class CombineNLatestStream<T> extends RefCountedStream<List<T>> {

Expand Down
Loading
Loading