Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
private volatile List<InetSocketAddress> roundRobinAddresses;
private volatile Uri roundRobinBaseUri;
private volatile ScramContext scramContext;
// Memoized base (host/scheme/port) partition key; see basePartitionKey(). proxyServer is final and

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment lists "pool poll/offer" as sites that use this memoized key, but pollPooledChannel never calls basePartitionKey(). It computes its own key from the request parameter, and that is intentional. On the filter replay path (replayRequestsendNextRequest), a filter can replace the request without setTargetRequest() ever being called, so the future's memoized key may still correspond to the original target.

If someone later "fixes" pollPooledChannel to use basePartitionKey() based on this comment, a replayed request could poll a pooled connection using the wrong host's key. Could you reword the comment to mention only the actual consumers (semaphore acquisition, pool offer, and HTTP/2 registration)? It would also be worth adding a short note in pollPooledChannel explaining that it intentionally derives the key from the current request.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java:451

After this change, poll(Uri, String, ProxyServer, ChannelPoolPartitioning) has no remaining internal callers. Since ChannelManager is a public class, we shouldn't remove it, but it would be good to mark it @Deprecated and add Javadoc pointing callers to poll(Object) instead. That helps prevent it from being reintroduced internally and sets it up for removal in the next major release.

The same could be considered for pollHttp2(Uri, ...) at line 446 once its remaining internal caller (NettyRequestSender around line 1162, in the waitForHttp2Connection path) has been migrated to pollHttp2Connection(Object).

// targetRequest is its only other input, so this is invalidated only by setTargetRequest.
private volatile Object basePartitionKeyCache;

public NettyResponseFuture(Request originalRequest,
AsyncHandler<V> asyncHandler,
Expand Down Expand Up @@ -370,6 +373,9 @@ public Request getTargetRequest() {

public void setTargetRequest(Request targetRequest) {
this.targetRequest = targetRequest;
// Invalidate the memoized base partition key: a redirect/retry may target a different
// host/scheme/port, which changes the key.
basePartitionKeyCache = null;
}

public Request getCurrentRequest() {
Expand Down Expand Up @@ -552,8 +558,16 @@ public Object getPartitionKey() {
* initially selected.
*/
public Object basePartitionKey() {
return connectionPoolPartitioning.getPartitionKey(targetRequest.getUri(), targetRequest.getVirtualHost(),
proxyServer);
// Memoized: the same key is needed at several sites per request attempt (semaphore acquire, pool
// poll/offer, HTTP/2 registration). It depends only on targetRequest (host/scheme/port + virtualHost)
// and the final proxyServer, so it is recomputed only when setTargetRequest changes the target.
Object key = basePartitionKeyCache;
if (key == null) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bug today, since all reads and writes to this cache are serialized by the one-at-a-time request lifecycle. However, the lazy check-then-store pattern only remains safe as long as that invariant holds.

Computing the key eagerly in the constructor and in setTargetRequest(), and making basePartitionKey() a simple read, would provide the same allocation savings without introducing a reader-side write. It also removes any concurrency considerations if the threading model changes in the future.

key = connectionPoolPartitioning.getPartitionKey(targetRequest.getUri(), targetRequest.getVirtualHost(),
proxyServer);
basePartitionKeyCache = key;
}
return key;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,15 +1205,18 @@ private Channel pollPooledChannel(NettyResponseFuture<?> future, Request request
// connection would send the WS handshake as a plain HTTP/2 request and the WebSocket handler would
// receive raw frames ("Invalid message ... AdaptiveByteBuf"). Fall through to an HTTP/1.1 connection.
// See Issue #2160.
// Compute the base partition key once and reuse it for both the HTTP/2 registry poll and the
// HTTP/1.1 pool poll, instead of recomputing (and re-allocating) it inside each channelManager call.
Object partitionKey = request.getChannelPoolPartitioning().getPartitionKey(uri, virtualHost, proxy);
if (!uri.isWebSocket()) {
Channel h2Channel = channelManager.pollHttp2(uri, virtualHost, proxy, request.getChannelPoolPartitioning());
Channel h2Channel = channelManager.pollHttp2Connection(partitionKey);
if (h2Channel != null) {
LOGGER.debug("Using HTTP/2 multiplexed Channel '{}' for '{}' to '{}'", h2Channel, request.getMethod(), uri);
return h2Channel;
}
}

final Channel channel = channelManager.poll(uri, virtualHost, proxy, request.getChannelPoolPartitioning());
final Channel channel = channelManager.poll(partitionKey);

if (channel != null) {
LOGGER.debug("Using pooled Channel '{}' for '{}' to '{}'", channel, request.getMethod(), uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@

import io.github.artsok.RepeatedIfExceptionsTest;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.Request;
import org.asynchttpclient.channel.ChannelPoolPartitioning;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

import static org.asynchttpclient.Dsl.get;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -90,4 +96,29 @@ public void testGetThrowsExceptionOnAbort() throws Exception {
assertThrows(ExecutionException.class, () -> nettyResponseFuture.get(),
"An ExecutionException must have occurred by now as 'abort' was called before 'get'");
}

@Test
public void basePartitionKeyIsMemoizedAndInvalidatedOnTargetChange() {
AsyncHandler<?> asyncHandler = mock(AsyncHandler.class);
Request reqA = get("http://hosta.example/").build();
ChannelPoolPartitioning partitioning = reqA.getChannelPoolPartitioning();
NettyResponseFuture<?> future = new NettyResponseFuture<>(reqA, asyncHandler, null, 3, partitioning, null, null);

Object k1 = future.basePartitionKey();
Object k2 = future.basePartitionKey();
// Memoized: repeat calls return the SAME instance (previously each call allocated a fresh key).
assertSame(k1, k2, "base partition key must be memoized (same instance on repeat calls)");
// ...and it equals a fresh computation for the same target, so behavior is unchanged.
assertEquals(partitioning.getPartitionKey(reqA.getUri(), reqA.getVirtualHost(), null), k1,
"memoized key must equal a fresh computation for the current target");

// Changing the target host must invalidate the memo and yield the new host's key — otherwise a
// redirect could reuse a pooled connection to the wrong host.
Request reqB = get("http://hostb.example/").build();
future.setTargetRequest(reqB);
Object k3 = future.basePartitionKey();
assertNotEquals(k1, k3, "changing the target host must invalidate the memo and yield a different key");
assertEquals(partitioning.getPartitionKey(reqB.getUri(), reqB.getVirtualHost(), null), k3,
"after setTargetRequest the key must match a fresh computation for the new target");
}
}
Loading