Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 125 additions & 2 deletions src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,28 @@ impl Client {
node_url.unwrap_or(format!("{LOCAL}:{port}"))
};
log::info!("connecting to {base_url}");
let client = reqwest::Client::builder()
let mut builder = reqwest::Client::builder()
.timeout(Duration::from_secs(args.request_timeout_seconds))
.connect_timeout(Duration::from_secs(args.request_timeout_seconds)) // Connection establishment timeout
.connect_timeout(Duration::from_secs(args.request_timeout_seconds)); // Connection establishment timeout
if args.node_disable_conn_pool {
if use_esplora {
// The flag is node-only; applying it to Esplora would force a fresh
// TCP + TLS handshake per request — a real regression — so ignore it
// here, but make the ignored setting loud rather than silent.
log::warn!(
"**********************************************************************\n\
* --node-disable-conn-pool (NODE_DISABLE_CONN_POOL) is set together \n\
* with --use-esplora. This flag only affects the local node \n\
* connection and is being IGNORED for the Esplora backend; keep-alive \n\
* pooling stays ENABLED. \n\
**********************************************************************"
);
} else {
// No keep-alive reuse: each request opens a fresh connection.
builder = builder.pool_max_idle_per_host(0);
}
}
let client = builder
.build()
.with_context(|| "Failed to create HTTP client with timeout")?;

Expand Down Expand Up @@ -688,6 +707,110 @@ mod test {
assert_eq!(result[&6], 2.0);
}

/// Spawn a minimal HTTP/1.1 server on loopback that counts the TCP
/// connections it accepts and answers every request with the same canned
/// blockhash, keeping the connection alive for any follow-up requests.
/// Returns the bound address and the accept counter.
async fn spawn_counting_node() -> (
std::net::SocketAddr,
std::sync::Arc<std::sync::atomic::AtomicUsize>,
) {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let connections = Arc::new(AtomicUsize::new(0));
let counter = connections.clone();

tokio::spawn(async move {
// A valid 64-hex blockhash so `block_hash` parses the body successfully.
const HASH_HEX: &str =
"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f";
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{HASH_HEX}",
HASH_HEX.len()
);
loop {
let (mut socket, _) = match listener.accept().await {
Ok(pair) => pair,
Err(_) => break,
};
counter.fetch_add(1, Ordering::SeqCst);
let response = response.clone();
tokio::spawn(async move {
let mut buf = [0u8; 1024];
// Serve each request on this connection until the client closes it,
// so a reused keep-alive connection is counted only once.
'conn: loop {
let mut req = Vec::new();
loop {
match socket.read(&mut buf).await {
Ok(0) | Err(_) => break 'conn, // client closed the connection
Ok(n) => {
req.extend_from_slice(&buf[..n]);
if req.windows(4).any(|w| w == b"\r\n\r\n") {
break; // full request headers received
}
}
}
}
if socket.write_all(response.as_bytes()).await.is_err() {
break;
}
}
});
}
});

(addr, connections)
}

fn node_client(addr: std::net::SocketAddr, disable_conn_pool: bool) -> Client {
let mut args = Arguments::default();
args.network = Network::Bitcoin;
args.use_esplora = false;
args.node_url = Some(format!("http://{addr}"));
args.rpc_user_password = Some("user:pass".to_string()); // satisfies is_valid()
args.request_timeout_seconds = 30; // Default::default() leaves this 0, which is_valid() rejects
args.node_disable_conn_pool = disable_conn_pool;
Client::new(&args).unwrap()
}

/// Default (pooled) behavior: sequential requests reuse a single keep-alive connection.
#[tokio::test]
async fn test_conn_pool_enabled_reuses_connection() {
use std::sync::atomic::Ordering;

let (addr, connections) = spawn_counting_node().await;
let client = node_client(addr, false);

for _ in 0..3 {
client.block_hash(0).await.unwrap().unwrap();
}

assert_eq!(connections.load(Ordering::SeqCst), 1);
}

/// With the pool disabled, every request establishes its own connection, so a
/// locality-aware L4 proxy re-evaluates the upstream on each request instead of
/// staying pinned to whichever backend the first connection landed on.
#[tokio::test]
async fn test_conn_pool_disabled_opens_fresh_connection_each_request() {
use std::sync::atomic::Ordering;

let (addr, connections) = spawn_counting_node().await;
let client = node_client(addr, true);

for _ in 0..3 {
client.block_hash(0).await.unwrap().unwrap();
}

assert_eq!(connections.load(Ordering::SeqCst), 3);
}

#[tokio::test]
#[ignore = "connects to prod server"]
async fn test_client_esplora() {
Expand Down
6 changes: 6 additions & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ pub struct Arguments {
#[arg(env, long, default_value = "30")]
pub request_timeout_seconds: u64,

/// Disable HTTP keep-alive connection pooling to the node, forcing a fresh connection per request.
/// Node-only: ignored (with a warning) when --use-esplora is set.
#[arg(env, long)]
pub node_disable_conn_pool: bool,

/// Timeout in seconds for reading incoming HTTP request headers (protects against slowloris attacks)
#[arg(env, long, default_value = "10")]
pub header_read_timeout_seconds: u64,
Expand Down Expand Up @@ -187,6 +192,7 @@ impl std::fmt::Debug for Arguments {
.field("enable_db_statistics", &self.enable_db_statistics)
.field("cache_control_seconds", &self.cache_control_seconds)
.field("request_timeout_seconds", &self.request_timeout_seconds)
.field("node_disable_conn_pool", &self.node_disable_conn_pool)
.field(
"header_read_timeout_seconds",
&self.header_read_timeout_seconds,
Expand Down
Loading