Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2026 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.SafeCloseable;

import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ScheduledFuture;

/**
* A composite watcher that fans out resource updates to multiple {@link SnapshotWatcher}s.
* Also manages the absent-on-timeout timer for SotW protocols.
*/
class CompositeSnapshotWatcher<T extends XdsResource> implements SnapshotWatcher<T>, SafeCloseable {

private static final Logger logger = LoggerFactory.getLogger(CompositeSnapshotWatcher.class);

private final XdsType type;
private final String resource;
@Nullable
private ScheduledFuture<?> initialAbsentFuture;
private final Set<SnapshotWatcher<? super T>> watchers = new HashSet<>();

CompositeSnapshotWatcher(XdsType type, String resource, EventExecutor eventLoop, long timeoutMillis,
boolean enableAbsentOnTimeout) {
this.type = type;
this.resource = resource;

if (enableAbsentOnTimeout) {
initialAbsentFuture = eventLoop.schedule(() -> {
initialAbsentFuture = null;
onUpdate(null, new MissingXdsResourceException(type, resource));
}, timeoutMillis, TimeUnit.MILLISECONDS);
}
}

private void maybeCancelAbsentTimer() {
if (initialAbsentFuture != null && initialAbsentFuture.isCancellable()) {
initialAbsentFuture.cancel(false);
initialAbsentFuture = null;
}
}

@Override
public void close() {
maybeCancelAbsentTimer();
}

void addWatcher(SnapshotWatcher<? super T> watcher) {
watchers.add(watcher);
}

void removeWatcher(SnapshotWatcher<? super T> watcher) {
watchers.remove(watcher);
}

@Override
public void onUpdate(@Nullable T value, @Nullable Throwable error) {
maybeCancelAbsentTimer();
for (SnapshotWatcher<? super T> watcher : watchers) {
try {
watcher.onUpdate(value, error);
} catch (Exception e) {
logger.warn("Unexpected exception while invoking onUpdate() with ({}, {}).",
type, resource, e);
}
}
}

boolean isEmpty() {
return watchers.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ final class ConfigSourceHandler implements SafeCloseable {
this.stream = stream;
}

void addSubscriber(XdsType type, String resourceName, ResourceWatcher<?> watcher) {
void addSubscriber(XdsType type, String resourceName, SnapshotWatcher<? extends XdsResource> watcher) {
if (stateCoordinator.register(type, resourceName, watcher)) {
stream.updateInterests(type, stateCoordinator.interestedResources(type));
}
}

boolean removeSubscriber(XdsType type, String resourceName, ResourceWatcher<?> watcher) {
boolean removeSubscriber(XdsType type, String resourceName,
SnapshotWatcher<? extends XdsResource> watcher) {
if (stateCoordinator.unregister(type, resourceName, watcher)) {
stream.updateInterests(type, stateCoordinator.interestedResources(type));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*
* @param <T> the type of the current {@link XdsResource}
*/
interface ResourceNode<T extends XdsResource> extends ResourceWatcher<T> {
interface ResourceNode<T extends XdsResource> extends SnapshotWatcher<T> {

@Nullable
ConfigSource configSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.armeria.xds;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.stream.RefCountedStream;
import com.linecorp.armeria.xds.stream.Subscription;

Expand Down Expand Up @@ -55,21 +56,15 @@ public String name() {
}

@Override
public void onChanged(T update) {
resourceNodeMeterBinder.onChanged(update);
emit(update, null);
}

@Override
public void onError(XdsType type, String resourceName, Throwable t) {
resourceNodeMeterBinder.onError();
emit(null, XdsResourceException.maybeWrap(type, resourceName, t));
}

@Override
public void onResourceDoesNotExist(XdsType type, String resourceName) {
resourceNodeMeterBinder.onResourceDoesNotExist();
emit(null, new MissingXdsResourceException(type, resourceName));
public void onUpdate(@Nullable T value, @Nullable Throwable error) {
if (value != null) {
resourceNodeMeterBinder.onChanged(value);
} else if (error instanceof MissingXdsResourceException) {
resourceNodeMeterBinder.onResourceDoesNotExist();
} else if (error != null) {
resourceNodeMeterBinder.onError();
}
emit(value, error);
}

@Override
Expand Down
32 changes: 0 additions & 32 deletions xds/src/main/java/com/linecorp/armeria/xds/ResourceWatcher.java

This file was deleted.

35 changes: 16 additions & 19 deletions xds/src/main/java/com/linecorp/armeria/xds/StateCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.SafeCloseable;

import io.envoyproxy.envoy.config.core.v3.ConfigSource;
Expand Down Expand Up @@ -70,13 +69,15 @@ private static long initialFetchTimeoutMillis(ConfigSource configSource) {
return epochMilli;
}

<T extends XdsResource> boolean register(XdsType type, String resourceName, ResourceWatcher<T> watcher) {
<T extends XdsResource> boolean register(XdsType type, String resourceName,
SnapshotWatcher<T> watcher) {
final boolean updated = subscriberStorage.register(type, resourceName, watcher);
replayToWatcher(type, resourceName, watcher);
return updated;
}

<T extends XdsResource> boolean unregister(XdsType type, String resourceName, ResourceWatcher<T> watcher) {
<T extends XdsResource> boolean unregister(XdsType type, String resourceName,
SnapshotWatcher<T> watcher) {
return subscriberStorage.unregister(type, resourceName, watcher);
}

Expand All @@ -101,40 +102,36 @@ void onResourceUpdated(XdsType type, String resourceName, XdsResource resource)
if (revised == null) {
return;
}
final XdsStreamSubscriber<XdsResource> subscriber = subscriber(type, resourceName);
final CompositeSnapshotWatcher<XdsResource> subscriber =
subscriberStorage.subscriber(type, resourceName);
if (subscriber != null) {
subscriber.onData(revised);
subscriber.onUpdate(revised, null);
}
}

void onResourceMissing(XdsType type, String resourceName) {
if (!stateStore.remove(type, resourceName)) {
return;
}
final XdsStreamSubscriber<?> subscriber = subscriber(type, resourceName);
final CompositeSnapshotWatcher<?> subscriber = subscriberStorage.subscriber(type, resourceName);
if (subscriber != null) {
subscriber.onAbsent();
subscriber.onUpdate(null, new MissingXdsResourceException(type, resourceName));
}
}

void onResourceError(XdsType type, String resourceName, Throwable cause) {
final XdsStreamSubscriber<?> subscriber = subscriber(type, resourceName);
final CompositeSnapshotWatcher<?> subscriber = subscriberStorage.subscriber(type, resourceName);
if (subscriber != null) {
subscriber.onError(resourceName, cause);
subscriber.onUpdate(null, XdsResourceException.maybeWrap(type, resourceName, cause));
}
}

@Nullable
private <T extends XdsResource> XdsStreamSubscriber<T> subscriber(XdsType type, String resourceName) {
return subscriberStorage.subscriber(type, resourceName);
}

private <T extends XdsResource> void replayToWatcher(XdsType type, String resourceName,
ResourceWatcher<T> watcher) {
final XdsResource resource = stateStore.resource(type, resourceName);
if (resource != null) {
//noinspection unchecked
watcher.onChanged((T) resource);
SnapshotWatcher<T> watcher) {
@SuppressWarnings("unchecked")
final T cached = (T) stateStore.resource(type, resourceName);
if (cached != null) {
watcher.onUpdate(cached, null);
}
}

Expand Down
26 changes: 13 additions & 13 deletions xds/src/main/java/com/linecorp/armeria/xds/SubscriberStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ final class SubscriberStorage implements SafeCloseable {
private final EventExecutor eventLoop;
private final long timeoutMillis;
private final boolean delta;
private final Map<XdsType, Map<String, XdsStreamSubscriber<?>>> subscriberMap =
private final Map<XdsType, Map<String, CompositeSnapshotWatcher<?>>> subscriberMap =
new EnumMap<>(XdsType.class);

SubscriberStorage(EventExecutor eventLoop, long timeoutMillis, boolean delta) {
Expand All @@ -45,37 +45,37 @@ final class SubscriberStorage implements SafeCloseable {
/**
* Returns {@code true} if a new subscriber is added.
*/
<T extends XdsResource> boolean register(XdsType type, String resourceName, ResourceWatcher<T> watcher) {
<T extends XdsResource> boolean register(XdsType type, String resourceName, SnapshotWatcher<T> watcher) {
//noinspection unchecked
XdsStreamSubscriber<T> subscriber = (XdsStreamSubscriber<T>) subscriberMap.computeIfAbsent(
CompositeSnapshotWatcher<T> subscriber = (CompositeSnapshotWatcher<T>) subscriberMap.computeIfAbsent(
type, key -> new HashMap<>()).get(resourceName);
boolean updated = false;
if (subscriber == null) {
final boolean enableAbsentOnTimeout = !delta && timeoutMillis > 0;
subscriber = new XdsStreamSubscriber<>(type, resourceName, eventLoop, timeoutMillis,
enableAbsentOnTimeout);
subscriber = new CompositeSnapshotWatcher<>(type, resourceName, eventLoop, timeoutMillis,
enableAbsentOnTimeout);
subscriberMap.get(type).put(resourceName, subscriber);
updated = true;
}
subscriber.registerWatcher(watcher);
subscriber.addWatcher(watcher);
return updated;
}

/**
* Returns {@code true} if a subscriber is removed.
*/
<T extends XdsResource> boolean unregister(XdsType type, String resourceName, ResourceWatcher<T> watcher) {
<T extends XdsResource> boolean unregister(XdsType type, String resourceName, SnapshotWatcher<T> watcher) {
if (!subscriberMap.containsKey(type)) {
return false;
}
final Map<String, XdsStreamSubscriber<?>> resourceToSubscriber = subscriberMap.get(type);
final Map<String, CompositeSnapshotWatcher<?>> resourceToSubscriber = subscriberMap.get(type);
if (!resourceToSubscriber.containsKey(resourceName)) {
return false;
}
//noinspection unchecked
final XdsStreamSubscriber<T> subscriber =
(XdsStreamSubscriber<T>) resourceToSubscriber.get(resourceName);
subscriber.unregisterWatcher(watcher);
final CompositeSnapshotWatcher<T> subscriber =
(CompositeSnapshotWatcher<T>) resourceToSubscriber.get(resourceName);
subscriber.removeWatcher(watcher);
if (subscriber.isEmpty()) {
resourceToSubscriber.remove(resourceName);
subscriber.close();
Expand All @@ -88,7 +88,7 @@ <T extends XdsResource> boolean unregister(XdsType type, String resourceName, Re
}

@Nullable
<T extends XdsResource> XdsStreamSubscriber<T> subscriber(XdsType type, String resourceName) {
<T extends XdsResource> CompositeSnapshotWatcher<T> subscriber(XdsType type, String resourceName) {
return unsafeCast(subscriberMap.getOrDefault(type, ImmutableMap.of()).get(resourceName));
}

Expand All @@ -109,7 +109,7 @@ boolean hasNoSubscribers() {
@Override
public void close() {
subscriberMap.values().forEach(subscribers -> {
subscribers.values().forEach(XdsStreamSubscriber::close);
subscribers.values().forEach(CompositeSnapshotWatcher::close);
});
subscriberMap.clear();
}
Expand Down
Loading
Loading