Skip to content

Commit

Permalink
Allow customization of the Bootstrap in NettyClientCustomizer (micron…
Browse files Browse the repository at this point in the history
…aut-projects#11110)

* Allow customization of the Bootstrap in NettyClientCustomizer
This change allows access to the Bootstrap before the client connects. This can change e.g. the remoteAddress.

I use this for unix domain socket support for micronaut-oracle-cloud. Though I'm not yet 100% sure if it's necessary, there.

* since

* fix test

* fix test

* checkstyle
  • Loading branch information
yawkat authored Aug 29, 2024
1 parent ad4cc2c commit d10e6f7
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package io.micronaut.http.client.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.http.netty.AbstractCompositeCustomizer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;

import java.util.Collections;
Expand All @@ -42,6 +44,11 @@ protected NettyClientCustomizer specializeForChannel(NettyClientCustomizer membe
return member.specializeForChannel(channel, role);
}

@Override
public @NonNull NettyClientCustomizer specializeForBootstrap(@NonNull Bootstrap bootstrap) {
return specialize(ch -> ch.specializeForBootstrap(bootstrap));
}

@Override
protected NettyClientCustomizer makeNewComposite(List<NettyClientCustomizer> members) {
return new CompositeNettyClientCustomizer(members);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@
@Internal
public class ConnectionManager {

final NettyClientCustomizer clientCustomizer;

private final HttpVersionSelection httpVersion;
private final Logger log;
private final Map<DefaultHttpClient.RequestKey, Pool> pools = new ConcurrentHashMap<>();
Expand All @@ -161,7 +163,6 @@ public class ConnectionManager {
private volatile SslContext sslContext;
private volatile /* QuicSslContext */ Object http3SslContext;
private volatile SslContext websocketSslContext;
private final NettyClientCustomizer clientCustomizer;
private final String informationalServiceId;

/**
Expand Down Expand Up @@ -438,16 +439,18 @@ public final boolean isRunning() {
* @param channelInitializer The initializer to use
* @return Future that terminates when the TCP connection is established.
*/
ChannelFuture doConnect(DefaultHttpClient.RequestKey requestKey, ChannelInitializer<?> channelInitializer) {
ChannelFuture doConnect(DefaultHttpClient.RequestKey requestKey, CustomizerAwareInitializer channelInitializer) {
String host = requestKey.getHost();
int port = requestKey.getPort();
Bootstrap localBootstrap = bootstrap.clone();
Proxy proxy = configuration.resolveProxy(requestKey.isSecure(), host, port);
if (proxy.type() != Proxy.Type.DIRECT) {
localBootstrap.resolver(NoopAddressResolverGroup.INSTANCE);
}
localBootstrap.handler(channelInitializer);
return localBootstrap.connect(host, port);
localBootstrap.handler(channelInitializer)
.remoteAddress(host, port);
channelInitializer.bootstrappedCustomizer = clientCustomizer.specializeForBootstrap(localBootstrap);
return localBootstrap.connect();
}

/**
Expand Down Expand Up @@ -518,7 +521,7 @@ private SslContext buildWebsocketSslContext(DefaultHttpClient.RequestKey request
final Mono<?> connectForWebsocket(DefaultHttpClient.RequestKey requestKey, ChannelHandler handler) {
Sinks.Empty<Object> initial = new CancellableMonoSink<>(null);

ChannelFuture connectFuture = doConnect(requestKey, new ChannelInitializer<Channel>() {
ChannelFuture connectFuture = doConnect(requestKey, new CustomizerAwareInitializer() {
@Override
protected void initChannel(@NonNull Channel ch) {
addLogHandler(ch);
Expand Down Expand Up @@ -546,7 +549,7 @@ protected void initChannel(@NonNull Channel ch) {
ch.pipeline().addLast(WebSocketClientCompressionHandler.INSTANCE);
}
ch.pipeline().addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_WEBSOCKET_CLIENT, handler);
clientCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION).onInitialPipelineBuilt();
bootstrappedCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION).onInitialPipelineBuilt();
if (initial.tryEmitEmpty().isSuccess()) {
return;
}
Expand Down Expand Up @@ -741,11 +744,15 @@ private <E extends HttpClientException> E decorate(E exc) {
return HttpClientExceptionUtils.populateServiceId(exc, informationalServiceId, configuration);
}

abstract static class CustomizerAwareInitializer extends ChannelInitializer<Channel> {
NettyClientCustomizer bootstrappedCustomizer;
}

/**
* Initializer for TLS channels. After ALPN we will proceed either with
* {@link #initHttp1(Channel)} or {@link #initHttp2(Pool, Channel, NettyClientCustomizer)}.
*/
private final class AdaptiveAlpnChannelInitializer extends ChannelInitializer<Channel> {
private final class AdaptiveAlpnChannelInitializer extends CustomizerAwareInitializer {
private final Pool pool;

private final SslContext sslContext;
Expand All @@ -767,7 +774,7 @@ private final class AdaptiveAlpnChannelInitializer extends ChannelInitializer<Ch
*/
@Override
protected void initChannel(@NonNull Channel ch) {
NettyClientCustomizer channelCustomizer = clientCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION);
NettyClientCustomizer channelCustomizer = bootstrappedCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION);

configureProxy(ch.pipeline(), true, host, port);

Expand Down Expand Up @@ -822,7 +829,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
* Initializer for H2C connections. Will proceed with
* {@link #initHttp2(Pool, Channel, NettyClientCustomizer)} when the upgrade is done.
*/
private final class Http2UpgradeInitializer extends ChannelInitializer<Channel> {
private final class Http2UpgradeInitializer extends CustomizerAwareInitializer {
private final Pool pool;

Http2UpgradeInitializer(Pool pool) {
Expand All @@ -831,7 +838,7 @@ private final class Http2UpgradeInitializer extends ChannelInitializer<Channel>

@Override
protected void initChannel(@NonNull Channel ch) throws Exception {
NettyClientCustomizer connectionCustomizer = clientCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION);
NettyClientCustomizer connectionCustomizer = bootstrappedCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION);

Http2FrameCodec frameCodec = makeFrameCodec();

Expand Down Expand Up @@ -877,6 +884,8 @@ private final class Http3ChannelInitializer extends ChannelOutboundHandlerAdapte
private final String host;
private final int port;

private NettyClientCustomizer bootstrappedCustomizer;

Http3ChannelInitializer(Pool pool, String host, int port) {
this.pool = pool;
this.host = host;
Expand Down Expand Up @@ -905,7 +914,7 @@ public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelP
}

private void initChannel(Channel ch) {
NettyClientCustomizer channelCustomizer = clientCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION);
NettyClientCustomizer channelCustomizer = bootstrappedCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION);

ch.pipeline()
.addLast(Http3.newQuicClientCodecBuilder()
Expand Down Expand Up @@ -1099,12 +1108,15 @@ void openNewConnection(@Nullable BlockHint blockHint) throws Exception {
}

private ChannelFuture openConnectionFuture() {
ChannelInitializer<?> initializer;
CustomizerAwareInitializer initializer;
if (requestKey.isSecure()) {
if (httpVersion.isHttp3()) {
return udpBootstrap.clone()
.handler(new Http3ChannelInitializer(this, requestKey.getHost(), requestKey.getPort()))
.bind(0);
Http3ChannelInitializer channelInitializer = new Http3ChannelInitializer(this, requestKey.getHost(), requestKey.getPort());
Bootstrap localBootstrap = udpBootstrap.clone()
.handler(channelInitializer)
.localAddress(0);
channelInitializer.bootstrappedCustomizer = clientCustomizer.specializeForBootstrap(localBootstrap);
return localBootstrap.bind();
}

initializer = new AdaptiveAlpnChannelInitializer(
Expand All @@ -1115,7 +1127,7 @@ private ChannelFuture openConnectionFuture() {
);
} else {
initializer = switch (httpVersion.getPlaintextMode()) {
case HTTP_1 -> new ChannelInitializer<>() {
case HTTP_1 -> new CustomizerAwareInitializer() {
@Override
protected void initChannel(@NonNull Channel ch) throws Exception {
configureProxy(ch.pipeline(), false, requestKey.getHost(), requestKey.getPort());
Expand All @@ -1125,7 +1137,7 @@ protected void initChannel(@NonNull Channel ch) throws Exception {
public void channelActive(@NonNull ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.pipeline().remove(this);
NettyClientCustomizer channelCustomizer = clientCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION);
NettyClientCustomizer channelCustomizer = bootstrappedCustomizer.specializeForChannel(ch, NettyClientCustomizer.ChannelRole.CONNECTION);
new Http1ConnectionHolder(ch, channelCustomizer).init(true);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package io.micronaut.http.client.netty;

import io.micronaut.core.annotation.Experimental;
import io.micronaut.core.annotation.NonNull;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;

/**
Expand Down Expand Up @@ -47,6 +49,17 @@ default NettyClientCustomizer specializeForChannel(@NonNull Channel channel, @No
return this;
}

/**
* @param bootstrap The bootstrap that will be used to connect
* @return The new customizer, or {@code this} if no specialization needs to take place.
* @since 4.7.0
*/
@Experimental
@NonNull
default NettyClientCustomizer specializeForBootstrap(@NonNull Bootstrap bootstrap) {
return this;
}

/**
* Called when the <i>initial</i> connection pipeline has been built, before any incoming data
* has been processed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ class ConnectionManagerSpec extends Specification {
int i = 0

@Override
protected ChannelFuture doConnect(DefaultHttpClient.RequestKey requestKey, ChannelInitializer<? extends Channel> channelInitializer) {
protected ChannelFuture doConnect(DefaultHttpClient.RequestKey requestKey, ConnectionManager.CustomizerAwareInitializer channelInitializer) {
try {
channelInitializer.bootstrappedCustomizer = clientCustomizer
def connection = connections[i++]
connection.clientChannel = new EmbeddedChannel(new DummyChannelId('client' + i), connection.clientInitializer, channelInitializer) {
def loop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;

/**
* Base class for the composite customizers for the client and server. The APIs are structured
Expand Down Expand Up @@ -75,12 +76,23 @@ public synchronized void add(C customizer) {

@NonNull
public final C specializeForChannel(@NonNull Channel channel, @NonNull R role) {
return specialize(c -> specializeForChannel(c, channel, role));
}

/**
* Specialize all members with the given action.
*
* @param specializeAction The specialization action. Input is the old member customizer, output
* is the new member customizer.
* @return The specialized composite customizer
*/
protected final C specialize(UnaryOperator<C> specializeAction) {
List<C> specialized = null;
for (int i = 0; i < this.members.size(); i++) {
C old = this.members.get(i);
C nev;
try {
nev = specializeForChannel(old, channel, role);
nev = specializeAction.apply(old);
Objects.requireNonNull(nev, "specializeForChannel must not return null");
} catch (Exception e) {
LOG.error("Failed to specialize customizer", e);
Expand Down

0 comments on commit d10e6f7

Please sign in to comment.