aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zma...@apache.org
Subject [23/37] aurora git commit: Import of Twitter Commons.
Date Tue, 25 Aug 2015 18:19:37 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java b/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java
new file mode 100644
index 0000000..8767433
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java
@@ -0,0 +1,124 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing.easymock;
+
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.WildcardType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.reflect.TypeToken;
+import com.google.common.testing.TearDown;
+import com.google.common.testing.junit4.TearDownTestCase;
+
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+
+import static org.easymock.EasyMock.createControl;
+
+/**
+ * A baseclass for tests that use EasyMock.  A new {@link IMocksControl control} is set up before
+ * each test and the mocks created and replayed with it are verified during tear down.
+ *
+ * @author John Sirois
+ */
+public abstract class EasyMockTest extends TearDownTestCase {
+  protected IMocksControl control;
+
+  /**
+   * Creates an EasyMock {@link #control} for tests to use that will be automatically
+   * {@link IMocksControl#verify() verified} on tear down.
+   */
+  @Before
+  public final void setupEasyMock() {
+    control = createControl();
+    addTearDown(new TearDown() {
+      @Override public void tearDown() {
+        control.verify();
+      }
+    });
+  }
+
+  /**
+   * Creates an EasyMock mock with this test's control.  Will be
+   * {@link IMocksControl#verify() verified} in a tear down.
+   */
+  public <T> T createMock(Class<T> type) {
+    Preconditions.checkNotNull(type);
+    return control.createMock(type);
+  }
+
+  /**
+   * A class meant to be sub-classed in order to capture a generic type literal value.  To capture
+   * the type of a {@code List<String>} you would use: {@code new Clazz<List<String>>() {}}
+   */
+  public abstract static class Clazz<T> extends TypeToken {
+    Class<T> rawType() {
+      @SuppressWarnings("unchecked")
+      Class<T> rawType = (Class<T>) findRawType();
+      return rawType;
+    }
+
+    private Class<?> findRawType() {
+      if (getType() instanceof Class<?>) { // Plain old
+        return (Class<?>) getType();
+
+      } else if (getType() instanceof ParameterizedType) { // Nested type parameter
+        ParameterizedType parametrizedType = (ParameterizedType) getType();
+        Type rawType = parametrizedType.getRawType();
+        return (Class<?>) rawType;
+      } else if (getType() instanceof GenericArrayType) {
+        throw new IllegalStateException("cannot mock arrays, rejecting type: " + getType());
+      } else if (getType() instanceof WildcardType) {
+        throw new IllegalStateException(
+            "wildcarded instantiations are not allowed in java, rejecting type: " + getType());
+      } else {
+        throw new IllegalArgumentException("Could not decode raw type for: " + getType());
+      }
+    }
+
+    public T createMock() {
+      return EasyMock.createMock(rawType());
+    }
+
+    public T createMock(IMocksControl control) {
+      return control.createMock(rawType());
+    }
+  }
+
+  /**
+   * Creates an EasyMock mock with this test's control.  Will be
+   * {@link IMocksControl#verify() verified} in a tear down.
+   *
+   * Allows for mocking of parameterized types without all the unchecked conversion warnings in a
+   * safe way.
+   */
+  public <T> T createMock(Clazz<T> type) {
+    Preconditions.checkNotNull(type);
+    return type.createMock(control);
+  }
+
+  /**
+   * A type-inferring convenience method for creating new captures.
+   */
+  public static <T> Capture<T> createCapture() {
+    return new Capture<T>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java b/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java
new file mode 100644
index 0000000..5197e91
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java
@@ -0,0 +1,77 @@
+package com.twitter.common.testing.easymock;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multiset;
+
+import org.easymock.IArgumentMatcher;
+
+import static org.easymock.EasyMock.reportMatcher;
+
+/**
+ * This EasyMock argument matcher tests Iterables for equality irrespective of order.
+ *
+ * @param <T> type argument for the Iterables being matched.
+ */
+public class IterableEquals<T> implements IArgumentMatcher {
+  private final Multiset<T> elements = HashMultiset.create();
+
+  /**
+   * Constructs an IterableEquals object that tests for equality against the specified expected
+   * Iterable.
+   *
+   * @param expected an Iterable containing the elements that are expected, in any order.
+   */
+  public IterableEquals(Iterable<T> expected) {
+    Iterables.addAll(elements, expected);
+  }
+
+  @Override
+  public boolean matches(Object observed) {
+    if (observed instanceof Iterable<?>) {
+      Multiset<Object> observedElements = HashMultiset.create((Iterable<?>) observed);
+      return elements.equals(observedElements);
+    }
+    return false;
+  }
+
+  @Override
+  public void appendTo(StringBuffer buffer) {
+    buffer.append("eqIterable(").append(elements).append(")");
+  }
+
+  /**
+   * When used in EasyMock expectations, this matches an Iterable having the same elements in any
+   * order.
+   *
+   * @return null, to avoid a compile time error.
+   */
+  public static <T> Iterable<T> eqIterable(Iterable<T> in) {
+    reportMatcher(new IterableEquals(in));
+    return null;
+  }
+
+  /**
+   * When used in EasyMock expectations, this matches a List having the same elements in any order.
+   *
+   * @return null, to avoid a compile time error.
+   */
+  public static <T> List<T> eqList(Iterable<T> in) {
+    reportMatcher(new IterableEquals(in));
+    return null;
+  }
+
+  /**
+   * When used in EasyMock expectations, this matches a Collection having the same elements in any
+   * order.
+   *
+   * @return null, to avoid a compile time error.
+   */
+  public static <T> Collection<T> eqCollection(Iterable<T> in) {
+    reportMatcher(new IterableEquals(in));
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java b/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java
new file mode 100644
index 0000000..5658ff6
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java
@@ -0,0 +1,161 @@
+// =================================================================================================
+// Copyright 2015 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing.junit.rules;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+
+import org.junit.rules.MethodRule;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.Statement;
+
+/**
+ * A test method annotation useful for smoking out flaky behavior in tests.
+ *
+ * @see Retry.Rule RetryRule needed to enable this annotation in a test class.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface Retry {
+
+  /**
+   * The number of times to retry the test.
+   *
+   * When a {@link Retry.Rule} is installed and a test method is annotated for {@literal @Retry},
+   * it will be retried 0 to N times.  If times is negative, it is treated as 0 and no retries are
+   * performed.  If times is &gt;= 1 then a successful execution of the annotated test method is
+   * retried until the 1st error, failure or otherwise up to {@code times} times.
+   */
+  int times() default 1;
+
+  /**
+   * Enables {@link Retry @Retry}able tests.
+   */
+  class Rule implements MethodRule {
+    private interface ThrowableFactory {
+      Throwable create(String message, Throwable cause);
+    }
+
+    private static Throwable annotate(
+        int tryNumber,
+        final int maxRetries,
+        Throwable cause,
+        String prefix,
+        ThrowableFactory throwableFactory) {
+
+      Throwable annotated =
+          throwableFactory.create(
+              String.format("%s on try %d of %d: %s", prefix, tryNumber, maxRetries + 1,
+                  Objects.firstNonNull(cause.getMessage(), "")), cause);
+      annotated.setStackTrace(cause.getStackTrace());
+      return annotated;
+    }
+
+    static class RetriedAssertionError extends AssertionError {
+      private final int tryNumber;
+      private final int maxRetries;
+
+      RetriedAssertionError(int tryNumber, int maxRetries, String message, Throwable cause) {
+        // We do a manual initCause here to be compatible with the Java 1.6 AssertionError
+        // constructors.
+        super(message);
+        initCause(cause);
+
+        this.tryNumber = tryNumber;
+        this.maxRetries = maxRetries;
+      }
+
+      @VisibleForTesting
+      int getTryNumber() {
+        return tryNumber;
+      }
+
+      @VisibleForTesting
+      int getMaxRetries() {
+        return maxRetries;
+      }
+    }
+
+    private static Throwable annotate(final int tryNumber, final int maxRetries, AssertionError e) {
+      return annotate(tryNumber, maxRetries, e, "Failure", new ThrowableFactory() {
+        @Override public Throwable create(String message, Throwable cause) {
+          return new RetriedAssertionError(tryNumber, maxRetries, message, cause);
+        }
+      });
+    }
+
+    static class RetriedException extends Exception {
+      private final int tryNumber;
+      private final int maxRetries;
+
+      RetriedException(int tryNumber, int maxRetries, String message, Throwable cause) {
+        super(message, cause);
+        this.tryNumber = tryNumber;
+        this.maxRetries = maxRetries;
+      }
+
+      @VisibleForTesting
+      int getTryNumber() {
+        return tryNumber;
+      }
+
+      @VisibleForTesting
+      int getMaxRetries() {
+        return maxRetries;
+      }
+    }
+
+    private static Throwable annotate(final int tryNumber, final int maxRetries, Exception e) {
+      return annotate(tryNumber, maxRetries, e, "Error", new ThrowableFactory() {
+        @Override public Throwable create(String message, Throwable cause) {
+          return new RetriedException(tryNumber, maxRetries, message, cause);
+        }
+      });
+    }
+
+    @Override
+    public Statement apply(final Statement statement, FrameworkMethod method, Object receiver) {
+      Retry retry = method.getAnnotation(Retry.class);
+      if (retry == null || retry.times() <= 0) {
+        return statement;
+      } else {
+        final int times = retry.times();
+        return new Statement() {
+          @Override public void evaluate() throws Throwable {
+            for (int i = 0; i <= times; i++) {
+              try {
+                statement.evaluate();
+              } catch (AssertionError e) {
+                throw annotate(i + 1, times, e);
+              // We purposefully catch any non-assertion exceptions in order to tag the try count
+              // for erroring (as opposed to failing) tests.
+              // SUPPRESS CHECKSTYLE RegexpSinglelineJava
+              } catch (Exception e) {
+                throw annotate(i + 1, times, e);
+              }
+            }
+          }
+        };
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java b/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java
new file mode 100644
index 0000000..a56bb2b
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java
@@ -0,0 +1,34 @@
+// =================================================================================================
+// Copyright 2012 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.testing.mockito;
+
+import org.junit.Before;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * A base class for tests that use Mockito. Before each test, it initializes all the mocks
+ * declared in the class.
+ */
+public abstract class MockitoTest  {
+  /**
+   * Initializes all fields annotated with {@link org.mockito.Mock}.
+   */
+  @Before
+  public final void initMockito() {
+    MockitoAnnotations.initMocks(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/Config.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/Config.java b/commons/src/main/java/com/twitter/common/thrift/Config.java
new file mode 100644
index 0000000..977489f
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/Config.java
@@ -0,0 +1,305 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
+
+/**
+ * Represents the configuration for a thrift call.  Use {@link #builder()} to create a new one or
+ * or {@link #builder(Config)} to create a new config based on another config.
+ *
+ * <p>If a deadline is specified, it acts as a global timeout for each thrift call made.
+ * Obtaining connections, performing the remote call and executing retries are all expected to
+ * complete within this deadline.  When the specified deadline is not met, an
+ * {@link TTimeoutException} will be thrown.
+ *
+ * <p>If max retries is specified as zero (never retry), then the list of retryable exceptions are
+ * ignored.  It is only when max retries is greater than zero that list of retryable exceptions is
+ * used to determine if a particular failed call should be retried.
+ *
+ * @author John Sirois
+ */
+public class Config {
+
+  /**
+   * Created a builder for a new {@link Config}.  Default values are as follows:
+   * <ul>
+   * <li>{@link #getRequestTimeout()} 0
+   * <li>{@link #getMaxRetries()} 0
+   * <li>{@link #getRetryableExceptions()} []
+   * <li>{@link #isDebug()} ()} false
+   * </ul>
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   *
+   * @param config the builder configuration to use
+   */
+  public static Builder builder(Config config) {
+    Preconditions.checkNotNull(config);
+    return new Builder(config);
+  }
+
+  private static final Amount<Long,Time> DEADLINE_BLOCKING = Amount.of(0L, Time.MILLISECONDS);
+
+  @VisibleForTesting
+  static final Amount<Long,Time> DEFAULT_CONNECT_TIMEOUT = Amount.of(5L, Time.SECONDS);
+
+  private Amount<Long, Time> requestTimeout = DEADLINE_BLOCKING;
+  private Amount<Long, Time> connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+  private int maxRetries;
+  private ImmutableSet<Class<? extends Exception>> retryableExceptions = ImmutableSet.of();
+  private boolean debug = false;
+  private boolean enableStats = true;
+  private StatsProvider statsProvider = Stats.STATS_PROVIDER;
+
+  private Config() {
+    // defaults
+  }
+
+  private Config(Config copyFrom) {
+    requestTimeout = copyFrom.requestTimeout;
+    maxRetries = copyFrom.maxRetries;
+    retryableExceptions = copyFrom.retryableExceptions;
+    debug = copyFrom.debug;
+    statsProvider = copyFrom.statsProvider;
+  }
+
+  /**
+   * Returns the maximum time to wait for any thrift call to complete.  A deadline of 0 means to
+   * wait forever
+   */
+  public Amount<Long, Time> getRequestTimeout() {
+    return requestTimeout;
+  }
+
+  /**
+   * Returns the maximum time to wait for a connection to be established.  A deadline of 0 means to
+   * wait forever
+   */
+  public Amount<Long, Time> getConnectTimeout() {
+    return connectTimeout;
+  }
+
+  /**
+   * Returns the maximum number of retries to perform for each thrift call.  A value of 0 means to
+   * never retry and in this case {@link #getRetryableExceptions()} will be an empty set.
+   */
+  public int getMaxRetries() {
+    return maxRetries;
+  }
+
+  /**
+   * Returns the set of exceptions to retry calls for.  The returned set will only be empty if
+   * {@link #getMaxRetries()} is 0.
+   */
+  public ImmutableSet<Class<? extends Exception>> getRetryableExceptions() {
+    return retryableExceptions;
+  }
+
+  /**
+   * Returns {@code true} if the client should log extra debugging information.  Currently this
+   * includes method call arguments when RPCs fail with exceptions.
+   */
+  public boolean isDebug() {
+    return debug;
+  }
+
+  /**
+   * Returns {@code true} if the client should track request statistics.
+   */
+  public boolean enableStats() {
+    return enableStats;
+  }
+
+  /**
+   * Returns the stats provider to use to record Thrift client stats.
+   */
+  public StatsProvider getStatsProvider() {
+    return statsProvider;
+  }
+
+  // This was made public because it seems to be causing problems for scala users when it is not
+  // public.
+  public static abstract class AbstractBuilder<T extends AbstractBuilder> {
+    private final Config config;
+
+    AbstractBuilder() {
+      this.config = new Config();
+    }
+
+    AbstractBuilder(Config template) {
+      Preconditions.checkNotNull(template);
+      this.config = new Config(template);
+    }
+
+    protected abstract T getThis();
+
+    // TODO(John Sirois): extra validation or design ... can currently do strange things like:
+    // builder.blocking().withDeadline(1, TimeUnit.MILLISECONDS)
+    // builder.noRetries().retryOn(TException.class)
+
+    /**
+     * Specifies that all calls be blocking calls with no inherent deadline.  It may be the
+     * case that underlying transports will eventually deadline, but {@link Thrift} will not
+     * enforce a deadline.
+     */
+    public final T blocking() {
+      config.requestTimeout = DEADLINE_BLOCKING;
+      return getThis();
+    }
+
+    /**
+     * Specifies that all calls be subject to a global timeout.  This deadline includes all call
+     * activities, including obtaining a free connection and any automatic retries.
+     */
+    public final T withRequestTimeout(Amount<Long, Time> timeout) {
+      Preconditions.checkNotNull(timeout);
+      Preconditions.checkArgument(timeout.getValue() >= 0,
+          "A negative deadline is invalid: %s", timeout);
+      config.requestTimeout = timeout;
+      return getThis();
+    }
+
+    /**
+     * Assigns the timeout for all connections established with the blocking client.
+     * On an asynchronous client this timeout is only used for the connection pool lock
+     * acquisition on initial calls (not retries, @see withRetries).  The actual network
+     * connection timeout for the asynchronous client is governed by socketTimeout.
+     *
+     * @param timeout Connection timeout.
+     * @return A reference to the builder.
+     */
+    public final T withConnectTimeout(Amount<Long, Time> timeout) {
+      Preconditions.checkNotNull(timeout);
+      Preconditions.checkArgument(timeout.getValue() >= 0,
+          "A negative deadline is invalid: %s", timeout);
+      config.connectTimeout = timeout;
+      return getThis();
+    }
+
+    /**
+     * Specifies that no calls be automatically retried.
+     */
+    public final T noRetries() {
+      config.maxRetries = 0;
+      config.retryableExceptions = ImmutableSet.of();
+      return getThis();
+    }
+
+    /**
+     * Specifies that failing calls meeting {@link #retryOn retry} criteria be retried up to a
+     * maximum of {@code retries} times before failing.  On an asynchronous client, these retries
+     * will be forced to be non-blocking, failing fast if they cannot immediately acquire the
+     * connection pool locks, so they only provide a best-effort retry strategy there.
+     */
+    public final T withRetries(int retries) {
+      Preconditions.checkArgument(retries >= 0, "A negative retry count is invalid: %d", retries);
+      config.maxRetries = retries;
+      return getThis();
+    }
+
+    /**
+     * Specifies the set of exception classes that are to be considered retryable (if retries are
+     * enabled).  Any exceptions thrown by the underlying thrift call will be considered retryable
+     * if they are an instance of any one of the specified exception classes.  The set of exception
+     * classes must contain at least exception class.  To specify no retries either use
+     * {@link #noRetries()} or pass zero to {@link #withRetries(int)}.
+     */
+    public final T retryOn(Iterable<? extends Class<? extends Exception>> retryableExceptions) {
+      Preconditions.checkNotNull(retryableExceptions);
+      ImmutableSet<Class<? extends Exception>> classes =
+          ImmutableSet.copyOf(Iterables.filter(retryableExceptions, Predicates.notNull()));
+      Preconditions.checkArgument(!classes.isEmpty(),
+          "Must provide at least one retryable exception class");
+      config.retryableExceptions = classes;
+      return getThis();
+    }
+
+    /**
+     * Specifies the set of exception classes that are to be considered retryable (if retries are
+     * enabled).  Any exceptions thrown by the underlying thrift call will be considered retryable
+     * if they are an instance of any one of the specified exception classes.  The set of exception
+     * classes must contain at least exception class.  To specify no retries either use
+     * {@link #noRetries()} or pass zero to {@link #withRetries(int)}.
+     */
+    public final T retryOn(Class<? extends Exception> exception) {
+      Preconditions.checkNotNull(exception);
+      config.retryableExceptions =
+          ImmutableSet.<Class<? extends Exception>>builder().add(exception).build();
+      return getThis();
+    }
+
+    /**
+     * When {@code debug == true}, specifies that extra debugging information should be logged.
+     */
+    public final T withDebug(boolean debug) {
+      config.debug = debug;
+      return getThis();
+    }
+
+    /**
+     * Disables stats collection on the client (enabled by default).
+     */
+    public T disableStats() {
+      config.enableStats = false;
+      return getThis();
+    }
+
+    /**
+     * Registers a custom stats provider to use to track various client stats.
+     */
+    public T withStatsProvider(StatsProvider statsProvider) {
+      config.statsProvider = Preconditions.checkNotNull(statsProvider);
+      return getThis();
+    }
+
+    protected final Config getConfig() {
+      return config;
+    }
+  }
+
+  public static final class Builder extends AbstractBuilder<Builder> {
+    private Builder() {
+      super();
+    }
+
+    private Builder(Config template) {
+      super(template);
+    }
+
+    @Override
+    protected Builder getThis() {
+      return this;
+    }
+
+    public Config create() {
+      return getConfig();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java b/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java
new file mode 100644
index 0000000..fb9194d
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java
@@ -0,0 +1,42 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import org.apache.thrift.TException;
+
+/**
+ * @author Adam Samet
+ *
+ * This is exception is thrown when there are no available instances of a thrift backend
+ * service to serve requests.
+ */
+public class TResourceExhaustedException extends TException {
+
+  private static final long serialVersionUID = 1L;
+
+  public TResourceExhaustedException(String message) {
+    super(message);
+  }
+
+  public TResourceExhaustedException(Throwable cause) {
+    super(cause);
+  }
+
+  public TResourceExhaustedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java b/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java
new file mode 100644
index 0000000..50020bd
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java
@@ -0,0 +1,41 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import org.apache.thrift.TException;
+
+/**
+ * @author Adam Samet
+ *
+ * This is exception is thrown when accessing a thrift service resource times out.
+ */
+public class TTimeoutException extends TException {
+
+  private static final long serialVersionUID = 1L;
+
+  public TTimeoutException(String message) {
+    super(message);
+  }
+
+  public TTimeoutException(Throwable cause) {
+    super(cause);
+  }
+
+  public TTimeoutException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java b/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java
new file mode 100644
index 0000000..329c03f
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java
@@ -0,0 +1,73 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import com.google.common.base.Preconditions;
+import com.twitter.common.net.pool.Connection;
+import com.twitter.common.net.pool.ConnectionPool;
+import org.apache.thrift.transport.TTransport;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A {@link ConnectionPool} compatible thrift connection that can work with any valid thrift
+ * transport.
+ *
+ * @author John Sirois
+ */
+public class TTransportConnection implements Connection<TTransport, InetSocketAddress> {
+
+  private final TTransport transport;
+  private final InetSocketAddress endpoint;
+
+  public TTransportConnection(TTransport transport, InetSocketAddress endpoint) {
+    this.transport = Preconditions.checkNotNull(transport);
+    this.endpoint = Preconditions.checkNotNull(endpoint);
+  }
+
+  /**
+   * Returns {@code true} if the underlying transport is still open.  To invalidate a transport it
+   * should be closed.
+   *
+   * <p>TODO(John Sirois): it seems like an improper soc to have validity testing here and not also an
+   * invalidation method - correct or accept
+   */
+  @Override
+  public boolean isValid() {
+    return transport.isOpen();
+  }
+
+  @Override
+  public TTransport get() {
+    return transport;
+  }
+
+  @Override
+  public void close() {
+    transport.close();
+  }
+
+  @Override
+  public InetSocketAddress getEndpoint() {
+    return endpoint;
+  }
+
+  @Override
+  public String toString() {
+    return endpoint.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/Thrift.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/Thrift.java b/commons/src/main/java/com/twitter/common/thrift/Thrift.java
new file mode 100644
index 0000000..aecf251
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/Thrift.java
@@ -0,0 +1,393 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.net.loadbalancing.RequestTracker;
+import com.twitter.common.net.pool.Connection;
+import com.twitter.common.net.pool.ObjectPool;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.thrift.callers.Caller;
+import com.twitter.common.thrift.callers.DeadlineCaller;
+import com.twitter.common.thrift.callers.DebugCaller;
+import com.twitter.common.thrift.callers.RetryingCaller;
+import com.twitter.common.thrift.callers.StatTrackingCaller;
+import com.twitter.common.thrift.callers.ThriftCaller;
+
+/**
+ * A generic thrift client that handles reconnection in the case of protocol errors, automatic
+ * retries, call deadlines and call statistics tracking.  This class aims for behavior compatible
+ * with the <a href="http://github.com/fauna/thrift_client">generic ruby thrift client</a>.
+ *
+ * <p>In order to enforce call deadlines for synchronous clients, this class uses an
+ * {@link java.util.concurrent.ExecutorService}.  If a custom executor is supplied, it should throw
+ * a subclass of {@link RejectedExecutionException} to signal thread resource exhaustion, in which
+ * case the client will fail fast and propagate the event as a {@link TResourceExhaustedException}.
+ *
+ * TODO(William Farner): Before open sourcing, look into changing the current model of wrapped proxies
+ *    to use a single proxy and wrapped functions for decorators.
+ *
+ * @author John Sirois
+ */
+public class Thrift<T> {
+
+  /**
+   * The default thrift call configuration used if none is specified.
+   *
+   * Specifies the following settings:
+   * <ul>
+   * <li>global call timeout: 1 second
+   * <li>call retries: 0
+   * <li>retryable exceptions: TTransportException (network exceptions including socket timeouts)
+   * <li>wait for connections: true
+   * <li>debug: false
+   * </ul>
+   */
+  public static final Config DEFAULT_CONFIG = Config.builder()
+      .withRequestTimeout(Amount.of(1L, Time.SECONDS))
+      .noRetries()
+      .retryOn(TTransportException.class) // if maxRetries is set non-zero
+      .create();
+
+  /**
+   * The default thrift call configuration used for an async client if none is specified.
+   *
+   * Specifies the following settings:
+   * <ul>
+   * <li>global call timeout: none
+   * <li>call retries: 0
+   * <li>retryable exceptions: IOException, TTransportException
+   *    (network exceptions but not timeouts)
+   * <li>wait for connections: true
+   * <li>debug: false
+   * </ul>
+   */
+  @SuppressWarnings("unchecked")
+  public static final Config DEFAULT_ASYNC_CONFIG = Config.builder(DEFAULT_CONFIG)
+      .withRequestTimeout(Amount.of(0L, Time.SECONDS))
+      .noRetries()
+      .retryOn(ImmutableSet.<Class<? extends Exception>>builder()
+          .add(IOException.class)
+          .add(TTransportException.class).build()) // if maxRetries is set non-zero
+      .create();
+
+  private final Config defaultConfig;
+  private final ExecutorService executorService;
+  private final ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool;
+  private final RequestTracker<InetSocketAddress> requestTracker;
+  private final String serviceName;
+  private final Class<T> serviceInterface;
+  private final Function<TTransport, T> clientFactory;
+  private final boolean async;
+  private final boolean withSsl;
+
+  /**
+   * Constructs an instance with the {@link #DEFAULT_CONFIG}, cached thread pool
+   * {@link ExecutorService}, and synchronous calls.
+   *
+   * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
+   *     boolean, boolean)
+   */
+  public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+      RequestTracker<InetSocketAddress> requestTracker,
+      String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory) {
+
+    this(DEFAULT_CONFIG, connectionPool, requestTracker, serviceName, serviceInterface,
+        clientFactory, false, false);
+  }
+
+  /**
+   * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool
+   * {@link ExecutorService}.
+   *
+   * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
+   *    boolean, boolean)
+   */
+  public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+      RequestTracker<InetSocketAddress> requestTracker,
+      String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
+      boolean async) {
+
+    this(getConfig(async), connectionPool, requestTracker, serviceName,
+        serviceInterface, clientFactory, async, false);
+  }
+
+  /**
+   * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool
+   * {@link ExecutorService}.
+   *
+   * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
+   *    boolean, boolean)
+   */
+  public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+      RequestTracker<InetSocketAddress> requestTracker,
+      String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
+      boolean async, boolean ssl) {
+
+    this(getConfig(async), connectionPool, requestTracker, serviceName,
+        serviceInterface, clientFactory, async, ssl);
+  }
+
+  /**
+   * Constructs an instance with a cached thread pool {@link ExecutorService}.
+   *
+   * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
+   *    boolean, boolean)
+   */
+  public Thrift(Config config, ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+      RequestTracker<InetSocketAddress> requestTracker,
+      String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
+      boolean async, boolean ssl) {
+
+    this(config,
+        Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("Thrift["+ serviceName +"][%d]")
+                .build()),
+        connectionPool, requestTracker, serviceName, serviceInterface, clientFactory, async, ssl);
+  }
+
+  /**
+   * Constructs an instance with the {@link #DEFAULT_CONFIG}.
+   *
+   * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function,
+   *    boolean, boolean)
+   */
+  public Thrift(ExecutorService executorService,
+      ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+      RequestTracker<InetSocketAddress> requestTracker,
+      String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory,
+      boolean async, boolean ssl) {
+
+    this(getConfig(async), executorService, connectionPool, requestTracker, serviceName,
+        serviceInterface, clientFactory, async, ssl);
+  }
+
+  private static Config getConfig(boolean async) {
+    return async ? DEFAULT_ASYNC_CONFIG : DEFAULT_CONFIG;
+  }
+
+  /**
+   * Constructs a new Thrift factory for creating clients that make calls to a particular thrift
+   * service.
+   *
+   * <p>Note that the combination of {@code config} and {@code connectionPool} need to be chosen
+   * with care depending on usage of the generated thrift clients.  In particular, if configured
+   * to not wait for connections, the {@code connectionPool} ought to be warmed up with a set of
+   * connections or else be actively building connections in the background.
+   *
+   * <p>TODO(John Sirois): consider adding an method to ObjectPool that would allow Thrift to handle
+   * this case by pro-actively warming the pool.
+   *
+   * @param config the default configuration to use for all thrift calls; also the configuration all
+   *     {@link ClientBuilder}s start with
+   * @param executorService for invoking calls with a specified deadline
+   * @param connectionPool the source for thrift connections
+   * @param serviceName a /vars friendly name identifying the service clients will connect to
+   * @param serviceInterface the thrift compiler generate interface class for the remote service
+   *     (Iface)
+   * @param clientFactory a function that can generate a concrete thrift client for the given
+   *     {@code serviceInterface}
+   * @param async enable asynchronous API
+   * @param ssl enable TLS handshaking for Thrift calls
+   */
+  public Thrift(Config config, ExecutorService executorService,
+      ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool,
+      RequestTracker<InetSocketAddress> requestTracker, String serviceName,
+      Class<T> serviceInterface, Function<TTransport, T> clientFactory, boolean async, boolean ssl) {
+
+    defaultConfig = Preconditions.checkNotNull(config);
+    this.executorService = Preconditions.checkNotNull(executorService);
+    this.connectionPool = Preconditions.checkNotNull(connectionPool);
+    this.requestTracker = Preconditions.checkNotNull(requestTracker);
+    this.serviceName = MorePreconditions.checkNotBlank(serviceName);
+    this.serviceInterface = checkServiceInterface(serviceInterface);
+    this.clientFactory = Preconditions.checkNotNull(clientFactory);
+    this.async = async;
+    this.withSsl = ssl;
+  }
+
+  static <I> Class<I> checkServiceInterface(Class<I> serviceInterface) {
+    Preconditions.checkNotNull(serviceInterface);
+    Preconditions.checkArgument(serviceInterface.isInterface(),
+        "%s must be a thrift service interface", serviceInterface);
+    return serviceInterface;
+  }
+
+  /**
+   * Closes any open connections and prepares this thrift client for graceful shutdown.  Any thrift
+   * client proxies returned from {@link #create()} will become invalid.
+   */
+  public void close() {
+    connectionPool.close();
+    executorService.shutdown();
+  }
+
+  /**
+   * A builder class that allows modifications of call behavior to be made for a given Thrift
+   * client.  Note that in the case of conflicting configuration calls, the last call wins.  So,
+   * for example, the following sequence would result in all calls being subject to a 5 second
+   * global deadline:
+   * <code>
+   *   builder.blocking().withDeadline(5, TimeUnit.SECONDS).create()
+   * </code>
+   *
+   * @see Config
+   */
+  public final class ClientBuilder extends Config.AbstractBuilder<ClientBuilder> {
+    private ClientBuilder(Config template) {
+      super(template);
+    }
+
+    @Override
+    protected ClientBuilder getThis() {
+      return this;
+    }
+
+    /**
+     * Creates a new client using the built up configuration changes.
+     */
+    public T create() {
+      return createClient(getConfig());
+    }
+  }
+
+  /**
+   * Creates a new thrift client builder that inherits this Thrift instance's default configuration.
+   * This is useful for customizing a client for a particular thrift call that makes sense to treat
+   * differently from the rest of the calls to a given service.
+   */
+  public ClientBuilder builder() {
+    return builder(defaultConfig);
+  }
+
+  /**
+   * Creates a new thrift client builder that inherits the given configuration.
+   * This is useful for customizing a client for a particular thrift call that makes sense to treat
+   * differently from the rest of the calls to a given service.
+   */
+  public ClientBuilder builder(Config config) {
+    Preconditions.checkNotNull(config);
+    return new ClientBuilder(config);
+  }
+
+  /**
+   * Creates a new client using the default configuration specified for this Thrift instance.
+   */
+  public T create() {
+    return createClient(defaultConfig);
+  }
+
+  private T createClient(Config config) {
+    StatsProvider statsProvider = config.getStatsProvider();
+
+    // lease/call/[invalidate]/release
+    boolean debug = config.isDebug();
+
+    Caller decorated = new ThriftCaller<T>(connectionPool, requestTracker, clientFactory,
+        config.getConnectTimeout(), debug);
+
+    // [retry]
+    if (config.getMaxRetries() > 0) {
+      decorated = new RetryingCaller(decorated, async, statsProvider, serviceName,
+          config.getMaxRetries(), config.getRetryableExceptions(), debug);
+    }
+
+    // [deadline]
+    if (config.getRequestTimeout().getValue() > 0) {
+      Preconditions.checkArgument(!async,
+          "Request deadlines may not be used with an asynchronous client.");
+
+      decorated = new DeadlineCaller(decorated, async, executorService, config.getRequestTimeout());
+    }
+
+    // [debug]
+    if (debug) {
+      decorated = new DebugCaller(decorated, async);
+    }
+
+    // stats
+    if (config.enableStats()) {
+      decorated = new StatTrackingCaller(decorated, async, statsProvider, serviceName);
+    }
+
+    final Caller caller = decorated;
+
+    final InvocationHandler invocationHandler = new InvocationHandler() {
+      @Override
+      public Object invoke(Object o, Method method, Object[] args) throws Throwable {
+        AsyncMethodCallback callback = null;
+        if (args != null && async) {
+          List<Object> argsList = Lists.newArrayList(args);
+          callback = extractCallback(argsList);
+          args = argsList.toArray();
+        }
+
+        return caller.call(method, args, callback, null);
+      }
+    };
+
+    @SuppressWarnings("unchecked")
+    T instance = (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),
+        new Class<?>[] {serviceInterface}, invocationHandler);
+    return instance;
+  }
+
+  /**
+   * Verifies that the final argument in a list of objects is a fully-formed
+   * {@link AsyncMethodCallback} and extracts it, removing it from the argument list.
+   *
+   * @param args Argument list to remove the callback from.
+   * @return The callback extracted from {@code args}.
+   */
+  private static AsyncMethodCallback extractCallback(List<Object> args) {
+    // TODO(William Farner): Check all interface methods when building the Thrift client
+    //    and verify that last arguments are all callbacks...this saves us from checking
+    //    each time.
+
+    // Check that the last argument is a callback.
+    Preconditions.checkArgument(args.size() > 0);
+    Object lastArg = args.get(args.size() - 1);
+    Preconditions.checkArgument(lastArg instanceof AsyncMethodCallback,
+        "Last argument of an async thrift call is expected to be of type AsyncMethodCallback.");
+
+    return (AsyncMethodCallback) args.remove(args.size() - 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java b/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java
new file mode 100644
index 0000000..a1b79b0
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java
@@ -0,0 +1,369 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Closures;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.net.pool.Connection;
+import com.twitter.common.net.pool.ConnectionFactory;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+/**
+ * A connection factory for thrift transport connections to a given host.  This connection factory
+ * is lazy and will only create a configured maximum number of active connections - where a
+ * {@link ConnectionFactory#create(com.twitter.common.quantity.Amount) created} connection that has
+ * not been {@link #destroy destroyed} is considered active.
+ *
+ * @author John Sirois
+ */
+public class ThriftConnectionFactory
+    implements ConnectionFactory<Connection<TTransport, InetSocketAddress>> {
+
+  public enum TransportType {
+    BLOCKING, FRAMED, NONBLOCKING;
+
+    /**
+     * Async clients implicitly use a framed transport, requiring the server they connect to to do
+     * the same. This prevents specifying a nonblocking client without a framed transport, since
+     * that is not compatible with thrift and would simply cause the client to blow up when making a
+     * request. Instead, you must explicitly say useFramedTransport(true) for any buildAsync().
+     */
+    public static TransportType get(boolean framedTransport, boolean nonblocking) {
+      if (nonblocking) {
+        Preconditions.checkArgument(framedTransport,
+            "nonblocking client requires a server running framed transport");
+        return NONBLOCKING;
+      }
+
+      return framedTransport ? FRAMED : BLOCKING;
+    }
+  }
+
+  private static InetSocketAddress asEndpoint(String host, int port) {
+    MorePreconditions.checkNotBlank(host);
+    Preconditions.checkArgument(port > 0);
+    return InetSocketAddress.createUnresolved(host, port);
+  }
+
+  private InetSocketAddress endpoint;
+  private final int maxConnections;
+  private final TransportType transportType;
+  private final Amount<Long, Time> socketTimeout;
+  private final Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback;
+  private boolean sslTransport = false;
+
+  private final Set<Connection<TTransport, InetSocketAddress>> activeConnections =
+      Sets.newSetFromMap(
+          Maps.<Connection<TTransport, InetSocketAddress>, Boolean>newIdentityHashMap());
+  private volatile int lastActiveConnectionsSize = 0;
+
+  private final Lock activeConnectionsWriteLock = new ReentrantLock(true);
+
+  /**
+   * Creates a thrift connection factory with a plain socket (non-framed transport).
+   * This is the same as calling {@link #ThriftConnectionFactory(String, int, int, boolean)} with
+   * {@code framedTransport} set to {@code false}.
+   *
+   * @param host Host to connect to.
+   * @param port Port to connect on.
+   * @param maxConnections Maximum number of connections for this host:port.
+   */
+  public ThriftConnectionFactory(String host, int port, int maxConnections) {
+    this(host, port, maxConnections, TransportType.BLOCKING);
+  }
+
+  /**
+   * Creates a thrift connection factory.
+   * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
+   * otherwise a raw {@link TSocket} will be used.
+   *
+   * @param host Host to connect to.
+   * @param port Port to connect on.
+   * @param maxConnections Maximum number of connections for this host:port.
+   * @param framedTransport Whether to use framed or blocking transport.
+   */
+  public ThriftConnectionFactory(String host, int port, int maxConnections,
+      boolean framedTransport) {
+
+    this(asEndpoint(host, port), maxConnections, TransportType.get(framedTransport, false));
+  }
+
+  /**
+   * Creates a thrift connection factory.
+   * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
+   * otherwise a raw {@link TSocket} will be used.
+   *
+   * @param endpoint Endpoint to connect to.
+   * @param maxConnections Maximum number of connections for this host:port.
+   * @param framedTransport Whether to use framed or blocking transport.
+   */
+  public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
+      boolean framedTransport) {
+
+    this(endpoint, maxConnections, TransportType.get(framedTransport, false));
+  }
+
+  /**
+   * Creates a thrift connection factory.
+   * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
+   * otherwise a raw {@link TSocket} will be used.
+   * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used,
+   * otherwise a raw {@link TSocket} will be used.
+   * Timeouts are ignored when nonblocking transport is used.
+   *
+   * @param host Host to connect to.
+   * @param port Port to connect on.
+   * @param maxConnections Maximum number of connections for this host:port.
+   * @param transportType Whether to use normal blocking, framed blocking, or non-blocking
+   *    (implicitly framed) transport.
+   */
+  public ThriftConnectionFactory(String host, int port, int maxConnections,
+      TransportType transportType) {
+    this(host, port, maxConnections, transportType, null);
+  }
+
+  /**
+   * Creates a thrift connection factory.
+   * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
+   * otherwise a raw {@link TSocket} will be used.
+   * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used,
+   * otherwise a raw {@link TSocket} will be used.
+   * Timeouts are ignored when nonblocking transport is used.
+   *
+   * @param host Host to connect to.
+   * @param port Port to connect on.
+   * @param maxConnections Maximum number of connections for this host:port.
+   * @param transportType Whether to use normal blocking, framed blocking, or non-blocking
+   *          (implicitly framed) transport.
+   * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o
+   *          the blocking client.
+   */
+  public ThriftConnectionFactory(String host, int port, int maxConnections,
+      TransportType transportType, Amount<Long, Time> socketTimeout) {
+    this(asEndpoint(host, port), maxConnections, transportType, socketTimeout);
+  }
+
+  public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
+      TransportType transportType) {
+    this(endpoint, maxConnections, transportType, null);
+  }
+
+  /**
+   * Creates a thrift connection factory.
+   * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used,
+   * otherwise a raw {@link TSocket} will be used.
+   * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used,
+   * otherwise a raw {@link TSocket} will be used.
+   * Timeouts are ignored when nonblocking transport is used.
+   *
+   * @param endpoint Endpoint to connect to.
+   * @param maxConnections Maximum number of connections for this host:port.
+   * @param transportType Whether to use normal blocking, framed blocking, or non-blocking
+   *          (implicitly framed) transport.
+   * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o
+   *          the blocking client.
+   */
+  public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
+      TransportType transportType, Amount<Long, Time> socketTimeout) {
+	  this(endpoint, maxConnections, transportType, socketTimeout,
+        Closures.<Connection<TTransport, InetSocketAddress>>noop(), false);
+  }
+
+  public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections,
+      TransportType transportType, Amount<Long, Time> socketTimeout,
+	  Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback,
+	  boolean sslTransport) {
+    Preconditions.checkArgument(maxConnections > 0, "maxConnections must be at least 1");
+    if (socketTimeout != null) {
+      Preconditions.checkArgument(socketTimeout.as(Time.MILLISECONDS) >= 0);
+    }
+
+    this.endpoint = Preconditions.checkNotNull(endpoint);
+    this.maxConnections = maxConnections;
+    this.transportType = transportType;
+    this.socketTimeout = socketTimeout;
+    this.postCreateCallback = Preconditions.checkNotNull(postCreateCallback);
+    this.sslTransport = sslTransport;
+  }
+
+  @Override
+  public boolean mightCreate() {
+    return lastActiveConnectionsSize < maxConnections;
+  }
+
+  /**
+   * FIXME:  shouldn't this throw TimeoutException instead of returning null
+   *         in the timeout cases as per the ConnectionFactory.create javadoc?
+   */
+  @Override
+  public Connection<TTransport, InetSocketAddress> create(Amount<Long, Time> timeout)
+      throws TTransportException, IOException {
+
+    Preconditions.checkNotNull(timeout);
+    if (timeout.getValue() == 0) {
+      return create();
+    }
+
+    try {
+      long timeRemainingNs = timeout.as(Time.NANOSECONDS);
+      long start = System.nanoTime();
+      if(activeConnectionsWriteLock.tryLock(timeRemainingNs, TimeUnit.NANOSECONDS)) {
+        try {
+          if (!willCreateSafe()) {
+            return null;
+          }
+
+          timeRemainingNs -= (System.nanoTime() - start);
+
+          return createConnection((int) TimeUnit.NANOSECONDS.toMillis(timeRemainingNs));
+        } finally {
+          activeConnectionsWriteLock.unlock();
+        }
+      } else {
+        return null;
+      }
+    } catch (InterruptedException e) {
+      return null;
+    }
+  }
+
+  private Connection<TTransport, InetSocketAddress> create()
+      throws TTransportException, IOException {
+    activeConnectionsWriteLock.lock();
+    try {
+      if (!willCreateSafe()) {
+        return null;
+      }
+
+      return createConnection(0);
+    } finally {
+      activeConnectionsWriteLock.unlock();
+    }
+  }
+
+  private Connection<TTransport, InetSocketAddress> createConnection(int timeoutMillis)
+      throws TTransportException, IOException {
+    TTransport transport = createTransport(timeoutMillis);
+    if (transport == null) {
+      return null;
+    }
+
+    Connection<TTransport, InetSocketAddress> connection =
+        new TTransportConnection(transport, endpoint);
+    postCreateCallback.execute(connection);
+    activeConnections.add(connection);
+    lastActiveConnectionsSize = activeConnections.size();
+    return connection;
+  }
+
+  private boolean willCreateSafe() {
+    return activeConnections.size() < maxConnections;
+  }
+
+  @VisibleForTesting
+  TTransport createTransport(int timeoutMillis) throws TTransportException, IOException {
+    TSocket socket = null;
+    if (transportType != TransportType.NONBLOCKING) {
+      // can't do a nonblocking create on a blocking transport
+      if (timeoutMillis <= 0) {
+        return null;
+      }
+
+      if (sslTransport) {
+        SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory.getDefault();
+        SSLSocket ssl_socket = (SSLSocket) factory.createSocket(endpoint.getHostName(), endpoint.getPort());
+        ssl_socket.setSoTimeout(timeoutMillis);
+        return new TSocket(ssl_socket);
+      } else {
+        socket = new TSocket(endpoint.getHostName(), endpoint.getPort(), timeoutMillis);
+      }
+    }
+
+    try {
+      switch (transportType) {
+        case BLOCKING:
+          socket.open();
+          setSocketTimeout(socket);
+          return socket;
+        case FRAMED:
+          TFramedTransport transport = new TFramedTransport(socket);
+          transport.open();
+          setSocketTimeout(socket);
+          return transport;
+        case NONBLOCKING:
+          try {
+            return new TNonblockingSocket(endpoint.getHostName(), endpoint.getPort());
+          } catch (IOException e) {
+            throw new IOException("Failed to create non-blocking transport to " + endpoint, e);
+          }
+      }
+    } catch (TTransportException e) {
+      throw new TTransportException("Failed to create transport to " + endpoint, e);
+    }
+
+    throw new IllegalArgumentException("unknown transport type " + transportType);
+  }
+
+  private void setSocketTimeout(TSocket socket) {
+    if (socketTimeout != null) {
+      socket.setTimeout(socketTimeout.as(Time.MILLISECONDS).intValue());
+    }
+  }
+
+  @Override
+  public void destroy(Connection<TTransport, InetSocketAddress> connection) {
+    activeConnectionsWriteLock.lock();
+    try {
+      boolean wasActiveConnection = activeConnections.remove(connection);
+      Preconditions.checkArgument(wasActiveConnection,
+          "connection %s not created by this factory", connection);
+      lastActiveConnectionsSize = activeConnections.size();
+    } finally {
+      activeConnectionsWriteLock.unlock();
+    }
+
+    // We close the connection outside the critical section which means we may have more connections
+    // "active" (open) than maxConnections for a very short time
+    connection.close();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s[%s]", getClass().getSimpleName(), endpoint);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/ThriftException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/thrift/ThriftException.java b/commons/src/main/java/com/twitter/common/thrift/ThriftException.java
new file mode 100644
index 0000000..b8e5949
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/thrift/ThriftException.java
@@ -0,0 +1,29 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.thrift;
+
+/**
+ * Exception class to wrap exceptions caught during thrift calls.
+ */
+public class ThriftException extends Exception {
+  public ThriftException(String message) {
+    super(message);
+  }
+  public ThriftException(String message, Throwable t) {
+    super(message, t);
+  }
+}


Mime
View raw message