Skip to content

Commit

Permalink
ConnectionObserver: provide ConnectionInfo on transport handshake (#2726
Browse files Browse the repository at this point in the history
)

Motivation:

If a connection fails with an exception before one of the "established" methods is invoked, users don't have access to meaningful information, like `SslConfig`, SocketOptions, channelId (for correlation with wire logs), local address, etc.

Modifications:

- Add `ConnectionObserver.onTransportHandshakeComplete(ConnectionInfo)` callback that gives users a ST view of the netty's `Channel`;
- Deprecate pre-existing `ConnectionObserver.onTransportHandshakeComplete()`;

Results:

1. Users can collect more information about a connection if it's failed before "established".
2. Users can get Channel's ID for reporting `ProxyConnectObserver` events and security handshake failures.
  • Loading branch information
idelpivnitskiy authored Oct 6, 2023
1 parent c92941c commit 4fb2ac1
Show file tree
Hide file tree
Showing 35 changed files with 371 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ Single<FilterableStreamingHttpConnection> newFilterableConnection(
private Single<FilterableStreamingHttpConnection> createConnection(
final Channel channel, final ConnectionObserver connectionObserver,
final ReadOnlyTcpClientConfig tcpConfig) {
return new AlpnChannelSingle(channel, new TcpClientChannelInitializer(tcpConfig, connectionObserver),
return new AlpnChannelSingle(channel,
new TcpClientChannelInitializer(tcpConfig, connectionObserver, executionContext, false),
ctx -> { /* SslHandler will automatically start handshake on channelActive */ }).flatMap(protocol -> {
switch (protocol) {
case HTTP_1_1:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2019-2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,7 +91,8 @@ private static Single<NettyConnectionContext> alpnInitChannel(final SocketAddres
final StreamingHttpService service,
final boolean drainRequestPayloadBody,
final ConnectionObserver observer) {
return new AlpnChannelSingle(channel, new TcpServerChannelInitializer(config.tcpConfig(), observer),
return new AlpnChannelSingle(channel,
new TcpServerChannelInitializer(config.tcpConfig(), observer, httpExecutionContext),
// Force a read to get the SSL handshake started. We initialize pipeline before
// SslHandshakeCompletionEvent will complete, therefore, no data will be propagated before we finish
// initialization.
Expand All @@ -117,7 +118,7 @@ private static Single<NettyConnectionContext> sniInitChannel(final SocketAddress
final boolean drainRequestPayloadBody,
final ConnectionObserver observer) {
return new SniCompleteChannelSingle(channel,
new TcpServerChannelInitializer(config.tcpConfig(), observer)).flatMap(sniEvt -> {
new TcpServerChannelInitializer(config.tcpConfig(), observer, httpExecutionContext)).flatMap(sniEvt -> {
Throwable failureCause = sniEvt.cause();
if (failureCause != null) {
return failed(failureCause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Single<FilterableStreamingHttpConnection> newFilterableConnection(
(channel, connectionObserver) -> H2ClientParentConnectionContext.initChannel(channel,
executionContext, config.h2Config(), reqRespFactoryFunc.apply(HTTP_2_0),
tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(), tcpConfig.sslConfig(),
new TcpClientChannelInitializer(tcpConfig, connectionObserver).andThen(
new TcpClientChannelInitializer(tcpConfig, connectionObserver, executionContext, false).andThen(
new H2ClientParentChannelInitializer(config.h2Config())), connectionObserver,
config.allowDropTrailersReadFromTransport()), observer);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2019-2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -100,7 +100,7 @@ static Single<HttpServerContext> bind(final HttpExecutionContext executionContex
final ReadOnlyTcpServerConfig tcpServerConfig = config.tcpConfig();
return TcpServerBinder.bind(listenAddress, tcpServerConfig, executionContext, connectionAcceptor,
(channel, connectionObserver) -> initChannel(listenAddress, channel, executionContext, config,
new TcpServerChannelInitializer(tcpServerConfig, connectionObserver), service,
new TcpServerChannelInitializer(tcpServerConfig, connectionObserver, executionContext), service,
drainRequestPayloadBody, connectionObserver),
serverConnection -> { /* nothing to do as h2 uses auto read on the parent channel */ },
earlyConnectionAcceptor, lateConnectionAcceptor)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -129,7 +129,7 @@ static Single<HttpServerContext> bind(final HttpExecutionContext executionContex
final ReadOnlyTcpServerConfig tcpServerConfig = config.tcpConfig();
return TcpServerBinder.bind(address, tcpServerConfig, executionContext, connectionAcceptor,
(channel, connectionObserver) -> initChannel(channel, executionContext, config,
new TcpServerChannelInitializer(tcpServerConfig, connectionObserver), service,
new TcpServerChannelInitializer(tcpServerConfig, connectionObserver, executionContext), service,
drainRequestPayloadBody, connectionObserver),
serverConnection -> serverConnection.process(true),
earlyConnectionAcceptor, lateConnectionAcceptor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private Single<? extends FilterableStreamingHttpConnection> createConnection(
// Disable half-closure to simplify ProxyConnectHandler implementation
channelConfig.setOption(ALLOW_HALF_CLOSURE, FALSE);
return new ProxyConnectChannelSingle(channel,
new TcpClientChannelInitializer(config.tcpConfig(), observer, config.hasProxy())
new TcpClientChannelInitializer(config.tcpConfig(), observer, executionContext, true)
.andThen(new HttpClientChannelInitializer(
getByteBufAllocator(executionContext.bufferAllocator()), h1Config, closeHandler)),
observer, h1Config.headersFactory(), connectAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static <ResolvedAddress> Single<? extends NettyConnection<Object, Object>> build
// We disable auto read so we can handle stuff in the ConnectionFilter before we accept any content.
return TcpConnector.connect(null, resolvedAddress, tcpConfig, false, executionContext,
(channel, connectionObserver) -> createConnection(channel, executionContext, h1Config, tcpConfig,
new TcpClientChannelInitializer(tcpConfig, connectionObserver, false),
new TcpClientChannelInitializer(tcpConfig, connectionObserver, executionContext, false),
connectionObserver),
observer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private void setUp(final Param param) {
(channel, observer) -> {
channel.config().setAutoRead(true);
return initChannel(channel, httpExecutionContext, config,
new TcpServerChannelInitializer(tcpReadOnly, observer)
new TcpServerChannelInitializer(tcpReadOnly, observer, httpExecutionContext)
.andThen(channel1 -> channel1.pipeline().addLast(interceptor)), service,
true, observer);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ void protocolPayloadEndOutboundShouldNotTriggerOnFailedFlush() throws Exception
SEC.executor(), SEC.ioExecutor(),
forPipelinedRequestResponse(false, channel.config()), defaultFlushStrategy(), 0L,
null,
new TcpServerChannelInitializer(sConfig, observer).andThen(
new TcpServerChannelInitializer(sConfig, observer, SEC).andThen(
channel2 -> {
serverChannelRef.compareAndSet(null, channel2);
serverChannelLatch.countDown();
Expand All @@ -461,7 +461,7 @@ void protocolPayloadEndOutboundShouldNotTriggerOnFailedFlush() throws Exception
closeHandler, defaultFlushStrategy(), 0L,
cConfig.tcpConfig().sslConfig(),
new TcpClientChannelInitializer(cConfig.tcpConfig(),
connectionObserver)
connectionObserver, CEC, false)
.andThen(new HttpClientChannelInitializer(
getByteBufAllocator(CEC.bufferAllocator()),
cConfig.h1Config(), closeHandler))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void onFlush() {
}

@Override
public void onTransportHandshakeComplete() {
public void onTransportHandshakeComplete(final ConnectionInfo info) {
// AsyncContext is unknown at this point because this event is triggered by network
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ void connectionEstablished(HttpProtocol httpProtocol) throws Exception {

verify(clientTransportObserver).onNewConnection(any(), any());
verify(serverTransportObserver, await()).onNewConnection(any(), any());
verify(clientConnectionObserver).onTransportHandshakeComplete();
verify(serverConnectionObserver, await()).onTransportHandshakeComplete();
verify(clientConnectionObserver).onTransportHandshakeComplete(any());
verify(serverConnectionObserver, await()).onTransportHandshakeComplete(any());
if (protocol == HTTP_1) {
verify(clientConnectionObserver).connectionEstablished(any(ConnectionInfo.class));
verify(serverConnectionObserver, await()).connectionEstablished(any(ConnectionInfo.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,14 @@ private void assertTargetAddress() {

private void verifyProxyConnectFailed(Throwable cause) {
order.verify(transportObserver).onNewConnection(any(), any());
order.verify(connectionObserver).onTransportHandshakeComplete();
order.verify(connectionObserver).onTransportHandshakeComplete(any());
order.verify(connectionObserver).onProxyConnect(any());
order.verify(proxyConnectObserver).proxyConnectFailed(cause);
}

private void verifyProxyConnectComplete() {
order.verify(transportObserver).onNewConnection(any(), any());
order.verify(connectionObserver).onTransportHandshakeComplete();
order.verify(connectionObserver).onTransportHandshakeComplete(any());
order.verify(connectionObserver).onProxyConnect(any());
order.verify(proxyConnectObserver).proxyConnectComplete(any());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private static void verifyObservers(InOrder order, TransportObserver transportOb
ConnectionObserver connectionObserver, SecurityHandshakeObserver securityHandshakeObserver,
HttpProtocol expectedProtocol, boolean failHandshake) {
order.verify(transportObserver).onNewConnection(any(), any());
order.verify(connectionObserver).onTransportHandshakeComplete();
order.verify(connectionObserver).onTransportHandshakeComplete(any());
order.verify(connectionObserver).onSecurityHandshake();
if (failHandshake) {
ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ServerRespondsOnClosingTest {
return fromSource(responseProcessor);
};
serverConnection = initChannel(channel, httpExecutionContext, config, new TcpServerChannelInitializer(
config.tcpConfig(), connectionObserver),
config.tcpConfig(), connectionObserver, httpExecutionContext),
toStreamingHttpService(offloadNone(), service), true,
connectionObserver).toFuture().get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ static class SslBytesReadTransportObserver implements TransportObserver {
@Override
public ConnectionObserver onNewConnection(@Nullable final Object localAddress, final Object remoteAddress) {
return new ConnectionObserver() {

@Override
public void onDataRead(final int size) {
if (inHandshake) {
Expand Down Expand Up @@ -174,7 +175,7 @@ public void onFlush() {
}

@Override
public void onTransportHandshakeComplete() {
public void onTransportHandshakeComplete(final ConnectionInfo info) {
}

@Override
Expand Down
10 changes: 10 additions & 0 deletions servicetalk-tcp-netty-internal/gradle/spotbugs/main-exclusions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,14 @@
<Method name="lambda$handleSubscribe$1"/>
<Bug pattern="THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION"/>
</Match>

<!-- FIXME: 0.43 - Remove temporary suppression after we can remove deprecated constructors -->
<Match>
<Class name="io.servicetalk.tcp.netty.internal.TcpClientChannelInitializer"/>
<Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
</Match>
<Match>
<Class name="io.servicetalk.tcp.netty.internal.TcpServerChannelInitializer"/>
<Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.logging.api.UserDataLoggerConfig;
import io.servicetalk.transport.api.ServiceTalkSocketOptions;
import io.servicetalk.transport.api.SslConfig;
import io.servicetalk.transport.netty.internal.FlushStrategy;

import io.netty.channel.ChannelOption;
Expand All @@ -34,7 +35,7 @@
*
* @param <SecurityConfig> type of security configuration
*/
abstract class AbstractReadOnlyTcpConfig<SecurityConfig> {
abstract class AbstractReadOnlyTcpConfig<SecurityConfig extends SslConfig> {
@SuppressWarnings("rawtypes")
private final Map<ChannelOption, Object> options;
private final long idleTimeoutMs;
Expand Down Expand Up @@ -106,4 +107,12 @@ public final UserDataLoggerConfig wireLoggerConfig() {
*/
@Nullable
public abstract SslContext sslContext();

/**
* Get the {@link SslConfig}.
*
* @return the {@link SslConfig}, or {@code null} if SSL/TLS is not configured.
*/
@Nullable
public abstract SecurityConfig sslConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
*
* @param <SslConfigType> type of {@link SslConfig}.
*/
abstract class AbstractTcpConfig<SslConfigType> {
abstract class AbstractTcpConfig<SslConfigType extends SslConfig> {

@Nullable
@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,9 @@
package io.servicetalk.tcp.netty.internal;

import io.servicetalk.transport.api.ClientSslConfig;
import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.ConnectionObserverInitializer;
import io.servicetalk.transport.netty.internal.DeferSslHandler;
Expand All @@ -29,6 +31,7 @@
import io.netty.handler.ssl.SslHandler;

import static io.servicetalk.tcp.netty.internal.TcpServerChannelInitializer.initWireLogger;
import static io.servicetalk.transport.netty.internal.ExecutionContextUtils.channelExecutionContext;

/**
* {@link ChannelInitializer} for TCP client.
Expand All @@ -42,7 +45,10 @@ public class TcpClientChannelInitializer implements ChannelInitializer { // F
*
* @param config to use for initialization.
* @param observer {@link ConnectionObserver} to report network events.
* @deprecated Use
* {@link #TcpClientChannelInitializer(ReadOnlyTcpClientConfig, ConnectionObserver, ExecutionContext, boolean)}
*/
@Deprecated // FIXME: 0.43 - remove deprecated ctor
public TcpClientChannelInitializer(final ReadOnlyTcpClientConfig config,
final ConnectionObserver observer) {
this(config, observer, false);
Expand All @@ -54,25 +60,49 @@ public TcpClientChannelInitializer(final ReadOnlyTcpClientConfig config,
* @param config to use for initialization.
* @param observer {@link ConnectionObserver} to report network events.
* @param deferSslHandler {@code true} to wrap the {@link SslHandler} in a {@link DeferSslHandler}.
* @deprecated Use
* {@link #TcpClientChannelInitializer(ReadOnlyTcpClientConfig, ConnectionObserver, ExecutionContext, boolean)}
*/
@Deprecated // FIXME: 0.43 - remove deprecated ctor
@SuppressWarnings("DataFlowIssue")
public TcpClientChannelInitializer(final ReadOnlyTcpClientConfig config,
final ConnectionObserver observer,
final boolean deferSslHandler) {
this(config, observer, null, deferSslHandler);
}

/**
* Creates a {@link ChannelInitializer} for the {@code config}.
*
* @param config to use for initialization.
* @param observer {@link ConnectionObserver} to report network events.
* @param executionContext {@link ExecutionContext} to use for {@link ConnectionInfo} reporting.
* @param deferSslHandler {@code true} to wrap the {@link SslHandler} in a {@link DeferSslHandler}.
*/
@SuppressWarnings("ConstantValue")
public TcpClientChannelInitializer(final ReadOnlyTcpClientConfig config,
final ConnectionObserver observer,
final ExecutionContext<?> executionContext,
final boolean deferSslHandler) {
ChannelInitializer delegate = ChannelInitializer.defaultInitializer();

final SslContext sslContext = config.sslContext();
final ClientSslConfig sslConfig = config.sslConfig();
if (observer != NoopConnectionObserver.INSTANCE) {
delegate = delegate.andThen(new ConnectionObserverInitializer(observer,
sslContext != null && !deferSslHandler, true));
channel -> new TcpConnectionInfo(channel,
// ExecutionContext can be null if users used deprecated ctor
executionContext == null ? null : channelExecutionContext(channel, executionContext),
sslConfig, config.idleTimeoutMs()),
sslConfig != null && !deferSslHandler, true));
}

if (config.idleTimeoutMs() > 0L) {
delegate = delegate.andThen(new IdleTimeoutInitializer(config.idleTimeoutMs()));
}

if (sslContext != null) {
ClientSslConfig sslConfig = config.sslConfig();
assert sslConfig != null;
if (sslConfig != null) {
final SslContext sslContext = config.sslContext();
assert sslContext != null;
delegate = delegate.andThen(new SslClientChannelInitializer(sslContext, sslConfig, deferSslHandler));
}

Expand Down
Loading

0 comments on commit 4fb2ac1

Please sign in to comment.