From f5e79b7ac647905675b24d2dc11fbd92e6b863d8 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 30 May 2022 12:22:34 +0300 Subject: [PATCH 01/10] Http2Pool handles the lifecycle of the cache with connections (#2257) Cache Http2FrameCodec/Http2MultiplexHandler/H2CUpgradeHandler context. Obtain the negotiated application level protocol once. Related to #2151 and #2262 --- .../resources/PooledConnectionProvider.java | 10 +- .../reactor/netty/http/HttpResources.java | 10 + .../http/client/Http2ConnectionProvider.java | 34 +-- .../reactor/netty/http/client/Http2Pool.java | 242 ++++++++++++------ ...Http2ConnectionProviderMeterRegistrar.java | 17 +- .../netty/http/client/Http2PoolTest.java | 138 ++++++---- .../DefaultPooledConnectionProviderTest.java | 120 +++++++-- ...dConnectionProviderDefaultMetricsTest.java | 8 +- 8 files changed, 408 insertions(+), 171 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index a19816e512..25947313e2 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -89,6 +89,7 @@ public abstract class PooledConnectionProvider implements final Duration poolInactivity; final Duration disposeTimeout; final Map maxConnections = new HashMap<>(); + Mono onDispose; protected PooledConnectionProvider(Builder builder) { this(builder, null); @@ -106,6 +107,7 @@ protected PooledConnectionProvider(Builder builder) { poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout)); maxConnections.put(entry.getKey(), entry.getValue().maxConnections); } + this.onDispose = Mono.empty(); scheduleInactivePoolsDisposal(); } @@ -190,10 +192,10 @@ public final Mono disposeLater() { }) .collect(Collectors.toList()); if (pools.isEmpty()) { - return Mono.empty(); + return onDispose; } channelPools.clear(); - return Mono.when(pools); + return onDispose.and(Mono.when(pools)); }); } @@ -243,6 +245,10 @@ public String name() { return name; } + public void onDispose(Mono disposeMono) { + onDispose = onDispose.and(disposeMono); + } + protected abstract CoreSubscriber> createDisposableAcquire( TransportConfig config, ConnectionObserver connectionObserver, diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java index 801dcac824..a94268145b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java @@ -15,6 +15,7 @@ */ package reactor.netty.http; +import java.net.SocketAddress; import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -147,6 +148,15 @@ public static HttpResources set(LoopResources loops) { http2ConnectionProvider = new AtomicReference<>(); } + @Override + public void disposeWhen(SocketAddress remoteAddress) { + ConnectionProvider provider = http2ConnectionProvider.get(); + if (provider != null) { + provider.disposeWhen(remoteAddress); + } + super.disposeWhen(remoteAddress); + } + @Override public AddressResolverGroup getOrCreateDefaultResolver() { return super.getOrCreateDefaultResolver(); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index cf76c73902..f8cb3eb3b4 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -16,13 +16,12 @@ package reactor.netty.http.client; import io.netty.channel.Channel; -import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.ssl.ApplicationProtocolNames; -import io.netty.handler.ssl.SslHandler; import io.netty.resolver.AddressResolverGroup; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; @@ -37,7 +36,6 @@ import reactor.core.publisher.Operators; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; -import reactor.netty.NettyPipeline; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.channel.ChannelOperations; import reactor.netty.resources.ConnectionProvider; @@ -76,6 +74,9 @@ final class Http2ConnectionProvider extends PooledConnectionProvider Http2ConnectionProvider(ConnectionProvider parent) { super(initConfiguration(parent)); this.parent = parent; + if (parent instanceof PooledConnectionProvider) { + ((PooledConnectionProvider) parent).onDispose(disposeLater()); + } } static Builder initConfiguration(ConnectionProvider parent) { @@ -332,11 +333,12 @@ public void onUncaughtException(Connection connection, Throwable error) { @Override public void operationComplete(Future future) { Channel channel = pooledRef.poolable().channel(); - Http2FrameCodec frameCodec = channel.pipeline().get(Http2FrameCodec.class); + ChannelHandlerContext frameCodec = ((Http2Pool.Http2PooledRef) pooledRef).slot.http2FrameCodecCtx(); if (future.isSuccess()) { Http2StreamChannel ch = future.getNow(); - if (!channel.isActive() || frameCodec == null || !frameCodec.connection().local().canOpenStream()) { + if (!channel.isActive() || frameCodec == null || + !((Http2FrameCodec) frameCodec.handler()).connection().local().canOpenStream()) { invalidate(this); if (!retried) { if (log.isDebugEnabled()) { @@ -358,8 +360,8 @@ public void operationComplete(Future future) { sink.success(ops); } - Http2Connection.Endpoint localEndpoint = frameCodec.connection().local(); if (log.isDebugEnabled()) { + Http2Connection.Endpoint localEndpoint = ((Http2FrameCodec) frameCodec.handler()).connection().local(); logStreamsState(ch, localEndpoint, "Stream opened"); } } @@ -372,8 +374,8 @@ public void operationComplete(Future future) { boolean isH2cUpgrade() { Channel channel = pooledRef.poolable().channel(); - if (channel.pipeline().get(NettyPipeline.H2CUpgradeHandler) != null && - channel.pipeline().get(NettyPipeline.H2MultiplexHandler) == null) { + if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() != null && + ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) { ChannelOperations ops = ChannelOperations.get(channel); if (ops != null) { sink.success(ops); @@ -385,11 +387,9 @@ boolean isH2cUpgrade() { boolean notHttp2() { Channel channel = pooledRef.poolable().channel(); - ChannelPipeline pipeline = channel.pipeline(); - SslHandler handler = pipeline.get(SslHandler.class); - if (handler != null) { - String protocol = handler.applicationProtocol() != null ? handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1; - if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { + String applicationProtocol = ((Http2Pool.Http2PooledRef) pooledRef).slot.applicationProtocol; + if (applicationProtocol != null) { + if (ApplicationProtocolNames.HTTP_1_1.equals(applicationProtocol)) { // No information for the negotiated application-level protocol, // or it is HTTP/1.1, continue as an HTTP/1.1 request // and remove the connection from this pool. @@ -400,15 +400,15 @@ boolean notHttp2() { return true; } } - else if (!ApplicationProtocolNames.HTTP_2.equals(handler.applicationProtocol())) { + else if (!ApplicationProtocolNames.HTTP_2.equals(applicationProtocol)) { channel.attr(OWNER).set(null); invalidate(this); - sink.error(new IOException("Unknown protocol [" + protocol + "].")); + sink.error(new IOException("Unknown protocol [" + applicationProtocol + "].")); return true; } } - else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) == null && - pipeline.get(NettyPipeline.H2MultiplexHandler) == null) { + else if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() == null && + ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) { // It is not H2. There are no handlers for H2C upgrade/H2C prior-knowledge, // continue as an HTTP/1.1 request and remove the connection from this pool. ChannelOperations ops = ChannelOperations.get(channel); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index a86dd87f3b..8162e5df18 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -24,9 +24,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Function; +import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslHandler; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; @@ -36,8 +41,8 @@ import reactor.core.publisher.Operators; import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; -import reactor.netty.channel.ChannelOperations; +import reactor.netty.FutureMono; +import reactor.netty.NettyPipeline; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; @@ -59,7 +64,6 @@ *
    *
  • The connection is closed.
  • *
  • The connection has reached its life time and there are no active streams.
  • - *
  • The connection has no active streams.
  • *
  • When the client is in one of the two modes: 1) H2 and HTTP/1.1 or 2) H2C and HTTP/1.1, * and the negotiated protocol is HTTP/1.1.
  • *
@@ -75,9 +79,9 @@ *

* This pool always invalidate the {@link PooledRef}, there is no release functionality. *

    - *
  • {@link PoolMetrics#acquiredSize()} and {@link PoolMetrics#allocatedSize()} always return the number of - * the active streams from all connections currently in the pool.
  • - *
  • {@link PoolMetrics#idleSize()} always returns {@code 0}.
  • + *
  • {@link PoolMetrics#acquiredSize()}, {@link PoolMetrics#allocatedSize()} and {@link PoolMetrics#idleSize()} + * always return the number of the cached connections.
  • + *
  • {@link Http2Pool#activeStreams()} always return the active streams from all connections currently in the pool.
  • *
*

* Configurations that are not applicable @@ -114,6 +118,10 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. static final AtomicReferenceFieldUpdater CONNECTIONS = AtomicReferenceFieldUpdater.newUpdater(Http2Pool.class, ConcurrentLinkedQueue.class, "connections"); + volatile int idleSize; + private static final AtomicIntegerFieldUpdater IDLE_SIZE = + AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "idleSize"); + /** * Pending borrowers queue. Never invoke directly the poll/add/remove methods and instead of that, * use addPending/pollPending/removePending methods which take care of maintaining the pending queue size. @@ -171,12 +179,12 @@ public Mono> acquire(Duration timeout) { @Override public int acquiredSize() { - return acquired; + return allocatedSize() - idleSize(); } @Override public int allocatedSize() { - return acquired; + return poolConfig.allocationStrategy().permitGranted(); } @Override @@ -197,10 +205,19 @@ public Mono disposeLater() { p.fail(new PoolShutdownException()); } - // the last stream on that connection will release the connection to the parent pool - // the structure should not contain connections with 0 streams as the last stream on that connection - // always removes the connection from this pool - CONNECTIONS.getAndSet(this, null); + @SuppressWarnings("unchecked") + ConcurrentLinkedQueue slots = CONNECTIONS.getAndSet(this, null); + if (slots != null) { + Mono closeMonos = Mono.empty(); + while (!slots.isEmpty()) { + Slot slot = pollSlot(slots); + if (slot != null) { + slot.invalidate(); + closeMonos = closeMonos.and(DEFAULT_DESTROY_HANDLER.apply(slot.connection)); + } + } + return closeMonos; + } } return Mono.empty(); }); @@ -218,7 +235,7 @@ public int getMaxPendingAcquireSize() { @Override public int idleSize() { - return 0; + return idleSize; } @Override @@ -253,6 +270,10 @@ public Mono warmup() { return Mono.just(0); } + int activeStreams() { + return acquired; + } + void cancelAcquire(Borrower borrower) { if (!isDisposed()) { ConcurrentLinkedDeque q = pending; @@ -260,15 +281,32 @@ void cancelAcquire(Borrower borrower) { } } + @SuppressWarnings("FutureReturnValueIgnored") Mono destroyPoolable(Http2PooledRef ref) { + assert ref.slot.connection.channel().eventLoop().inEventLoop(); Mono mono = Mono.empty(); try { + // By default, check the connection for removal on acquire and invalidate (only if there are no active streams) if (ref.slot.decrementConcurrencyAndGet() == 0) { - ref.slot.invalidate(); - Connection connection = ref.poolable(); - Http2FrameCodec frameCodec = connection.channel().pipeline().get(Http2FrameCodec.class); - if (frameCodec != null) { - releaseConnection(connection); + // not HTTP/2 request + if (ref.slot.http2FrameCodecCtx() == null) { + ref.slot.invalidate(); + removeSlot(ref.slot); + } + // If there is eviction in background, the background process will remove this connection + else if (poolConfig.evictInBackgroundInterval().isZero()) { + // not active + if (!ref.poolable().channel().isActive()) { + ref.slot.invalidate(); + removeSlot(ref.slot); + } + // max life reached + else if (maxLifeReached(ref.slot)) { + //"FutureReturnValueIgnored" this is deliberate + ref.slot.connection.channel().close(); + ref.slot.invalidate(); + removeSlot(ref.slot); + } } } } @@ -315,27 +353,22 @@ void drainLoop() { if (slot != null) { Borrower borrower = pollPending(borrowers, true); if (borrower == null) { - resources.offer(slot); + offerSlot(resources, slot); continue; } if (isDisposed()) { borrower.fail(new PoolShutdownException()); return; } - if (slot.incrementConcurrencyAndGet() > 1) { - borrower.stopPendingCountdown(); - if (log.isDebugEnabled()) { - log.debug(format(slot.connection.channel(), "Channel activated")); - } - ACQUIRED.incrementAndGet(this); - // we are ready here, the connection can be used for opening another stream - slot.deactivate(); - poolConfig.acquisitionScheduler().schedule(() -> borrower.deliver(new Http2PooledRef(slot))); - } - else { - addPending(borrowers, borrower, true); - continue; + borrower.stopPendingCountdown(); + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Channel activated")); } + ACQUIRED.incrementAndGet(this); + slot.connection.channel().eventLoop().execute(() -> { + borrower.deliver(new Http2PooledRef(slot)); + drain(); + }); } else { int permits = poolConfig.allocationStrategy().getPermits(1); @@ -372,8 +405,6 @@ void drainLoop() { log.debug(format(newInstance.channel(), "Channel activated")); } ACQUIRED.incrementAndGet(this); - newSlot.incrementConcurrencyAndGet(); - newSlot.deactivate(); borrower.deliver(new Http2PooledRef(newSlot)); } else if (sig.isOnError()) { @@ -398,15 +429,16 @@ else if (sig.isOnError()) { } @Nullable + @SuppressWarnings("FutureReturnValueIgnored") Slot findConnection(ConcurrentLinkedQueue resources) { - int resourcesCount = resources.size(); + int resourcesCount = idleSize; while (resourcesCount > 0) { // There are connections in the queue resourcesCount--; // get the connection - Slot slot = resources.poll(); + Slot slot = pollSlot(resources); if (slot == null) { continue; } @@ -418,38 +450,40 @@ Slot findConnection(ConcurrentLinkedQueue resources) { log.debug(format(slot.connection.channel(), "Channel is closed, {} active streams"), slot.concurrency()); } - resources.offer(slot); + offerSlot(resources, slot); } else { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Channel is closed, remove from pool")); } - resources.remove(slot); + slot.invalidate(); } continue; } // check that the connection's max lifetime has not been reached - if (maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime) { + if (maxLifeReached(slot)) { if (slot.concurrency() > 0) { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max life time is reached, {} active streams"), slot.concurrency()); } - resources.offer(slot); + offerSlot(resources, slot); } else { if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool")); } - resources.remove(slot); + //"FutureReturnValueIgnored" this is deliberate + slot.connection.channel().close(); + slot.invalidate(); } continue; } // check that the connection's max active streams has not been reached if (!slot.canOpenStream()) { - resources.offer(slot); + offerSlot(resources, slot); if (log.isDebugEnabled()) { log.debug(format(slot.connection.channel(), "Max active streams is reached")); } @@ -462,6 +496,10 @@ Slot findConnection(ConcurrentLinkedQueue resources) { return null; } + boolean maxLifeReached(Slot slot) { + return maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime; + } + void pendingAcquireLimitReached(Borrower borrower, int maxPending) { if (maxPending == 0) { borrower.fail(new PoolAcquirePendingLimitException(0, @@ -530,33 +568,40 @@ int addPending(ConcurrentLinkedDeque borrowers, Borrower borrower, boo return PENDING_SIZE.incrementAndGet(this); } - static boolean offerSlot(Slot slot) { - @SuppressWarnings("unchecked") - ConcurrentLinkedQueue q = CONNECTIONS.get(slot.pool); - return q != null && q.offer(slot); + void offerSlot(@Nullable ConcurrentLinkedQueue slots, Slot slot) { + if (slots != null && slots.offer(slot)) { + IDLE_SIZE.incrementAndGet(this); + } } - static void releaseConnection(Connection connection) { - ChannelOperations ops = connection.as(ChannelOperations.class); - if (ops != null) { - ops.listener().onStateChange(ops, ConnectionObserver.State.DISCONNECTING); - } - else if (connection instanceof ConnectionObserver) { - ((ConnectionObserver) connection).onStateChange(connection, ConnectionObserver.State.DISCONNECTING); + @Nullable + Slot pollSlot(@Nullable ConcurrentLinkedQueue slots) { + if (slots == null) { + return null; } - else { - connection.dispose(); + Slot slot = slots.poll(); + if (slot != null) { + IDLE_SIZE.decrementAndGet(this); } + return slot; } - static void removeSlot(Slot slot) { + void removeSlot(Slot slot) { @SuppressWarnings("unchecked") ConcurrentLinkedQueue q = CONNECTIONS.get(slot.pool); - if (q != null) { - q.remove(slot); + if (q != null && q.remove(slot)) { + IDLE_SIZE.decrementAndGet(this); } } + static final Function> DEFAULT_DESTROY_HANDLER = + connection -> { + if (!connection.channel().isActive()) { + return Mono.empty(); + } + return FutureMono.from(connection.channel().close()); + }; + static final class Borrower extends AtomicBoolean implements Scannable, Subscription, Runnable { static final Disposable TIMEOUT_DISPOSED = Disposables.disposed(); @@ -589,7 +634,10 @@ Context currentContext() { @Override public void request(long n) { if (Operators.validate(n)) { - if (!acquireTimeout.isZero()) { + // Cannot rely on idleSize because there might be idle connections but not suitable for use + int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); + int pending = pool.pendingSize; + if (!acquireTimeout.isZero() && permits <= pending) { timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS); } pool.doAcquire(this); @@ -627,7 +675,9 @@ public String toString() { } void deliver(Http2PooledRef poolSlot) { - stopPendingCountdown(); + assert poolSlot.slot.connection.channel().eventLoop().inEventLoop(); + poolSlot.slot.incrementConcurrencyAndGet(); + poolSlot.slot.deactivate(); if (get()) { //CANCELLED or timeout reached poolSlot.invalidate().subscribe(aVoid -> {}, e -> Operators.onErrorDropped(e, Context.empty())); @@ -737,7 +787,7 @@ public String toString() { } } - static final class Slot { + static final class Slot extends AtomicBoolean { volatile int concurrency; static final AtomicIntegerFieldUpdater CONCURRENCY = @@ -746,18 +796,30 @@ static final class Slot { final Connection connection; final long creationTimestamp; final Http2Pool pool; + final String applicationProtocol; + + volatile ChannelHandlerContext http2FrameCodecCtx; + volatile ChannelHandlerContext http2MultiplexHandlerCtx; + volatile ChannelHandlerContext h2cUpgradeHandlerCtx; Slot(Http2Pool pool, Connection connection) { this.connection = connection; this.creationTimestamp = pool.clock.millis(); this.pool = pool; + SslHandler handler = connection.channel().pipeline().get(SslHandler.class); + if (handler != null) { + this.applicationProtocol = handler.applicationProtocol() != null ? + handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1; + } + else { + this.applicationProtocol = null; + } } boolean canOpenStream() { - Http2FrameCodec frameCodec = connection.channel().pipeline().get(Http2FrameCodec.class); - Http2MultiplexHandler multiplexHandler = connection.channel().pipeline().get(Http2MultiplexHandler.class); - if (frameCodec != null && multiplexHandler != null) { - int maxActiveStreams = frameCodec.connection().local().maxActiveStreams(); + ChannelHandlerContext frameCodec = http2FrameCodecCtx(); + if (frameCodec != null && http2MultiplexHandlerCtx() != null) { + int maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); int concurrency = this.concurrency; return concurrency < maxActiveStreams; } @@ -772,23 +834,59 @@ void deactivate() { if (log.isDebugEnabled()) { log.debug(format(connection.channel(), "Channel deactivated")); } - offerSlot(this); + @SuppressWarnings("unchecked") + ConcurrentLinkedQueue slots = CONNECTIONS.get(pool); + pool.offerSlot(slots, this); } int decrementConcurrencyAndGet() { return CONCURRENCY.decrementAndGet(this); } - int incrementConcurrencyAndGet() { - return CONCURRENCY.incrementAndGet(this); + @Nullable + ChannelHandlerContext http2FrameCodecCtx() { + ChannelHandlerContext ctx = http2FrameCodecCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(Http2FrameCodec.class); + http2FrameCodecCtx = ctx; + return ctx; + } + + @Nullable + ChannelHandlerContext http2MultiplexHandlerCtx() { + ChannelHandlerContext ctx = http2MultiplexHandlerCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(Http2MultiplexHandler.class); + http2MultiplexHandlerCtx = ctx; + return ctx; + } + + @Nullable + ChannelHandlerContext h2cUpgradeHandlerCtx() { + ChannelHandlerContext ctx = h2cUpgradeHandlerCtx; + if (ctx != null && !ctx.isRemoved()) { + return ctx; + } + ctx = connection.channel().pipeline().context(NettyPipeline.H2CUpgradeHandler); + h2cUpgradeHandlerCtx = ctx; + return ctx; + } + + void incrementConcurrencyAndGet() { + CONCURRENCY.incrementAndGet(this); } void invalidate() { - if (log.isDebugEnabled()) { - log.debug(format(connection.channel(), "Channel removed from pool")); + if (compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + log.debug(format(connection.channel(), "Channel removed from pool")); + } + pool.poolConfig.allocationStrategy().returnPermits(1); } - pool.poolConfig.allocationStrategy().returnPermits(1); - removeSlot(this); } long lifeTime() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java index 08bd8ee6af..3543fe1608 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java @@ -22,16 +22,21 @@ import java.net.SocketAddress; +import static reactor.netty.Metrics.ACTIVE_CONNECTIONS; import static reactor.netty.Metrics.ACTIVE_STREAMS; import static reactor.netty.Metrics.CONNECTION_PROVIDER_PREFIX; import static reactor.netty.Metrics.ID; +import static reactor.netty.Metrics.IDLE_CONNECTIONS; import static reactor.netty.Metrics.NAME; import static reactor.netty.Metrics.PENDING_STREAMS; import static reactor.netty.Metrics.REGISTRY; import static reactor.netty.Metrics.REMOTE_ADDRESS; final class MicrometerHttp2ConnectionProviderMeterRegistrar { + static final String ACTIVE_CONNECTIONS_DESCRIPTION = + "The number of the connections that have been successfully acquired and are in active use"; static final String ACTIVE_STREAMS_DESCRIPTION = "The number of the active HTTP/2 streams"; + static final String IDLE_CONNECTIONS_DESCRIPTION = "The number of the idle connections"; static final String PENDING_STREAMS_DESCRIPTION = "The number of requests that are waiting for opening HTTP/2 stream"; @@ -45,11 +50,21 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In String addressAsString = Metrics.formatSocketAddress(remoteAddress); Tags tags = Tags.of(ID, id, REMOTE_ADDRESS, addressAsString, NAME, poolName); - Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, metrics, InstrumentedPool.PoolMetrics::acquiredSize) + Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, metrics, InstrumentedPool.PoolMetrics::acquiredSize) + .description(ACTIVE_CONNECTIONS_DESCRIPTION) + .tags(tags) + .register(REGISTRY); + + Gauge.builder(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams()) .description(ACTIVE_STREAMS_DESCRIPTION) .tags(tags) .register(REGISTRY); + Gauge.builder(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, metrics, InstrumentedPool.PoolMetrics::idleSize) + .description(IDLE_CONNECTIONS_DESCRIPTION) + .tags(tags) + .register(REGISTRY); + Gauge.builder(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, metrics, InstrumentedPool.PoolMetrics::pendingAcquireSize) .description(PENDING_STREAMS_DESCRIPTION) .tags(tags) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index dc24581f27..ec68ef3669 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.netty.Connection; -import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; import reactor.netty.internal.shaded.reactor.pool.PoolBuilder; import reactor.netty.internal.shaded.reactor.pool.PoolConfig; @@ -38,6 +37,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -53,7 +53,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); try { List> acquired = new ArrayList<>(); @@ -61,21 +61,23 @@ void acquireInvalidate() { http2Pool.acquire().subscribe(acquired::add); http2Pool.acquire().subscribe(acquired::add); + channel.runPendingTasks(); + assertThat(acquired).hasSize(3); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3); + assertThat(http2Pool.activeStreams()).isEqualTo(3); for (PooledRef slot : acquired) { slot.invalidate().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); for (PooledRef slot : acquired) { // second invalidate() should be ignored and ACQUIRED size should remain the same slot.invalidate().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); } finally { channel.finishAndReleaseAll(); @@ -92,7 +94,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); try { List> acquired = new ArrayList<>(); @@ -100,21 +102,23 @@ void acquireRelease() { http2Pool.acquire().subscribe(acquired::add); http2Pool.acquire().subscribe(acquired::add); + channel.runPendingTasks(); + assertThat(acquired).hasSize(3); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3); + assertThat(http2Pool.activeStreams()).isEqualTo(3); for (PooledRef slot : acquired) { slot.release().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); for (PooledRef slot : acquired) { // second release() should be ignored and ACQUIRED size should remain the same slot.release().block(Duration.ofSeconds(1)); } - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); } finally { channel.finishAndReleaseAll(); @@ -141,7 +145,7 @@ void evictClosedConnection() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); @@ -153,18 +157,18 @@ void evictClosedConnection() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired2.poolable(); @@ -174,8 +178,8 @@ void evictClosedConnection() throws Exception { acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); } finally { if (connection != null) { @@ -186,12 +190,22 @@ void evictClosedConnection() throws Exception { } @Test - void evictClosedConnectionMaxConnectionsNotReached() throws Exception { + void evictClosedConnectionMaxConnectionsNotReached_1() throws Exception { + evictClosedConnectionMaxConnectionsNotReached(false); + } + + @Test + void evictClosedConnectionMaxConnectionsNotReached_2() throws Exception { + evictClosedConnectionMaxConnectionsNotReached(true); + } + + private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) throws Exception { PoolBuilder> poolBuilder = PoolBuilder.from(Mono.fromSupplier(() -> { Channel channel = new EmbeddedChannel( new TestChannelId(), - Http2FrameCodecBuilder.forClient().build()); + Http2FrameCodecBuilder.forClient().build(), + new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); return Connection.from(channel); })) .idleResourceReuseLruOrder() @@ -204,7 +218,7 @@ void evictClosedConnectionMaxConnectionsNotReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); @@ -216,25 +230,48 @@ void evictClosedConnectionMaxConnectionsNotReached() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); PooledRef acquired2 = http2Pool.acquire().block(); - assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(2); - assertThat(http2Pool.connections.size()).isEqualTo(2); + + AtomicReference> acquired3 = new AtomicReference<>(); + http2Pool.acquire().subscribe(acquired3::set); connection = acquired2.poolable(); - ChannelId id2 = connection.channel().id(); + ((EmbeddedChannel) connection.channel()).runPendingTasks(); + + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(http2Pool.connections.size()).isEqualTo(2); + + if (closeSecond) { + latch = new CountDownLatch(1); + ((EmbeddedChannel) connection.channel()).finishAndReleaseAll(); + connection.onDispose(latch::countDown); + connection.dispose(); + assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); + } + + ChannelId id2 = connection.channel().id(); assertThat(id1).isNotEqualTo(id2); acquired1.invalidate().block(); acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + acquired3.get().invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + if (closeSecond) { + assertThat(http2Pool.connections.size()).isEqualTo(0); + } + else { + assertThat(http2Pool.connections.size()).isEqualTo(1); + } } finally { if (connection != null) { @@ -263,7 +300,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); @@ -274,7 +311,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); http2Pool.acquire(Duration.ofMillis(10)) @@ -282,12 +319,12 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); } finally { @@ -318,7 +355,7 @@ void maxLifeTime() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection1 = acquired1.poolable(); @@ -326,18 +363,18 @@ void maxLifeTime() throws Exception { Thread.sleep(10); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection2 = acquired2.poolable(); @@ -347,8 +384,8 @@ void maxLifeTime() throws Exception { acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); } finally { if (connection1 != null) { @@ -374,7 +411,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 50)); Connection connection1 = null; Connection connection2 = null; @@ -382,21 +419,21 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); - Thread.sleep(10); + Thread.sleep(50); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(2); + assertThat(http2Pool.activeStreams()).isEqualTo(2); assertThat(http2Pool.connections.size()).isEqualTo(2); connection2 = acquired2.poolable(); @@ -407,8 +444,8 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { acquired1.invalidate().block(); acquired2.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); - assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); } finally { if (connection1 != null) { @@ -441,14 +478,14 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { PooledRef acquired1 = http2Pool.acquire().block(); assertThat(acquired1).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); connection = acquired1.poolable(); Thread.sleep(10); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); http2Pool.acquire(Duration.ofMillis(10)) @@ -456,12 +493,12 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); acquired1.invalidate().block(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); } finally { @@ -488,24 +525,25 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1)); assertThat(acquired).isNotNull(); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1); + assertThat(http2Pool.activeStreams()).isEqualTo(1); acquired.invalidate().block(Duration.ofSeconds(1)); - assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(0); } finally { channel.finishAndReleaseAll(); diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java index 6ee8a43053..32453a4c24 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java @@ -15,6 +15,10 @@ */ package reactor.netty.resources; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -28,7 +32,9 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -62,18 +68,41 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; +import static reactor.netty.Metrics.ACTIVE_CONNECTIONS; +import static reactor.netty.Metrics.CONNECTION_PROVIDER_PREFIX; +import static reactor.netty.Metrics.IDLE_CONNECTIONS; +import static reactor.netty.Metrics.NAME; +import static reactor.netty.Metrics.REMOTE_ADDRESS; +import static reactor.netty.Metrics.TOTAL_CONNECTIONS; +import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED; class DefaultPooledConnectionProviderTest extends BaseHttpTest { static SelfSignedCertificate ssc; + private MeterRegistry registry; + @BeforeAll static void createSelfSignedCertificate() throws CertificateException { ssc = new SelfSignedCertificate(); } + @BeforeEach + void setUp() { + registry = new SimpleMeterRegistry(); + Metrics.addRegistry(registry); + } + + @AfterEach + void tearDown() { + Metrics.removeRegistry(registry); + registry.clear(); + registry.close(); + } + @Test void testIssue903() { Http11SslContextSpec serverCtx = Http11SslContextSpec.forServer(ssc.key(), ssc.cert()); @@ -290,7 +319,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, } @Test - void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { + void testConnectionIdleWhenNoActiveStreams() throws Exception { Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); Http2SslContextSpec clientCtx = Http2SslContextSpec.forClient() @@ -307,19 +336,31 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { int requestsNum = 10; CountDownLatch latch = new CountDownLatch(1); DefaultPooledConnectionProvider provider = - (DefaultPooledConnectionProvider) ConnectionProvider.create("testConnectionReturnedToParentPoolWhenNoActiveStreams", 5); + (DefaultPooledConnectionProvider) ConnectionProvider.create("testConnectionIdleWhenNoActiveStreams", 5); AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); HttpClient client = createClient(provider, disposableServer.port()) - .wiretap(false) + .wiretap(false) .protocol(HttpProtocol.H2) .secure(spec -> spec.sslContext(clientCtx)) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) .observe((conn, state) -> { - if (state == ConnectionObserver.State.CONNECTED) { + if (state == STREAM_CONFIGURED) { counter.incrementAndGet(); - } - if (state == ConnectionObserver.State.RELEASED && counter.decrementAndGet() == 0) { - latch.countDown(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); } }); @@ -328,7 +369,7 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { .flatMap(i -> client.post() .uri("/") - .send(ByteBufMono.fromString(Mono.just("testConnectionReturnedToParentPoolWhenNoActiveStreams"))) + .send(ByteBufMono.fromString(Mono.just("testConnectionIdleWhenNoActiveStreams"))) .responseContent() .aggregate() .asString()) @@ -336,14 +377,16 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception { assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(provider.channelPools).hasSize(1); + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); - @SuppressWarnings({"unchecked", "rawtypes"}) - InstrumentedPool channelPool = - provider.channelPools.values().toArray(new InstrumentedPool[0])[0]; - InstrumentedPool.PoolMetrics metrics = channelPool.metrics(); - assertThat(metrics.acquiredSize()).isEqualTo(0); - assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize()); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testConnectionIdleWhenNoActiveStreams")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testConnectionIdleWhenNoActiveStreams"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "testConnectionIdleWhenNoActiveStreams"); + assertThat(totalConn).isEqualTo(idleConn); } finally { provider.disposeLater() @@ -442,21 +485,33 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie .bindNow(); DefaultPooledConnectionProvider provider = - (DefaultPooledConnectionProvider) ConnectionProvider.create("", 5); + (DefaultPooledConnectionProvider) ConnectionProvider.create("doTestIssue1982", 5); CountDownLatch latch = new CountDownLatch(1); AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); HttpClient mainClient = clientCtx != null ? HttpClient.create(provider).port(disposableServer.port()).secure(sslContextSpec -> sslContextSpec.sslContext(clientCtx)) : HttpClient.create(provider).port(disposableServer.port()); HttpClient client = mainClient.protocol(clientProtocols) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) .observe((conn, state) -> { - if (state == ConnectionObserver.State.CONNECTED) { + if (state == STREAM_CONFIGURED) { counter.incrementAndGet(); - } - if (state == ConnectionObserver.State.RELEASED && counter.decrementAndGet() == 0) { - latch.countDown(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); } }); try { @@ -471,12 +526,16 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - @SuppressWarnings({"unchecked", "rawtypes"}) - InstrumentedPool channelPool = - provider.channelPools.values().toArray(new InstrumentedPool[0])[0]; - InstrumentedPool.PoolMetrics metrics = channelPool.metrics(); - assertThat(metrics.acquiredSize()).isEqualTo(0); - assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize()); + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); + + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.doTestIssue1982")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.doTestIssue1982"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "doTestIssue1982"); + assertThat(totalConn).isEqualTo(idleConn); } finally { provider.disposeLater() @@ -512,4 +571,13 @@ public boolean trySuccess(Void result) { return r; } } + + private double getGaugeValue(String gaugeName, String... tags) { + Gauge gauge = registry.find(gaugeName).tags(tags).gauge(); + double result = -1; + if (gauge != null) { + result = gauge.value(); + } + return result; + } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java index ac752e082f..a466ff3982 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java @@ -207,15 +207,17 @@ private void doTest(HttpServer server, HttpClient client, String poolName, boole assertThat(metrics.get()).isTrue(); if (isSecured) { assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, poolName)).isEqualTo(1); - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(1); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, "http2." + poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, "http2." + poolName)).isEqualTo(1); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, "http2." + poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2." + poolName)).isEqualTo(0); } else { assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, poolName)).isEqualTo(0); - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(0); } - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(0); + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_CONNECTIONS, poolName)).isEqualTo(0); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_CONNECTIONS, poolName)).isEqualTo(expectedMaxConnection); assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_PENDING_CONNECTIONS, poolName)).isEqualTo(expectedMaxPendingAcquire); From d9ae62cbb28b3f7f79ad4dd69626fb7efb795d4f Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 6 Jun 2022 11:18:25 +0300 Subject: [PATCH 02/10] Add maxIdleTime to Http2Pool (#2257) Related to #2151 and #2262 --- .../resources/PooledConnectionProvider.java | 4 + .../http/client/Http2ConnectionProvider.java | 2 +- .../reactor/netty/http/client/Http2Pool.java | 32 ++++- .../netty/http/client/Http2PoolTest.java | 134 ++++++++++++++++-- 4 files changed, 158 insertions(+), 14 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index 25947313e2..0823c7562a 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -455,6 +455,10 @@ PoolBuilder> newPoolInternal( return poolBuilder; } + public long maxIdleTime() { + return this.maxIdleTime; + } + public long maxLifeTime() { return maxLifeTime; } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index f8cb3eb3b4..69f3f5ca7e 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -466,7 +466,7 @@ static final class PooledConnectionAllocator { this.remoteAddress = remoteAddress; this.resolver = resolver; this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime())); + poolConFig -> new Http2Pool(poolConFig, poolFactory.maxIdleTime(), poolFactory.maxLifeTime())); } Publisher connectChannel() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 8162e5df18..5261ba7226 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -64,6 +64,7 @@ *

    *
  • The connection is closed.
  • *
  • The connection has reached its life time and there are no active streams.
  • + *
  • The connection has reached its idle time and there are no active streams.
  • *
  • When the client is in one of the two modes: 1) H2 and HTTP/1.1 or 2) H2C and HTTP/1.1, * and the negotiated protocol is HTTP/1.1.
  • *
@@ -148,18 +149,20 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "wip"); final Clock clock; + final long maxIdleTime; final long maxLifeTime; final PoolConfig poolConfig; long lastInteractionTimestamp; - Http2Pool(PoolConfig poolConfig, long maxLifeTime) { + Http2Pool(PoolConfig poolConfig, long maxIdleTime, long maxLifeTime) { if (poolConfig.allocationStrategy().getPermits(0) != 0) { throw new IllegalArgumentException("No support for configuring minimum number of connections"); } this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); + this.maxIdleTime = maxIdleTime; this.maxLifeTime = maxLifeTime; this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; @@ -461,7 +464,18 @@ Slot findConnection(ConcurrentLinkedQueue resources) { continue; } - // check that the connection's max lifetime has not been reached + // check whether the connection's idle time has been reached + if (maxIdleTime != -1 && slot.idleTime() >= maxIdleTime) { + if (log.isDebugEnabled()) { + log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool")); + } + //"FutureReturnValueIgnored" this is deliberate + slot.connection.channel().close(); + slot.invalidate(); + continue; + } + + // check whether the connection's max lifetime has been reached if (maxLifeReached(slot)) { if (slot.concurrency() > 0) { if (log.isDebugEnabled()) { @@ -798,6 +812,8 @@ static final class Slot extends AtomicBoolean { final Http2Pool pool; final String applicationProtocol; + long idleTimestamp; + volatile ChannelHandlerContext http2FrameCodecCtx; volatile ChannelHandlerContext http2MultiplexHandlerCtx; volatile ChannelHandlerContext h2cUpgradeHandlerCtx; @@ -840,7 +856,17 @@ void deactivate() { } int decrementConcurrencyAndGet() { - return CONCURRENCY.decrementAndGet(this); + int concurrency = CONCURRENCY.decrementAndGet(this); + idleTimestamp = pool.clock.millis(); + return concurrency; + } + + long idleTime() { + if (concurrency() > 0) { + return 0L; + } + long idleTime = idleTimestamp != 0 ? idleTimestamp : creationTimestamp; + return pool.clock.millis() - idleTime; } @Nullable diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index ec68ef3669..bc1ec1a561 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -53,7 +53,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -94,7 +94,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -138,7 +138,7 @@ void evictClosedConnection() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); Connection connection = null; try { @@ -211,7 +211,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); Connection connection = null; try { @@ -293,7 +293,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); Connection connection = null; try { @@ -335,6 +335,120 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { } } + @Test + void maxIdleTime() throws Exception { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1)); + + Connection connection1 = null; + Connection connection2 = null; + try { + PooledRef acquired1 = http2Pool.acquire().block(); + + assertThat(acquired1).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection1 = acquired1.poolable(); + ChannelId id1 = connection1.channel().id(); + + acquired1.invalidate().block(); + + Thread.sleep(15); + + PooledRef acquired2 = http2Pool.acquire().block(); + + assertThat(acquired2).isNotNull(); + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection2 = acquired2.poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isNotEqualTo(id2); + + acquired2.invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + + @Test + void maxIdleTimeActiveStreams() throws Exception { + EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(), + Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.just(Connection.from(channel))) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 1); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1)); + + Connection connection1 = null; + Connection connection2 = null; + try { + List> acquired = new ArrayList<>(); + http2Pool.acquire().subscribe(acquired::add); + http2Pool.acquire().subscribe(acquired::add); + + channel.runPendingTasks(); + + assertThat(acquired).hasSize(2); + assertThat(http2Pool.activeStreams()).isEqualTo(2); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection1 = acquired.get(0).poolable(); + ChannelId id1 = connection1.channel().id(); + + acquired.get(0).invalidate().block(); + + Thread.sleep(15); + + assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.connections.size()).isEqualTo(1); + + connection2 = acquired.get(1).poolable(); + ChannelId id2 = connection2.channel().id(); + + assertThat(id1).isEqualTo(id2); + + acquired.get(1).invalidate().block(); + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.connections.size()).isEqualTo(1); + } + finally { + if (connection1 != null) { + ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll(); + connection1.dispose(); + } + if (connection2 != null) { + ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll(); + connection2.dispose(); + } + } + } + @Test void maxLifeTime() throws Exception { PoolBuilder> poolBuilder = @@ -347,7 +461,7 @@ void maxLifeTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10)); Connection connection1 = null; Connection connection2 = null; @@ -411,7 +525,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 50)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 50)); Connection connection1 = null; Connection connection2 = null; @@ -471,7 +585,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10)); Connection connection = null; try { @@ -514,7 +628,7 @@ void minConnectionsConfigNotSupported() { PoolBuilder> poolBuilder = PoolBuilder.from(Mono.empty()).sizeBetween(1, 2); assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1))); + .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1, -1))); } @Test @@ -525,7 +639,7 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1)); From dae5de852a9966146471ba291de3670292959d10 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 6 Jun 2022 11:22:08 +0300 Subject: [PATCH 03/10] Add evictInBackground to Http2Pool (#2257) Related to #2151 and #2262 --- .../reactor/netty/http/client/Http2Pool.java | 88 +++++++- .../netty/http/client/Http2PoolTest.java | 207 ++++++++++++++++++ 2 files changed, 291 insertions(+), 4 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 5261ba7226..ccf34fb809 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -17,6 +17,7 @@ import java.time.Clock; import java.time.Duration; +import java.util.Iterator; import java.util.Objects; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; @@ -88,9 +89,6 @@ * Configurations that are not applicable * *

+ * If minimum connections is specified, the cached connections with active streams will be kept at that minimum + * (can be the best effort). However, if the cached connections have reached max concurrent streams, + * then new connections will be allocated up to the maximum connections limit. + *

* Configurations that are not applicable *

    *
  • {@link PoolConfig#destroyHandler()} - the destroy handler cannot be used as the destruction is more complex.
  • @@ -97,7 +103,6 @@ *
  • {@link PoolConfig#reuseIdleResourcesInLruOrder()} - FIFO is used when checking the connections.
  • *
  • FIFO is used when obtaining the pending borrowers
  • *
  • Warm up functionality is not supported
  • - *
  • Setting minimum connections configuration is not supported
  • *
*

This class is based on * https://github.com/reactor/reactor-pool/blob/v0.2.7/src/main/java/reactor/pool/SimpleDequePool.java @@ -142,28 +147,35 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. @SuppressWarnings("rawtypes") static final ConcurrentLinkedDeque TERMINATED = new ConcurrentLinkedDeque(); + volatile long totalMaxConcurrentStreams; + static final AtomicLongFieldUpdater TOTAL_MAX_CONCURRENT_STREAMS = + AtomicLongFieldUpdater.newUpdater(Http2Pool.class, "totalMaxConcurrentStreams"); + volatile int wip; static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "wip"); final Clock clock; + final Long maxConcurrentStreams; final long maxIdleTime; final long maxLifeTime; + final int minConnections; final PoolConfig poolConfig; long lastInteractionTimestamp; Disposable evictionTask; - Http2Pool(PoolConfig poolConfig, long maxIdleTime, long maxLifeTime) { - if (poolConfig.allocationStrategy().getPermits(0) != 0) { - throw new IllegalArgumentException("No support for configuring minimum number of connections"); - } + Http2Pool(PoolConfig poolConfig, @Nullable ConnectionProvider.AllocationStrategy allocationStrategy, + long maxIdleTime, long maxLifeTime) { this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); + this.maxConcurrentStreams = allocationStrategy instanceof Http2AllocationStrategy ? + ((Http2AllocationStrategy) allocationStrategy).maxConcurrentStreams() : -1; this.maxIdleTime = maxIdleTime; this.maxLifeTime = maxLifeTime; + this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum(); this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; @@ -356,7 +368,10 @@ void drainLoop() { if (borrowersCount != 0) { // find a connection that can be used for opening a new stream - Slot slot = findConnection(resources); + // when cached connections are below minimum connections, then allocate a new connection + boolean belowMinConnections = minConnections > 0 && + poolConfig.allocationStrategy().permitGranted() < minConnections; + Slot slot = belowMinConnections ? null : findConnection(resources); if (slot != null) { Borrower borrower = pollPending(borrowers, true); if (borrower == null) { @@ -378,52 +393,64 @@ void drainLoop() { }); } else { - int permits = poolConfig.allocationStrategy().getPermits(1); - if (permits <= 0) { - if (maxPending >= 0) { - borrowersCount = pendingSize; - int toCull = borrowersCount - maxPending; - for (int i = 0; i < toCull; i++) { - Borrower extraneous = pollPending(borrowers, true); - if (extraneous != null) { - pendingAcquireLimitReached(extraneous, maxPending); - } - } - } + int resourcesCount = idleSize; + if (minConnections > 0 && + poolConfig.allocationStrategy().permitGranted() >= minConnections && + resourcesCount == 0) { + // connections allocations were triggered } else { - Borrower borrower = pollPending(borrowers, true); - if (borrower == null) { - continue; + int permits = poolConfig.allocationStrategy().getPermits(1); + if (permits <= 0) { + if (maxPending >= 0) { + borrowersCount = pendingSize; + int toCull = borrowersCount - maxPending; + for (int i = 0; i < toCull; i++) { + Borrower extraneous = pollPending(borrowers, true); + if (extraneous != null) { + pendingAcquireLimitReached(extraneous, maxPending); + } + } + } } - if (isDisposed()) { - borrower.fail(new PoolShutdownException()); - return; + else { + if (permits > 1) { + // warmup is not supported + poolConfig.allocationStrategy().returnPermits(permits - 1); + } + Borrower borrower = pollPending(borrowers, true); + if (borrower == null) { + continue; + } + if (isDisposed()) { + borrower.fail(new PoolShutdownException()); + return; + } + borrower.stopPendingCountdown(); + Mono allocator = poolConfig.allocator(); + Mono primary = + allocator.doOnEach(sig -> { + if (sig.isOnNext()) { + Connection newInstance = sig.get(); + assert newInstance != null; + Slot newSlot = new Slot(this, newInstance); + if (log.isDebugEnabled()) { + log.debug(format(newInstance.channel(), "Channel activated")); + } + ACQUIRED.incrementAndGet(this); + borrower.deliver(new Http2PooledRef(newSlot)); + } + else if (sig.isOnError()) { + Throwable error = sig.getThrowable(); + assert error != null; + poolConfig.allocationStrategy().returnPermits(1); + borrower.fail(error); + } + }) + .contextWrite(borrower.currentContext()); + + primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain); } - borrower.stopPendingCountdown(); - Mono allocator = poolConfig.allocator(); - Mono primary = - allocator.doOnEach(sig -> { - if (sig.isOnNext()) { - Connection newInstance = sig.get(); - assert newInstance != null; - Slot newSlot = new Slot(this, newInstance); - if (log.isDebugEnabled()) { - log.debug(format(newInstance.channel(), "Channel activated")); - } - ACQUIRED.incrementAndGet(this); - borrower.deliver(new Http2PooledRef(newSlot)); - } - else if (sig.isOnError()) { - Throwable error = sig.getThrowable(); - assert error != null; - poolConfig.allocationStrategy().returnPermits(1); - borrower.fail(error); - } - }) - .contextWrite(borrower.currentContext()); - - primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain); } } } @@ -728,10 +755,10 @@ Context currentContext() { @Override public void request(long n) { if (Operators.validate(n)) { - // Cannot rely on idleSize because there might be idle connections but not suitable for use + long estimateStreamsCount = pool.totalMaxConcurrentStreams - pool.acquired; int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); int pending = pool.pendingSize; - if (!acquireTimeout.isZero() && permits <= pending) { + if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) { timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS); } pool.doAcquire(this); @@ -893,6 +920,7 @@ static final class Slot extends AtomicBoolean { final String applicationProtocol; long idleTimestamp; + long maxConcurrentStreams; volatile ChannelHandlerContext http2FrameCodecCtx; volatile ChannelHandlerContext http2MultiplexHandlerCtx; @@ -910,12 +938,26 @@ static final class Slot extends AtomicBoolean { else { this.applicationProtocol = null; } + ChannelHandlerContext frameCodec = http2FrameCodecCtx(); + if (frameCodec != null && http2MultiplexHandlerCtx() != null) { + this.maxConcurrentStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); + this.maxConcurrentStreams = pool.maxConcurrentStreams == -1 ? maxConcurrentStreams : + Math.min(pool.maxConcurrentStreams, maxConcurrentStreams); + } + TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, this.maxConcurrentStreams); } boolean canOpenStream() { ChannelHandlerContext frameCodec = http2FrameCodecCtx(); if (frameCodec != null && http2MultiplexHandlerCtx() != null) { - int maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); + long maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams(); + maxActiveStreams = pool.maxConcurrentStreams == -1 ? maxActiveStreams : + Math.min(pool.maxConcurrentStreams, maxActiveStreams); + long diff = maxActiveStreams - maxConcurrentStreams; + if (diff != 0) { + maxConcurrentStreams = maxActiveStreams; + TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, diff); + } int concurrency = this.concurrency; return concurrency < maxActiveStreams; } @@ -992,6 +1034,7 @@ void invalidate() { log.debug(format(connection.channel(), "Channel removed from pool")); } pool.poolConfig.allocationStrategy().returnPermits(1); + TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, -maxConcurrentStreams); } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java new file mode 100644 index 0000000000..d708754cf2 --- /dev/null +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.client; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MAX_CONCURRENT_STREAMS; +import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MAX_CONNECTIONS; +import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MIN_CONNECTIONS; + +class Http2AllocationStrategyTest { + private Http2AllocationStrategy.Builder builder; + + @BeforeEach + void setUp() { + builder = Http2AllocationStrategy.builder(); + } + + @Test + void build() { + builder.maxConcurrentStreams(2).maxConnections(2).minConnections(1); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(2); + assertThat(strategy.permitMaximum()).isEqualTo(2); + assertThat(strategy.permitMinimum()).isEqualTo(1); + } + + @Test + void buildBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxConnections(1).minConnections(2).build()) + .withMessage("minConnections (2) must be less than or equal to maxConnections (1)"); + } + + @Test + void copy() { + builder.maxConcurrentStreams(2).maxConnections(2).minConnections(1); + Http2AllocationStrategy strategy = builder.build(); + Http2AllocationStrategy copy = strategy.copy(); + assertThat(copy.maxConcurrentStreams()).isEqualTo(strategy.maxConcurrentStreams()); + assertThat(copy.permitMaximum()).isEqualTo(strategy.permitMaximum()); + assertThat(copy.permitMinimum()).isEqualTo(strategy.permitMinimum()); + } + + @Test + void maxConcurrentStreams() { + builder.maxConcurrentStreams(2); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(2); + assertThat(strategy.permitMaximum()).isEqualTo(DEFAULT_MAX_CONNECTIONS); + assertThat(strategy.permitMinimum()).isEqualTo(DEFAULT_MIN_CONNECTIONS); + } + + @Test + void maxConcurrentStreamsBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxConcurrentStreams(-2)) + .withMessage("maxConcurrentStreams must be greater than or equal to -1"); + } + + @Test + void permitMaximum() { + builder.maxConnections(2); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(DEFAULT_MAX_CONCURRENT_STREAMS); + assertThat(strategy.permitMaximum()).isEqualTo(2); + assertThat(strategy.permitMinimum()).isEqualTo(DEFAULT_MIN_CONNECTIONS); + } + + @Test + void permitMaximumBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.maxConnections(0)) + .withMessage("maxConnections must be strictly positive"); + } + + @Test + void permitMinimum() { + builder.minConnections(2); + Http2AllocationStrategy strategy = builder.build(); + assertThat(strategy.maxConcurrentStreams()).isEqualTo(DEFAULT_MAX_CONCURRENT_STREAMS); + assertThat(strategy.permitMaximum()).isEqualTo(DEFAULT_MAX_CONNECTIONS); + assertThat(strategy.permitMinimum()).isEqualTo(2); + } + + @Test + void permitMinimumBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.minConnections(-1)) + .withMessage("minConnections must be positive or zero"); + } +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index ec317c8828..6780f795c2 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2MultiplexHandler; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; @@ -40,7 +41,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; class Http2PoolTest { @@ -53,7 +53,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -65,12 +65,14 @@ void acquireInvalidate() { assertThat(acquired).hasSize(3); assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { slot.invalidate().block(Duration.ofSeconds(1)); } assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { // second invalidate() should be ignored and ACQUIRED size should remain the same @@ -78,6 +80,7 @@ void acquireInvalidate() { } assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } finally { channel.finishAndReleaseAll(); @@ -94,7 +97,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); try { List> acquired = new ArrayList<>(); @@ -106,12 +109,14 @@ void acquireRelease() { assertThat(acquired).hasSize(3); assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { slot.release().block(Duration.ofSeconds(1)); } assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); for (PooledRef slot : acquired) { // second release() should be ignored and ACQUIRED size should remain the same @@ -119,6 +124,7 @@ void acquireRelease() { } assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } finally { channel.finishAndReleaseAll(); @@ -138,7 +144,7 @@ void evictClosedConnection() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); Connection connection = null; try { @@ -147,6 +153,7 @@ void evictClosedConnection() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); ChannelId id1 = connection.channel().id(); @@ -159,17 +166,20 @@ void evictClosedConnection() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired2.poolable(); ChannelId id2 = connection.channel().id(); @@ -180,6 +190,7 @@ void evictClosedConnection() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -211,7 +222,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); Connection connection = null; try { @@ -220,6 +231,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); connection = acquired1.poolable(); ChannelId id1 = connection.channel().id(); @@ -232,6 +244,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); @@ -244,6 +257,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) assertThat(http2Pool.activeStreams()).isEqualTo(3); assertThat(http2Pool.connections.size()).isEqualTo(2); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(2L * Integer.MAX_VALUE); if (closeSecond) { latch = new CountDownLatch(1); @@ -262,15 +276,18 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); acquired3.get().invalidate().block(); assertThat(http2Pool.activeStreams()).isEqualTo(0); if (closeSecond) { assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } else { assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } } finally { @@ -293,7 +310,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); Connection connection = null; try { @@ -302,6 +319,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); CountDownLatch latch = new CountDownLatch(1); @@ -313,6 +331,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) @@ -321,11 +340,13 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -357,6 +378,7 @@ void evictInBackgroundClosedConnection() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); ChannelId id1 = connection.channel().id(); @@ -369,6 +391,7 @@ void evictInBackgroundClosedConnection() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); @@ -376,12 +399,14 @@ void evictInBackgroundClosedConnection() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired2.poolable(); ChannelId id2 = connection.channel().id(); @@ -394,6 +419,7 @@ void evictInBackgroundClosedConnection() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -426,6 +452,7 @@ void evictInBackgroundMaxIdleTime() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); @@ -438,12 +465,14 @@ void evictInBackgroundMaxIdleTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -458,6 +487,7 @@ void evictInBackgroundMaxIdleTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -494,6 +524,7 @@ void evictInBackgroundMaxLifeTime() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); @@ -502,6 +533,7 @@ void evictInBackgroundMaxLifeTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); @@ -509,12 +541,14 @@ void evictInBackgroundMaxLifeTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -529,6 +563,7 @@ void evictInBackgroundMaxLifeTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -554,7 +589,7 @@ void maxIdleTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1)); Connection connection1 = null; Connection connection2 = null; @@ -564,6 +599,7 @@ void maxIdleTime() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); @@ -577,6 +613,7 @@ void maxIdleTime() throws Exception { assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -587,6 +624,7 @@ void maxIdleTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -609,7 +647,7 @@ void maxIdleTimeActiveStreams() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1)); Connection connection1 = null; Connection connection2 = null; @@ -623,6 +661,7 @@ void maxIdleTimeActiveStreams() throws Exception { assertThat(acquired).hasSize(2); assertThat(http2Pool.activeStreams()).isEqualTo(2); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); connection1 = acquired.get(0).poolable(); ChannelId id1 = connection1.channel().id(); @@ -633,6 +672,7 @@ void maxIdleTimeActiveStreams() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); connection2 = acquired.get(1).poolable(); ChannelId id2 = connection2.channel().id(); @@ -643,6 +683,7 @@ void maxIdleTimeActiveStreams() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); } finally { if (connection1 != null) { @@ -668,7 +709,7 @@ void maxLifeTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); Connection connection1 = null; Connection connection2 = null; @@ -678,6 +719,7 @@ void maxLifeTime() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); @@ -686,17 +728,20 @@ void maxLifeTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -707,6 +752,7 @@ void maxLifeTime() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -732,7 +778,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 50)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 50)); Connection connection1 = null; Connection connection2 = null; @@ -742,6 +788,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection1 = acquired1.poolable(); ChannelId id1 = connection1.channel().id(); @@ -750,12 +797,14 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); PooledRef acquired2 = http2Pool.acquire().block(); assertThat(acquired2).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(2); assertThat(http2Pool.connections.size()).isEqualTo(2); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection2 = acquired2.poolable(); ChannelId id2 = connection2.channel().id(); @@ -767,6 +816,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection1 != null) { @@ -792,7 +842,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); Connection connection = null; try { @@ -801,6 +851,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { assertThat(acquired1).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); connection = acquired1.poolable(); @@ -808,6 +859,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) @@ -816,11 +868,13 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { assertThat(http2Pool.activeStreams()).isEqualTo(1); assertThat(http2Pool.connections.size()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); acquired1.invalidate().block(); assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { if (connection != null) { @@ -831,11 +885,101 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { } @Test - void minConnectionsConfigNotSupported() { + void minConnections() { + EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(), + Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); PoolBuilder> poolBuilder = - PoolBuilder.from(Mono.empty()).sizeBetween(1, 2); - assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1, -1))); + PoolBuilder.from(Mono.just(Connection.from(channel))) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(1, 3); + Http2AllocationStrategy strategy = Http2AllocationStrategy.builder() + .maxConnections(3) + .minConnections(1) + .build(); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1)); + + List> acquired = new ArrayList<>(); + try { + Flux.range(0, 3) + .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) + .subscribe(); + + channel.runPendingTasks(); + + assertThat(acquired).hasSize(3); + + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(1).poolable()); + assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(2).poolable()); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); + + for (PooledRef slot : acquired) { + slot.release().block(Duration.ofSeconds(1)); + } + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE); + } + finally { + for (PooledRef slot : acquired) { + Connection conn = slot.poolable(); + ((EmbeddedChannel) conn.channel()).finishAndReleaseAll(); + conn.dispose(); + } + } + } + + @Test + void minConnectionsMaxStreamsReached() { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(1, 3); + Http2AllocationStrategy strategy = Http2AllocationStrategy.builder() + .maxConnections(3) + .minConnections(1) + .build(); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1)); + + List> acquired = new ArrayList<>(); + try { + Flux.range(0, 3) + .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) + .blockLast(Duration.ofSeconds(1)); + + assertThat(acquired).hasSize(3); + + for (PooledRef pooledRef : acquired) { + ((EmbeddedChannel) pooledRef.poolable().channel()).runPendingTasks(); + } + + assertThat(http2Pool.activeStreams()).isEqualTo(3); + assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(1).poolable()); + assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(2).poolable()); + assertThat(acquired.get(1).poolable()).isNotSameAs(acquired.get(2).poolable()); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + + for (PooledRef slot : acquired) { + slot.release().block(Duration.ofSeconds(1)); + } + + assertThat(http2Pool.activeStreams()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); + } + finally { + for (PooledRef slot : acquired) { + Connection conn = slot.poolable(); + ((EmbeddedChannel) conn.channel()).finishAndReleaseAll(); + conn.dispose(); + } + } } @Test @@ -846,13 +990,14 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1)); assertThat(acquired).isNotNull(); assertThat(http2Pool.activeStreams()).isEqualTo(1); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); http2Pool.acquire(Duration.ofMillis(10)) .as(StepVerifier::create) @@ -865,6 +1010,7 @@ void nonHttp2ConnectionEmittedOnce() { assertThat(http2Pool.activeStreams()).isEqualTo(0); assertThat(http2Pool.connections.size()).isEqualTo(0); + assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0); } finally { channel.finishAndReleaseAll(); diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java index 32453a4c24..0cad951be8 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java @@ -49,6 +49,7 @@ import reactor.netty.http.Http11SslContextSpec; import reactor.netty.http.Http2SslContextSpec; import reactor.netty.http.HttpProtocol; +import reactor.netty.http.client.Http2AllocationStrategy; import reactor.netty.http.client.HttpClient; import reactor.netty.http.server.HttpServer; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; @@ -543,6 +544,86 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie } } + //https://github.com/reactor/reactor-netty/issues/1808 + @Test + void testMinConnections() throws Exception { + Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); + Http2SslContextSpec clientCtx = + Http2SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + + disposableServer = + createServer() + .wiretap(false) + .protocol(HttpProtocol.H2) + .secure(spec -> spec.sslContext(serverCtx)) + .route(routes -> routes.post("/", (req, res) -> res.send(req.receive().retain()))) + .bindNow(); + + int requestsNum = 100; + CountDownLatch latch = new CountDownLatch(1); + DefaultPooledConnectionProvider provider = + (DefaultPooledConnectionProvider) ConnectionProvider.builder("testMinConnections") + .allocationStrategy(Http2AllocationStrategy.builder().maxConnections(20).minConnections(5).build()) + .build(); + AtomicInteger counter = new AtomicInteger(); + AtomicReference serverAddress = new AtomicReference<>(); + HttpClient client = + createClient(provider, disposableServer.port()) + .wiretap(false) + .protocol(HttpProtocol.H2) + .secure(spec -> spec.sslContext(clientCtx)) + .metrics(true, Function.identity()) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) + .observe((conn, state) -> { + if (state == STREAM_CONFIGURED) { + counter.incrementAndGet(); + conn.onTerminate() + .subscribe(null, + t -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }), + () -> conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + })); + } + }); + + try { + Flux.range(0, requestsNum) + .flatMap(i -> + client.post() + .uri("/") + .send(ByteBufMono.fromString(Mono.just("testMinConnections"))) + .responseContent() + .aggregate() + .asString()) + .blockLast(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + + InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); + String address = sa.getHostString() + ":" + sa.getPort(); + + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testMinConnections")).isEqualTo(0); + double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "http2.testMinConnections"); + double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, + REMOTE_ADDRESS, address, NAME, "testMinConnections"); + assertThat(totalConn).isEqualTo(idleConn); + assertThat(totalConn).isLessThan(10); + } + finally { + provider.disposeLater() + .block(Duration.ofSeconds(5)); + } + } + static final class TestPromise extends DefaultChannelPromise { final ChannelPromise parent; From d5d0e97e5040eec93d35801389b46fdae19eb285 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Thu, 9 Jun 2022 11:12:25 +0200 Subject: [PATCH 07/10] Allow to configure connection pool aquire timers (#2175) Added an API to provide a hook function for changing the default timer implementation for the pending acquire timers. --- .../netty/resources/ConnectionProvider.java | 41 +++++++++++++++++++ .../resources/PooledConnectionProvider.java | 7 ++++ .../resources/ConnectionProviderTest.java | 6 +++ .../reactor/netty/http/client/Http2Pool.java | 5 +-- .../java/reactor/netty/http/Http2Tests.java | 34 ++++++++++++--- .../netty/http/client/Http2PoolTest.java | 24 +++++++++++ 6 files changed, 109 insertions(+), 8 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java index db831832c9..1a641ac0f2 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java @@ -18,6 +18,7 @@ import io.netty.resolver.AddressResolverGroup; import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.ReactorNetty; @@ -32,6 +33,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Supplier; @@ -449,6 +451,7 @@ class ConnectionPoolSpec> implements Suppl boolean metricsEnabled; String leasingStrategy = DEFAULT_POOL_LEASING_STRATEGY; Supplier registrar; + BiFunction pendingAcquireTimer; AllocationStrategy allocationStrategy; /** @@ -473,6 +476,7 @@ private ConnectionPoolSpec() { this.metricsEnabled = copy.metricsEnabled; this.leasingStrategy = copy.leasingStrategy; this.registrar = copy.registrar; + this.pendingAcquireTimer = copy.pendingAcquireTimer; this.allocationStrategy = copy.allocationStrategy; } @@ -648,6 +652,43 @@ public final SPEC evictInBackground(Duration evictionInterval) { return get(); } + /** + * Set the option to use for configuring {@link ConnectionProvider} pending acquire timer. + * The pending acquire timer must be specified as a function which is used to schedule a pending acquire timeout + * when there is no idle connection and no new connection can be created currently. + * The function takes as argument a {@link Duration} which is the one configured by {@link #pendingAcquireTimeout(Duration)}. + *

+ * Use this function if you want to specify your own implementation for scheduling pending acquire timers. + * + *

Default to {@link Schedulers#parallel()}. + * + *

Examples using Netty HashedWheelTimer implementation:

+ *
+		 * {@code
+		 * final static HashedWheelTimer wheel = new HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024);
+		 *
+		 * HttpClient client = HttpClient.create(
+		 *     ConnectionProvider.builder("myprovider")
+		 *         .pendingAcquireTimeout(Duration.ofMillis(10000))
+		 *         .pendingAcquireTimer((r, d) -> {
+		 *             Timeout t = wheel.newTimeout(timeout -> r.run(), d.toMillis(), TimeUnit.MILLISECONDS);
+		 *             return () -> t.cancel();
+		 *         })
+		 *         .build());
+		 * }
+		 * 
+ * + * @param pendingAcquireTimer the function to apply when scheduling pending acquire timers + * @return {@literal this} + * @throws NullPointerException if pendingAcquireTimer is null + * @since 1.0.20 + * @see #pendingAcquireTimeout(Duration) + */ + public final SPEC pendingAcquireTimer(BiFunction pendingAcquireTimer) { + this.pendingAcquireTimer = Objects.requireNonNull(pendingAcquireTimer, "pendingAcquireTimer"); + return get(); + } + /** * Limits in how many connections can be allocated and managed by the pool are driven by the * provided {@link AllocationStrategy}. This is a customization escape hatch that replaces the last diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index e065c4f268..d010a9af64 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.function.BiPredicate; import java.util.function.Function; import java.util.function.Supplier; @@ -369,6 +370,7 @@ protected static final class PoolFactory { final Supplier registrar; final Clock clock; final Duration disposeTimeout; + final BiFunction pendingAcquireTimer; final AllocationStrategy allocationStrategy; PoolFactory(ConnectionPoolSpec conf, Duration disposeTimeout) { @@ -389,6 +391,7 @@ protected static final class PoolFactory { this.registrar = conf.registrar; this.clock = clock; this.disposeTimeout = disposeTimeout; + this.pendingAcquireTimer = conf.pendingAcquireTimer; this.allocationStrategy = conf.allocationStrategy; } @@ -447,6 +450,10 @@ PoolBuilder> newPoolInternal( } } + if (pendingAcquireTimer != null) { + poolBuilder = poolBuilder.pendingAcquireTimer(pendingAcquireTimer); + } + if (clock != null) { poolBuilder = poolBuilder.clock(clock); } diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java index 64802ac61a..ffc289b3a5 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java @@ -16,12 +16,14 @@ package reactor.netty.resources; import org.junit.jupiter.api.Test; +import reactor.core.Disposable; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.function.BiFunction; import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; @@ -31,6 +33,7 @@ class ConnectionProviderTest { static final TestAllocationStrategy TEST_ALLOCATION_STRATEGY = new TestAllocationStrategy(); static final String TEST_STRING = ""; static final Supplier TEST_SUPPLIER = () -> (a, b, c, d) -> {}; + static final BiFunction TEST_BI_FUNCTION = (r, duration) -> () -> {}; @Test void testBuilderCopyConstructor() throws IllegalAccessException { @@ -74,6 +77,9 @@ else if (boolean.class == clazz) { else if (int.class == clazz) { field.setInt(builder, 1); } + else if (BiFunction.class == clazz) { + field.set(builder, TEST_BI_FUNCTION); + } else { throw new IllegalArgumentException("Unknown field type " + clazz); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 11f5b639b4..733a0078c5 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -41,7 +41,6 @@ import reactor.core.Scannable; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; -import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; import reactor.netty.FutureMono; import reactor.netty.NettyPipeline; @@ -759,7 +758,7 @@ public void request(long n) { int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); int pending = pool.pendingSize; if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) { - timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS); + timeoutTask = pool.poolConfig.pendingAcquireTimer().apply(this, acquireTimeout); } pool.doAcquire(this); } @@ -1042,4 +1041,4 @@ long lifeTime() { return pool.clock.millis() - creationTimestamp; } } -} \ No newline at end of file +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java b/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java index 59b77318a4..ebd06044da 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java @@ -21,9 +21,11 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +import reactor.core.scheduler.Schedulers; import reactor.netty.BaseHttpTest; import reactor.netty.ByteBufFlux; import reactor.netty.ByteBufMono; @@ -43,6 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.IntStream; @@ -159,14 +162,35 @@ private void doTestIssue1071(int length, String expectedResponse, int expectedCo @Test void testMaxActiveStreams_1_CustomPool() throws Exception { - ConnectionProvider provider = + doTestMaxActiveStreams_1_CustomPool(null); + } + + @Test + void testMaxActiveStreams_1_CustomPool_Custom_AcquireTimer() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + BiFunction timer = (r, d) -> { + Runnable wrapped = () -> { + r.run(); + latch.countDown(); + }; + return Schedulers.single().schedule(wrapped, d.toNanos(), TimeUnit.NANOSECONDS); + }; + doTestMaxActiveStreams_1_CustomPool(timer); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + void doTestMaxActiveStreams_1_CustomPool(BiFunction pendingAcquireTimer) throws Exception { + ConnectionProvider.Builder builder = ConnectionProvider.builder("testMaxActiveStreams_1_CustomPool") - .maxConnections(1) - .pendingAcquireTimeout(Duration.ofMillis(10)) // the default is 45s - .build(); + .maxConnections(1) + .pendingAcquireTimeout(Duration.ofMillis(10)); // the default is 45s + if (pendingAcquireTimer != null) { + builder = builder.pendingAcquireTimer(pendingAcquireTimer); + } + ConnectionProvider provider = builder.build(); doTestMaxActiveStreams(HttpClient.create(provider), 1, 1, 1); provider.disposeLater() - .block(); + .block(); } @Test diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index 6780f795c2..25f99f4ff4 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -22,8 +22,10 @@ import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2MultiplexHandler; import org.junit.jupiter.api.Test; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; import reactor.netty.internal.shaded.reactor.pool.PoolBuilder; @@ -38,6 +40,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; @@ -832,6 +835,24 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { @Test void maxLifeTimeMaxConnectionsReached() throws Exception { + doMaxLifeTimeMaxConnectionsReached(null); + } + + @Test + void maxLifeTimeMaxConnectionsReachedWithCustomTimer() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + BiFunction timer = (r, d) -> { + Runnable wrapped = () -> { + r.run(); + latch.countDown(); + }; + return Schedulers.single().schedule(wrapped, d.toNanos(), TimeUnit.NANOSECONDS); + }; + doMaxLifeTimeMaxConnectionsReached(timer); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + private void doMaxLifeTimeMaxConnectionsReached(BiFunction pendingAcquireTimer) throws Exception { PoolBuilder> poolBuilder = PoolBuilder.from(Mono.fromSupplier(() -> { Channel channel = new EmbeddedChannel( @@ -842,6 +863,9 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); + if (pendingAcquireTimer != null) { + poolBuilder = poolBuilder.pendingAcquireTimer(pendingAcquireTimer); + } Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); Connection connection = null; From 343eb0147e99a890ef11abfdce425cf5da3b5dda Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 10 Jun 2022 07:34:49 +0300 Subject: [PATCH 08/10] Bump tomcat-embed-core from 9.0.63 to 9.0.64 (#2274) Bumps tomcat-embed-core from 9.0.63 to 9.0.64. --- updated-dependencies: - dependency-name: org.apache.tomcat.embed:tomcat-embed-core dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 5fec14e17f..cc2b929d5a 100644 --- a/build.gradle +++ b/build.gradle @@ -112,7 +112,7 @@ ext { assertJVersion = '3.23.1' awaitilityVersion = '4.2.0' hoverflyJavaVersion = '0.14.2' - tomcatVersion = '9.0.63' + tomcatVersion = '9.0.64' boringSslVersion = '2.0.52.Final' junitVersion = '5.8.2' junitPlatformLauncherVersion = '1.8.2' From 6f3bf9a253e5bca68b67b83c371bd9e235c419c3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 13 Jun 2022 20:36:09 +0300 Subject: [PATCH 09/10] Bump com.diffplug.spotless from 6.7.0 to 6.7.2 (#2294) Bumps com.diffplug.spotless from 6.7.0 to 6.7.2. --- updated-dependencies: - dependency-name: com.diffplug.spotless dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index cc2b929d5a..eed347f572 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,7 @@ buildscript { } plugins { - id "com.diffplug.spotless" version "6.7.0" + id "com.diffplug.spotless" version "6.7.2" id 'org.asciidoctor.jvm.convert' version '3.3.2' apply false id 'org.asciidoctor.jvm.pdf' version '3.3.2' apply false id 'com.google.osdetector' version '1.7.0' From 19e2802c8fbbb013dadeaf14acf69c264c89d5c1 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 13 Jun 2022 22:13:01 +0300 Subject: [PATCH 10/10] Update to netty-tcnative-boringssl-static v2.0.53.Final (#2295) --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index eed347f572..2a0aa1eb97 100644 --- a/build.gradle +++ b/build.gradle @@ -113,7 +113,7 @@ ext { awaitilityVersion = '4.2.0' hoverflyJavaVersion = '0.14.2' tomcatVersion = '9.0.64' - boringSslVersion = '2.0.52.Final' + boringSslVersion = '2.0.53.Final' junitVersion = '5.8.2' junitPlatformLauncherVersion = '1.8.2' mockitoVersion = '4.6.1'