diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java index 4a1861893..40dca76e4 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java @@ -26,12 +26,32 @@ */ package org.apache.hc.core5.benchmark; - +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.ModalCloseable; @@ -40,7 +60,6 @@ import org.apache.hc.core5.pool.ManagedConnPool; import org.apache.hc.core5.pool.PoolEntry; import org.apache.hc.core5.pool.PoolReusePolicy; -import org.apache.hc.core5.pool.PoolStats; import org.apache.hc.core5.pool.RouteSegmentedConnPool; import org.apache.hc.core5.pool.StrictConnPool; import org.apache.hc.core5.util.TimeValue; @@ -51,7 +70,6 @@ import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; @@ -62,143 +80,245 @@ import org.openjdk.jmh.annotations.Warmup; /** - * Compare StrictConnPool, LaxConnPool, and RouteSegmentedConnPool (“OFFLOCK”) - * under different contention patterns and slow-disposal rates. + * JMH harness that drives StrictConnPool, LaxConnPool, and RouteSegmentedConnPool + * against a local HTTP/1.1 mini-cluster using real sockets and keep-alive. */ -@BenchmarkMode({Mode.Throughput, Mode.SampleTime}) -@Warmup(iterations = 3, time = 2, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@BenchmarkMode({Mode.Throughput}) +@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS) @Fork(1) -@OutputTimeUnit(TimeUnit.MICROSECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) public class RoutePoolsJmh { - /** - * Minimal connection that can simulate slow close. - */ - public static final class FakeConn implements ModalCloseable { + // --------------------------------------------------------------- + // Utilities + // --------------------------------------------------------------- + static ThreadFactory daemonFactory(final String prefix) { + final AtomicInteger n = new AtomicInteger(1); + return r -> { + final Thread t = new Thread(r, prefix + "-" + n.getAndIncrement()); + t.setDaemon(true); + return t; + }; + } + + // --------------------------------------------------------------- + // Real HTTP/1.1 persistent connection used by the pool + // --------------------------------------------------------------- + public static final class RealConn implements ModalCloseable { + private final String host; + private final int port; private final int closeDelayMs; + private final Socket socket; + private final BufferedInputStream in; + private final BufferedOutputStream out; - public FakeConn(final int closeDelayMs) { + public RealConn( + final String host, + final int port, + final int closeDelayMs, + final int soTimeoutMs, + final int connectTimeoutMs) throws IOException { + this.host = host; + this.port = port; this.closeDelayMs = closeDelayMs; + final Socket s = new Socket(); + s.setTcpNoDelay(true); + s.setSoTimeout(Math.max(1000, soTimeoutMs)); // read timeout + s.setKeepAlive(true); + s.connect(new InetSocketAddress(host, port), Math.max(1, connectTimeoutMs)); + this.socket = s; + this.in = new BufferedInputStream(s.getInputStream(), 32 * 1024); + this.out = new BufferedOutputStream(s.getOutputStream(), 32 * 1024); + } + + public void getOnce(final boolean keepAlive) throws IOException { + final String req = "GET / HTTP/1.1\r\n" + + "Host: " + host + ":" + port + "\r\n" + + (keepAlive ? "Connection: keep-alive\r\n" : "Connection: close\r\n") + + "\r\n"; + out.write(req.getBytes(StandardCharsets.ISO_8859_1)); + out.flush(); + + final String status = readLine(); + if (status == null) { + throw new IOException("No status line"); + } + final String[] parts = status.split(" ", 3); + if (parts.length < 2 || !parts[0].startsWith("HTTP/1.")) { + throw new IOException("Bad status: " + status); + } + final int code; + try { + code = Integer.parseInt(parts[1]); + } catch (final NumberFormatException nfe) { + throw new IOException("Bad status code in: " + status); + } + if (code != 200) { + throw new IOException("Unexpected status: " + status); + } + + int contentLength = -1; + for (; ; ) { + final String line = readLine(); + if (line == null) { + throw new IOException("EOF in headers"); + } + if (line.isEmpty()) { + break; + } + final int colon = line.indexOf(':'); + if (colon > 0) { + final String name = line.substring(0, colon).trim(); + if ("Content-Length".equalsIgnoreCase(name)) { + try { + contentLength = Integer.parseInt(line.substring(colon + 1).trim()); + } catch (final NumberFormatException ignore) { + // ignore + } + } + } + } + if (contentLength < 0) { + throw new IOException("Missing Content-Length"); + } + + int remaining = contentLength; + final byte[] buf = new byte[8192]; + while (remaining > 0) { + final int r = in.read(buf, 0, Math.min(buf.length, remaining)); + if (r == -1) { + throw new IOException("unexpected EOF in body"); + } + remaining -= r; + } + } + + private String readLine() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(128); + for (; ; ) { + final int b = in.read(); + if (b == -1) { + if (baos.size() == 0) { + return null; + } + break; + } + if (b == '\n') { + break; + } + baos.write(b); + } + final byte[] raw = baos.toByteArray(); + final int len = raw.length; + final int eff = (len > 0 && raw[len - 1] == '\r') ? len - 1 : len; + return new String(raw, 0, eff, StandardCharsets.ISO_8859_1); } @Override public void close(final CloseMode closeMode) { - if (closeDelayMs <= 0) { - return; + if (closeDelayMs > 0) { + try { + Thread.sleep(closeDelayMs); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + } } try { - Thread.sleep(closeDelayMs); - } catch (final InterruptedException ignore) { - Thread.currentThread().interrupt(); + socket.close(); + } catch (final IOException ignore) { + // ignore } } @Override public void close() throws IOException { - + if (closeDelayMs > 0) { + try { + Thread.sleep(closeDelayMs); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + socket.close(); } } - /** - * All benchmark parameters & shared state live here (required by JMH). - */ + // --------------------------------------------------------------- + // Benchmark state & setup + // --------------------------------------------------------------- @State(Scope.Benchmark) public static class BenchState { - - /** - * Which pool to benchmark. - * STRICT -> StrictConnPool - * LAX -> LaxConnPool - * OFFLOCK -> RouteSegmentedConnPool - */ - @Param({"STRICT", "LAX", "OFFLOCK"}) + @Param({"OFFLOCK", "STRICT", "LAX"}) public String policy; - - /** - * Number of distinct routes to spread load across. - * 1 = hot single route; 10 = multi-route scenario. - */ - @Param({"1", "10"}) + @Param({"1", "4", "10", "25", "50"}) public int routes; - - /** - * Percent (0..100) of releases that will be non-reusable, - * triggering a discard (and thus a potentially slow close). - */ - @Param({"0", "5", "20"}) - public int slowClosePct; - - /** - * Sleep (ms) when a connection is discarded (slow close path). - */ - @Param({"0", "200"}) - public int closeSleepMs; - - /** - * Max total, default per-route — tuned to create contention. - */ - @Param({"32"}) + @Param({"128"}) + public int payloadBytes; + @Param({"100"}) public int maxTotal; - @Param({"8"}) + @Param({"5"}) public int defMaxPerRoute; - - /** - * Keep-alive on reusable releases. - */ + @Param({"true"}) + public boolean keepAlive; @Param({"5000"}) public int keepAliveMs; + @Param({"0", "20"}) + public int slowClosePct; + @Param({"0", "200"}) + public int closeSleepMs; + @Param({"10000"}) + public int soTimeoutMs; + @Param({"30000"}) + public int requestTimeoutMs; + @Param({"1000"}) + public int connectTimeoutMs; - ManagedConnPool pool; + ManagedConnPool pool; + DisposalCallback disposal; + MiniCluster cluster; String[] routeKeys; - DisposalCallback disposal; + ScheduledExecutorService maint; @Setup(Level.Trial) - public void setUp() { - // routes list - routeKeys = new String[routes]; - for (int i = 0; i < routes; i++) { - routeKeys[i] = "route-" + i; - } - + public void setUp() throws Exception { + cluster = new MiniCluster(routes, payloadBytes); + routeKeys = cluster.routeKeys(); disposal = (c, m) -> { if (c != null) { c.close(m); } }; - final TimeValue ttl = TimeValue.NEG_ONE_MILLISECOND; - switch (policy.toUpperCase(Locale.ROOT)) { - case "STRICT": - pool = new StrictConnPool<>( - defMaxPerRoute, - maxTotal, - ttl, - PoolReusePolicy.LIFO, - disposal, - null); + case "STRICT": { + pool = new StrictConnPool<>(defMaxPerRoute, maxTotal, ttl, PoolReusePolicy.LIFO, disposal, null); break; - case "LAX": - pool = new LaxConnPool<>( - defMaxPerRoute, - ttl, - PoolReusePolicy.LIFO, - disposal, - null); - pool.setMaxTotal(maxTotal); + } + case "LAX": { + final LaxConnPool lax = new LaxConnPool<>(defMaxPerRoute, ttl, PoolReusePolicy.LIFO, disposal, null); + lax.setMaxTotal(maxTotal); + pool = lax; break; - case "OFFLOCK": - pool = new RouteSegmentedConnPool<>( - defMaxPerRoute, - maxTotal, - ttl, - PoolReusePolicy.LIFO, - disposal); + } + case "OFFLOCK": { + pool = new RouteSegmentedConnPool<>(defMaxPerRoute, maxTotal, ttl, PoolReusePolicy.LIFO, disposal); break; - default: + } + default: { throw new IllegalArgumentException("Unknown policy: " + policy); + } } + // Light periodic maintenance, close idle/expired like real clients do + maint = java.util.concurrent.Executors.newSingleThreadScheduledExecutor(daemonFactory("pool-maint")); + maint.scheduleAtFixedRate(() -> { + try { + pool.closeIdle(TimeValue.ofSeconds(5)); + pool.closeExpired(); + } catch (final Exception ignore) { + // ignore in benchmark + } + }, 5, 5, TimeUnit.SECONDS); } @TearDown(Level.Trial) @@ -206,6 +326,17 @@ public void tearDown() { if (pool != null) { pool.close(CloseMode.IMMEDIATE); } + if (cluster != null) { + cluster.close(); + } + if (maint != null) { + maint.shutdownNow(); + try { + maint.awaitTermination(5, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } } String pickRoute() { @@ -214,47 +345,195 @@ String pickRoute() { } boolean shouldDiscard() { - if (slowClosePct <= 0) return false; - return ThreadLocalRandom.current().nextInt(100) < slowClosePct; + return slowClosePct > 0 && ThreadLocalRandom.current().nextInt(100) < slowClosePct; } } - /** - * Lease+release on a randomly chosen route. - * Mix of reusable and non-reusable releases (to trigger discard/close). - */ + // --------------------------------------------------------------- + // Benchmark body + // --------------------------------------------------------------- @Benchmark @Threads(50) - public void leaseReleaseMixed(final BenchState s) throws Exception { + public void lease_io_release(final BenchState s) { + final String key = s.pickRoute(); + final Future> f = s.pool.lease(key, null, Timeout.DISABLED, null); + final PoolEntry e; try { - final Future> f = s.pool.lease(s.pickRoute(), null, Timeout.ofMilliseconds(500), null); - final PoolEntry e = f.get(500, TimeUnit.MILLISECONDS); - if (!e.hasConnection()) e.assignConnection(new FakeConn(s.closeSleepMs)); - final boolean reusable = !s.shouldDiscard(); - if (reusable) { - e.updateExpiry(TimeValue.ofMilliseconds(s.keepAliveMs)); - s.pool.release(e, true); - } else { + e = f.get(s.requestTimeoutMs, TimeUnit.MILLISECONDS); + } catch (final TimeoutException te) { + // IMPORTANT: drop waiter on pools that queue + f.cancel(true); + return; + } catch (final ExecutionException ee) { + if (ee.getCause() instanceof TimeoutException) { + f.cancel(true); + } + return; + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + if (e == null) { + return; // defensive + } + + RealConn c = e.getConnection(); + if (c == null) { + // parse host:port defensively + final int colon = key.indexOf(':'); + if (colon <= 0 || colon >= key.length() - 1) { + s.pool.release(e, false); + return; + } + final String host = key.substring(0, colon); + final int port; + try { + port = Integer.parseInt(key.substring(colon + 1)); + } catch (final NumberFormatException nfe) { + s.pool.release(e, false); + return; + } + RealConn fresh = null; + try { + fresh = new RealConn(host, port, s.closeSleepMs, s.soTimeoutMs, s.connectTimeoutMs); + // Double-check before assigning to avoid races + final RealConn existing = e.getConnection(); + if (existing == null) { + try { + e.assignConnection(fresh); + c = fresh; + fresh = null; // ownership transferred + } catch (final IllegalStateException already) { + // someone else assigned concurrently + c = e.getConnection(); + if (c == null) { + s.pool.release(e, false); + try { + fresh.close(CloseMode.IMMEDIATE); + } catch (final Exception ignore) { + } + return; + } + } + } else { + c = existing; + } + } catch (final IOException ioe) { s.pool.release(e, false); + if (fresh != null) { + try { + fresh.close(CloseMode.IMMEDIATE); + } catch (final Exception ignore) { + } + } + return; + } finally { + if (fresh != null) { // we created but didn't assign -> close to avoid leak + try { + fresh.close(CloseMode.IMMEDIATE); + } catch (final Exception ignore) { + } + } + } + } + + if (c == null) { + s.pool.release(e, false); + return; + } + + try { + c.getOnce(s.keepAlive); + } catch (final IOException ioe) { + s.pool.release(e, false); + return; + } + + final boolean reusable = s.keepAlive && !s.shouldDiscard(); + if (reusable) { + e.updateExpiry(TimeValue.ofMilliseconds(s.keepAliveMs)); + s.pool.release(e, true); + } else { + s.pool.release(e, false); + } + } + + // --------------------------------------------------------------- + // Local HTTP mini-cluster + // --------------------------------------------------------------- + static final class MiniCluster { + private final List servers = new ArrayList<>(); + private final String[] keys; + private final byte[] body; + private final ExecutorService exec; + + MiniCluster(final int n, final int payloadBytes) throws IOException { + this.keys = new String[n]; + this.body = new byte[payloadBytes]; + // Bounded, CPU-sized pool to keep the com.sun server in check + final int cores = Math.max(2, Runtime.getRuntime().availableProcessors()); + final int coreThreads = Math.min(64, Math.max(cores, n * 2)); + final int maxThreads = Math.min(128, Math.max(coreThreads, n * 4)); + this.exec = new java.util.concurrent.ThreadPoolExecutor( + coreThreads, maxThreads, + 60L, TimeUnit.SECONDS, + new java.util.concurrent.LinkedBlockingQueue<>(2048), + daemonFactory("mini-http"), + new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); + for (int i = 0; i < n; i++) { + final InetSocketAddress bind = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); + final HttpServer s = HttpServer.create(bind, 4096); + s.createContext("/", new FixedHandler(body)); + s.setExecutor(exec); + s.start(); + servers.add(s); + keys[i] = "127.0.0.1:" + s.getAddress().getPort(); } - } catch (final IllegalStateException ignored) { + } + String[] routeKeys() { + return keys; + } + + void close() { + for (final HttpServer s : servers) { + try { + s.stop(0); + } catch (final Exception ignore) { + } + } + exec.shutdownNow(); + try { + exec.awaitTermination(5, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + } } } + static final class FixedHandler implements HttpHandler { + private final byte[] body; - /** - * Optional stats probe to ensure the benchmark does "something". - * Not a measured benchmark; use only for sanity runs. - */ - @Benchmark - @Threads(1) - @OperationsPerInvocation(1) - @BenchmarkMode(Mode.SingleShotTime) - public void statsProbe(final BenchState s, final org.openjdk.jmh.infra.Blackhole bh) { - final PoolStats stats = s.pool.getTotalStats(); - bh.consume(stats.getAvailable()); - bh.consume(stats.getLeased()); - bh.consume(stats.getPending()); + FixedHandler(final byte[] body) { + this.body = body; + } + + @Override + public void handle(final HttpExchange ex) throws IOException { + try (InputStream in = ex.getRequestBody()) { + final byte[] buf = new byte[1024]; + while (in.read(buf) != -1) { + // drain + } + } + ex.getResponseHeaders().set("Content-Type", "text/plain; charset=US-ASCII"); + ex.sendResponseHeaders(200, body.length); + try (OutputStream os = ex.getResponseBody()) { + if (body.length > 0) { + os.write(body); + } + os.flush(); + } + } } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java index 0fc2bd0a4..d49744a2f 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java @@ -27,6 +27,8 @@ package org.apache.hc.core5.pool; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.HashSet; import java.util.Iterator; import java.util.Map; @@ -38,8 +40,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,15 +62,15 @@ import org.apache.hc.core5.util.Timeout; /** - * Lock-free, route-segmented connection pool. + * Lock-free, route-segmented connection pool with tiny, conditional round-robin assistance. * - *

This implementation keeps per-route state in independent segments and avoids - * holding a global lock while disposing of connections. Under slow closes - * (for example TLS shutdown or OS-level socket stalls), threads leasing - * connections on other routes are not blocked by disposal work.

+ *

Per-route state is kept in independent segments. Disposal of connections is offloaded + * to a bounded executor so slow closes do not block threads leasing on other routes. + * A minimal round-robin drainer is engaged only when there are many pending routes and + * there is global headroom; it never scans all routes.

* * @param route key type - * @param connection type (must be {@link org.apache.hc.core5.io.ModalCloseable}) + * @param connection type (must be {@link ModalCloseable}) * @see ManagedConnPool * @see PoolReusePolicy * @see DisposalCallback @@ -74,6 +80,10 @@ @Experimental public final class RouteSegmentedConnPool implements ManagedConnPool { + // Tiny RR assist: only engage when there are many distinct routes waiting and there is headroom. + private static final int RR_MIN_PENDING_ROUTES = 12; + private static final int RR_BUDGET = 64; + private final PoolReusePolicy reusePolicy; private final TimeValue timeToLive; private final DisposalCallback disposal; @@ -89,6 +99,17 @@ public final class RouteSegmentedConnPool implement private final ScheduledExecutorService timeouts; + /** + * Dedicated executor for asynchronous, best-effort disposal. + * Bounded queue; on saturation we fall back to IMMEDIATE close on the caller thread. + */ + private final ThreadPoolExecutor disposer; + + // Minimal fair round-robin over routes with waiters (no global scans). + private final ConcurrentLinkedQueue pendingQueue = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean draining = new AtomicBoolean(false); + private final AtomicInteger pendingRouteCount = new AtomicInteger(0); + public RouteSegmentedConnPool( final int defaultMaxPerRoute, final int maxTotal, @@ -108,12 +129,29 @@ public RouteSegmentedConnPool( return t; }; this.timeouts = Executors.newSingleThreadScheduledExecutor(tf); + + // Asynchronous disposer for slow GRACEFUL closes. + final int cores = Math.max(2, Runtime.getRuntime().availableProcessors()); + final int nThreads = Math.min(8, Math.max(2, cores)); // allow up to 8 on bigger boxes + final int qsize = 1024; + final ThreadFactory df = r -> { + final Thread t = new Thread(r, "seg-pool-disposer"); + t.setDaemon(true); + return t; + }; + this.disposer = new ThreadPoolExecutor( + nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(qsize), + df, + new ThreadPoolExecutor.AbortPolicy()); // but we preflight capacity to avoid exception storms } final class Segment { final ConcurrentLinkedDeque> available = new ConcurrentLinkedDeque<>(); - final ConcurrentLinkedQueue waiters = new ConcurrentLinkedQueue<>(); + final ConcurrentLinkedDeque waiters = new ConcurrentLinkedDeque<>(); final AtomicInteger allocated = new AtomicInteger(0); + final AtomicBoolean enqueued = new AtomicBoolean(false); int limitPerRoute(final R route) { final Integer v = maxPerRoute.get(route); @@ -122,13 +160,18 @@ int limitPerRoute(final R route) { } final class Waiter extends CompletableFuture> { + final R route; final Timeout requestTimeout; final Object state; volatile boolean cancelled; + volatile ScheduledFuture timeoutTask; - Waiter(final Timeout t, final Object s) { + Waiter(final R route, final Timeout t, final Object s) { + this.route = route; this.requestTimeout = t != null ? t : Timeout.DISABLED; this.state = s; + this.cancelled = false; + this.timeoutTask = null; } } @@ -142,6 +185,7 @@ public Future> lease( ensureOpen(); final Segment seg = segments.computeIfAbsent(route, r -> new Segment()); + // 1) Try available PoolEntry hit; for (; ; ) { hit = pollAvailable(seg, state); @@ -162,40 +206,46 @@ public Future> lease( return CompletableFuture.completedFuture(hit); } - for (; ; ) { - final int tot = totalAllocated.get(); - if (tot >= maxTotal.get()) { - break; - } - if (totalAllocated.compareAndSet(tot, tot + 1)) { - for (; ; ) { - final int per = seg.allocated.get(); - if (per >= seg.limitPerRoute(route)) { - totalAllocated.decrementAndGet(); - break; - } - if (seg.allocated.compareAndSet(per, per + 1)) { - final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal); - if (callback != null) { - callback.completed(entry); - } - return CompletableFuture.completedFuture(entry); - } - } - break; + // 2) Try to allocate new within caps + if (tryAllocateOne(route, seg)) { + final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal); + if (callback != null) { + callback.completed(entry); } + return CompletableFuture.completedFuture(entry); } - final Waiter w = new Waiter(requestTimeout, state); - seg.waiters.add(w); + // 3) Enqueue waiter with timeout + final Waiter w = new Waiter(route, requestTimeout, state); + seg.waiters.addLast(w); + enqueueIfNeeded(route, seg); + // Late hit after enqueuing final PoolEntry late = pollAvailable(seg, state); - if (late != null && seg.waiters.remove(w)) { - if (callback != null) { - callback.completed(late); + if (late != null) { + if (seg.waiters.remove(w)) { + cancelTimeout(w); + if (callback != null) { + callback.completed(late); + } + w.complete(late); + dequeueIfDrained(seg); + return w; + } else { + boolean handedOff = false; + for (Waiter other; (other = seg.waiters.pollFirst()) != null; ) { + if (!other.cancelled && compatible(other.state, late.getState())) { + cancelTimeout(other); + handedOff = other.complete(late); + if (handedOff) { + break; + } + } + } + if (!handedOff) { + offerAvailable(seg, late); + } } - w.complete(late); - return w; } scheduleTimeout(w, seg); @@ -209,6 +259,8 @@ public Future> lease( } }); } + + triggerDrainIfMany(); return w; } @@ -220,7 +272,8 @@ public void release(final PoolEntry entry, final boolean reusable) { final R route = entry.getRoute(); final Segment seg = segments.get(route); if (seg == null) { - entry.discardConnection(CloseMode.GRACEFUL); + // Segment got removed; dispose off-thread and bail. + discardEntry(entry, CloseMode.GRACEFUL); return; } @@ -228,21 +281,11 @@ public void release(final PoolEntry entry, final boolean reusable) { final boolean stillValid = reusable && !isPastTtl(entry) && !entry.getExpiryDeadline().isBefore(now); if (stillValid) { - for (; ; ) { - final Waiter w = seg.waiters.poll(); - if (w == null) { - break; - } - if (w.cancelled) { - continue; - } - if (compatible(w.state, entry.getState())) { - if (w.complete(entry)) { - return; - } - } + if (!handOffToCompatibleWaiter(entry, seg)) { + offerAvailable(seg, entry); + enqueueIfNeeded(route, seg); + triggerDrainIfMany(); } - offerAvailable(seg, entry); } else { discardAndDecr(entry, CloseMode.GRACEFUL); } @@ -266,15 +309,19 @@ public void close(final CloseMode closeMode) { for (final Map.Entry e : segments.entrySet()) { final Segment seg = e.getValue(); - // cancel waiters for (final Waiter w : seg.waiters) { w.cancelled = true; + cancelTimeout(w); w.completeExceptionally(new TimeoutException("Pool closed")); } seg.waiters.clear(); + if (seg.enqueued.getAndSet(false)) { + pendingRouteCount.decrementAndGet(); + } + // discard available for (final PoolEntry p : seg.available) { - p.discardConnection(orImmediate(closeMode)); + discardEntry(p, closeMode); } seg.available.clear(); @@ -284,6 +331,11 @@ public void close(final CloseMode closeMode) { } } segments.clear(); + pendingQueue.clear(); + pendingRouteCount.set(0); + + // Let in-flight graceful closes progress; no blocking here. + disposer.shutdown(); } @Override @@ -418,42 +470,40 @@ private boolean isPastTtl(final PoolEntry p) { if (timeToLive == null || timeToLive.getDuration() < 0) { return false; } - return (System.currentTimeMillis() - p.getCreated()) >= timeToLive.toMilliseconds(); + return System.currentTimeMillis() - p.getCreated() >= timeToLive.toMilliseconds(); } - private void scheduleTimeout( - final Waiter w, - final Segment seg) { - + private void scheduleTimeout(final Waiter w, final Segment seg) { if (!TimeValue.isPositive(w.requestTimeout)) { return; } - timeouts.schedule(() -> { + w.timeoutTask = timeouts.schedule(() -> { if (w.isDone()) { return; } w.cancelled = true; - final TimeoutException tex = new TimeoutException("Lease timed out"); - w.completeExceptionally(tex); + seg.waiters.remove(w); + w.completeExceptionally(new TimeoutException("Lease timed out")); + dequeueIfDrained(seg); + maybeCleanupSegment(w.route, seg); final PoolEntry p = pollAvailable(seg, w.state); if (p != null) { - boolean handedOff = false; - for (Waiter other; (other = seg.waiters.poll()) != null; ) { - if (!other.cancelled && compatible(other.state, p.getState())) { - handedOff = other.complete(p); - if (handedOff) { - break; - } - } - } - if (!handedOff) { + // Try to hand off that available entry to some other compatible waiter. + if (!handOffToCompatibleWaiter(p, seg)) { offerAvailable(seg, p); } } }, w.requestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); } + private void cancelTimeout(final Waiter w) { + final ScheduledFuture t = w.timeoutTask; + if (t != null) { + t.cancel(false); + } + } + private void offerAvailable(final Segment seg, final PoolEntry p) { if (reusePolicy == PoolReusePolicy.LIFO) { seg.available.addFirst(p); @@ -463,6 +513,9 @@ private void offerAvailable(final Segment seg, final PoolEntry p) { } private PoolEntry pollAvailable(final Segment seg, final Object neededState) { + if (neededState == null) { + return seg.available.pollFirst(); + } for (final Iterator> it = seg.available.iterator(); it.hasNext(); ) { final PoolEntry p = it.next(); if (compatible(neededState, p.getState())) { @@ -477,13 +530,42 @@ private boolean compatible(final Object needed, final Object have) { return needed == null || Objects.equals(needed, have); } + private boolean handOffToCompatibleWaiter(final PoolEntry entry, final Segment seg) { + final Deque skipped = new ArrayDeque<>(); + boolean handedOff = false; + for (; ; ) { + final Waiter w = seg.waiters.pollFirst(); + if (w == null) { + break; + } + if (w.cancelled || w.isDone()) { + continue; + } + if (compatible(w.state, entry.getState())) { + cancelTimeout(w); + handedOff = w.complete(entry); + if (handedOff) { + dequeueIfDrained(seg); + break; + } + } else { + skipped.addLast(w); + } + } + // Restore non-compatible waiters to the head to preserve ordering. + while (!skipped.isEmpty()) { + seg.waiters.addFirst(skipped.pollLast()); + } + return handedOff; + } + private void discardAndDecr(final PoolEntry p, final CloseMode mode) { - p.discardConnection(orImmediate(mode)); totalAllocated.decrementAndGet(); final Segment seg = segments.get(p.getRoute()); if (seg != null) { seg.allocated.decrementAndGet(); } + discardEntry(p, mode); } private CloseMode orImmediate(final CloseMode m) { @@ -493,6 +575,143 @@ private CloseMode orImmediate(final CloseMode m) { private void maybeCleanupSegment(final R route, final Segment seg) { if (seg.allocated.get() == 0 && seg.available.isEmpty() && seg.waiters.isEmpty()) { segments.remove(route, seg); + if (seg.enqueued.getAndSet(false)) { + pendingRouteCount.decrementAndGet(); + } + } + } + + private boolean tryAllocateOne(final R route, final Segment seg) { + for (; ; ) { + final int tot = totalAllocated.get(); + if (tot >= maxTotal.get()) { + return false; + } + if (!totalAllocated.compareAndSet(tot, tot + 1)) { + continue; + } + for (; ; ) { + final int per = seg.allocated.get(); + if (per >= seg.limitPerRoute(route)) { + totalAllocated.decrementAndGet(); + return false; + } + if (seg.allocated.compareAndSet(per, per + 1)) { + return true; + } + } + } + } + + private void enqueueIfNeeded(final R route, final Segment seg) { + if (seg.enqueued.compareAndSet(false, true)) { + pendingQueue.offer(route); + pendingRouteCount.incrementAndGet(); + } + } + + private void dequeueIfDrained(final Segment seg) { + if (seg.waiters.isEmpty() && seg.enqueued.getAndSet(false)) { + pendingRouteCount.decrementAndGet(); + } + } + + private void triggerDrainIfMany() { + // Engage RR only if there is global headroom and many distinct routes pending + if (pendingRouteCount.get() < RR_MIN_PENDING_ROUTES) { + return; + } + if (totalAllocated.get() >= maxTotal.get()) { + return; + } + if (!draining.compareAndSet(false, true)) { + return; + } + disposer.execute(() -> { + try { + serveRoundRobin(RR_BUDGET); + } finally { + draining.set(false); + if (pendingRouteCount.get() >= RR_MIN_PENDING_ROUTES + && totalAllocated.get() < maxTotal.get() + && !pendingQueue.isEmpty()) { + triggerDrainIfMany(); + } + } + }); + } + + private void serveRoundRobin(final int budget) { + int created = 0; + for (; created < budget; ) { + final R route = pendingQueue.poll(); + if (route == null) { + break; + } + final Segment seg = segments.get(route); + if (seg == null) { + continue; + } + if (seg.waiters.isEmpty()) { + if (seg.enqueued.getAndSet(false)) { + pendingRouteCount.decrementAndGet(); + } + continue; + } + + if (!tryAllocateOne(route, seg)) { + // No headroom or hit per-route cap. Re-queue for later. + pendingQueue.offer(route); + continue; + } + + final Waiter w = seg.waiters.pollFirst(); + if (w == null || w.cancelled) { + seg.allocated.decrementAndGet(); + totalAllocated.decrementAndGet(); + } else { + final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal); + cancelTimeout(w); + w.complete(entry); + created++; + } + + if (!seg.waiters.isEmpty()) { + pendingQueue.offer(route); + } else { + if (seg.enqueued.getAndSet(false)) { + pendingRouteCount.decrementAndGet(); + } + } + } + } + + /** + * Dispose a pool entry's connection asynchronously if possible; under pressure fall back to IMMEDIATE on caller. + */ + private void discardEntry(final PoolEntry p, final CloseMode preferred) { + final CloseMode mode = orImmediate(preferred); + // Pre-flight capacity to avoid exception storms under saturation + if (disposer.isShutdown()) { + p.discardConnection(CloseMode.IMMEDIATE); + return; + } + final LinkedBlockingQueue q = (LinkedBlockingQueue) disposer.getQueue(); + if (q.remainingCapacity() == 0) { + p.discardConnection(CloseMode.IMMEDIATE); + return; + } + try { + disposer.execute(() -> { + try { + p.discardConnection(mode); + } catch (final RuntimeException ignore) { + // best-effort + } + }); + } catch (final RejectedExecutionException saturated) { + // Saturated or shutting down: never block caller + p.discardConnection(CloseMode.IMMEDIATE); } } } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java index 7d6b81b14..ce4b9192b 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java @@ -39,10 +39,12 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.ModalCloseable; @@ -208,7 +210,6 @@ void poolCloseCancelsWaitersAndDrainsAvailable() throws Exception { assertEquals("Pool closed", ex.getCause().getMessage()); } - @Test void reusePolicyLifoVsFifoIsObservable() throws Exception { final RouteSegmentedConnPool lifo = @@ -250,9 +251,14 @@ void reusePolicyLifoVsFifoIsObservable() throws Exception { @Test void disposalIsCalledOnDiscard() throws Exception { final List closed = new ArrayList<>(); + final CountDownLatch disposed = new CountDownLatch(1); final DisposalCallback disposal = (c, m) -> { - c.close(m); - closed.add(c); + try { + c.close(m); + } finally { + closed.add(c); + disposed.countDown(); + } }; final RouteSegmentedConnPool pool = newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal); @@ -261,6 +267,9 @@ void disposalIsCalledOnDiscard() throws Exception { final FakeConnection conn = new FakeConnection(); e.assignConnection(conn); pool.release(e, false); + + // Wait for async disposer to run + assertTrue(disposed.await(2, TimeUnit.SECONDS), "Disposal did not complete in time"); assertEquals(1, closed.size()); assertEquals(1, closed.get(0).closeCount()); pool.close(CloseMode.IMMEDIATE); @@ -268,23 +277,37 @@ void disposalIsCalledOnDiscard() throws Exception { @Test void slowDisposalDoesNotBlockOtherRoutes() throws Exception { - final DisposalCallback disposal = FakeConnection::close; + final CountDownLatch disposed = new CountDownLatch(1); + final AtomicLong closedAt = new AtomicLong(0L); + final DisposalCallback disposal = (c, m) -> { + try { + c.close(m); // FakeConnection sleeps closeDelayMs internally + } finally { + closedAt.set(System.nanoTime()); + disposed.countDown(); + } + }; final RouteSegmentedConnPool pool = newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal); final PoolEntry e1 = pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); - e1.assignConnection(new FakeConnection(600)); + e1.assignConnection(new FakeConnection(600)); // close sleeps ~600ms + final long startDiscard = System.nanoTime(); - pool.release(e1, false); + pool.release(e1, false); // triggers async disposal + // Lease on another route must not be blocked by slow disposal final long t0 = System.nanoTime(); final PoolEntry e2 = pool.lease("r2", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); final long tLeaseMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0); assertTrue(tLeaseMs < 200, "Other route lease blocked by disposal: " + tLeaseMs + "ms"); pool.release(e2, false); - final long discardMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startDiscard); - assertTrue(discardMs >= 600, "Discard should reflect slow close path"); + + // Wait for disposer to finish, then assert the slow path really took ~600ms + assertTrue(disposed.await(2, TimeUnit.SECONDS), "Disposal did not complete in time"); + final long discardMs = TimeUnit.NANOSECONDS.toMillis(closedAt.get() - startDiscard); + assertTrue(discardMs >= 600, "Discard should reflect slow close path (took " + discardMs + "ms)"); pool.close(CloseMode.IMMEDIATE); } @@ -296,40 +319,43 @@ void getRoutesCoversAllocatedAvailableAndWaiters() throws Exception { assertTrue(pool.getRoutes().isEmpty(), "Initially there should be no routes"); + // Allocate on rA final PoolEntry a = pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); assertEquals(new HashSet(Collections.singletonList("rA")), pool.getRoutes(), "rA must be listed because it is leased (allocated > 0)"); + // Make rA available a.assignConnection(new FakeConnection()); a.updateExpiry(TimeValue.ofSeconds(30)); pool.release(a, true); assertEquals(new HashSet<>(Collections.singletonList("rA")), pool.getRoutes(), "rA must be listed because it has AVAILABLE entries"); + // Enqueue waiter on rB (will time out) final Future> waiterB = - pool.lease("rB", null, Timeout.ofMilliseconds(300), null); // enqueues immediately + pool.lease("rB", null, Timeout.ofMilliseconds(300), null); final Set routesNow = pool.getRoutes(); assertTrue(routesNow.contains("rA") && routesNow.contains("rB"), "Both rA (available) and rB (waiter) must be listed"); - final PoolEntry a2 = - pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); - pool.release(a2, false); // discard - final Set afterDropA = pool.getRoutes(); - assertFalse(afterDropA.contains("rA"), "rA segment should be cleaned up"); - assertTrue(afterDropA.contains("rB"), "rB (waiter) should remain listed"); - + // Let rB time out (do NOT free capacity before the timeout fires) final ExecutionException ex = assertThrows( ExecutionException.class, () -> waiterB.get(600, TimeUnit.MILLISECONDS)); assertInstanceOf(TimeoutException.class, ex.getCause()); assertEquals("Lease timed out", ex.getCause().getMessage()); - // Final cleanup: after close everything is cleared + // Now drain rA by leasing and discarding to trigger segment cleanup + final PoolEntry a2 = + pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + pool.release(a2, false); // discard + final Set afterDropA = pool.getRoutes(); + assertFalse(afterDropA.contains("rA"), "rA segment should be cleaned up"); + assertFalse(afterDropA.contains("rB"), "rB waiter timed out; should not remain listed"); + + // Final cleanup pool.close(CloseMode.IMMEDIATE); assertTrue(pool.getRoutes().isEmpty(), "All routes must be gone after close()"); } - - }