Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3E6151869F for ; Fri, 28 Aug 2015 18:33:20 +0000 (UTC) Received: (qmail 1396 invoked by uid 500); 28 Aug 2015 18:33:20 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 1313 invoked by uid 500); 28 Aug 2015 18:33:20 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 733 invoked by uid 99); 28 Aug 2015 18:33:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Aug 2015 18:33:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8586FE7EB3; Fri, 28 Aug 2015 18:33:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.apache.org Date: Fri, 28 Aug 2015 18:33:33 -0000 Message-Id: <994deb0d827c45ffb07d83d37f36c0c7@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/21] aurora git commit: Remove unused classes from commons fork. http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java b/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java deleted file mode 100644 index 5e5df6d..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift; - -import com.google.common.base.Preconditions; -import org.apache.aurora.common.net.pool.Connection; -import org.apache.aurora.common.net.pool.ConnectionPool; -import org.apache.thrift.transport.TTransport; - -import java.net.InetSocketAddress; - -/** - * A {@link ConnectionPool} compatible thrift connection that can work with any valid thrift - * transport. - * - * @author John Sirois - */ -public class TTransportConnection implements Connection { - - private final TTransport transport; - private final InetSocketAddress endpoint; - - public TTransportConnection(TTransport transport, InetSocketAddress endpoint) { - this.transport = Preconditions.checkNotNull(transport); - this.endpoint = Preconditions.checkNotNull(endpoint); - } - - /** - * Returns {@code true} if the underlying transport is still open. To invalidate a transport it - * should be closed. - * - *

TODO(John Sirois): it seems like an improper soc to have validity testing here and not also an - * invalidation method - correct or accept - */ - @Override - public boolean isValid() { - return transport.isOpen(); - } - - @Override - public TTransport get() { - return transport; - } - - @Override - public void close() { - transport.close(); - } - - @Override - public InetSocketAddress getEndpoint() { - return endpoint; - } - - @Override - public String toString() { - return endpoint.toString(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/Thrift.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/Thrift.java b/commons/src/main/java/org/apache/aurora/common/thrift/Thrift.java deleted file mode 100644 index b36b46e..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/Thrift.java +++ /dev/null @@ -1,390 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift; - -import java.io.IOException; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.aurora.common.thrift.callers.DebugCaller; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; - -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.net.loadbalancing.RequestTracker; -import org.apache.aurora.common.net.pool.Connection; -import org.apache.aurora.common.net.pool.ObjectPool; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.stats.StatsProvider; -import org.apache.aurora.common.thrift.callers.Caller; -import org.apache.aurora.common.thrift.callers.DeadlineCaller; -import org.apache.aurora.common.thrift.callers.RetryingCaller; -import org.apache.aurora.common.thrift.callers.StatTrackingCaller; -import org.apache.aurora.common.thrift.callers.ThriftCaller; - -/** - * A generic thrift client that handles reconnection in the case of protocol errors, automatic - * retries, call deadlines and call statistics tracking. This class aims for behavior compatible - * with the generic ruby thrift client. - * - *

In order to enforce call deadlines for synchronous clients, this class uses an - * {@link java.util.concurrent.ExecutorService}. If a custom executor is supplied, it should throw - * a subclass of {@link RejectedExecutionException} to signal thread resource exhaustion, in which - * case the client will fail fast and propagate the event as a {@link TResourceExhaustedException}. - * - * TODO(William Farner): Before open sourcing, look into changing the current model of wrapped proxies - * to use a single proxy and wrapped functions for decorators. - * - * @author John Sirois - */ -public class Thrift { - - /** - * The default thrift call configuration used if none is specified. - * - * Specifies the following settings: - *

    - *
  • global call timeout: 1 second - *
  • call retries: 0 - *
  • retryable exceptions: TTransportException (network exceptions including socket timeouts) - *
  • wait for connections: true - *
  • debug: false - *
- */ - public static final Config DEFAULT_CONFIG = Config.builder() - .withRequestTimeout(Amount.of(1L, Time.SECONDS)) - .noRetries() - .retryOn(TTransportException.class) // if maxRetries is set non-zero - .create(); - - /** - * The default thrift call configuration used for an async client if none is specified. - * - * Specifies the following settings: - *
    - *
  • global call timeout: none - *
  • call retries: 0 - *
  • retryable exceptions: IOException, TTransportException - * (network exceptions but not timeouts) - *
  • wait for connections: true - *
  • debug: false - *
- */ - @SuppressWarnings("unchecked") - public static final Config DEFAULT_ASYNC_CONFIG = Config.builder(DEFAULT_CONFIG) - .withRequestTimeout(Amount.of(0L, Time.SECONDS)) - .noRetries() - .retryOn(ImmutableSet.>builder() - .add(IOException.class) - .add(TTransportException.class).build()) // if maxRetries is set non-zero - .create(); - - private final Config defaultConfig; - private final ExecutorService executorService; - private final ObjectPool> connectionPool; - private final RequestTracker requestTracker; - private final String serviceName; - private final Class serviceInterface; - private final Function clientFactory; - private final boolean async; - private final boolean withSsl; - - /** - * Constructs an instance with the {@link #DEFAULT_CONFIG}, cached thread pool - * {@link ExecutorService}, and synchronous calls. - * - * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, - * boolean, boolean) - */ - public Thrift(ObjectPool> connectionPool, - RequestTracker requestTracker, - String serviceName, Class serviceInterface, Function clientFactory) { - - this(DEFAULT_CONFIG, connectionPool, requestTracker, serviceName, serviceInterface, - clientFactory, false, false); - } - - /** - * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool - * {@link ExecutorService}. - * - * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, - * boolean, boolean) - */ - public Thrift(ObjectPool> connectionPool, - RequestTracker requestTracker, - String serviceName, Class serviceInterface, Function clientFactory, - boolean async) { - - this(getConfig(async), connectionPool, requestTracker, serviceName, - serviceInterface, clientFactory, async, false); - } - - /** - * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool - * {@link ExecutorService}. - * - * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, - * boolean, boolean) - */ - public Thrift(ObjectPool> connectionPool, - RequestTracker requestTracker, - String serviceName, Class serviceInterface, Function clientFactory, - boolean async, boolean ssl) { - - this(getConfig(async), connectionPool, requestTracker, serviceName, - serviceInterface, clientFactory, async, ssl); - } - - /** - * Constructs an instance with a cached thread pool {@link ExecutorService}. - * - * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, - * boolean, boolean) - */ - public Thrift(Config config, ObjectPool> connectionPool, - RequestTracker requestTracker, - String serviceName, Class serviceInterface, Function clientFactory, - boolean async, boolean ssl) { - - this(config, - Executors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Thrift["+ serviceName +"][%d]") - .build()), - connectionPool, requestTracker, serviceName, serviceInterface, clientFactory, async, ssl); - } - - /** - * Constructs an instance with the {@link #DEFAULT_CONFIG}. - * - * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, - * boolean, boolean) - */ - public Thrift(ExecutorService executorService, - ObjectPool> connectionPool, - RequestTracker requestTracker, - String serviceName, Class serviceInterface, Function clientFactory, - boolean async, boolean ssl) { - - this(getConfig(async), executorService, connectionPool, requestTracker, serviceName, - serviceInterface, clientFactory, async, ssl); - } - - private static Config getConfig(boolean async) { - return async ? DEFAULT_ASYNC_CONFIG : DEFAULT_CONFIG; - } - - /** - * Constructs a new Thrift factory for creating clients that make calls to a particular thrift - * service. - * - *

Note that the combination of {@code config} and {@code connectionPool} need to be chosen - * with care depending on usage of the generated thrift clients. In particular, if configured - * to not wait for connections, the {@code connectionPool} ought to be warmed up with a set of - * connections or else be actively building connections in the background. - * - *

TODO(John Sirois): consider adding an method to ObjectPool that would allow Thrift to handle - * this case by pro-actively warming the pool. - * - * @param config the default configuration to use for all thrift calls; also the configuration all - * {@link ClientBuilder}s start with - * @param executorService for invoking calls with a specified deadline - * @param connectionPool the source for thrift connections - * @param serviceName a /vars friendly name identifying the service clients will connect to - * @param serviceInterface the thrift compiler generate interface class for the remote service - * (Iface) - * @param clientFactory a function that can generate a concrete thrift client for the given - * {@code serviceInterface} - * @param async enable asynchronous API - * @param ssl enable TLS handshaking for Thrift calls - */ - public Thrift(Config config, ExecutorService executorService, - ObjectPool> connectionPool, - RequestTracker requestTracker, String serviceName, - Class serviceInterface, Function clientFactory, boolean async, boolean ssl) { - - defaultConfig = Preconditions.checkNotNull(config); - this.executorService = Preconditions.checkNotNull(executorService); - this.connectionPool = Preconditions.checkNotNull(connectionPool); - this.requestTracker = Preconditions.checkNotNull(requestTracker); - this.serviceName = MorePreconditions.checkNotBlank(serviceName); - this.serviceInterface = checkServiceInterface(serviceInterface); - this.clientFactory = Preconditions.checkNotNull(clientFactory); - this.async = async; - this.withSsl = ssl; - } - - static Class checkServiceInterface(Class serviceInterface) { - Preconditions.checkNotNull(serviceInterface); - Preconditions.checkArgument(serviceInterface.isInterface(), - "%s must be a thrift service interface", serviceInterface); - return serviceInterface; - } - - /** - * Closes any open connections and prepares this thrift client for graceful shutdown. Any thrift - * client proxies returned from {@link #create()} will become invalid. - */ - public void close() { - connectionPool.close(); - executorService.shutdown(); - } - - /** - * A builder class that allows modifications of call behavior to be made for a given Thrift - * client. Note that in the case of conflicting configuration calls, the last call wins. So, - * for example, the following sequence would result in all calls being subject to a 5 second - * global deadline: - * - * builder.blocking().withDeadline(5, TimeUnit.SECONDS).create() - * - * - * @see Config - */ - public final class ClientBuilder extends Config.AbstractBuilder { - private ClientBuilder(Config template) { - super(template); - } - - @Override - protected ClientBuilder getThis() { - return this; - } - - /** - * Creates a new client using the built up configuration changes. - */ - public T create() { - return createClient(getConfig()); - } - } - - /** - * Creates a new thrift client builder that inherits this Thrift instance's default configuration. - * This is useful for customizing a client for a particular thrift call that makes sense to treat - * differently from the rest of the calls to a given service. - */ - public ClientBuilder builder() { - return builder(defaultConfig); - } - - /** - * Creates a new thrift client builder that inherits the given configuration. - * This is useful for customizing a client for a particular thrift call that makes sense to treat - * differently from the rest of the calls to a given service. - */ - public ClientBuilder builder(Config config) { - Preconditions.checkNotNull(config); - return new ClientBuilder(config); - } - - /** - * Creates a new client using the default configuration specified for this Thrift instance. - */ - public T create() { - return createClient(defaultConfig); - } - - private T createClient(Config config) { - StatsProvider statsProvider = config.getStatsProvider(); - - // lease/call/[invalidate]/release - boolean debug = config.isDebug(); - - Caller decorated = new ThriftCaller(connectionPool, requestTracker, clientFactory, - config.getConnectTimeout(), debug); - - // [retry] - if (config.getMaxRetries() > 0) { - decorated = new RetryingCaller(decorated, async, statsProvider, serviceName, - config.getMaxRetries(), config.getRetryableExceptions(), debug); - } - - // [deadline] - if (config.getRequestTimeout().getValue() > 0) { - Preconditions.checkArgument(!async, - "Request deadlines may not be used with an asynchronous client."); - - decorated = new DeadlineCaller(decorated, async, executorService, config.getRequestTimeout()); - } - - // [debug] - if (debug) { - decorated = new DebugCaller(decorated, async); - } - - // stats - if (config.enableStats()) { - decorated = new StatTrackingCaller(decorated, async, statsProvider, serviceName); - } - - final Caller caller = decorated; - - final InvocationHandler invocationHandler = new InvocationHandler() { - @Override - public Object invoke(Object o, Method method, Object[] args) throws Throwable { - AsyncMethodCallback callback = null; - if (args != null && async) { - List argsList = Lists.newArrayList(args); - callback = extractCallback(argsList); - args = argsList.toArray(); - } - - return caller.call(method, args, callback, null); - } - }; - - @SuppressWarnings("unchecked") - T instance = (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), - new Class[] {serviceInterface}, invocationHandler); - return instance; - } - - /** - * Verifies that the final argument in a list of objects is a fully-formed - * {@link AsyncMethodCallback} and extracts it, removing it from the argument list. - * - * @param args Argument list to remove the callback from. - * @return The callback extracted from {@code args}. - */ - private static AsyncMethodCallback extractCallback(List args) { - // TODO(William Farner): Check all interface methods when building the Thrift client - // and verify that last arguments are all callbacks...this saves us from checking - // each time. - - // Check that the last argument is a callback. - Preconditions.checkArgument(args.size() > 0); - Object lastArg = args.get(args.size() - 1); - Preconditions.checkArgument(lastArg instanceof AsyncMethodCallback, - "Last argument of an async thrift call is expected to be of type AsyncMethodCallback."); - - return (AsyncMethodCallback) args.remove(args.size() - 1); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/ThriftConnectionFactory.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftConnectionFactory.java b/commons/src/main/java/org/apache/aurora/common/thrift/ThriftConnectionFactory.java deleted file mode 100644 index 8c302d3..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftConnectionFactory.java +++ /dev/null @@ -1,366 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.Closures; -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.net.pool.Connection; -import org.apache.aurora.common.net.pool.ConnectionFactory; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TNonblockingSocket; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; - -/** - * A connection factory for thrift transport connections to a given host. This connection factory - * is lazy and will only create a configured maximum number of active connections - where a - * {@link ConnectionFactory#create(Amount) created} connection that has - * not been {@link #destroy destroyed} is considered active. - * - * @author John Sirois - */ -public class ThriftConnectionFactory - implements ConnectionFactory> { - - public enum TransportType { - BLOCKING, FRAMED, NONBLOCKING; - - /** - * Async clients implicitly use a framed transport, requiring the server they connect to to do - * the same. This prevents specifying a nonblocking client without a framed transport, since - * that is not compatible with thrift and would simply cause the client to blow up when making a - * request. Instead, you must explicitly say useFramedTransport(true) for any buildAsync(). - */ - public static TransportType get(boolean framedTransport, boolean nonblocking) { - if (nonblocking) { - Preconditions.checkArgument(framedTransport, - "nonblocking client requires a server running framed transport"); - return NONBLOCKING; - } - - return framedTransport ? FRAMED : BLOCKING; - } - } - - private static InetSocketAddress asEndpoint(String host, int port) { - MorePreconditions.checkNotBlank(host); - Preconditions.checkArgument(port > 0); - return InetSocketAddress.createUnresolved(host, port); - } - - private InetSocketAddress endpoint; - private final int maxConnections; - private final TransportType transportType; - private final Amount socketTimeout; - private final Closure> postCreateCallback; - private boolean sslTransport = false; - - private final Set> activeConnections = - Sets.newSetFromMap( - Maps., Boolean>newIdentityHashMap()); - private volatile int lastActiveConnectionsSize = 0; - - private final Lock activeConnectionsWriteLock = new ReentrantLock(true); - - /** - * Creates a thrift connection factory with a plain socket (non-framed transport). - * This is the same as calling {@link #ThriftConnectionFactory(String, int, int, boolean)} with - * {@code framedTransport} set to {@code false}. - * - * @param host Host to connect to. - * @param port Port to connect on. - * @param maxConnections Maximum number of connections for this host:port. - */ - public ThriftConnectionFactory(String host, int port, int maxConnections) { - this(host, port, maxConnections, TransportType.BLOCKING); - } - - /** - * Creates a thrift connection factory. - * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, - * otherwise a raw {@link TSocket} will be used. - * - * @param host Host to connect to. - * @param port Port to connect on. - * @param maxConnections Maximum number of connections for this host:port. - * @param framedTransport Whether to use framed or blocking transport. - */ - public ThriftConnectionFactory(String host, int port, int maxConnections, - boolean framedTransport) { - - this(asEndpoint(host, port), maxConnections, TransportType.get(framedTransport, false)); - } - - /** - * Creates a thrift connection factory. - * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, - * otherwise a raw {@link TSocket} will be used. - * - * @param endpoint Endpoint to connect to. - * @param maxConnections Maximum number of connections for this host:port. - * @param framedTransport Whether to use framed or blocking transport. - */ - public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, - boolean framedTransport) { - - this(endpoint, maxConnections, TransportType.get(framedTransport, false)); - } - - /** - * Creates a thrift connection factory. - * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, - * otherwise a raw {@link TSocket} will be used. - * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used, - * otherwise a raw {@link TSocket} will be used. - * Timeouts are ignored when nonblocking transport is used. - * - * @param host Host to connect to. - * @param port Port to connect on. - * @param maxConnections Maximum number of connections for this host:port. - * @param transportType Whether to use normal blocking, framed blocking, or non-blocking - * (implicitly framed) transport. - */ - public ThriftConnectionFactory(String host, int port, int maxConnections, - TransportType transportType) { - this(host, port, maxConnections, transportType, null); - } - - /** - * Creates a thrift connection factory. - * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, - * otherwise a raw {@link TSocket} will be used. - * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used, - * otherwise a raw {@link TSocket} will be used. - * Timeouts are ignored when nonblocking transport is used. - * - * @param host Host to connect to. - * @param port Port to connect on. - * @param maxConnections Maximum number of connections for this host:port. - * @param transportType Whether to use normal blocking, framed blocking, or non-blocking - * (implicitly framed) transport. - * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o - * the blocking client. - */ - public ThriftConnectionFactory(String host, int port, int maxConnections, - TransportType transportType, Amount socketTimeout) { - this(asEndpoint(host, port), maxConnections, transportType, socketTimeout); - } - - public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, - TransportType transportType) { - this(endpoint, maxConnections, transportType, null); - } - - /** - * Creates a thrift connection factory. - * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, - * otherwise a raw {@link TSocket} will be used. - * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used, - * otherwise a raw {@link TSocket} will be used. - * Timeouts are ignored when nonblocking transport is used. - * - * @param endpoint Endpoint to connect to. - * @param maxConnections Maximum number of connections for this host:port. - * @param transportType Whether to use normal blocking, framed blocking, or non-blocking - * (implicitly framed) transport. - * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o - * the blocking client. - */ - public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, - TransportType transportType, Amount socketTimeout) { - this(endpoint, maxConnections, transportType, socketTimeout, - Closures.>noop(), false); - } - - public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, - TransportType transportType, Amount socketTimeout, - Closure> postCreateCallback, - boolean sslTransport) { - Preconditions.checkArgument(maxConnections > 0, "maxConnections must be at least 1"); - if (socketTimeout != null) { - Preconditions.checkArgument(socketTimeout.as(Time.MILLISECONDS) >= 0); - } - - this.endpoint = Preconditions.checkNotNull(endpoint); - this.maxConnections = maxConnections; - this.transportType = transportType; - this.socketTimeout = socketTimeout; - this.postCreateCallback = Preconditions.checkNotNull(postCreateCallback); - this.sslTransport = sslTransport; - } - - @Override - public boolean mightCreate() { - return lastActiveConnectionsSize < maxConnections; - } - - /** - * FIXME: shouldn't this throw TimeoutException instead of returning null - * in the timeout cases as per the ConnectionFactory.create javadoc? - */ - @Override - public Connection create(Amount timeout) - throws TTransportException, IOException { - - Preconditions.checkNotNull(timeout); - if (timeout.getValue() == 0) { - return create(); - } - - try { - long timeRemainingNs = timeout.as(Time.NANOSECONDS); - long start = System.nanoTime(); - if(activeConnectionsWriteLock.tryLock(timeRemainingNs, TimeUnit.NANOSECONDS)) { - try { - if (!willCreateSafe()) { - return null; - } - - timeRemainingNs -= (System.nanoTime() - start); - - return createConnection((int) TimeUnit.NANOSECONDS.toMillis(timeRemainingNs)); - } finally { - activeConnectionsWriteLock.unlock(); - } - } else { - return null; - } - } catch (InterruptedException e) { - return null; - } - } - - private Connection create() - throws TTransportException, IOException { - activeConnectionsWriteLock.lock(); - try { - if (!willCreateSafe()) { - return null; - } - - return createConnection(0); - } finally { - activeConnectionsWriteLock.unlock(); - } - } - - private Connection createConnection(int timeoutMillis) - throws TTransportException, IOException { - TTransport transport = createTransport(timeoutMillis); - if (transport == null) { - return null; - } - - Connection connection = - new TTransportConnection(transport, endpoint); - postCreateCallback.execute(connection); - activeConnections.add(connection); - lastActiveConnectionsSize = activeConnections.size(); - return connection; - } - - private boolean willCreateSafe() { - return activeConnections.size() < maxConnections; - } - - @VisibleForTesting - TTransport createTransport(int timeoutMillis) throws TTransportException, IOException { - TSocket socket = null; - if (transportType != TransportType.NONBLOCKING) { - // can't do a nonblocking create on a blocking transport - if (timeoutMillis <= 0) { - return null; - } - - if (sslTransport) { - SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory.getDefault(); - SSLSocket ssl_socket = (SSLSocket) factory.createSocket(endpoint.getHostName(), endpoint.getPort()); - ssl_socket.setSoTimeout(timeoutMillis); - return new TSocket(ssl_socket); - } else { - socket = new TSocket(endpoint.getHostName(), endpoint.getPort(), timeoutMillis); - } - } - - try { - switch (transportType) { - case BLOCKING: - socket.open(); - setSocketTimeout(socket); - return socket; - case FRAMED: - TFramedTransport transport = new TFramedTransport(socket); - transport.open(); - setSocketTimeout(socket); - return transport; - case NONBLOCKING: - try { - return new TNonblockingSocket(endpoint.getHostName(), endpoint.getPort()); - } catch (IOException e) { - throw new IOException("Failed to create non-blocking transport to " + endpoint, e); - } - } - } catch (TTransportException e) { - throw new TTransportException("Failed to create transport to " + endpoint, e); - } - - throw new IllegalArgumentException("unknown transport type " + transportType); - } - - private void setSocketTimeout(TSocket socket) { - if (socketTimeout != null) { - socket.setTimeout(socketTimeout.as(Time.MILLISECONDS).intValue()); - } - } - - @Override - public void destroy(Connection connection) { - activeConnectionsWriteLock.lock(); - try { - boolean wasActiveConnection = activeConnections.remove(connection); - Preconditions.checkArgument(wasActiveConnection, - "connection %s not created by this factory", connection); - lastActiveConnectionsSize = activeConnections.size(); - } finally { - activeConnectionsWriteLock.unlock(); - } - - // We close the connection outside the critical section which means we may have more connections - // "active" (open) than maxConnections for a very short time - connection.close(); - } - - @Override - public String toString() { - return String.format("%s[%s]", getClass().getSimpleName(), endpoint); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/ThriftException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftException.java b/commons/src/main/java/org/apache/aurora/common/thrift/ThriftException.java deleted file mode 100644 index 27e9f5e..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftException.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift; - -/** - * Exception class to wrap exceptions caught during thrift calls. - */ -public class ThriftException extends Exception { - public ThriftException(String message) { - super(message); - } - public ThriftException(String message, Throwable t) { - super(message, t); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/ThriftFactory.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftFactory.java b/commons/src/main/java/org/apache/aurora/common/thrift/ThriftFactory.java deleted file mode 100644 index 75c58e2..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftFactory.java +++ /dev/null @@ -1,653 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.thrift.async.TAsyncClient; -import org.apache.thrift.async.TAsyncClientManager; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TNonblockingTransport; -import org.apache.thrift.transport.TTransport; - -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.Closures; -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.net.loadbalancing.LeastConnectedStrategy; -import org.apache.aurora.common.net.loadbalancing.LoadBalancer; -import org.apache.aurora.common.net.loadbalancing.LoadBalancerImpl; -import org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy; -import org.apache.aurora.common.net.loadbalancing.MarkDeadStrategyWithHostCheck; -import org.apache.aurora.common.net.loadbalancing.TrafficMonitorAdapter; -import org.apache.aurora.common.net.monitoring.TrafficMonitor; -import org.apache.aurora.common.net.pool.Connection; -import org.apache.aurora.common.net.pool.ConnectionPool; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.net.pool.DynamicPool; -import org.apache.aurora.common.net.pool.MetaPool; -import org.apache.aurora.common.net.pool.ObjectPool; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.stats.Stats; -import org.apache.aurora.common.stats.StatsProvider; -import org.apache.aurora.common.thrift.ThriftConnectionFactory.TransportType; -import org.apache.aurora.common.util.BackoffDecider; -import org.apache.aurora.common.util.BackoffStrategy; -import org.apache.aurora.common.util.TruncatedBinaryBackoff; -import org.apache.aurora.common.util.concurrent.ForwardingExecutorService; - -/** - * A utility that provides convenience methods to build common {@link Thrift}s. - * - * The thrift factory allows you to specify parameters that define how the client connects to - * and communicates with servers, such as the transport type, connection settings, and load - * balancing. Request-level settings like sync/async and retries should be set on the - * {@link Thrift} instance that this factory will create. - * - * The factory will attempt to provide reasonable defaults to allow the caller to minimize the - * amount of necessary configuration. Currently, the default behavior includes: - * - *
    - *
  • A test lease/release for each host will be performed every second - * {@link #withDeadConnectionRestoreInterval(Amount)} - *
  • At most 50 connections will be established to each host - * {@link #withMaxConnectionsPerEndpoint(int)} - *
  • Unframed transport {@link #useFramedTransport(boolean)} - *
  • A load balancing strategy that will mark hosts dead and prefer least-connected hosts. - * Hosts are marked dead if the most recent connection attempt was a failure or else based on - * the windowed error rate of attempted RPCs. If the error rate for a connected host exceeds - * 20% over the last second, the host will be disabled for 2 seconds ascending up to 10 seconds - * if the elevated error rate persists. - * {@link #withLoadBalancingStrategy(LoadBalancingStrategy)} - *
  • Statistics are reported through {@link Stats} - * {@link #withStatsProvider(StatsProvider)} - *
  • A service name matching the thrift interface name {@link #withServiceName(String)} - *
- * - * @author John Sirois - */ -public class ThriftFactory { - private static final Amount DEFAULT_DEAD_TARGET_RESTORE_INTERVAL = - Amount.of(1L, Time.SECONDS); - - private static final int DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT = 50; - - private Class serviceInterface; - private Function clientFactory; - private int maxConnectionsPerEndpoint; - private Amount connectionRestoreInterval; - private boolean framedTransport; - private LoadBalancingStrategy loadBalancingStrategy = null; - private final TrafficMonitor monitor; - private Amount socketTimeout = null; - private Closure> postCreateCallback = Closures.noop(); - private StatsProvider statsProvider = Stats.STATS_PROVIDER; - private Optional endpointName = Optional.absent(); - private String serviceName; - private boolean sslTransport; - - public static ThriftFactory create(Class serviceInterface) { - return new ThriftFactory(serviceInterface); - } - - /** - * Creates a default factory that will use unframed blocking transport. - * - * @param serviceInterface The interface of the thrift service to make a client for. - */ - private ThriftFactory(Class serviceInterface) { - this.serviceInterface = Thrift.checkServiceInterface(serviceInterface); - this.maxConnectionsPerEndpoint = DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT; - this.connectionRestoreInterval = DEFAULT_DEAD_TARGET_RESTORE_INTERVAL; - this.framedTransport = false; - this.monitor = new TrafficMonitor(serviceInterface.getName()); - this.serviceName = serviceInterface.getEnclosingClass().getSimpleName(); - this.sslTransport = false; - } - - private void checkBaseState() { - Preconditions.checkArgument(maxConnectionsPerEndpoint > 0, - "Must allow at least 1 connection per endpoint; %s specified", maxConnectionsPerEndpoint); - } - - public TrafficMonitor getMonitor() { - return monitor; - } - - /** - * Creates the thrift client, and initializes connection pools. - * - * @param backends Backends to connect to. - * @return A new thrift client. - */ - public Thrift build(Set backends) { - checkBaseState(); - MorePreconditions.checkNotBlank(backends); - - ManagedThreadPool managedThreadPool = createManagedThreadpool(backends.size()); - LoadBalancer loadBalancer = createLoadBalancer(); - Function clientFactory = getClientFactory(); - - ObjectPool> connectionPool = - createConnectionPool(backends, loadBalancer, managedThreadPool, false); - - return new Thrift(managedThreadPool, connectionPool, loadBalancer, serviceName, - serviceInterface, clientFactory, false, sslTransport); - } - - /** - * Creates a synchronous thrift client that will communicate with a dynamic host set. - * - * @param hostSet The host set to use as a backend. - * @return A thrift client. - * @throws ThriftFactoryException If an error occurred while creating the client. - */ - public Thrift build(DynamicHostSet hostSet) throws ThriftFactoryException { - checkBaseState(); - Preconditions.checkNotNull(hostSet); - - ManagedThreadPool managedThreadPool = createManagedThreadpool(1); - LoadBalancer loadBalancer = createLoadBalancer(); - Function clientFactory = getClientFactory(); - - ObjectPool> connectionPool = - createConnectionPool(hostSet, loadBalancer, managedThreadPool, false, endpointName); - - return new Thrift(managedThreadPool, connectionPool, loadBalancer, serviceName, - serviceInterface, clientFactory, false, sslTransport); - } - - private ManagedThreadPool createManagedThreadpool(int initialEndpointCount) { - return new ManagedThreadPool(serviceName, initialEndpointCount, maxConnectionsPerEndpoint); - } - - /** - * A finite thread pool that monitors backend choice events to dynamically resize. This - * {@link java.util.concurrent.ExecutorService} implementation immediately rejects requests when - * there are no more available worked threads (requests are not queued). - */ - private static class ManagedThreadPool extends ForwardingExecutorService - implements Closure> { - - private static final Logger LOG = Logger.getLogger(ManagedThreadPool.class.getName()); - - private static ThreadPoolExecutor createThreadPool(String serviceName, int initialSize) { - ThreadFactory threadFactory = - new ThreadFactoryBuilder() - .setNameFormat("Thrift[" +serviceName + "][%d]") - .setDaemon(true) - .build(); - return new ThreadPoolExecutor(initialSize, initialSize, 0, TimeUnit.MILLISECONDS, - new SynchronousQueue(), threadFactory); - } - - private final int maxConnectionsPerEndpoint; - - public ManagedThreadPool(String serviceName, int initialEndpointCount, - int maxConnectionsPerEndpoint) { - - super(createThreadPool(serviceName, initialEndpointCount * maxConnectionsPerEndpoint)); - this.maxConnectionsPerEndpoint = maxConnectionsPerEndpoint; - setRejectedExecutionHandler(initialEndpointCount); - } - - private void setRejectedExecutionHandler(int endpointCount) { - final String message = - String.format("All %d x %d connections in use", endpointCount, maxConnectionsPerEndpoint); - delegate.setRejectedExecutionHandler(new RejectedExecutionHandler() { - @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { - throw new RejectedExecutionException(message); - } - }); - } - - @Override - public void execute(Collection chosenBackends) { - int previousPoolSize = delegate.getMaximumPoolSize(); - /* - * In the case of no available backends, we need to make sure we pass in a positive pool - * size to our delegate. In particular, java.util.concurrent.ThreadPoolExecutor does not - * accept zero as a valid core or max pool size. - */ - int backendCount = Math.max(chosenBackends.size(), 1); - int newPoolSize = backendCount * maxConnectionsPerEndpoint; - - if (previousPoolSize != newPoolSize) { - LOG.info(String.format("Re-sizing deadline thread pool from: %d to: %d", - previousPoolSize, newPoolSize)); - if (previousPoolSize < newPoolSize) { // Don't cross the beams! - delegate.setMaximumPoolSize(newPoolSize); - delegate.setCorePoolSize(newPoolSize); - } else { - delegate.setCorePoolSize(newPoolSize); - delegate.setMaximumPoolSize(newPoolSize); - } - setRejectedExecutionHandler(backendCount); - } - } - } - - /** - * Creates an asynchronous thrift client that will communicate with a fixed set of backends. - * - * @param backends Backends to connect to. - * @return A thrift client. - * @throws ThriftFactoryException If an error occurred while creating the client. - */ - public Thrift buildAsync(Set backends) throws ThriftFactoryException { - checkBaseState(); - MorePreconditions.checkNotBlank(backends); - - LoadBalancer loadBalancer = createLoadBalancer(); - Closure> noop = Closures.noop(); - Function asyncClientFactory = getAsyncClientFactory(); - - ObjectPool> connectionPool = - createConnectionPool(backends, loadBalancer, noop, true); - - return new Thrift(connectionPool, loadBalancer, - serviceName, serviceInterface, asyncClientFactory, true); - } - - /** - * Creates an asynchronous thrift client that will communicate with a dynamic host set. - * - * @param hostSet The host set to use as a backend. - * @return A thrift client. - * @throws ThriftFactoryException If an error occurred while creating the client. - */ - public Thrift buildAsync(DynamicHostSet hostSet) - throws ThriftFactoryException { - checkBaseState(); - Preconditions.checkNotNull(hostSet); - - LoadBalancer loadBalancer = createLoadBalancer(); - Closure> noop = Closures.noop(); - Function asyncClientFactory = getAsyncClientFactory(); - - ObjectPool> connectionPool = - createConnectionPool(hostSet, loadBalancer, noop, true, endpointName); - - return new Thrift(connectionPool, loadBalancer, - serviceName, serviceInterface, asyncClientFactory, true); - } - - /** - * Prepare the client factory, which will create client class instances from transports. - * - * @return The client factory to use. - */ - private Function getClientFactory() { - return clientFactory == null ? createClientFactory(serviceInterface) : clientFactory; - } - - /** - * Prepare the async client factory, which will create client class instances from transports. - * - * @return The client factory to use. - * @throws ThriftFactoryException If there was a problem creating the factory. - */ - private Function getAsyncClientFactory() throws ThriftFactoryException { - try { - return clientFactory == null ? createAsyncClientFactory(serviceInterface) : clientFactory; - } catch (IOException e) { - throw new ThriftFactoryException("Failed to create async client factory.", e); - } - } - - private ObjectPool> createConnectionPool( - Set backends, LoadBalancer loadBalancer, - Closure> onBackendsChosen, boolean nonblocking) { - - ImmutableMap.Builder>> - backendBuilder = ImmutableMap.builder(); - for (InetSocketAddress backend : backends) { - backendBuilder.put(backend, createConnectionPool(backend, nonblocking)); - } - - return new MetaPool(backendBuilder.build(), - loadBalancer, onBackendsChosen, connectionRestoreInterval); - } - - private ObjectPool> createConnectionPool( - DynamicHostSet hostSet, LoadBalancer loadBalancer, - Closure> onBackendsChosen, - final boolean nonblocking, Optional serviceEndpointName) - throws ThriftFactoryException { - - Function>> - endpointPoolFactory = - new Function>>() { - @Override public ObjectPool> apply( - InetSocketAddress endpoint) { - return createConnectionPool(endpoint, nonblocking); - } - }; - - try { - return new DynamicPool(hostSet, - endpointPoolFactory, loadBalancer, onBackendsChosen, connectionRestoreInterval, - Util.getAddress(serviceEndpointName), Util.IS_ALIVE); - } catch (DynamicHostSet.MonitorException e) { - throw new ThriftFactoryException("Failed to monitor host set.", e); - } - } - - private ObjectPool> createConnectionPool( - InetSocketAddress backend, boolean nonblocking) { - - ThriftConnectionFactory connectionFactory = new ThriftConnectionFactory( - backend, maxConnectionsPerEndpoint, TransportType.get(framedTransport, nonblocking), - socketTimeout, postCreateCallback, sslTransport); - - return new ConnectionPool>(connectionFactory, - statsProvider); - } - - @VisibleForTesting - public ThriftFactory withClientFactory(Function clientFactory) { - this.clientFactory = Preconditions.checkNotNull(clientFactory); - - return this; - } - - public ThriftFactory withSslEnabled() { - this.sslTransport = true; - return this; - } - - /** - * Specifies the maximum number of connections that should be made to any single endpoint. - * - * @param maxConnectionsPerEndpoint Maximum number of connections per endpoint. - * @return A reference to the factory. - */ - public ThriftFactory withMaxConnectionsPerEndpoint(int maxConnectionsPerEndpoint) { - Preconditions.checkArgument(maxConnectionsPerEndpoint > 0); - this.maxConnectionsPerEndpoint = maxConnectionsPerEndpoint; - - return this; - } - - /** - * Specifies the interval at which dead endpoint connections should be checked and revived. - * - * @param connectionRestoreInterval the time interval to check. - * @return A reference to the factory. - */ - public ThriftFactory withDeadConnectionRestoreInterval( - Amount connectionRestoreInterval) { - Preconditions.checkNotNull(connectionRestoreInterval); - Preconditions.checkArgument(connectionRestoreInterval.getValue() >= 0, - "A negative interval is invalid: %s", connectionRestoreInterval); - this.connectionRestoreInterval = connectionRestoreInterval; - - return this; - } - - /** - * Instructs the factory whether framed transport should be used. - * - * @param framedTransport Whether to use framed transport. - * @return A reference to the factory. - */ - public ThriftFactory useFramedTransport(boolean framedTransport) { - this.framedTransport = framedTransport; - - return this; - } - - /** - * Specifies the load balancer to use when interacting with multiple backends. - * - * @param strategy Load balancing strategy. - * @return A reference to the factory. - */ - public ThriftFactory withLoadBalancingStrategy( - LoadBalancingStrategy strategy) { - this.loadBalancingStrategy = Preconditions.checkNotNull(strategy); - - return this; - } - - private LoadBalancer createLoadBalancer() { - if (loadBalancingStrategy == null) { - loadBalancingStrategy = createDefaultLoadBalancingStrategy(); - } - - return LoadBalancerImpl.create(TrafficMonitorAdapter.create(loadBalancingStrategy, monitor)); - } - - private LoadBalancingStrategy createDefaultLoadBalancingStrategy() { - Function backoffFactory = - new Function() { - @Override public BackoffDecider apply(InetSocketAddress socket) { - BackoffStrategy backoffStrategy = new TruncatedBinaryBackoff( - Amount.of(2L, Time.SECONDS), Amount.of(10L, Time.SECONDS)); - - return BackoffDecider.builder(socket.toString()) - .withTolerateFailureRate(0.2) - .withRequestWindow(Amount.of(1L, Time.SECONDS)) - .withSeedSize(5) - .withStrategy(backoffStrategy) - .withRecoveryType(BackoffDecider.RecoveryType.FULL_CAPACITY) - .withStatsProvider(statsProvider) - .build(); - } - }; - - return new MarkDeadStrategyWithHostCheck( - new LeastConnectedStrategy(), backoffFactory); - } - - /** - * Specifies the net read/write timeout to set via SO_TIMEOUT on the thrift blocking client - * or AsyncClient.setTimeout on the thrift async client. Defaults to the connectTimeout on - * the blocking client if not set. - * - * @param socketTimeout timeout on thrift i/o operations - * @return A reference to the factory. - */ - public ThriftFactory withSocketTimeout(Amount socketTimeout) { - this.socketTimeout = Preconditions.checkNotNull(socketTimeout); - Preconditions.checkArgument(socketTimeout.as(Time.MILLISECONDS) >= 0); - - return this; - } - - /** - * Specifies the callback to notify when a connection has been created. The callback may - * be used to make thrift calls to the connection, but must not invalidate it. - * Defaults to a no-op closure. - * - * @param postCreateCallback function to setup new connections - * @return A reference to the factory. - */ - public ThriftFactory withPostCreateCallback( - Closure> postCreateCallback) { - this.postCreateCallback = Preconditions.checkNotNull(postCreateCallback); - - return this; - } - - /** - * Registers a custom stats provider to use to track various client stats. - * - * @param statsProvider the {@code StatsProvider} to use - * @return A reference to the factory. - */ - public ThriftFactory withStatsProvider(StatsProvider statsProvider) { - this.statsProvider = Preconditions.checkNotNull(statsProvider); - - return this; - } - - /** - * Name to be passed to Thrift constructor, used in stats. - * - * @param serviceName string to use - * @return A reference to the factory. - */ - public ThriftFactory withServiceName(String serviceName) { - this.serviceName = MorePreconditions.checkNotBlank(serviceName); - - return this; - } - - /** - * Set the end-point to use from {@link ServiceInstance#getAdditionalEndpoints()}. - * If not set, the default behavior is to use {@link ServiceInstance#getServiceEndpoint()}. - * - * @param endpointName the (optional) name of the end-point, if unset - the - * default/primary end-point is selected - * @return a reference to the factory for chaining - */ - public ThriftFactory withEndpointName(String endpointName) { - this.endpointName = Optional.of(endpointName); - return this; - } - - private static Function createClientFactory(Class serviceInterface) { - final Constructor implementationConstructor = - findImplementationConstructor(serviceInterface); - - return new Function() { - @Override public T apply(TTransport transport) { - try { - return implementationConstructor.newInstance(new TBinaryProtocol(transport)); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private Function createAsyncClientFactory( - final Class serviceInterface) throws IOException { - - final TAsyncClientManager clientManager = new TAsyncClientManager(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override public void run() { - clientManager.stop(); - } - }); - - final Constructor implementationConstructor = - findAsyncImplementationConstructor(serviceInterface); - - return new Function() { - @Override public T apply(TTransport transport) { - Preconditions.checkNotNull(transport); - Preconditions.checkArgument(transport instanceof TNonblockingTransport, - "Invalid transport provided to client factory: " + transport.getClass()); - - try { - T client = implementationConstructor.newInstance(new TBinaryProtocol.Factory(), - clientManager, transport); - - if (socketTimeout != null) { - ((TAsyncClient) client).setTimeout(socketTimeout.as(Time.MILLISECONDS)); - } - - return client; - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private static Constructor findImplementationConstructor( - final Class serviceInterface) { - Class implementationClass = findImplementationClass(serviceInterface); - try { - return implementationClass.getConstructor(TProtocol.class); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("Failed to find a single argument TProtocol constructor " - + "in service client class: " + implementationClass); - } - } - - private static Constructor findAsyncImplementationConstructor( - final Class serviceInterface) { - Class implementationClass = findImplementationClass(serviceInterface); - try { - return implementationClass.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, - TNonblockingTransport.class); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("Failed to find expected constructor " - + "in service client class: " + implementationClass); - } - } - - @SuppressWarnings("unchecked") - private static Class findImplementationClass(final Class serviceInterface) { - try { - return (Class) - Iterables.find(ImmutableList.copyOf(serviceInterface.getEnclosingClass().getClasses()), - new Predicate>() { - @Override public boolean apply(Class inner) { - return !serviceInterface.equals(inner) - && serviceInterface.isAssignableFrom(inner); - } - }); - } catch (NoSuchElementException e) { - throw new IllegalArgumentException("Could not find a sibling enclosed implementation of " - + "service interface: " + serviceInterface); - } - } - - public static class ThriftFactoryException extends Exception { - public ThriftFactoryException(String msg) { - super(msg); - } - - public ThriftFactoryException(String msg, Throwable t) { - super(msg, t); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/Caller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/Caller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/Caller.java deleted file mode 100644 index 0200c49..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/Caller.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.callers; - -import com.google.common.base.Preconditions; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.thrift.async.AsyncMethodCallback; - -import javax.annotation.Nullable; -import java.lang.reflect.Method; - -/** -* A caller that invokes a method on an object. -* -* @author William Farner -*/ -public interface Caller { - - /** - * Invokes a method on an object, using the given arguments. The method call may be - * asynchronous, in which case {@code callback} will be non-null. - * - * @param method The method being invoked. - * @param args The arguments to call {@code method} with. - * @param callback The callback to use if the method is asynchronous. - * @param connectTimeoutOverride Optional override for the default connection timeout. - * @return The return value from invoking the method. - * @throws Throwable Exception, as prescribed by the method's contract. - */ - public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback, - @Nullable Amount connectTimeoutOverride) throws Throwable; - - /** - * Captures the result of a request, whether synchronous or asynchronous. It should be expected - * that for every request made, exactly one of these methods will be called. - */ - static interface ResultCapture { - /** - * Called when the request completed successfully. - */ - void success(); - - /** - * Called when the request failed. - * - * @param t Throwable that was caught. Must never be null. - * @return {@code true} if a wrapped callback should be notified of the failure, - * {@code false} otherwise. - */ - boolean fail(Throwable t); - } - - /** - * A callback that adapts a {@link ResultCapture} with an {@link AsyncMethodCallback} while - * maintaining the AsyncMethodCallback interface. The wrapped callback will handle invocation - * of the underlying callback based on the return values from the ResultCapture. - */ - static class WrappedMethodCallback implements AsyncMethodCallback { - private final AsyncMethodCallback wrapped; - private final ResultCapture capture; - - private boolean callbackTriggered = false; - - public WrappedMethodCallback(AsyncMethodCallback wrapped, ResultCapture capture) { - this.wrapped = wrapped; - this.capture = capture; - } - - private void callbackTriggered() { - Preconditions.checkState(!callbackTriggered, "Each callback may only be triggered once."); - callbackTriggered = true; - } - - @Override @SuppressWarnings("unchecked") public void onComplete(Object o) { - capture.success(); - wrapped.onComplete(o); - callbackTriggered(); - } - - @Override public void onError(Exception t) { - if (capture.fail(t)) { - wrapped.onError(t); - callbackTriggered(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/CallerDecorator.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/CallerDecorator.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/CallerDecorator.java deleted file mode 100644 index bd0a952..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/CallerDecorator.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.callers; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.thrift.async.AsyncMethodCallback; - -import javax.annotation.Nullable; -import java.lang.reflect.Method; - -/** -* A caller that decorates another caller. -* -* @author William Farner -*/ -abstract class CallerDecorator implements Caller { - private final Caller decoratedCaller; - private final boolean async; - - CallerDecorator(Caller decoratedCaller, boolean async) { - this.decoratedCaller = decoratedCaller; - this.async = async; - } - - /** - * Convenience method for invoking the method and shunting the capture into the callback if - * the call is asynchronous. - * - * @param method The method being invoked. - * @param args The arguments to call {@code method} with. - * @param callback The callback to use if the method is asynchronous. - * @param capture The result capture to notify of the call result. - * @param connectTimeoutOverride Optional override for the default connection timeout. - * @return The return value from invoking the method. - * @throws Throwable Exception, as prescribed by the method's contract. - */ - protected final Object invoke(Method method, Object[] args, - @Nullable AsyncMethodCallback callback, @Nullable final ResultCapture capture, - @Nullable Amount connectTimeoutOverride) throws Throwable { - - // Swap the wrapped callback out for ours. - if (callback != null) { - callback = new WrappedMethodCallback(callback, capture); - } - - try { - Object result = decoratedCaller.call(method, args, callback, connectTimeoutOverride); - if (callback == null && capture != null) capture.success(); - - return result; - } catch (Exception t) { - // We allow this one to go to both sync and async captures. - if (callback != null) { - callback.onError(t); - return null; - } else { - if (capture != null) capture.fail(t); - throw t; - } - } - } - - boolean isAsync() { - return async; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/DeadlineCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/DeadlineCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/DeadlineCaller.java deleted file mode 100644 index 75ed1ec..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/DeadlineCaller.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.callers; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeoutException; - -import javax.annotation.Nullable; - -import com.google.common.base.Throwables; - -import org.apache.aurora.common.thrift.TResourceExhaustedException; -import org.apache.thrift.async.AsyncMethodCallback; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.thrift.TTimeoutException; - -/** - * A caller that imposes a time deadline on the underlying caller. If the underlying calls fail - * to meet the deadline {@link TTimeoutException} is thrown. If the executor service rejects - * execution of a task, {@link TResourceExhaustedException} is thrown. - * - * @author William Farner - */ -public class DeadlineCaller extends CallerDecorator { - private final ExecutorService executorService; - private final Amount timeout; - - /** - * Creates a new deadline caller. - * - * @param decoratedCaller The caller to decorate with a deadline. - * @param async Whether the caller is asynchronous. - * @param executorService The executor service to use for performing calls. - * @param timeout The timeout by which the underlying call should complete in. - */ - public DeadlineCaller(Caller decoratedCaller, boolean async, ExecutorService executorService, - Amount timeout) { - super(decoratedCaller, async); - - this.executorService = executorService; - this.timeout = timeout; - } - - @Override - public Object call(final Method method, final Object[] args, - @Nullable final AsyncMethodCallback callback, - @Nullable final Amount connectTimeoutOverride) throws Throwable { - try { - Future result = executorService.submit(new Callable() { - @Override public Object call() throws Exception { - try { - return invoke(method, args, callback, null, connectTimeoutOverride); - } catch (Throwable t) { - Throwables.propagateIfInstanceOf(t, Exception.class); - throw new RuntimeException(t); - } - } - }); - - try { - return result.get(timeout.getValue(), timeout.getUnit().getTimeUnit()); - } catch (TimeoutException e) { - result.cancel(true); - throw new TTimeoutException(e); - } catch (ExecutionException e) { - throw e.getCause(); - } - } catch (RejectedExecutionException e) { - throw new TResourceExhaustedException(e); - } catch (InvocationTargetException e) { - throw e.getCause(); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/DebugCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/DebugCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/DebugCaller.java deleted file mode 100644 index aff4006..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/DebugCaller.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.callers; - -import com.google.common.base.Joiner; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.thrift.async.AsyncMethodCallback; - -import javax.annotation.Nullable; - -import java.lang.reflect.Method; -import java.util.logging.Logger; - -/** - * A caller that reports debugging information about calls. - * - * @author William Farner - */ -public class DebugCaller extends CallerDecorator { - private static final Logger LOG = Logger.getLogger(DebugCaller.class.getName()); - private static final Joiner ARG_JOINER = Joiner.on(", "); - - /** - * Creates a new debug caller. - * - * @param decoratedCaller The caller to decorate with debug information. - * @param async Whether the caller is asynchronous. - */ - public DebugCaller(Caller decoratedCaller, boolean async) { - super(decoratedCaller, async); - } - - @Override - public Object call(final Method method, final Object[] args, - @Nullable AsyncMethodCallback callback, @Nullable Amount connectTimeoutOverride) - throws Throwable { - ResultCapture capture = new ResultCapture() { - @Override public void success() { - // No-op. - } - - @Override public boolean fail(Throwable t) { - StringBuilder message = new StringBuilder("Thrift call failed: "); - message.append(method.getName()).append("("); - ARG_JOINER.appendTo(message, args); - message.append(")"); - LOG.warning(message.toString()); - - return true; - } - }; - - try { - return invoke(method, args, callback, capture, connectTimeoutOverride); - } catch (Throwable t) { - capture.fail(t); - throw t; - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java deleted file mode 100644 index 84a0f4e..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.callers; - -import java.lang.reflect.Method; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; - -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Predicate; -import com.google.common.base.Throwables; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -import org.apache.aurora.common.thrift.TResourceExhaustedException; -import org.apache.thrift.async.AsyncMethodCallback; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.stats.StatsProvider; - -/** -* A caller that will retry calls to the wrapped caller. -* -* @author William Farner -*/ -public class RetryingCaller extends CallerDecorator { - private static final Logger LOG = Logger.getLogger(RetryingCaller.class.getName()); - - @VisibleForTesting - public static final Amount NONBLOCKING_TIMEOUT = Amount.of(-1L, Time.MILLISECONDS); - - private final StatsProvider statsProvider; - private final String serviceName; - private final int retries; - private final ImmutableSet> retryableExceptions; - private final boolean debug; - - /** - * Creates a new retrying caller. The retrying caller will attempt to call invoked methods on the - * underlying caller at most {@code retries} times. A retry will be performed only when one of - * the {@code retryableExceptions} is caught. - * - * @param decoratedCall The caller to decorate with retries. - * @param async Whether the caller is asynchronous. - * @param statsProvider The stat provider to export retry statistics through. - * @param serviceName The service name that calls are being invoked on. - * @param retries The maximum number of retries to perform. - * @param retryableExceptions The exceptions that can be retried. - * @param debug Whether to include debugging information when retries are being performed. - */ - public RetryingCaller(Caller decoratedCall, boolean async, StatsProvider statsProvider, - String serviceName, int retries, ImmutableSet> retryableExceptions, - boolean debug) { - super(decoratedCall, async); - this.statsProvider = statsProvider; - this.serviceName = serviceName; - this.retries = retries; - this.retryableExceptions = retryableExceptions; - this.debug = debug; - } - - private final LoadingCache stats = - CacheBuilder.newBuilder().build(new CacheLoader() { - @Override public AtomicLong load(Method method) { - // Thrift does not support overloads - so just the name disambiguates all calls. - return statsProvider.makeCounter(serviceName + "_" + method.getName() + "_retries"); - } - }); - - @Override public Object call(final Method method, final Object[] args, - @Nullable final AsyncMethodCallback callback, - @Nullable final Amount connectTimeoutOverride) throws Throwable { - final AtomicLong retryCounter = stats.get(method); - final AtomicInteger attempts = new AtomicInteger(); - final List exceptions = Lists.newArrayList(); - - final ResultCapture capture = new ResultCapture() { - @Override public void success() { - // No-op. - } - - @Override public boolean fail(Throwable t) { - if (!isRetryable(t)) { - if (debug) { - LOG.warning(String.format( - "Call failed with un-retryable exception of [%s]: %s, previous exceptions: %s", - t.getClass().getName(), t.getMessage(), combineStackTraces(exceptions))); - } - - return true; - } else if (attempts.get() >= retries) { - exceptions.add(t); - - if (debug) { - LOG.warning(String.format("Retried %d times, last error: %s, exceptions: %s", - attempts.get(), t, combineStackTraces(exceptions))); - } - - return true; - } else { - exceptions.add(t); - - if (isAsync() && attempts.incrementAndGet() <= retries) { - try { - retryCounter.incrementAndGet(); - // override connect timeout in ThriftCaller to prevent blocking for a connection - // for async retries (since this is within the callback in the selector thread) - invoke(method, args, callback, this, NONBLOCKING_TIMEOUT); - } catch (Throwable throwable) { - return fail(throwable); - } - } - - return false; - } - } - }; - - boolean continueLoop; - do { - try { - // If this is an async call, the looping will be handled within the capture. - return invoke(method, args, callback, capture, connectTimeoutOverride); - } catch (Throwable t) { - if (!isRetryable(t)) { - Throwable propagated = t; - - if (!exceptions.isEmpty() && (t instanceof TResourceExhaustedException)) { - // If we've been trucking along through retries that have had remote call failures - // and we suddenly can't immediately get a connection on the next retry, throw the - // previous remote call failure - the idea here is that the remote call failure is - // more interesting than a transient inability to get an immediate connection. - propagated = exceptions.remove(exceptions.size() - 1); - } - - if (isAsync()) { - callback.onError((Exception) propagated); - } else { - throw propagated; - } - } - } - - continueLoop = !isAsync() && attempts.incrementAndGet() <= retries; - if (continueLoop) retryCounter.incrementAndGet(); - } while (continueLoop); - - Throwable lastRetriedException = Iterables.getLast(exceptions); - if (debug) { - if (!exceptions.isEmpty()) { - LOG.warning( - String.format("Retried %d times, last error: %s, previous exceptions: %s", - attempts.get(), lastRetriedException, combineStackTraces(exceptions))); - } else { - LOG.warning( - String.format("Retried 1 time, last error: %s", lastRetriedException)); - } - } - - if (!isAsync()) throw lastRetriedException; - return null; - } - - private boolean isRetryable(Throwable throwable) { - return isRetryable.getUnchecked(throwable.getClass()); - } - - private final LoadingCache, Boolean> isRetryable = - CacheBuilder.newBuilder().build(new CacheLoader, Boolean>() { - @Override public Boolean load(Class exceptionClass) { - return isRetryable(exceptionClass); - } - }); - - private boolean isRetryable(final Class exceptionClass) { - if (retryableExceptions.contains(exceptionClass)) { - return true; - } - return Iterables.any(retryableExceptions, new Predicate>() { - @Override public boolean apply(Class retryableExceptionClass) { - return retryableExceptionClass.isAssignableFrom(exceptionClass); - } - }); - } - - private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n'); - - private static String combineStackTraces(List exceptions) { - if (exceptions.isEmpty()) { - return "none"; - } else { - return STACK_TRACE_JOINER.join(Iterables.transform(exceptions, - new Function() { - private int index = 1; - @Override public String apply(Throwable exception) { - return String.format("[%d] %s", - index++, Throwables.getStackTraceAsString(exception)); - } - })); - } - } -}