aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [15/21] aurora git commit: Remove unused classes from commons fork.
Date Fri, 28 Aug 2015 18:33:33 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java b/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java
deleted file mode 100644
index 5e5df6d..0000000
--- a/commons/src/main/java/org/apache/aurora/common/thrift/TTransportConnection.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.thrift;
-
-import com.google.common.base.Preconditions;
-import org.apache.aurora.common.net.pool.Connection;
-import org.apache.aurora.common.net.pool.ConnectionPool;
-import org.apache.thrift.transport.TTransport;
-
-import java.net.InetSocketAddress;
-
-/**
- * A {@link ConnectionPool} compatible thrift connection that can work with any valid thrift
- * transport.
- *
- * @author John Sirois
- */
-public class TTransportConnection implements Connection<TTransport, InetSocketAddress> {
-
-  private final TTransport transport;
-  private final InetSocketAddress endpoint;
-
-  public TTransportConnection(TTransport transport, InetSocketAddress endpoint) {
-    this.transport = Preconditions.checkNotNull(transport);
-    this.endpoint = Preconditions.checkNotNull(endpoint);
-  }
-
-  /**
-   * Returns {@code true} if the underlying transport is still open.  To invalidate a transport it
-   * should be closed.
-   *
-   * <p>TODO(John Sirois): it seems like an improper soc to have validity testing here and not also an
-   * invalidation method - correct or accept
-   */
-  @Override
-  public boolean isValid() {
-    return transport.isOpen();
-  }
-
-  @Override
-  public TTransport get() {
-    return transport;
-  }
-
-  @Override
-  public void close() {
-    transport.close();
-  }
-
-  @Override
-  public InetSocketAddress getEndpoint() {
-    return endpoint;
-  }
-
-  @Override
-  public String toString() {
-    return endpoint.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/Thrift.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/Thrift.java b/commons/src/main/java/org/apache/aurora/common/thrift/Thrift.java
deleted file mode 100644
index b36b46e..0000000
--- a/commons/src/main/java/org/apache/aurora/common/thrift/Thrift.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.thrift;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.aurora.common.thrift.callers.DebugCaller;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.net.loadbalancing.RequestTracker;
-import org.apache.aurora.common.net.pool.Connection;
-import org.apache.aurora.common.net.pool.ObjectPool;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.common.thrift.callers.Caller;
-import org.apache.aurora.common.thrift.callers.DeadlineCaller;
-import org.apache.aurora.common.thrift.callers.RetryingCaller;
-import org.apache.aurora.common.thrift.callers.StatTrackingCaller;
-import org.apache.aurora.common.thrift.callers.ThriftCaller;
-
-/**
- * A generic thrift client that handles reconnection in the case of protocol errors, automatic
- * retries, call deadlines and call statistics tracking.  This class aims for behavior compatible
- * with the <a href="http://github.com/fauna/thrift_client">generic ruby thrift client</a>.
- *
- * <p>In order to enforce call deadlines for synchronous clients, this class uses an
- * {@link java.util.concurrent.ExecutorService}.  If a custom executor is supplied, it should throw
- * a subclass of {@link RejectedExecutionException} to signal thread resource exhaustion, in which
- * case the client will fail fast and propagate the event as a {@link TResourceExhaustedException}.
- *
- * TODO(William Farner): Before open sourcing, look into changing the current model of wrapped proxies
- *    to use a single proxy and wrapped functions for decorators.
- *
- * @author John Sirois
- */
-public class Thrift<T> {
-
-  /**
-   * The default thrift call configuration used if none is specified.
-   *
-   * Specifies the following settings:
-   * <ul>
-   * <li>global call timeout: 1 second
-   * <li>call retries: 0
-   * <li>retryable exceptions: TTransportException (network exceptions including socket timeouts)
-   * <li>wait for connections: true
-   * <li>debug: false
-   * </ul>
-   */
-  public static final Config DEFAULT_CONFIG = Config.builder()
-      .withRequestTimeout(Amount.of(1L, Time.SECONDS))
-      .noRetries()
-      .retryOn(TTransportException.class) // if maxRetries is set non-zero
-      .create();
-
-  /**
-   * The default thrift call configuration used for an async client if none is specified.
-   *
-   * Specifies the following settings:
-   * <ul>
-   * <li>global call timeout: none
-   * <li>call retries: 0
-   * <li>retryable exceptions: IOException, TTransportException
-   *    (network exceptions but not timeouts)
-   * <li>wait for connections: true
-   * <li>debug: false
-   * </ul>
-   */
-  @SuppressWarnings("unchecked")
-  public static final Config DEFAULT_ASYNC_CONFIG = Config.builder(DEFAULT_CONFIG)
-      .withRequestTimeout(Amount.of(0L, Time.SECONDS))
-      .noRetries()
-      .retryOn(ImmutableSet.<Class<? extends Exception>>builder()
-          .add(IOException.class)
-          .add(TTransportException.class).build()) // if maxRetries is set non-zero
-      .create();
-
-  private final Config defaultConfig;
-  private final ExecutorService executorService;
-  private final ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool;
-  private final RequestTracker<InetSocketAddress> requestTracker;
-  private final String serviceName;
-  private final Class<T> serviceInterface;
-  private final Function<TTransport, T> clientFactory;
-  private final boolean async;
-  private final boolean withSsl;
-
-  /**
-   * Constructs an instance with the {@link #DEFAULT_CONFIG}, cached thread pool
-   * {@link ExecutorService}, and synchronous calls.
-   *
-   * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
-   *     boolean, boolean)
-   */
-  public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
-      RequestTracker<InetSocketAddress> requestTracker,
-      String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory) {
-
-    this(DEFAULT_CONFIG, connectionPool, requestTracker, serviceName, serviceInterface,
-        clientFactory, false, false);
-  }
-
-  /**
-   * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool
-   * {@link ExecutorService}.
-   *
-   * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
-   *    boolean, boolean)
-   */
-  public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
-      RequestTracker<InetSocketAddress> requestTracker,
-      String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
-      boolean async) {
-
-    this(getConfig(async), connectionPool, requestTracker, serviceName,
-        serviceInterface, clientFactory, async, false);
-  }
-
-  /**
-   * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool
-   * {@link ExecutorService}.
-   *
-   * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
-   *    boolean, boolean)
-   */
-  public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
-      RequestTracker<InetSocketAddress> requestTracker,
-      String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
-      boolean async, boolean ssl) {
-
-    this(getConfig(async), connectionPool, requestTracker, serviceName,
-        serviceInterface, clientFactory, async, ssl);
-  }
-
-  /**
-   * Constructs an instance with a cached thread pool {@link ExecutorService}.
-   *
-   * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
-   *    boolean, boolean)
-   */
-  public Thrift(Config config, ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
-      RequestTracker<InetSocketAddress> requestTracker,
-      String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
-      boolean async, boolean ssl) {
-
-    this(config,
-        Executors.newCachedThreadPool(
-            new ThreadFactoryBuilder()
-                .setDaemon(true)
-                .setNameFormat("Thrift["+ serviceName +"][%d]")
-                .build()),
-        connectionPool, requestTracker, serviceName, serviceInterface, clientFactory, async, ssl);
-  }
-
-  /**
-   * Constructs an instance with the {@link #DEFAULT_CONFIG}.
-   *
-   * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
-   *    boolean, boolean)
-   */
-  public Thrift(ExecutorService executorService,
-      ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
-      RequestTracker<InetSocketAddress> requestTracker,
-      String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
-      boolean async, boolean ssl) {
-
-    this(getConfig(async), executorService, connectionPool, requestTracker, serviceName,
-        serviceInterface, clientFactory, async, ssl);
-  }
-
-  private static Config getConfig(boolean async) {
-    return async ? DEFAULT_ASYNC_CONFIG : DEFAULT_CONFIG;
-  }
-
-  /**
-   * Constructs a new Thrift factory for creating clients that make calls to a particular thrift
-   * service.
-   *
-   * <p>Note that the combination of {@code config} and {@code connectionPool} need to be chosen
-   * with care depending on usage of the generated thrift clients.  In particular, if configured
-   * to not wait for connections, the {@code connectionPool} ought to be warmed up with a set of
-   * connections or else be actively building connections in the background.
-   *
-   * <p>TODO(John Sirois): consider adding an method to ObjectPool that would allow Thrift to handle
-   * this case by pro-actively warming the pool.
-   *
-   * @param config the default configuration to use for all thrift calls; also the configuration all
-   *     {@link ClientBuilder}s start with
-   * @param executorService for invoking calls with a specified deadline
-   * @param connectionPool the source for thrift connections
-   * @param serviceName a /vars friendly name identifying the service clients will connect to
-   * @param serviceInterface the thrift compiler generate interface class for the remote service
-   *     (Iface)
-   * @param clientFactory a function that can generate a concrete thrift client for the given
-   *     {@code serviceInterface}
-   * @param async enable asynchronous API
-   * @param ssl enable TLS handshaking for Thrift calls
-   */
-  public Thrift(Config config, ExecutorService executorService,
-      ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
-      RequestTracker<InetSocketAddress> requestTracker, String serviceName,
-      Class<T> serviceInterface, Function<TTransport, T> clientFactory, boolean async, boolean ssl) {
-
-    defaultConfig = Preconditions.checkNotNull(config);
-    this.executorService = Preconditions.checkNotNull(executorService);
-    this.connectionPool = Preconditions.checkNotNull(connectionPool);
-    this.requestTracker = Preconditions.checkNotNull(requestTracker);
-    this.serviceName = MorePreconditions.checkNotBlank(serviceName);
-    this.serviceInterface = checkServiceInterface(serviceInterface);
-    this.clientFactory = Preconditions.checkNotNull(clientFactory);
-    this.async = async;
-    this.withSsl = ssl;
-  }
-
-  static <I> Class<I> checkServiceInterface(Class<I> serviceInterface) {
-    Preconditions.checkNotNull(serviceInterface);
-    Preconditions.checkArgument(serviceInterface.isInterface(),
-        "%s must be a thrift service interface", serviceInterface);
-    return serviceInterface;
-  }
-
-  /**
-   * Closes any open connections and prepares this thrift client for graceful shutdown.  Any thrift
-   * client proxies returned from {@link #create()} will become invalid.
-   */
-  public void close() {
-    connectionPool.close();
-    executorService.shutdown();
-  }
-
-  /**
-   * A builder class that allows modifications of call behavior to be made for a given Thrift
-   * client.  Note that in the case of conflicting configuration calls, the last call wins.  So,
-   * for example, the following sequence would result in all calls being subject to a 5 second
-   * global deadline:
-   * <code>
-   *   builder.blocking().withDeadline(5, TimeUnit.SECONDS).create()
-   * </code>
-   *
-   * @see Config
-   */
-  public final class ClientBuilder extends Config.AbstractBuilder<ClientBuilder> {
-    private ClientBuilder(Config template) {
-      super(template);
-    }
-
-    @Override
-    protected ClientBuilder getThis() {
-      return this;
-    }
-
-    /**
-     * Creates a new client using the built up configuration changes.
-     */
-    public T create() {
-      return createClient(getConfig());
-    }
-  }
-
-  /**
-   * Creates a new thrift client builder that inherits this Thrift instance's default configuration.
-   * This is useful for customizing a client for a particular thrift call that makes sense to treat
-   * differently from the rest of the calls to a given service.
-   */
-  public ClientBuilder builder() {
-    return builder(defaultConfig);
-  }
-
-  /**
-   * Creates a new thrift client builder that inherits the given configuration.
-   * This is useful for customizing a client for a particular thrift call that makes sense to treat
-   * differently from the rest of the calls to a given service.
-   */
-  public ClientBuilder builder(Config config) {
-    Preconditions.checkNotNull(config);
-    return new ClientBuilder(config);
-  }
-
-  /**
-   * Creates a new client using the default configuration specified for this Thrift instance.
-   */
-  public T create() {
-    return createClient(defaultConfig);
-  }
-
-  private T createClient(Config config) {
-    StatsProvider statsProvider = config.getStatsProvider();
-
-    // lease/call/[invalidate]/release
-    boolean debug = config.isDebug();
-
-    Caller decorated = new ThriftCaller<T>(connectionPool, requestTracker, clientFactory,
-        config.getConnectTimeout(), debug);
-
-    // [retry]
-    if (config.getMaxRetries() > 0) {
-      decorated = new RetryingCaller(decorated, async, statsProvider, serviceName,
-          config.getMaxRetries(), config.getRetryableExceptions(), debug);
-    }
-
-    // [deadline]
-    if (config.getRequestTimeout().getValue() > 0) {
-      Preconditions.checkArgument(!async,
-          "Request deadlines may not be used with an asynchronous client.");
-
-      decorated = new DeadlineCaller(decorated, async, executorService, config.getRequestTimeout());
-    }
-
-    // [debug]
-    if (debug) {
-      decorated = new DebugCaller(decorated, async);
-    }
-
-    // stats
-    if (config.enableStats()) {
-      decorated = new StatTrackingCaller(decorated, async, statsProvider, serviceName);
-    }
-
-    final Caller caller = decorated;
-
-    final InvocationHandler invocationHandler = new InvocationHandler() {
-      @Override
-      public Object invoke(Object o, Method method, Object[] args) throws Throwable {
-        AsyncMethodCallback callback = null;
-        if (args != null && async) {
-          List<Object> argsList = Lists.newArrayList(args);
-          callback = extractCallback(argsList);
-          args = argsList.toArray();
-        }
-
-        return caller.call(method, args, callback, null);
-      }
-    };
-
-    @SuppressWarnings("unchecked")
-    T instance = (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
-        new Class<?>[] {serviceInterface}, invocationHandler);
-    return instance;
-  }
-
-  /**
-   * Verifies that the final argument in a list of objects is a fully-formed
-   * {@link AsyncMethodCallback} and extracts it, removing it from the argument list.
-   *
-   * @param args Argument list to remove the callback from.
-   * @return The callback extracted from {@code args}.
-   */
-  private static AsyncMethodCallback extractCallback(List<Object> args) {
-    // TODO(William Farner): Check all interface methods when building the Thrift client
-    //    and verify that last arguments are all callbacks...this saves us from checking
-    //    each time.
-
-    // Check that the last argument is a callback.
-    Preconditions.checkArgument(args.size() > 0);
-    Object lastArg = args.get(args.size() - 1);
-    Preconditions.checkArgument(lastArg instanceof AsyncMethodCallback,
-        "Last argument of an async thrift call is expected to be of type AsyncMethodCallback.");
-
-    return (AsyncMethodCallback) args.remove(args.size() - 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/ThriftConnectionFactory.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftConnectionFactory.java b/commons/src/main/java/org/apache/aurora/common/thrift/ThriftConnectionFactory.java
deleted file mode 100644
index 8c302d3..0000000
--- a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftConnectionFactory.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.thrift;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.aurora.common.base.Closure;
-import org.apache.aurora.common.base.Closures;
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.net.pool.Connection;
-import org.apache.aurora.common.net.pool.ConnectionFactory;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-
-/**
- * A connection factory for thrift transport connections to a given host.  This connection factory
- * is lazy and will only create a configured maximum number of active connections - where a
- * {@link ConnectionFactory#create(Amount) created} connection that has
- * not been {@link #destroy destroyed} is considered active.
- *
- * @author John Sirois
- */
-public class ThriftConnectionFactory
-    implements ConnectionFactory<Connection<TTransport, InetSocketAddress>> {
-
-  public enum TransportType {
-    BLOCKING, FRAMED, NONBLOCKING;
-
-    /**
-     * Async clients implicitly use a framed transport, requiring the server they connect to to do
-     * the same. This prevents specifying a nonblocking client without a framed transport, since
-     * that is not compatible with thrift and would simply cause the client to blow up when making a
-     * request. Instead, you must explicitly say useFramedTransport(true) for any buildAsync().
-     */
-    public static TransportType get(boolean framedTransport, boolean nonblocking) {
-      if (nonblocking) {
-        Preconditions.checkArgument(framedTransport,
-            "nonblocking client requires a server running framed transport");
-        return NONBLOCKING;
-      }
-
-      return framedTransport ? FRAMED : BLOCKING;
-    }
-  }
-
-  private static InetSocketAddress asEndpoint(String host, int port) {
-    MorePreconditions.checkNotBlank(host);
-    Preconditions.checkArgument(port > 0);
-    return InetSocketAddress.createUnresolved(host, port);
-  }
-
-  private InetSocketAddress endpoint;
-  private final int maxConnections;
-  private final TransportType transportType;
-  private final Amount<Long, Time> socketTimeout;
-  private final Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback;
-  private boolean sslTransport = false;
-
-  private final Set<Connection<TTransport, InetSocketAddress>> activeConnections =
-      Sets.newSetFromMap(
-          Maps.<Connection<TTransport, InetSocketAddress>, Boolean>newIdentityHashMap());
-  private volatile int lastActiveConnectionsSize = 0;
-
-  private final Lock activeConnectionsWriteLock = new ReentrantLock(true);
-
-  /**
-   * Creates a thrift connection factory with a plain socket (non-framed transport).
-   * This is the same as calling {@link #ThriftConnectionFactory(String, int, int, boolean)} with
-   * {@code framedTransport} set to {@code false}.
-   *
-   * @param host Host to connect to.
-   * @param port Port to connect on.
-   * @param maxConnections Maximum number of connections for this host:port.
-   */
-  public ThriftConnectionFactory(String host, int port, int maxConnections) {
-    this(host, port, maxConnections, TransportType.BLOCKING);
-  }
-
-  /**
-   * Creates a thrift connection factory.
-   * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
-   * otherwise a raw {@link TSocket} will be used.
-   *
-   * @param host Host to connect to.
-   * @param port Port to connect on.
-   * @param maxConnections Maximum number of connections for this host:port.
-   * @param framedTransport Whether to use framed or blocking transport.
-   */
-  public ThriftConnectionFactory(String host, int port, int maxConnections,
-      boolean framedTransport) {
-
-    this(asEndpoint(host, port), maxConnections, TransportType.get(framedTransport, false));
-  }
-
-  /**
-   * Creates a thrift connection factory.
-   * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
-   * otherwise a raw {@link TSocket} will be used.
-   *
-   * @param endpoint Endpoint to connect to.
-   * @param maxConnections Maximum number of connections for this host:port.
-   * @param framedTransport Whether to use framed or blocking transport.
-   */
-  public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
-      boolean framedTransport) {
-
-    this(endpoint, maxConnections, TransportType.get(framedTransport, false));
-  }
-
-  /**
-   * Creates a thrift connection factory.
-   * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
-   * otherwise a raw {@link TSocket} will be used.
-   * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used,
-   * otherwise a raw {@link TSocket} will be used.
-   * Timeouts are ignored when nonblocking transport is used.
-   *
-   * @param host Host to connect to.
-   * @param port Port to connect on.
-   * @param maxConnections Maximum number of connections for this host:port.
-   * @param transportType Whether to use normal blocking, framed blocking, or non-blocking
-   *    (implicitly framed) transport.
-   */
-  public ThriftConnectionFactory(String host, int port, int maxConnections,
-      TransportType transportType) {
-    this(host, port, maxConnections, transportType, null);
-  }
-
-  /**
-   * Creates a thrift connection factory.
-   * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
-   * otherwise a raw {@link TSocket} will be used.
-   * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used,
-   * otherwise a raw {@link TSocket} will be used.
-   * Timeouts are ignored when nonblocking transport is used.
-   *
-   * @param host Host to connect to.
-   * @param port Port to connect on.
-   * @param maxConnections Maximum number of connections for this host:port.
-   * @param transportType Whether to use normal blocking, framed blocking, or non-blocking
-   *          (implicitly framed) transport.
-   * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o
-   *          the blocking client.
-   */
-  public ThriftConnectionFactory(String host, int port, int maxConnections,
-      TransportType transportType, Amount<Long, Time> socketTimeout) {
-    this(asEndpoint(host, port), maxConnections, transportType, socketTimeout);
-  }
-
-  public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
-      TransportType transportType) {
-    this(endpoint, maxConnections, transportType, null);
-  }
-
-  /**
-   * Creates a thrift connection factory.
-   * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
-   * otherwise a raw {@link TSocket} will be used.
-   * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used,
-   * otherwise a raw {@link TSocket} will be used.
-   * Timeouts are ignored when nonblocking transport is used.
-   *
-   * @param endpoint Endpoint to connect to.
-   * @param maxConnections Maximum number of connections for this host:port.
-   * @param transportType Whether to use normal blocking, framed blocking, or non-blocking
-   *          (implicitly framed) transport.
-   * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o
-   *          the blocking client.
-   */
-  public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
-      TransportType transportType, Amount<Long, Time> socketTimeout) {
-	  this(endpoint, maxConnections, transportType, socketTimeout,
-        Closures.<Connection<TTransport, InetSocketAddress>>noop(), false);
-  }
-
-  public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
-      TransportType transportType, Amount<Long, Time> socketTimeout,
-	  Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback,
-	  boolean sslTransport) {
-    Preconditions.checkArgument(maxConnections > 0, "maxConnections must be at least 1");
-    if (socketTimeout != null) {
-      Preconditions.checkArgument(socketTimeout.as(Time.MILLISECONDS) >= 0);
-    }
-
-    this.endpoint = Preconditions.checkNotNull(endpoint);
-    this.maxConnections = maxConnections;
-    this.transportType = transportType;
-    this.socketTimeout = socketTimeout;
-    this.postCreateCallback = Preconditions.checkNotNull(postCreateCallback);
-    this.sslTransport = sslTransport;
-  }
-
-  @Override
-  public boolean mightCreate() {
-    return lastActiveConnectionsSize < maxConnections;
-  }
-
-  /**
-   * FIXME:  shouldn't this throw TimeoutException instead of returning null
-   *         in the timeout cases as per the ConnectionFactory.create javadoc?
-   */
-  @Override
-  public Connection<TTransport, InetSocketAddress> create(Amount<Long, Time> timeout)
-      throws TTransportException, IOException {
-
-    Preconditions.checkNotNull(timeout);
-    if (timeout.getValue() == 0) {
-      return create();
-    }
-
-    try {
-      long timeRemainingNs = timeout.as(Time.NANOSECONDS);
-      long start = System.nanoTime();
-      if(activeConnectionsWriteLock.tryLock(timeRemainingNs, TimeUnit.NANOSECONDS)) {
-        try {
-          if (!willCreateSafe()) {
-            return null;
-          }
-
-          timeRemainingNs -= (System.nanoTime() - start);
-
-          return createConnection((int) TimeUnit.NANOSECONDS.toMillis(timeRemainingNs));
-        } finally {
-          activeConnectionsWriteLock.unlock();
-        }
-      } else {
-        return null;
-      }
-    } catch (InterruptedException e) {
-      return null;
-    }
-  }
-
-  private Connection<TTransport, InetSocketAddress> create()
-      throws TTransportException, IOException {
-    activeConnectionsWriteLock.lock();
-    try {
-      if (!willCreateSafe()) {
-        return null;
-      }
-
-      return createConnection(0);
-    } finally {
-      activeConnectionsWriteLock.unlock();
-    }
-  }
-
-  private Connection<TTransport, InetSocketAddress> createConnection(int timeoutMillis)
-      throws TTransportException, IOException {
-    TTransport transport = createTransport(timeoutMillis);
-    if (transport == null) {
-      return null;
-    }
-
-    Connection<TTransport, InetSocketAddress> connection =
-        new TTransportConnection(transport, endpoint);
-    postCreateCallback.execute(connection);
-    activeConnections.add(connection);
-    lastActiveConnectionsSize = activeConnections.size();
-    return connection;
-  }
-
-  private boolean willCreateSafe() {
-    return activeConnections.size() < maxConnections;
-  }
-
-  @VisibleForTesting
-  TTransport createTransport(int timeoutMillis) throws TTransportException, IOException {
-    TSocket socket = null;
-    if (transportType != TransportType.NONBLOCKING) {
-      // can't do a nonblocking create on a blocking transport
-      if (timeoutMillis <= 0) {
-        return null;
-      }
-
-      if (sslTransport) {
-        SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory.getDefault();
-        SSLSocket ssl_socket = (SSLSocket) factory.createSocket(endpoint.getHostName(), endpoint.getPort());
-        ssl_socket.setSoTimeout(timeoutMillis);
-        return new TSocket(ssl_socket);
-      } else {
-        socket = new TSocket(endpoint.getHostName(), endpoint.getPort(), timeoutMillis);
-      }
-    }
-
-    try {
-      switch (transportType) {
-        case BLOCKING:
-          socket.open();
-          setSocketTimeout(socket);
-          return socket;
-        case FRAMED:
-          TFramedTransport transport = new TFramedTransport(socket);
-          transport.open();
-          setSocketTimeout(socket);
-          return transport;
-        case NONBLOCKING:
-          try {
-            return new TNonblockingSocket(endpoint.getHostName(), endpoint.getPort());
-          } catch (IOException e) {
-            throw new IOException("Failed to create non-blocking transport to " + endpoint, e);
-          }
-      }
-    } catch (TTransportException e) {
-      throw new TTransportException("Failed to create transport to " + endpoint, e);
-    }
-
-    throw new IllegalArgumentException("unknown transport type " + transportType);
-  }
-
-  private void setSocketTimeout(TSocket socket) {
-    if (socketTimeout != null) {
-      socket.setTimeout(socketTimeout.as(Time.MILLISECONDS).intValue());
-    }
-  }
-
-  @Override
-  public void destroy(Connection<TTransport, InetSocketAddress> connection) {
-    activeConnectionsWriteLock.lock();
-    try {
-      boolean wasActiveConnection = activeConnections.remove(connection);
-      Preconditions.checkArgument(wasActiveConnection,
-          "connection %s not created by this factory", connection);
-      lastActiveConnectionsSize = activeConnections.size();
-    } finally {
-      activeConnectionsWriteLock.unlock();
-    }
-
-    // We close the connection outside the critical section which means we may have more connections
-    // "active" (open) than maxConnections for a very short time
-    connection.close();
-  }
-
-  @Override
-  public String toString() {
-    return String.format("%s[%s]", getClass().getSimpleName(), endpoint);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/ThriftException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftException.java b/commons/src/main/java/org/apache/aurora/common/thrift/ThriftException.java
deleted file mode 100644
index 27e9f5e..0000000
--- a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.thrift;
-
-/**
- * Exception class to wrap exceptions caught during thrift calls.
- */
-public class ThriftException extends Exception {
-  public ThriftException(String message) {
-    super(message);
-  }
-  public ThriftException(String message, Throwable t) {
-    super(message, t);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/ThriftFactory.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftFactory.java b/commons/src/main/java/org/apache/aurora/common/thrift/ThriftFactory.java
deleted file mode 100644
index 75c58e2..0000000
--- a/commons/src/main/java/org/apache/aurora/common/thrift/ThriftFactory.java
+++ /dev/null
@@ -1,653 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.thrift;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.thrift.async.TAsyncClient;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TNonblockingTransport;
-import org.apache.thrift.transport.TTransport;
-
-import org.apache.aurora.common.base.Closure;
-import org.apache.aurora.common.base.Closures;
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.net.loadbalancing.LeastConnectedStrategy;
-import org.apache.aurora.common.net.loadbalancing.LoadBalancer;
-import org.apache.aurora.common.net.loadbalancing.LoadBalancerImpl;
-import org.apache.aurora.common.net.loadbalancing.LoadBalancingStrategy;
-import org.apache.aurora.common.net.loadbalancing.MarkDeadStrategyWithHostCheck;
-import org.apache.aurora.common.net.loadbalancing.TrafficMonitorAdapter;
-import org.apache.aurora.common.net.monitoring.TrafficMonitor;
-import org.apache.aurora.common.net.pool.Connection;
-import org.apache.aurora.common.net.pool.ConnectionPool;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.net.pool.DynamicPool;
-import org.apache.aurora.common.net.pool.MetaPool;
-import org.apache.aurora.common.net.pool.ObjectPool;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.common.thrift.ThriftConnectionFactory.TransportType;
-import org.apache.aurora.common.util.BackoffDecider;
-import org.apache.aurora.common.util.BackoffStrategy;
-import org.apache.aurora.common.util.TruncatedBinaryBackoff;
-import org.apache.aurora.common.util.concurrent.ForwardingExecutorService;
-
-/**
- * A utility that provides convenience methods to build common {@link Thrift}s.
- *
- * The thrift factory allows you to specify parameters that define how the client connects to
- * and communicates with servers, such as the transport type, connection settings, and load
- * balancing.  Request-level settings like sync/async and retries should be set on the
- * {@link Thrift} instance that this factory will create.
- *
- * The factory will attempt to provide reasonable defaults to allow the caller to minimize the
- * amount of necessary configuration.  Currently, the default behavior includes:
- *
- * <ul>
- *   <li> A test lease/release for each host will be performed every second
- *      {@link #withDeadConnectionRestoreInterval(Amount)}
- *   <li> At most 50 connections will be established to each host
- *      {@link #withMaxConnectionsPerEndpoint(int)}
- *   <li> Unframed transport {@link #useFramedTransport(boolean)}
- *   <li> A load balancing strategy that will mark hosts dead and prefer least-connected hosts.
- *      Hosts are marked dead if the most recent connection attempt was a failure or else based on
- *      the windowed error rate of attempted RPCs.  If the error rate for a connected host exceeds
- *      20% over the last second, the host will be disabled for 2 seconds ascending up to 10 seconds
- *      if the elevated error rate persists.
- *      {@link #withLoadBalancingStrategy(LoadBalancingStrategy)}
- *   <li> Statistics are reported through {@link Stats}
- *      {@link #withStatsProvider(StatsProvider)}
- *   <li> A service name matching the thrift interface name {@link #withServiceName(String)}
- * </ul>
- *
- * @author John Sirois
- */
-public class ThriftFactory<T> {
-  private static final Amount<Long,Time> DEFAULT_DEAD_TARGET_RESTORE_INTERVAL =
-      Amount.of(1L, Time.SECONDS);
-
-  private static final int DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT = 50;
-
-  private Class<T> serviceInterface;
-  private Function<TTransport, T> clientFactory;
-  private int maxConnectionsPerEndpoint;
-  private Amount<Long,Time> connectionRestoreInterval;
-  private boolean framedTransport;
-  private LoadBalancingStrategy<InetSocketAddress> loadBalancingStrategy = null;
-  private final TrafficMonitor<InetSocketAddress> monitor;
-  private Amount<Long,Time> socketTimeout = null;
-  private Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback = Closures.noop();
-  private StatsProvider statsProvider = Stats.STATS_PROVIDER;
-  private Optional<String> endpointName = Optional.absent();
-  private String serviceName;
-  private boolean sslTransport;
-
-  public static <T> ThriftFactory<T> create(Class<T> serviceInterface) {
-    return new ThriftFactory<T>(serviceInterface);
-  }
-
-  /**
-   * Creates a default factory that will use unframed blocking transport.
-   *
-   * @param serviceInterface The interface of the thrift service to make a client for.
-   */
-  private ThriftFactory(Class<T> serviceInterface) {
-    this.serviceInterface = Thrift.checkServiceInterface(serviceInterface);
-    this.maxConnectionsPerEndpoint = DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT;
-    this.connectionRestoreInterval = DEFAULT_DEAD_TARGET_RESTORE_INTERVAL;
-    this.framedTransport = false;
-    this.monitor = new TrafficMonitor<InetSocketAddress>(serviceInterface.getName());
-    this.serviceName = serviceInterface.getEnclosingClass().getSimpleName();
-    this.sslTransport = false;
-  }
-
-  private void checkBaseState() {
-    Preconditions.checkArgument(maxConnectionsPerEndpoint > 0,
-        "Must allow at least 1 connection per endpoint; %s specified", maxConnectionsPerEndpoint);
-  }
-
-  public TrafficMonitor<InetSocketAddress> getMonitor() {
-    return monitor;
-  }
-
-  /**
-   * Creates the thrift client, and initializes connection pools.
-   *
-   * @param backends Backends to connect to.
-   * @return A new thrift client.
-   */
-  public Thrift<T> build(Set<InetSocketAddress> backends) {
-    checkBaseState();
-    MorePreconditions.checkNotBlank(backends);
-
-    ManagedThreadPool managedThreadPool = createManagedThreadpool(backends.size());
-    LoadBalancer<InetSocketAddress> loadBalancer = createLoadBalancer();
-    Function<TTransport, T> clientFactory = getClientFactory();
-
-    ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool =
-        createConnectionPool(backends, loadBalancer, managedThreadPool, false);
-
-    return new Thrift<T>(managedThreadPool, connectionPool, loadBalancer, serviceName,
-        serviceInterface, clientFactory, false, sslTransport);
-  }
-
-  /**
-   * Creates a synchronous thrift client that will communicate with a dynamic host set.
-   *
-   * @param hostSet The host set to use as a backend.
-   * @return A thrift client.
-   * @throws ThriftFactoryException If an error occurred while creating the client.
-   */
-  public Thrift<T> build(DynamicHostSet<ServiceInstance> hostSet) throws ThriftFactoryException {
-    checkBaseState();
-    Preconditions.checkNotNull(hostSet);
-
-    ManagedThreadPool managedThreadPool = createManagedThreadpool(1);
-    LoadBalancer<InetSocketAddress> loadBalancer = createLoadBalancer();
-    Function<TTransport, T> clientFactory = getClientFactory();
-
-    ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool =
-        createConnectionPool(hostSet, loadBalancer, managedThreadPool, false, endpointName);
-
-    return new Thrift<T>(managedThreadPool, connectionPool, loadBalancer, serviceName,
-        serviceInterface, clientFactory, false, sslTransport);
-  }
-
-  private ManagedThreadPool createManagedThreadpool(int initialEndpointCount) {
-    return new ManagedThreadPool(serviceName, initialEndpointCount, maxConnectionsPerEndpoint);
-  }
-
-  /**
-   * A finite thread pool that monitors backend choice events to dynamically resize.  This
-   * {@link java.util.concurrent.ExecutorService} implementation immediately rejects requests when
-   * there are no more available worked threads (requests are not queued).
-   */
-  private static class ManagedThreadPool extends ForwardingExecutorService<ThreadPoolExecutor>
-      implements Closure<Collection<InetSocketAddress>> {
-
-    private static final Logger LOG = Logger.getLogger(ManagedThreadPool.class.getName());
-
-    private static ThreadPoolExecutor createThreadPool(String serviceName, int initialSize) {
-      ThreadFactory threadFactory =
-          new ThreadFactoryBuilder()
-              .setNameFormat("Thrift[" +serviceName + "][%d]")
-              .setDaemon(true)
-              .build();
-      return new ThreadPoolExecutor(initialSize, initialSize, 0, TimeUnit.MILLISECONDS,
-          new SynchronousQueue<Runnable>(), threadFactory);
-    }
-
-    private final int maxConnectionsPerEndpoint;
-
-    public ManagedThreadPool(String serviceName, int initialEndpointCount,
-        int maxConnectionsPerEndpoint) {
-
-      super(createThreadPool(serviceName, initialEndpointCount * maxConnectionsPerEndpoint));
-      this.maxConnectionsPerEndpoint = maxConnectionsPerEndpoint;
-      setRejectedExecutionHandler(initialEndpointCount);
-    }
-
-    private void setRejectedExecutionHandler(int endpointCount) {
-      final String message =
-          String.format("All %d x %d connections in use", endpointCount, maxConnectionsPerEndpoint);
-      delegate.setRejectedExecutionHandler(new RejectedExecutionHandler() {
-        @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
-          throw new RejectedExecutionException(message);
-        }
-      });
-    }
-
-    @Override
-    public void execute(Collection<InetSocketAddress> chosenBackends) {
-      int previousPoolSize = delegate.getMaximumPoolSize();
-      /*
-       * In the case of no available backends, we need to make sure we pass in a positive pool
-       * size to our delegate.  In particular, java.util.concurrent.ThreadPoolExecutor does not
-       * accept zero as a valid core or max pool size.
-       */
-      int backendCount = Math.max(chosenBackends.size(), 1);
-      int newPoolSize = backendCount * maxConnectionsPerEndpoint;
-
-      if (previousPoolSize != newPoolSize) {
-        LOG.info(String.format("Re-sizing deadline thread pool from: %d to: %d",
-            previousPoolSize, newPoolSize));
-        if (previousPoolSize < newPoolSize) { // Don't cross the beams!
-          delegate.setMaximumPoolSize(newPoolSize);
-          delegate.setCorePoolSize(newPoolSize);
-        } else {
-          delegate.setCorePoolSize(newPoolSize);
-          delegate.setMaximumPoolSize(newPoolSize);
-        }
-        setRejectedExecutionHandler(backendCount);
-      }
-    }
-  }
-
-  /**
-   * Creates an asynchronous thrift client that will communicate with a fixed set of backends.
-   *
-   * @param backends Backends to connect to.
-   * @return A thrift client.
-   * @throws ThriftFactoryException If an error occurred while creating the client.
-   */
-  public Thrift<T> buildAsync(Set<InetSocketAddress> backends) throws ThriftFactoryException {
-    checkBaseState();
-    MorePreconditions.checkNotBlank(backends);
-
-    LoadBalancer<InetSocketAddress> loadBalancer = createLoadBalancer();
-    Closure<Collection<InetSocketAddress>> noop = Closures.noop();
-    Function<TTransport, T> asyncClientFactory = getAsyncClientFactory();
-
-    ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool =
-        createConnectionPool(backends, loadBalancer, noop, true);
-
-    return new Thrift<T>(connectionPool, loadBalancer,
-        serviceName, serviceInterface, asyncClientFactory, true);
-  }
-
-  /**
-   * Creates an asynchronous thrift client that will communicate with a dynamic host set.
-   *
-   * @param hostSet The host set to use as a backend.
-   * @return A thrift client.
-   * @throws ThriftFactoryException If an error occurred while creating the client.
-   */
-  public Thrift<T> buildAsync(DynamicHostSet<ServiceInstance> hostSet)
-      throws ThriftFactoryException {
-    checkBaseState();
-    Preconditions.checkNotNull(hostSet);
-
-    LoadBalancer<InetSocketAddress> loadBalancer = createLoadBalancer();
-    Closure<Collection<InetSocketAddress>> noop = Closures.noop();
-    Function<TTransport, T> asyncClientFactory = getAsyncClientFactory();
-
-    ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool =
-        createConnectionPool(hostSet, loadBalancer, noop, true, endpointName);
-
-    return new Thrift<T>(connectionPool, loadBalancer,
-        serviceName, serviceInterface, asyncClientFactory, true);
-  }
-
-  /**
-   * Prepare the client factory, which will create client class instances from transports.
-   *
-   * @return The client factory to use.
-   */
-  private Function<TTransport, T> getClientFactory() {
-    return clientFactory == null ? createClientFactory(serviceInterface) : clientFactory;
-  }
-
-  /**
-   * Prepare the async client factory, which will create client class instances from transports.
-   *
-   * @return The client factory to use.
-   * @throws ThriftFactoryException If there was a problem creating the factory.
-   */
-  private Function<TTransport, T> getAsyncClientFactory() throws ThriftFactoryException {
-    try {
-      return clientFactory == null ? createAsyncClientFactory(serviceInterface) : clientFactory;
-    } catch (IOException e) {
-      throw new ThriftFactoryException("Failed to create async client factory.", e);
-    }
-  }
-
-  private ObjectPool<Connection<TTransport, InetSocketAddress>> createConnectionPool(
-      Set<InetSocketAddress> backends, LoadBalancer<InetSocketAddress> loadBalancer,
-      Closure<Collection<InetSocketAddress>> onBackendsChosen, boolean nonblocking) {
-
-    ImmutableMap.Builder<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>
-        backendBuilder = ImmutableMap.builder();
-    for (InetSocketAddress backend : backends) {
-      backendBuilder.put(backend, createConnectionPool(backend, nonblocking));
-    }
-
-    return new MetaPool<TTransport, InetSocketAddress>(backendBuilder.build(),
-        loadBalancer, onBackendsChosen, connectionRestoreInterval);
-  }
-
-  private ObjectPool<Connection<TTransport, InetSocketAddress>> createConnectionPool(
-      DynamicHostSet<ServiceInstance> hostSet, LoadBalancer<InetSocketAddress> loadBalancer,
-      Closure<Collection<InetSocketAddress>> onBackendsChosen,
-      final boolean nonblocking, Optional<String> serviceEndpointName)
-          throws ThriftFactoryException {
-
-    Function<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>
-        endpointPoolFactory =
-      new Function<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>() {
-        @Override public ObjectPool<Connection<TTransport, InetSocketAddress>> apply(
-            InetSocketAddress endpoint) {
-          return createConnectionPool(endpoint, nonblocking);
-        }
-      };
-
-    try {
-      return new DynamicPool<ServiceInstance, TTransport, InetSocketAddress>(hostSet,
-          endpointPoolFactory, loadBalancer, onBackendsChosen, connectionRestoreInterval,
-          Util.getAddress(serviceEndpointName), Util.IS_ALIVE);
-    } catch (DynamicHostSet.MonitorException e) {
-      throw new ThriftFactoryException("Failed to monitor host set.", e);
-    }
-  }
-
-  private ObjectPool<Connection<TTransport, InetSocketAddress>> createConnectionPool(
-      InetSocketAddress backend, boolean nonblocking) {
-
-    ThriftConnectionFactory connectionFactory = new ThriftConnectionFactory(
-        backend, maxConnectionsPerEndpoint, TransportType.get(framedTransport, nonblocking),
-        socketTimeout, postCreateCallback, sslTransport);
-
-    return new ConnectionPool<Connection<TTransport, InetSocketAddress>>(connectionFactory,
-        statsProvider);
-  }
-
-  @VisibleForTesting
-  public ThriftFactory<T> withClientFactory(Function<TTransport, T> clientFactory) {
-    this.clientFactory = Preconditions.checkNotNull(clientFactory);
-
-    return this;
-  }
-
-  public ThriftFactory<T> withSslEnabled() {
-    this.sslTransport = true;
-    return this;
-  }
-
-  /**
-   * Specifies the maximum number of connections that should be made to any single endpoint.
-   *
-   * @param maxConnectionsPerEndpoint Maximum number of connections per endpoint.
-   * @return A reference to the factory.
-   */
-  public ThriftFactory<T> withMaxConnectionsPerEndpoint(int maxConnectionsPerEndpoint) {
-    Preconditions.checkArgument(maxConnectionsPerEndpoint > 0);
-    this.maxConnectionsPerEndpoint = maxConnectionsPerEndpoint;
-
-    return this;
-  }
-
-  /**
-   * Specifies the interval at which dead endpoint connections should be checked and revived.
-   *
-   * @param connectionRestoreInterval the time interval to check.
-   * @return A reference to the factory.
-   */
-  public ThriftFactory<T> withDeadConnectionRestoreInterval(
-      Amount<Long, Time> connectionRestoreInterval) {
-    Preconditions.checkNotNull(connectionRestoreInterval);
-    Preconditions.checkArgument(connectionRestoreInterval.getValue() >= 0,
-        "A negative interval is invalid: %s", connectionRestoreInterval);
-    this.connectionRestoreInterval = connectionRestoreInterval;
-
-    return this;
-  }
-
-  /**
-   * Instructs the factory whether framed transport should be used.
-   *
-   * @param framedTransport Whether to use framed transport.
-   * @return A reference to the factory.
-   */
-  public ThriftFactory<T> useFramedTransport(boolean framedTransport) {
-    this.framedTransport = framedTransport;
-
-    return this;
-  }
-
-  /**
-   * Specifies the load balancer to use when interacting with multiple backends.
-   *
-   * @param strategy Load balancing strategy.
-   * @return A reference to the factory.
-   */
-  public ThriftFactory<T> withLoadBalancingStrategy(
-      LoadBalancingStrategy<InetSocketAddress> strategy) {
-    this.loadBalancingStrategy = Preconditions.checkNotNull(strategy);
-
-    return this;
-  }
-
-  private LoadBalancer<InetSocketAddress> createLoadBalancer() {
-    if (loadBalancingStrategy == null) {
-      loadBalancingStrategy = createDefaultLoadBalancingStrategy();
-    }
-
-    return LoadBalancerImpl.create(TrafficMonitorAdapter.create(loadBalancingStrategy, monitor));
-  }
-
-  private LoadBalancingStrategy<InetSocketAddress> createDefaultLoadBalancingStrategy() {
-    Function<InetSocketAddress, BackoffDecider> backoffFactory =
-        new Function<InetSocketAddress, BackoffDecider>() {
-          @Override public BackoffDecider apply(InetSocketAddress socket) {
-            BackoffStrategy backoffStrategy = new TruncatedBinaryBackoff(
-                Amount.of(2L, Time.SECONDS), Amount.of(10L, Time.SECONDS));
-
-            return BackoffDecider.builder(socket.toString())
-                .withTolerateFailureRate(0.2)
-                .withRequestWindow(Amount.of(1L, Time.SECONDS))
-                .withSeedSize(5)
-                .withStrategy(backoffStrategy)
-                .withRecoveryType(BackoffDecider.RecoveryType.FULL_CAPACITY)
-                .withStatsProvider(statsProvider)
-                .build();
-          }
-    };
-
-    return new MarkDeadStrategyWithHostCheck<InetSocketAddress>(
-        new LeastConnectedStrategy<InetSocketAddress>(), backoffFactory);
-  }
-
-  /**
-   * Specifies the net read/write timeout to set via SO_TIMEOUT on the thrift blocking client
-   * or AsyncClient.setTimeout on the thrift async client.  Defaults to the connectTimeout on
-   * the blocking client if not set.
-   *
-   * @param socketTimeout timeout on thrift i/o operations
-   * @return A reference to the factory.
-   */
-  public ThriftFactory<T> withSocketTimeout(Amount<Long, Time> socketTimeout) {
-    this.socketTimeout = Preconditions.checkNotNull(socketTimeout);
-    Preconditions.checkArgument(socketTimeout.as(Time.MILLISECONDS) >= 0);
-
-    return this;
-  }
-
-  /**
-   * Specifies the callback to notify when a connection has been created.  The callback may
-   * be used to make thrift calls to the connection, but must not invalidate it.
-   * Defaults to a no-op closure.
-   *
-   * @param postCreateCallback function to setup new connections
-   * @return A reference to the factory.
-   */
-  public ThriftFactory<T> withPostCreateCallback(
-      Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback) {
-    this.postCreateCallback = Preconditions.checkNotNull(postCreateCallback);
-
-    return this;
-  }
-
-  /**
-   * Registers a custom stats provider to use to track various client stats.
-   *
-   * @param statsProvider the {@code StatsProvider} to use
-   * @return A reference to the factory.
-   */
-  public ThriftFactory<T> withStatsProvider(StatsProvider statsProvider) {
-    this.statsProvider = Preconditions.checkNotNull(statsProvider);
-
-    return this;
-  }
-
-  /**
-   * Name to be passed to Thrift constructor, used in stats.
-   *
-   * @param serviceName string to use
-   * @return A reference to the factory.
-   */
-  public ThriftFactory<T> withServiceName(String serviceName) {
-    this.serviceName = MorePreconditions.checkNotBlank(serviceName);
-
-    return this;
-  }
-
-  /**
-   * Set the end-point to use from {@link ServiceInstance#getAdditionalEndpoints()}.
-   * If not set, the default behavior is to use {@link ServiceInstance#getServiceEndpoint()}.
-   *
-   * @param endpointName the (optional) name of the end-point, if unset - the
-   *     default/primary end-point is selected
-   * @return a reference to the factory for chaining
-   */
-  public ThriftFactory<T> withEndpointName(String endpointName) {
-    this.endpointName = Optional.of(endpointName);
-    return this;
-  }
-
-  private static <T> Function<TTransport, T> createClientFactory(Class<T> serviceInterface) {
-    final Constructor<? extends T> implementationConstructor =
-        findImplementationConstructor(serviceInterface);
-
-    return new Function<TTransport, T>() {
-      @Override public T apply(TTransport transport) {
-        try {
-          return implementationConstructor.newInstance(new TBinaryProtocol(transport));
-        } catch (InstantiationException e) {
-          throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-          throw new RuntimeException(e);
-        } catch (InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private <T> Function<TTransport, T> createAsyncClientFactory(
-      final Class<T> serviceInterface) throws IOException {
-
-    final TAsyncClientManager clientManager = new TAsyncClientManager();
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override public void run() {
-        clientManager.stop();
-      }
-    });
-
-    final Constructor<? extends T> implementationConstructor =
-        findAsyncImplementationConstructor(serviceInterface);
-
-    return new Function<TTransport, T>() {
-      @Override public T apply(TTransport transport) {
-        Preconditions.checkNotNull(transport);
-        Preconditions.checkArgument(transport instanceof TNonblockingTransport,
-            "Invalid transport provided to client factory: " + transport.getClass());
-
-        try {
-          T client = implementationConstructor.newInstance(new TBinaryProtocol.Factory(),
-              clientManager, transport);
-
-          if (socketTimeout != null) {
-            ((TAsyncClient) client).setTimeout(socketTimeout.as(Time.MILLISECONDS));
-          }
-
-          return client;
-        } catch (InstantiationException e) {
-          throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-          throw new RuntimeException(e);
-        } catch (InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private static <T> Constructor<? extends T> findImplementationConstructor(
-      final Class<T> serviceInterface) {
-    Class<? extends T> implementationClass = findImplementationClass(serviceInterface);
-    try {
-      return implementationClass.getConstructor(TProtocol.class);
-    } catch (NoSuchMethodException e) {
-      throw new IllegalArgumentException("Failed to find a single argument TProtocol constructor "
-                                         + "in service client class: " + implementationClass);
-    }
-  }
-
-  private static <T> Constructor<? extends T> findAsyncImplementationConstructor(
-      final Class<T> serviceInterface) {
-    Class<? extends T> implementationClass = findImplementationClass(serviceInterface);
-    try {
-      return implementationClass.getConstructor(TProtocolFactory.class, TAsyncClientManager.class,
-          TNonblockingTransport.class);
-    } catch (NoSuchMethodException e) {
-      throw new IllegalArgumentException("Failed to find expected constructor "
-                                         + "in service client class: " + implementationClass);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> Class<? extends T> findImplementationClass(final Class<T> serviceInterface) {
-    try {
-      return (Class<? extends T>)
-          Iterables.find(ImmutableList.copyOf(serviceInterface.getEnclosingClass().getClasses()),
-              new Predicate<Class<?>>() {
-                @Override public boolean apply(Class<?> inner) {
-                  return !serviceInterface.equals(inner)
-                         && serviceInterface.isAssignableFrom(inner);
-                }
-              });
-    } catch (NoSuchElementException e) {
-      throw new IllegalArgumentException("Could not find a sibling enclosed implementation of "
-                                         + "service interface: " + serviceInterface);
-    }
-  }
-
-  public static class ThriftFactoryException extends Exception {
-    public ThriftFactoryException(String msg) {
-      super(msg);
-    }
-
-    public ThriftFactoryException(String msg, Throwable t) {
-      super(msg, t);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/Caller.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/Caller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/Caller.java
deleted file mode 100644
index 0200c49..0000000
--- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/Caller.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.thrift.callers;
-
-import com.google.common.base.Preconditions;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.thrift.async.AsyncMethodCallback;
-
-import javax.annotation.Nullable;
-import java.lang.reflect.Method;
-
-/**
-* A caller that invokes a method on an object.
-*
-* @author William Farner
-*/
-public interface Caller {
-
-  /**
-   * Invokes a method on an object, using the given arguments.  The method call may be
-   * asynchronous, in which case {@code callback} will be non-null.
-   *
-   * @param method The method being invoked.
-   * @param args The arguments to call {@code method} with.
-   * @param callback The callback to use if the method is asynchronous.
-   * @param connectTimeoutOverride Optional override for the default connection timeout.
-   * @return The return value from invoking the method.
-   * @throws Throwable Exception, as prescribed by the method's contract.
-   */
-  public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback,
-      @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable;
-
-  /**
-   * Captures the result of a request, whether synchronous or asynchronous.  It should be expected
-   * that for every request made, exactly one of these methods will be called.
-   */
-  static interface ResultCapture {
-    /**
-     * Called when the request completed successfully.
-     */
-    void success();
-
-    /**
-     * Called when the request failed.
-     *
-     * @param t Throwable that was caught.  Must never be null.
-     * @return {@code true} if a wrapped callback should be notified of the failure,
-     *    {@code false} otherwise.
-     */
-    boolean fail(Throwable t);
-  }
-
-  /**
-   * A callback that adapts a {@link ResultCapture} with an {@link AsyncMethodCallback} while
-   * maintaining the AsyncMethodCallback interface.  The wrapped callback will handle invocation
-   * of the underlying callback based on the return values from the ResultCapture.
-   */
-  static class WrappedMethodCallback implements AsyncMethodCallback {
-    private final AsyncMethodCallback wrapped;
-    private final ResultCapture capture;
-
-    private boolean callbackTriggered = false;
-
-    public WrappedMethodCallback(AsyncMethodCallback wrapped, ResultCapture capture) {
-      this.wrapped = wrapped;
-      this.capture = capture;
-    }
-
-    private void callbackTriggered() {
-      Preconditions.checkState(!callbackTriggered, "Each callback may only be triggered once.");
-      callbackTriggered = true;
-    }
-
-    @Override @SuppressWarnings("unchecked") public void onComplete(Object o) {
-      capture.success();
-      wrapped.onComplete(o);
-      callbackTriggered();
-    }
-
-    @Override public void onError(Exception t) {
-      if (capture.fail(t)) {
-        wrapped.onError(t);
-        callbackTriggered();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/CallerDecorator.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/CallerDecorator.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/CallerDecorator.java
deleted file mode 100644
index bd0a952..0000000
--- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/CallerDecorator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.thrift.callers;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.thrift.async.AsyncMethodCallback;
-
-import javax.annotation.Nullable;
-import java.lang.reflect.Method;
-
-/**
-* A caller that decorates another caller.
-*
-* @author William Farner
-*/
-abstract class CallerDecorator implements Caller {
-  private final Caller decoratedCaller;
-  private final boolean async;
-
-  CallerDecorator(Caller decoratedCaller, boolean async) {
-    this.decoratedCaller = decoratedCaller;
-    this.async = async;
-  }
-
-  /**
-   * Convenience method for invoking the method and shunting the capture into the callback if
-   * the call is asynchronous.
-   *
-   * @param method The method being invoked.
-   * @param args The arguments to call {@code method} with.
-   * @param callback The callback to use if the method is asynchronous.
-   * @param capture The result capture to notify of the call result.
-   * @param connectTimeoutOverride Optional override for the default connection timeout.
-   * @return The return value from invoking the method.
-   * @throws Throwable Exception, as prescribed by the method's contract.
-   */
-  protected final Object invoke(Method method, Object[] args,
-      @Nullable AsyncMethodCallback callback, @Nullable final ResultCapture capture,
-      @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable {
-
-    // Swap the wrapped callback out for ours.
-    if (callback != null) {
-      callback = new WrappedMethodCallback(callback, capture);
-    }
-
-    try {
-      Object result = decoratedCaller.call(method, args, callback, connectTimeoutOverride);
-      if (callback == null && capture != null) capture.success();
-
-      return result;
-    } catch (Exception t) {
-      // We allow this one to go to both sync and async captures.
-      if (callback != null) {
-        callback.onError(t);
-        return null;
-      } else {
-        if (capture != null) capture.fail(t);
-        throw t;
-      }
-    }
-  }
-
-  boolean isAsync() {
-    return async;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/DeadlineCaller.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/DeadlineCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/DeadlineCaller.java
deleted file mode 100644
index 75ed1ec..0000000
--- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/DeadlineCaller.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.thrift.callers;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Throwables;
-
-import org.apache.aurora.common.thrift.TResourceExhaustedException;
-import org.apache.thrift.async.AsyncMethodCallback;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.thrift.TTimeoutException;
-
-/**
- * A caller that imposes a time deadline on the underlying caller.  If the underlying calls fail
- * to meet the deadline {@link TTimeoutException} is thrown.  If the executor service rejects
- * execution of a task, {@link TResourceExhaustedException} is thrown.
- *
- * @author William Farner
- */
-public class DeadlineCaller extends CallerDecorator {
-  private final ExecutorService executorService;
-  private final Amount<Long, Time> timeout;
-
-  /**
-   * Creates a new deadline caller.
-   *
-   * @param decoratedCaller The caller to decorate with a deadline.
-   * @param async Whether the caller is asynchronous.
-   * @param executorService The executor service to use for performing calls.
-   * @param timeout The timeout by which the underlying call should complete in.
-   */
-  public DeadlineCaller(Caller decoratedCaller, boolean async, ExecutorService executorService,
-      Amount<Long, Time> timeout) {
-    super(decoratedCaller, async);
-
-    this.executorService = executorService;
-    this.timeout = timeout;
-  }
-
-  @Override
-  public Object call(final Method method, final Object[] args,
-      @Nullable final AsyncMethodCallback callback,
-      @Nullable final Amount<Long, Time> connectTimeoutOverride) throws Throwable {
-    try {
-      Future<Object> result = executorService.submit(new Callable<Object>() {
-        @Override public Object call() throws Exception {
-          try {
-            return invoke(method, args, callback, null, connectTimeoutOverride);
-          } catch (Throwable t) {
-            Throwables.propagateIfInstanceOf(t, Exception.class);
-            throw new RuntimeException(t);
-          }
-        }
-      });
-
-      try {
-        return result.get(timeout.getValue(), timeout.getUnit().getTimeUnit());
-      } catch (TimeoutException e) {
-        result.cancel(true);
-        throw new TTimeoutException(e);
-      } catch (ExecutionException e) {
-        throw e.getCause();
-      }
-    } catch (RejectedExecutionException e) {
-      throw new TResourceExhaustedException(e);
-    } catch (InvocationTargetException e) {
-      throw e.getCause();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/DebugCaller.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/DebugCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/DebugCaller.java
deleted file mode 100644
index aff4006..0000000
--- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/DebugCaller.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.thrift.callers;
-
-import com.google.common.base.Joiner;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.thrift.async.AsyncMethodCallback;
-
-import javax.annotation.Nullable;
-
-import java.lang.reflect.Method;
-import java.util.logging.Logger;
-
-/**
- * A caller that reports debugging information about calls.
- *
- * @author William Farner
- */
-public class DebugCaller extends CallerDecorator {
-  private static final Logger LOG = Logger.getLogger(DebugCaller.class.getName());
-  private static final Joiner ARG_JOINER = Joiner.on(", ");
-
-  /**
-   * Creates a new debug caller.
-   *
-   * @param decoratedCaller The caller to decorate with debug information.
-   * @param async Whether the caller is asynchronous.
-   */
-  public DebugCaller(Caller decoratedCaller, boolean async) {
-    super(decoratedCaller, async);
-  }
-
-  @Override
-  public Object call(final Method method, final Object[] args,
-      @Nullable AsyncMethodCallback callback, @Nullable Amount<Long, Time> connectTimeoutOverride)
-      throws Throwable {
-    ResultCapture capture = new ResultCapture() {
-      @Override public void success() {
-        // No-op.
-      }
-
-      @Override public boolean fail(Throwable t) {
-        StringBuilder message = new StringBuilder("Thrift call failed: ");
-        message.append(method.getName()).append("(");
-        ARG_JOINER.appendTo(message, args);
-        message.append(")");
-        LOG.warning(message.toString());
-
-        return true;
-      }
-    };
-
-    try {
-      return invoke(method, args, callback, capture, connectTimeoutOverride);
-    } catch (Throwable t) {
-      capture.fail(t);
-      throw t;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java
deleted file mode 100644
index 84a0f4e..0000000
--- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.thrift.callers;
-
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.common.thrift.TResourceExhaustedException;
-import org.apache.thrift.async.AsyncMethodCallback;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.StatsProvider;
-
-/**
-* A caller that will retry calls to the wrapped caller.
-*
-* @author William Farner
-*/
-public class RetryingCaller extends CallerDecorator {
-  private static final Logger LOG = Logger.getLogger(RetryingCaller.class.getName());
-
-  @VisibleForTesting
-  public static final Amount<Long, Time> NONBLOCKING_TIMEOUT = Amount.of(-1L, Time.MILLISECONDS);
-
-  private final StatsProvider statsProvider;
-  private final String serviceName;
-  private final int retries;
-  private final ImmutableSet<Class<? extends Exception>> retryableExceptions;
-  private final boolean debug;
-
-  /**
-   * Creates a new retrying caller. The retrying caller will attempt to call invoked methods on the
-   * underlying caller at most {@code retries} times.  A retry will be performed only when one of
-   * the {@code retryableExceptions} is caught.
-   *
-   * @param decoratedCall The caller to decorate with retries.
-   * @param async Whether the caller is asynchronous.
-   * @param statsProvider The stat provider to export retry statistics through.
-   * @param serviceName The service name that calls are being invoked on.
-   * @param retries The maximum number of retries to perform.
-   * @param retryableExceptions The exceptions that can be retried.
-   * @param debug Whether to include debugging information when retries are being performed.
-   */
-  public RetryingCaller(Caller decoratedCall, boolean async, StatsProvider statsProvider,
-      String serviceName, int retries, ImmutableSet<Class<? extends Exception>> retryableExceptions,
-      boolean debug) {
-    super(decoratedCall, async);
-    this.statsProvider = statsProvider;
-    this.serviceName = serviceName;
-    this.retries = retries;
-    this.retryableExceptions = retryableExceptions;
-    this.debug = debug;
-  }
-
-  private final LoadingCache<Method, AtomicLong> stats =
-      CacheBuilder.newBuilder().build(new CacheLoader<Method, AtomicLong>() {
-        @Override public AtomicLong load(Method method) {
-          // Thrift does not support overloads - so just the name disambiguates all calls.
-          return statsProvider.makeCounter(serviceName + "_" + method.getName() + "_retries");
-        }
-      });
-
-  @Override public Object call(final Method method, final Object[] args,
-      @Nullable final AsyncMethodCallback callback,
-      @Nullable final Amount<Long, Time> connectTimeoutOverride) throws Throwable {
-    final AtomicLong retryCounter = stats.get(method);
-    final AtomicInteger attempts = new AtomicInteger();
-    final List<Throwable> exceptions = Lists.newArrayList();
-
-    final ResultCapture capture = new ResultCapture() {
-      @Override public void success() {
-        // No-op.
-      }
-
-      @Override public boolean fail(Throwable t) {
-        if (!isRetryable(t)) {
-          if (debug) {
-            LOG.warning(String.format(
-                "Call failed with un-retryable exception of [%s]: %s, previous exceptions: %s",
-                t.getClass().getName(), t.getMessage(), combineStackTraces(exceptions)));
-          }
-
-          return true;
-        } else if (attempts.get() >= retries) {
-          exceptions.add(t);
-
-          if (debug) {
-            LOG.warning(String.format("Retried %d times, last error: %s, exceptions: %s",
-                attempts.get(), t, combineStackTraces(exceptions)));
-          }
-
-          return true;
-        } else {
-          exceptions.add(t);
-
-          if (isAsync() && attempts.incrementAndGet() <= retries) {
-            try {
-              retryCounter.incrementAndGet();
-              // override connect timeout in ThriftCaller to prevent blocking for a connection
-              // for async retries (since this is within the callback in the selector thread)
-              invoke(method, args, callback, this, NONBLOCKING_TIMEOUT);
-            } catch (Throwable throwable) {
-              return fail(throwable);
-            }
-          }
-
-          return false;
-        }
-      }
-    };
-
-    boolean continueLoop;
-    do {
-      try {
-        // If this is an async call, the looping will be handled within the capture.
-        return invoke(method, args, callback, capture, connectTimeoutOverride);
-      } catch (Throwable t) {
-        if (!isRetryable(t)) {
-          Throwable propagated = t;
-
-          if (!exceptions.isEmpty() && (t instanceof TResourceExhaustedException)) {
-            // If we've been trucking along through retries that have had remote call failures
-            // and we suddenly can't immediately get a connection on the next retry, throw the
-            // previous remote call failure - the idea here is that the remote call failure is
-            // more interesting than a transient inability to get an immediate connection.
-            propagated = exceptions.remove(exceptions.size() - 1);
-          }
-
-          if (isAsync()) {
-            callback.onError((Exception) propagated);
-          } else {
-            throw propagated;
-          }
-        }
-      }
-
-      continueLoop = !isAsync() && attempts.incrementAndGet() <= retries;
-      if (continueLoop) retryCounter.incrementAndGet();
-    } while (continueLoop);
-
-    Throwable lastRetriedException = Iterables.getLast(exceptions);
-    if (debug) {
-      if (!exceptions.isEmpty()) {
-        LOG.warning(
-            String.format("Retried %d times, last error: %s, previous exceptions: %s",
-                attempts.get(), lastRetriedException, combineStackTraces(exceptions)));
-      } else {
-        LOG.warning(
-            String.format("Retried 1 time, last error: %s", lastRetriedException));
-      }
-    }
-
-    if (!isAsync()) throw lastRetriedException;
-    return null;
-  }
-
-  private boolean isRetryable(Throwable throwable) {
-    return isRetryable.getUnchecked(throwable.getClass());
-  }
-
-  private final LoadingCache<Class<? extends Throwable>, Boolean> isRetryable =
-      CacheBuilder.newBuilder().build(new CacheLoader<Class<? extends Throwable>, Boolean>() {
-        @Override public Boolean load(Class<? extends Throwable> exceptionClass) {
-          return isRetryable(exceptionClass);
-        }
-      });
-
-  private boolean isRetryable(final Class<? extends Throwable> exceptionClass) {
-    if (retryableExceptions.contains(exceptionClass)) {
-      return true;
-    }
-    return Iterables.any(retryableExceptions, new Predicate<Class<? extends Exception>>() {
-      @Override public boolean apply(Class<? extends Exception> retryableExceptionClass) {
-        return retryableExceptionClass.isAssignableFrom(exceptionClass);
-      }
-    });
-  }
-
-  private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n');
-
-  private static String combineStackTraces(List<Throwable> exceptions) {
-    if (exceptions.isEmpty()) {
-      return "none";
-    } else {
-      return STACK_TRACE_JOINER.join(Iterables.transform(exceptions,
-          new Function<Throwable, String>() {
-            private int index = 1;
-            @Override public String apply(Throwable exception) {
-              return String.format("[%d] %s",
-                  index++, Throwables.getStackTraceAsString(exception));
-            }
-          }));
-    }
-  }
-}


Mime
View raw message