Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions xds/src/main/java/com/linecorp/armeria/xds/AdsXdsStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@

import com.linecorp.armeria.client.retry.Backoff;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.configsource.InterestedResources;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.grpc.Status;
import io.netty.util.concurrent.EventExecutor;

final class AdsXdsStream implements XdsStream {
final class AdsXdsStream extends RefCountedStream<ParsedResources> implements XdsStream {

interface ActualStream {
void closeStream();
Expand All @@ -46,6 +50,7 @@ interface ActualStreamFactory {
private final StateCoordinator stateCoordinator;
private final ConfigSourceLifecycleObserver lifecycleObserver;
private final Set<XdsType> targetTypes;
private final SnapshotStream<InterestedResources> interestStream;

StateCoordinator stateCoordinator() {
return stateCoordinator;
Expand All @@ -58,13 +63,14 @@ StateCoordinator stateCoordinator() {

AdsXdsStream(ActualStreamFactory factory, Backoff backoff, EventExecutor eventLoop,
StateCoordinator stateCoordinator, ConfigSourceLifecycleObserver lifecycleObserver,
Set<XdsType> targetTypes) {
Set<XdsType> targetTypes, SnapshotStream<InterestedResources> interestStream) {
this.factory = requireNonNull(factory, "factory");
this.backoff = requireNonNull(backoff, "backoff");
this.eventLoop = requireNonNull(eventLoop, "eventLoop");
this.stateCoordinator = requireNonNull(stateCoordinator, "stateCoordinator");
this.lifecycleObserver = requireNonNull(lifecycleObserver, "lifecycleObserver");
this.targetTypes = requireNonNull(targetTypes, "targetTypes");
this.interestStream = interestStream;
}

void stop() {
Expand All @@ -86,14 +92,18 @@ void stop(Throwable throwable) {
}

@Override
public void close() {
stop();
lifecycleObserver.close();
}

@Override
public void resourcesUpdated(XdsType type) {
ensureStream().resourcesUpdated(type);
protected Subscription onStart(SnapshotWatcher<ParsedResources> watcher) {
final Subscription subscription = interestStream.subscribe((snapshot, error) -> {
assert snapshot != null;
if (targetTypes.contains(snapshot.type())) {
ensureStream().resourcesUpdated(snapshot.type());
}
});
return () -> {
subscription.close();
stop();
lifecycleObserver.close();
};
}

void retryOrClose(boolean closedByError) {
Expand Down Expand Up @@ -127,7 +137,7 @@ private void reset() {
}
for (XdsType targetType : targetTypes) {
if (!stateCoordinator.interestedResources(targetType).isEmpty()) {
resourcesUpdated(targetType);
ensureStream().resourcesUpdated(targetType);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2026 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.SafeCloseable;

import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ScheduledFuture;

/**
* A composite watcher that fans out resource updates to multiple {@link SnapshotWatcher}s.
* Also manages the absent-on-timeout timer for SotW protocols.
*/
class CompositeSnapshotWatcher<T extends XdsResource> implements SnapshotWatcher<T>, SafeCloseable {

private static final Logger logger = LoggerFactory.getLogger(CompositeSnapshotWatcher.class);

private final XdsType type;
private final String resource;
@Nullable
private ScheduledFuture<?> initialAbsentFuture;
private final Set<SnapshotWatcher<? super T>> watchers = new CopyOnWriteArraySet<>();

CompositeSnapshotWatcher(XdsType type, String resource, EventExecutor eventLoop, long timeoutMillis,
boolean enableAbsentOnTimeout) {
this.type = type;
this.resource = resource;

if (enableAbsentOnTimeout) {
initialAbsentFuture = eventLoop.schedule(() -> {
initialAbsentFuture = null;
onUpdate(null, new MissingXdsResourceException(type, resource));
}, timeoutMillis, TimeUnit.MILLISECONDS);
}
}

private void maybeCancelAbsentTimer() {
if (initialAbsentFuture != null && initialAbsentFuture.isCancellable()) {
initialAbsentFuture.cancel(false);
initialAbsentFuture = null;
}
}

@Override
public void close() {
maybeCancelAbsentTimer();
}

void addWatcher(SnapshotWatcher<? super T> watcher) {
watchers.add(watcher);
}

void removeWatcher(SnapshotWatcher<? super T> watcher) {
watchers.remove(watcher);
}

@Override
public void onUpdate(@Nullable T value, @Nullable Throwable error) {
maybeCancelAbsentTimer();
for (SnapshotWatcher<? super T> watcher : watchers) {
try {
watcher.onUpdate(value, error);
} catch (Exception e) {
logger.warn("Unexpected exception while invoking onUpdate() with ({}, {}).",
type, resource, e);
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

boolean isEmpty() {
return watchers.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

package com.linecorp.armeria.xds;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import com.google.common.collect.ImmutableMap;

final class CompositeXdsStream implements XdsStream {
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.Subscription;

final class CompositeXdsStream extends RefCountedStream<ParsedResources> implements XdsStream {

private final Map<XdsType, XdsStream> streamMap;

Expand All @@ -34,14 +39,13 @@ final class CompositeXdsStream implements XdsStream {
}

@Override
public void close() {
streamMap.values().forEach(XdsStream::close);
}

@Override
public void resourcesUpdated(XdsType type) {
final XdsStream stream = streamMap.get(type);
assert stream != null;
stream.resourcesUpdated(type);
protected Subscription onStart(SnapshotWatcher<ParsedResources> watcher) {
final List<Subscription> subscriptions = new ArrayList<>();
for (XdsStream stream : streamMap.values()) {
subscriptions.add(stream.subscribe(watcher));
}
return () -> {
subscriptions.forEach(Subscription::close);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,123 @@
package com.linecorp.armeria.xds;

import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.xds.configsource.InterestedResources;
import com.linecorp.armeria.xds.stream.SnapshotStream;
import com.linecorp.armeria.xds.stream.Subscription;

import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;

final class ConfigSourceHandler implements SafeCloseable {

private final StateCoordinator stateCoordinator;
private final ConfigSourceSubscription stream;
private final InterestPublisher interestPublisher;
private final Subscription subscription;

static ConfigSourceHandler of(StateCoordinator stateCoordinator, InterestPublisher interestPublisher,
SnapshotStream<DiscoveryResponse> stream,
SnapshotWatcher<Object> defaultWatcher) {
return new ConfigSourceHandler(
stateCoordinator, interestPublisher,
stream.map(res -> parseResponse(res, stateCoordinator.extensionRegistry())),
defaultWatcher);
}

ConfigSourceHandler(StateCoordinator stateCoordinator, ConfigSourceSubscription stream) {
ConfigSourceHandler(StateCoordinator stateCoordinator, InterestPublisher interestPublisher,
SnapshotStream<ParsedResources> stream, SnapshotWatcher<Object> defaultWatcher) {
this.stateCoordinator = stateCoordinator;
this.stream = stream;
this.interestPublisher = interestPublisher;
subscription = stream.subscribe((parsed, error) -> {
if (error != null) {
defaultWatcher.onUpdate(null, error);
return;
}
if (parsed instanceof ParsedResources.SotwParsedResources) {
apply((ParsedResources.SotwParsedResources) parsed);
} else {
assert parsed instanceof ParsedResources.DeltaParsedResources;
apply((ParsedResources.DeltaParsedResources) parsed);
}
});
}

private void apply(ParsedResources.SotwParsedResources sotw) {
if (!sotw.invalidResources().isEmpty()) {
sotw.invalidResources().forEach(
(name, cause) -> stateCoordinator.onResourceError(sotw.type(), name, cause));
return;
}
final XdsType type = sotw.type();
applyUpdatesAndErrors(sotw);
if (sotw.isFullStateOfTheWorld()) {
for (String name : stateCoordinator.activeResources(type)) {
if (!sotw.parsedResources().containsKey(name)) {
stateCoordinator.onResourceMissing(type, name);
}
}
}
}

private void apply(ParsedResources.DeltaParsedResources delta) {
if (!delta.invalidResources().isEmpty()) {
delta.invalidResources().forEach(
(name, cause) -> stateCoordinator.onResourceError(delta.type(), name, cause));
return;
}
applyUpdatesAndErrors(delta);
delta.removed().forEach(name -> stateCoordinator.onResourceMissing(delta.type(), name));
}

void addSubscriber(XdsType type, String resourceName, ResourceWatcher<?> watcher) {
private void applyUpdatesAndErrors(ParsedResources parsed) {
final XdsType type = parsed.type();
parsed.parsedResources().forEach((name, resource) -> {
if (resource instanceof XdsResource) {
stateCoordinator.onResourceUpdated(type, name, (XdsResource) resource);
}
});
parsed.invalidResources().forEach((name, cause) ->
stateCoordinator.onResourceError(type, name, cause));
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

void addSubscriber(XdsType type, String resourceName, SnapshotWatcher<? extends XdsResource> watcher) {
if (stateCoordinator.register(type, resourceName, watcher)) {
stream.updateInterests(type, stateCoordinator.interestedResources(type));
interestPublisher.publish(
new InterestedResources(type, stateCoordinator.interestedResources(type)));
}
}

boolean removeSubscriber(XdsType type, String resourceName, ResourceWatcher<?> watcher) {
boolean removeSubscriber(XdsType type, String resourceName,
SnapshotWatcher<? extends XdsResource> watcher) {
if (stateCoordinator.unregister(type, resourceName, watcher)) {
stream.updateInterests(type, stateCoordinator.interestedResources(type));
interestPublisher.publish(
new InterestedResources(type, stateCoordinator.interestedResources(type)));
}
return stateCoordinator.hasNoSubscribers();
}

@Override
public void close() {
try {
stream.close();
subscription.close();
} finally {
stateCoordinator.close();
}
}

private static ParsedResources parseResponse(DiscoveryResponse response, XdsExtensionRegistry registry) {
final String typeUrl = response.getTypeUrl();
final ResourceParser<?, ?> parser = XdsResourceParserUtil.fromTypeUrl(typeUrl);
if (parser == null) {
throw new IllegalArgumentException("Unknown type URL in discovery response: " + typeUrl);
}

final ParsedResources.SotwParsedResources holder =
parser.parseResources(response.getResourcesList(), registry, response.getVersionInfo());

if (!holder.errors().isEmpty()) {
throw new IllegalArgumentException(
"Failed to parse resource(s) from discovery response: " + holder);
}

return holder;
}
}

This file was deleted.

Loading
Loading