Skip to content

Commit

Permalink
loadbalancer-experimental: make ConnectionPoolPolicy consistent (#3134)
Browse files Browse the repository at this point in the history
Motivation:

We typically have a 'ies' class for the static helper methods, but
right now we stuff all that into ConnectionPoolPolicy. We also tried
to hide all the details by making the ConnectionPoolPolicy a data
carrier instead of having any behavior, but that makes some extra
noise around conversions.

Modifications:

- Make ConnectionPoolPolicy follow the pattern in LoadBalancingPolicy,
  in that it's a factory but the constructor and methods are package
  private to control proliferation.
- Remove the internal time ConnetionPoolFactory which is now subsumed
  by ConnectionPoolPolicy.
- Make ConnectionPoolPolicies to follow the pattern established
  patterns.

Result:

More consistent code.
  • Loading branch information
bryce-anderson authored Dec 11, 2024
1 parent 93f20f8 commit 869c12a
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 192 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;

public final class ConnectionPoolPolicies {
private static final int DEFAULT_MAX_EFFORT = 5;
private static final int DEFAULT_LINEAR_SEARCH_SPACE = 16;

private ConnectionPoolPolicies() {
// no instances
}

/**
* A connection selection policy that prioritizes a configurable "core" pool.
* <p>
* This {@link ConnectionPoolPolicy} attempts to emulate the pooling behavior often seen in thread pools.
* Specifically it allows for the configuration of a "core pool" size which are intended to be long-lived.
* Iteration starts in the core pool at a random position and then iterates through the entire core pool before
* moving to an overflow pool. Because iteration of this core pool starts at a random position the core connections
* will get an even traffic load and, because they are equally selectable, will tend not to be removed due to
* idleness.
* <p>
* If the core pool cannot satisfy the load traffic can spill over to extra connections which are selected in-order.
* This has the property of minimizing traffic to the latest elements added outside the core pool size, thus let
* them idle out of the pool once they're no longer necessary.
* @param corePoolSize the size of the core pool.
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
* @return the configured {@link ConnectionPoolPolicy}.
*/
public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C>
corePool(final int corePoolSize, final boolean forceCorePool) {
return CorePoolConnectionSelector.factory(corePoolSize, forceCorePool);
}

/**
* A connection selection policy that prioritizes connection reuse.
* <p>
* This {@link ConnectionPoolPolicy} attempts to minimize the number of connections by attempting to direct
* traffic to connections in the order they were created in linear order up until a configured quantity. After
* this linear pool is exhausted the remaining connections will be selected from at random. Prioritizing traffic
* to the existing connections will let tailing connections be removed due to idleness.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
* @return the configured {@link ConnectionPoolPolicy}.
*/
public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> linearSearch() {
return linearSearch(DEFAULT_LINEAR_SEARCH_SPACE);
}

/**
* A connection selection policy that prioritizes connection reuse.
* <p>
* This {@link ConnectionPoolPolicy} attempts to minimize the number of connections by attempting to direct
* traffic to connections in the order they were created in linear order up until a configured quantity. After
* this linear pool is exhausted the remaining connections will be selected from at random. Prioritizing traffic
* to the existing connections will let tailing connections be removed due to idleness.
* @param linearSearchSpace the space to search linearly before resorting to random selection for remaining
* connections.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
* @return the configured {@link ConnectionPoolPolicy}.
*/
public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> linearSearch(int linearSearchSpace) {
return LinearSearchConnectionSelector.factory(linearSearchSpace);
}

/**
* A {@link ConnectionPoolPolicy} that attempts to discern between the health of individual connections.
* If individual connections have health data the P2C policy can be used to bias traffic toward the best
* connections. This has the following algorithm:
* - Randomly select two connections from the 'core pool' (pick-two).
* - Try to select the 'best' of the two connections.
* - If we fail to select the best connection, try the other connection.
* - If both connections fail, repeat the pick-two operation for up to maxEffort attempts, begin linear iteration
* through the remaining connections searching for an acceptable connection.
* @param corePoolSize the size of the core pool.
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
* @return the configured {@link ConnectionPoolPolicy}.
*/
public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C>
p2c(int corePoolSize, boolean forceCorePool) {
return p2c(DEFAULT_MAX_EFFORT, corePoolSize, forceCorePool);
}

/**
* A {@link ConnectionPoolPolicy} that attempts to discern between the health of individual connections.
* If individual connections have health data the P2C policy can be used to bias traffic toward the best
* connections. This has the following algorithm:
* - Randomly select two connections from the 'core pool' (pick-two).
* - Try to select the 'best' of the two connections.
* - If we fail to select the best connection, try the other connection.
* - If both connections fail, repeat the pick-two operation for up to maxEffort attempts, begin linear iteration
* through the remaining connections searching for an acceptable connection.
* @param maxEffort the maximum number of attempts to pick a healthy connection from the core pool.
* @param corePoolSize the size of the core pool.
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
* @return the configured {@link ConnectionPoolPolicy}.
*/
public static <C extends LoadBalancedConnection>
ConnectionPoolPolicy<C> p2c(int maxEffort, int corePoolSize, boolean forceCorePool) {
return P2CConnectionSelector.factory(maxEffort, corePoolSize, forceCorePool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,136 +15,22 @@
*/
package io.servicetalk.loadbalancer;

import static io.servicetalk.utils.internal.NumberUtils.ensurePositive;
import io.servicetalk.client.api.LoadBalancedConnection;

/**
* Configuration of the policy for selecting connections from a pool to the same endpoint.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
*/
public abstract class ConnectionPoolPolicy {
public abstract class ConnectionPoolPolicy<C extends LoadBalancedConnection> {

static final int DEFAULT_MAX_EFFORT = 5;
static final int DEFAULT_LINEAR_SEARCH_SPACE = 16;

private ConnectionPoolPolicy() {
// only instances are in this class.
}

/**
* A connection selection policy that prioritizes a configurable "core" pool.
* <p>
* This {@link ConnectionPoolPolicy} attempts to emulate the pooling behavior often seen in thread pools.
* Specifically it allows for the configuration of a "core pool" size which are intended to be long-lived.
* Iteration starts in the core pool at a random position and then iterates through the entire core pool before
* moving to an overflow pool. Because iteration of this core pool starts at a random position the core connections
* will get an even traffic load and, because they are equally selectable, will tend not to be removed due to
* idleness.
* <p>
* If the core pool cannot satisfy the load traffic can spill over to extra connections which are selected in-order.
* This has the property of minimizing traffic to the latest elements added outside the core pool size, thus let
* them idle out of the pool once they're no longer necessary.
*
* @param corePoolSize the size of the core pool.
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @return the configured {@link ConnectionPoolPolicy}.
*/
public static ConnectionPoolPolicy corePool(final int corePoolSize, final boolean forceCorePool) {
return new CorePoolPolicy(corePoolSize, forceCorePool);
}

/**
* A connection selection policy that prioritizes connection reuse.
* <p>
* This {@link ConnectionPoolPolicy} attempts to minimize the number of connections by attempting to direct
* traffic to connections in the order they were created in linear order up until a configured quantity. After
* this linear pool is exhausted the remaining connections will be selected from at random. Prioritizing traffic
* to the existing connections will let tailing connections be removed due to idleness.
* @return the configured {@link ConnectionPoolPolicy}.
*/
public static ConnectionPoolPolicy linearSearch() {
return linearSearch(DEFAULT_LINEAR_SEARCH_SPACE);
ConnectionPoolPolicy() {
// package private constructor to control proliferation
}

/**
* A connection selection policy that prioritizes connection reuse.
* <p>
* This {@link ConnectionPoolPolicy} attempts to minimize the number of connections by attempting to direct
* traffic to connections in the order they were created in linear order up until a configured quantity. After
* this linear pool is exhausted the remaining connections will be selected from at random. Prioritizing traffic
* to the existing connections will let tailing connections be removed due to idleness.
* @param linearSearchSpace the space to search linearly before resorting to random selection for remaining
* connections.
* @return the configured {@link ConnectionPoolPolicy}.
* Provide an instance of the {@link ConnectionSelector} to use with a {@link Host}.
* @param lbDescription description of the resource, used for logging purposes.
* @return an instance of the {@link ConnectionSelector} to use with a {@link Host}.
*/
public static ConnectionPoolPolicy linearSearch(int linearSearchSpace) {
return new LinearSearchPolicy(linearSearchSpace);
}

/**
* A {@link ConnectionPoolPolicy} that attempts to discern between the health of individual connections.
* If individual connections have health data the P2C policy can be used to bias traffic toward the best
* connections. This has the following algorithm:
* - Randomly select two connections from the 'core pool' (pick-two).
* - Try to select the 'best' of the two connections.
* - If we fail to select the best connection, try the other connection.
* - If both connections fail, repeat the pick-two operation for up to maxEffort attempts, begin linear iteration
* through the remaining connections searching for an acceptable connection.
* @param corePoolSize the size of the core pool.
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @return the configured {@link ConnectionPoolPolicy}.
*/
public static ConnectionPoolPolicy p2c(int corePoolSize, boolean forceCorePool) {
return p2c(DEFAULT_MAX_EFFORT, corePoolSize, forceCorePool);
}

/**
* A {@link ConnectionPoolPolicy} that attempts to discern between the health of individual connections.
* If individual connections have health data the P2C policy can be used to bias traffic toward the best
* connections. This has the following algorithm:
* - Randomly select two connections from the 'core pool' (pick-two).
* - Try to select the 'best' of the two connections.
* - If we fail to select the best connection, try the other connection.
* - If both connections fail, repeat the pick-two operation for up to maxEffort attempts, begin linear iteration
* through the remaining connections searching for an acceptable connection.
* @param maxEffort the maximum number of attempts to pick a healthy connection from the core pool.
* @param corePoolSize the size of the core pool.
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @return the configured {@link ConnectionPoolPolicy}.
*/
public static ConnectionPoolPolicy p2c(int maxEffort, int corePoolSize, boolean forceCorePool) {
return new P2CPolicy(maxEffort, corePoolSize, forceCorePool);
}

// instance types
static final class CorePoolPolicy extends ConnectionPoolPolicy {
final int corePoolSize;
final boolean forceCorePool;

CorePoolPolicy(final int corePoolSize, final boolean forceCorePool) {
this.corePoolSize = ensurePositive(corePoolSize, "corePoolSize");
this.forceCorePool = forceCorePool;
}
}

static final class P2CPolicy extends ConnectionPoolPolicy {
final int maxEffort;
final int corePoolSize;
final boolean forceCorePool;

P2CPolicy(final int maxEffort, final int corePoolSize, final boolean forceCorePool) {
this.maxEffort = ensurePositive(maxEffort, "maxEffort");
this.corePoolSize = ensurePositive(corePoolSize, "corePoolSize");
this.forceCorePool = forceCorePool;
}
}

static final class LinearSearchPolicy extends ConnectionPoolPolicy {
final int linearSearchSpace;

LinearSearchPolicy(int linearSearchSpace) {
this.linearSearchSpace = ensurePositive(linearSearchSpace, "linearSearchSpace");
}
}
abstract ConnectionSelector<C> buildConnectionSelector(String lbDescription);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,4 @@ interface ConnectionSelector<C extends LoadBalancedConnection> {
*/
@Nullable
C select(List<C> connections, Predicate<C> selector);

/**
* The factory of {@link ConnectionSelector} instances.
* @param <C> the least specific connection type necessary for properly implementing the selector.
* @see ConnectionSelector for available strategies.
*/

interface ConnectionSelectorFactory<C extends LoadBalancedConnection> {

/**
* Provide an instance of the {@link ConnectionSelector} to use with a {@link Host}.
* @param lbDescription description of the resource, used for logging purposes.
* @return an instance of the {@link ConnectionSelector} to use with a {@link Host}.
*/
ConnectionSelector<C> buildConnectionSelector(String lbDescription);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ public C select(List<C> connections, Predicate<C> selector) {
return null;
}

static <C extends LoadBalancedConnection> ConnectionSelectorFactory<C> factory(
static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> factory(
int corePoolSize, boolean forceCorePool) {
return new CorePoolConnectionSelectorFactory<>(corePoolSize, forceCorePool);
}

private static final class CorePoolConnectionSelectorFactory<C extends LoadBalancedConnection>
implements ConnectionSelectorFactory<C> {
extends ConnectionPoolPolicy<C> {

private final int corePoolSize;
private final boolean forceCorePool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
* @param priorityStrategyFactory a builder of the {@link HostPriorityStrategy} to use with the load balancer.
* @param loadBalancingPolicy a factory of the initial host selector to use with this load balancer.
* @param subsetter a subset builder.
* @param connectionSelectorFactory factory of the connection pool strategy to use with this load balancer.
* @param connectionPoolPolicy factory of the connection pool strategy to use with this load balancer.
* @param connectionFactory a function which creates new connections.
* @param loadBalancerObserverFactory factory used to build a {@link LoadBalancerObserver} to use with this
* load balancer.
Expand All @@ -144,7 +144,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
final Function<String, HostPriorityStrategy> priorityStrategyFactory,
final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy,
final Subsetter subsetter,
final ConnectionSelector.ConnectionSelectorFactory<C> connectionSelectorFactory,
final ConnectionPoolPolicy<C> connectionPoolPolicy,
final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
final LoadBalancerObserverFactory loadBalancerObserverFactory,
@Nullable final HealthCheckConfig healthCheckConfig,
Expand All @@ -155,8 +155,8 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
.buildSelector(Collections.emptyList(), lbDescription);
this.priorityStrategy = requireNonNull(
priorityStrategyFactory, "priorityStrategyFactory").apply(lbDescription);
this.connectionSelector = requireNonNull(connectionSelectorFactory,
"connectionSelectorFactory").buildConnectionSelector(lbDescription);
this.connectionSelector = requireNonNull(connectionPoolPolicy,
"connectionPoolPolicy").buildConnectionSelector(lbDescription);
this.eventPublisher = requireNonNull(eventPublisher);
this.eventStream = fromSource(eventStreamProcessor)
.replay(1); // Allow for multiple subscribers and provide new subscribers with last signal.
Expand Down
Loading

0 comments on commit 869c12a

Please sign in to comment.