aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zma...@apache.org
Subject [26/37] aurora git commit: Import of Twitter Commons.
Date Tue, 25 Aug 2015 18:19:40 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java b/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java
new file mode 100644
index 0000000..2887a50
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java
@@ -0,0 +1,337 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.net.pool;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
+
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A generic connection pool that delegates growth policy to a {@link ConnectionFactory} and
+ * connection choice to a supplied strategy.
+ *
+ * <p>TODO(John Sirois): implement a reaper to clean up connections that may become invalid when not in
+ * use.
+ *
+ * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command
+ *
+ * @author John Sirois
+ */
+public final class ConnectionPool<S extends Connection<?, ?>> implements ObjectPool<S> {
+
+  private static final Logger LOG = Logger.getLogger(ConnectionPool.class.getName());
+
+  private final Set<S> leasedConnections =
+      Sets.newSetFromMap(Maps.<S, Boolean>newIdentityHashMap());
+  private final Set<S> availableConnections = Sets.newHashSet();
+  private final Lock poolLock;
+  private final Condition available;
+
+  private final ConnectionFactory<S> connectionFactory;
+  private final Executor executor;
+
+  private volatile boolean closed;
+  private final AtomicLong connectionsCreated;
+  private final AtomicLong connectionsDestroyed;
+  private final AtomicLong connectionsReturned;
+
+  /**
+   * Creates a connection pool with a connection picker that selects the first item in the set of
+   * available connections, exporting statistics to stats provider {@link Stats#STATS_PROVIDER}.
+   *
+   * @param connectionFactory Factory to create and destroy connections.
+   */
+  public ConnectionPool(ConnectionFactory<S> connectionFactory) {
+    this(connectionFactory, Stats.STATS_PROVIDER);
+  }
+
+  /**
+   * Creates a connection pool with a connection picker that selects the first item in the set of
+   * available connections and uses the supplied StatsProvider to register stats with.
+   *
+   * @param connectionFactory Factory to create and destroy connections.
+   * @param statsProvider Stats export provider.
+   */
+  public ConnectionPool(ConnectionFactory<S> connectionFactory, StatsProvider statsProvider) {
+    this(Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder()
+            .setNameFormat("CP-" + connectionFactory + "[%d]")
+            .setDaemon(true)
+            .build()),
+        new ReentrantLock(true), connectionFactory, statsProvider);
+  }
+
+  @VisibleForTesting
+  ConnectionPool(Executor executor, Lock poolLock, ConnectionFactory<S> connectionFactory,
+      StatsProvider statsProvider) {
+    Preconditions.checkNotNull(executor);
+    Preconditions.checkNotNull(poolLock);
+    Preconditions.checkNotNull(connectionFactory);
+    Preconditions.checkNotNull(statsProvider);
+
+    this.executor = executor;
+    this.poolLock = poolLock;
+    available = poolLock.newCondition();
+    this.connectionFactory = connectionFactory;
+
+    String cfName = Stats.normalizeName(connectionFactory.toString());
+    statsProvider.makeGauge("cp_leased_connections_" + cfName,
+        new Supplier<Integer>() {
+      @Override public Integer get() {
+        return leasedConnections.size();
+      }
+    });
+    statsProvider.makeGauge("cp_available_connections_" + cfName,
+        new Supplier<Integer>() {
+          @Override public Integer get() {
+            return availableConnections.size();
+          }
+        });
+    this.connectionsCreated =
+        statsProvider.makeCounter("cp_created_connections_" + cfName);
+    this.connectionsDestroyed =
+        statsProvider.makeCounter("cp_destroyed_connections_" + cfName);
+    this.connectionsReturned =
+        statsProvider.makeCounter("cp_returned_connections_" + cfName);
+  }
+
+  @Override
+  public String toString() {
+    return "CP-" + connectionFactory;
+  }
+
+  @Override
+  public S get() throws ResourceExhaustedException, TimeoutException {
+    checkNotClosed();
+    poolLock.lock();
+    try {
+      return leaseConnection(NO_TIMEOUT);
+    } finally {
+      poolLock.unlock();
+    }
+  }
+
+  @Override
+  public S get(Amount<Long, Time> timeout)
+      throws ResourceExhaustedException, TimeoutException {
+
+    checkNotClosed();
+    Preconditions.checkNotNull(timeout);
+    if (timeout.getValue() == 0) {
+      return get();
+    }
+
+    try {
+      long start = System.nanoTime();
+      long timeBudgetNs = timeout.as(Time.NANOSECONDS);
+      if (poolLock.tryLock(timeBudgetNs, TimeUnit.NANOSECONDS)) {
+        try {
+          timeBudgetNs -= (System.nanoTime() - start);
+          return leaseConnection(Amount.of(timeBudgetNs, Time.NANOSECONDS));
+        } finally {
+          poolLock.unlock();
+        }
+      } else {
+        throw new TimeoutException("Timed out waiting for pool lock");
+      }
+    } catch (InterruptedException e) {
+      throw new TimeoutException("Interrupted waiting for pool lock");
+    }
+  }
+
+  private S leaseConnection(Amount<Long, Time> timeout) throws ResourceExhaustedException,
+      TimeoutException {
+    S connection = getConnection(timeout);
+    if (connection == null) {
+      throw new ResourceExhaustedException("Connection pool resources exhausted");
+    }
+    return leaseConnection(connection);
+  }
+
+  @Override
+  public void release(S connection) {
+    release(connection, false);
+  }
+
+  /**
+   * Equivalent to releasing a Connection with isValid() == false.
+   * @see ObjectPool#remove(Object)
+   */
+  @Override
+  public void remove(S connection) {
+    release(connection, true);
+  }
+
+  // TODO(John Sirois): release could block indefinitely if someone is blocked in get() on a create
+  // connection - reason about this and potentially submit release to our executor
+  private void release(S connection, boolean remove) {
+    poolLock.lock();
+    try {
+      if (!leasedConnections.remove(connection)) {
+        throw new IllegalArgumentException("Connection not controlled by this connection pool: "
+                                           + connection);
+      }
+
+      if (!closed && !remove && connection.isValid()) {
+        addConnection(connection);
+        connectionsReturned.incrementAndGet();
+      } else {
+        connectionFactory.destroy(connection);
+        connectionsDestroyed.incrementAndGet();
+      }
+    } finally {
+      poolLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() {
+    poolLock.lock();
+    try {
+      for (S availableConnection : availableConnections) {
+        connectionFactory.destroy(availableConnection);
+      }
+    } finally {
+      closed = true;
+      poolLock.unlock();
+    }
+  }
+
+  private void checkNotClosed() {
+    Preconditions.checkState(!closed);
+  }
+
+  private S leaseConnection(S connection) {
+    leasedConnections.add(connection);
+    return connection;
+  }
+
+  // TODO(John Sirois): pool growth is serialized by poolLock currently - it seems like this could be
+  // fixed but there may be no need - do gedankanalysis
+  private S getConnection(final Amount<Long, Time> timeout) throws ResourceExhaustedException,
+      TimeoutException {
+    if (availableConnections.isEmpty()) {
+      if (leasedConnections.isEmpty()) {
+        // Completely empty pool
+        try {
+          return createConnection(timeout);
+        } catch (Exception e) {
+          throw new ResourceExhaustedException("failed to create a new connection", e);
+        }
+      } else {
+        // If the pool is allowed to grow - let the connection factory race a release
+        if (connectionFactory.mightCreate()) {
+          executor.execute(new Runnable() {
+            @Override public void run() {
+              try {
+                // The connection timeout is not needed here to honor the callers get requested
+                // timeout, but we don't want to have an infinite timeout which could exhaust a
+                // thread pool over many backgrounded create calls
+                S connection = createConnection(timeout);
+                if (connection != null) {
+                  addConnection(connection);
+                } else {
+                  LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client " +
+                      "due to maximum pool size or timeout");
+                }
+              } catch (Exception e) {
+                LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client", e);
+              }
+            }
+          });
+        }
+
+        try {
+          // We wait for a returned/new connection here in loops to guard against the
+          // "spurious wakeups" that are documented can occur with Condition.await()
+          if (timeout.getValue() == 0) {
+            while(availableConnections.isEmpty()) {
+              available.await();
+            }
+          } else {
+            long timeRemainingNs = timeout.as(Time.NANOSECONDS);
+            while(availableConnections.isEmpty()) {
+              long start = System.nanoTime();
+              if (!available.await(timeRemainingNs, TimeUnit.NANOSECONDS)) {
+                throw new TimeoutException(
+                    "timeout waiting for a connection to be released to the pool");
+              } else {
+                timeRemainingNs -= (System.nanoTime() - start);
+              }
+            }
+            if (availableConnections.isEmpty()) {
+              throw new TimeoutException(
+                  "timeout waiting for a connection to be released to the pool");
+            }
+          }
+        } catch (InterruptedException e) {
+          throw new TimeoutException("Interrupted while waiting for a connection.");
+        }
+      }
+    }
+
+    return getAvailableConnection();
+  }
+
+  private S getAvailableConnection() {
+    S connection = (availableConnections.size() == 1)
+        ? Iterables.getOnlyElement(availableConnections)
+        : availableConnections.iterator().next();
+    if (!availableConnections.remove(connection)) {
+      throw new IllegalArgumentException("Connection picked not in pool: " + connection);
+    }
+    return connection;
+  }
+
+  private S createConnection(Amount<Long, Time> timeout) throws Exception {
+    S connection = connectionFactory.create(timeout);
+    if (connection != null) {
+      connectionsCreated.incrementAndGet();
+    }
+    return connection;
+  }
+
+  private void addConnection(S connection) {
+    poolLock.lock();
+    try {
+      availableConnections.add(connection);
+      available.signal();
+    } finally {
+      poolLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java
new file mode 100644
index 0000000..4c20604
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java
@@ -0,0 +1,79 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.net.pool;
+
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.common.base.Command;
+
+/**
+ * A host set that can be monitored for changes.
+ *
+ * @param <T> The type that is used to identify members of the host set.
+ */
+public interface DynamicHostSet<T> {
+
+  /**
+   * Registers a monitor to receive change notices for this server set as long as this jvm process
+   * is alive.  Blocks until the initial server set can be gathered and delivered to the monitor.
+   * The monitor will be notified if the membership set or parameters of existing members have
+   * changed.
+   *
+   * @param monitor the server set monitor to call back when the host set changes
+   * @throws MonitorException if there is a problem monitoring the host set
+   * @deprecated Deprecated in favor of {@link #watch(HostChangeMonitor)}
+   */
+  @Deprecated
+  public void monitor(final HostChangeMonitor<T> monitor) throws MonitorException;
+
+  /**
+   * Registers a monitor to receive change notices for this server set as long as this jvm process
+   * is alive.  Blocks until the initial server set can be gathered and delivered to the monitor.
+   * The monitor will be notified if the membership set or parameters of existing members have
+   * changed.
+   *
+   * @param monitor the server set monitor to call back when the host set changes
+   * @return A command which, when executed, will stop monitoring the host set.
+   * @throws MonitorException if there is a problem monitoring the host set
+   */
+  public Command watch(final HostChangeMonitor<T> monitor) throws MonitorException;
+
+  /**
+   * An interface to an object that is interested in receiving notification whenever the host set
+   * changes.
+   */
+  public static interface HostChangeMonitor<T> {
+
+    /**
+     * Called when either the available set of services changes (when a service dies or a new
+     * instance comes on-line) or when an existing service advertises a status or health change.
+     *
+     * @param hostSet the current set of available ServiceInstances
+     */
+    void onChange(ImmutableSet<T> hostSet);
+  }
+
+  public static class MonitorException extends Exception {
+    public MonitorException(String msg) {
+      super(msg);
+    }
+
+    public MonitorException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java
new file mode 100644
index 0000000..e9cc0f0
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java
@@ -0,0 +1,52 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.net.pool;
+
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.common.base.Command;
+
+import static com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor;
+import static com.twitter.common.net.pool.DynamicHostSet.MonitorException;
+
+/**
+ * Utility methods for dealing with dynamic sets of hosts.
+ */
+public final class DynamicHostSetUtil {
+
+  /**
+   * Gets a snapshot of a set of dynamic hosts (e.g. a ServerSet) and returns a readable copy of
+   * the underlying actual endpoints.
+   *
+   * @param hostSet The hostSet to snapshot.
+   * @throws MonitorException if there was a problem obtaining the snapshot.
+   */
+  public static <T> ImmutableSet<T> getSnapshot(DynamicHostSet<T> hostSet) throws MonitorException {
+    final ImmutableSet.Builder<T> snapshot = ImmutableSet.builder();
+    Command unwatch = hostSet.watch(new HostChangeMonitor<T>() {
+      @Override public void onChange(ImmutableSet<T> hostSet) {
+        snapshot.addAll(hostSet);
+      }
+    });
+    unwatch.execute();
+    return snapshot.build();
+  }
+
+  private DynamicHostSetUtil() {
+    // utility
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java
new file mode 100644
index 0000000..dc9aa21
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java
@@ -0,0 +1,172 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.net.pool;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.twitter.common.base.Closure;
+import com.twitter.common.net.loadbalancing.LoadBalancer;
+import com.twitter.common.net.pool.DynamicHostSet.MonitorException;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An ObjectPool that maintains a set of connections for a set of service endpoints defined by a
+ * {@link com.twitter.common.zookeeper.ServerSet}.
+ *
+ * @param <H> The type that contains metadata information about hosts, such as liveness and address.
+ * @param <T> The raw connection type that is being pooled.
+ * @param <E> The type that identifies the endpoint of the pool, such as an address.
+ * @author John Sirois
+ */
+public class DynamicPool<H, T, E> implements ObjectPool<Connection<T, E>> {
+
+  private final MetaPool<T, E> pool;
+
+  /**
+   * Creates a new ServerSetConnectionPool and blocks on an initial read and constructions of pools
+   * for the given {@code serverSet}.
+   *
+   * @param hostSet the dynamic set of available servers to pool connections for
+   * @param endpointPoolFactory a factory that can generate a connection pool for an endpoint
+   * @param loadBalancer Load balancer to manage request flow.
+   * @param onBackendsChosen A callback to notify of chosen backends.
+   * @param restoreInterval the interval after connection errors start occurring for a target to
+   *     begin checking to see if it has come back to a healthy state
+   * @param endpointExtractor Function that transforms a service instance into an endpoint instance.
+   * @param livenessChecker Filter that will determine whether a host indicates itself as available.
+   * @throws MonitorException if there is a problem monitoring the host set
+   */
+  public DynamicPool(DynamicHostSet<H> hostSet,
+      Function<E, ObjectPool<Connection<T, E>>> endpointPoolFactory,
+      LoadBalancer<E> loadBalancer,
+      Closure<Collection<E>> onBackendsChosen,
+      Amount<Long, Time> restoreInterval,
+      Function<H, E> endpointExtractor,
+      Predicate<H> livenessChecker)
+      throws DynamicHostSet.MonitorException {
+    Preconditions.checkNotNull(hostSet);
+    Preconditions.checkNotNull(endpointPoolFactory);
+
+    pool = new MetaPool<T, E>(loadBalancer, onBackendsChosen, restoreInterval);
+
+    // TODO(John Sirois): consider an explicit start/stop
+    hostSet.monitor(new PoolMonitor<H, Connection<T, E>>(endpointPoolFactory, endpointExtractor,
+        livenessChecker) {
+      @Override protected void onPoolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools,
+          Map<E, ObjectPool<Connection<T, E>>> livePools) {
+        poolRebuilt(deadPools, livePools);
+      }
+    });
+  }
+
+  @VisibleForTesting
+  void poolRebuilt(Set<ObjectPool<Connection<T, E>>> deadPools,
+      Map<E, ObjectPool<Connection<T, E>>> livePools) {
+
+    pool.setBackends(livePools);
+
+    for (ObjectPool<Connection<T, E>> deadTargetPool : deadPools) {
+      deadTargetPool.close();
+    }
+  }
+
+  @Override
+  public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException {
+    return pool.get();
+  }
+
+  @Override
+  public Connection<T, E> get(Amount<Long, Time> timeout)
+      throws ResourceExhaustedException, TimeoutException {
+    return pool.get(timeout);
+  }
+
+  @Override
+  public void release(Connection<T, E> connection) {
+    pool.release(connection);
+  }
+
+  @Override
+  public void remove(Connection<T, E> connection) {
+    pool.remove(connection);
+  }
+
+  @Override
+  public void close() {
+    pool.close();
+  }
+
+  private abstract class PoolMonitor<H, S extends Connection<?, ?>>
+      implements DynamicHostSet.HostChangeMonitor<H> {
+
+    private final Function<E, ObjectPool<S>> endpointPoolFactory;
+    private final Function<H, E> endpointExtractor;
+    private final Predicate<H> livenessTest;
+
+    public PoolMonitor(Function<E, ObjectPool<S>> endpointPoolFactory,
+        Function<H, E> endpointExtractor,
+        Predicate<H> livenessTest) {
+      this.endpointPoolFactory = endpointPoolFactory;
+      this.endpointExtractor = endpointExtractor;
+      this.livenessTest = livenessTest;
+    }
+
+    private final Map<E, ObjectPool<S>> endpointPools = Maps.newHashMap();
+
+    @Override
+    public synchronized void onChange(ImmutableSet<H> serverSet) {
+      // TODO(John Sirois): change onChange to pass the delta data since its already computed by
+      // ServerSet
+
+      Map<E, H> newEndpoints =
+          Maps.uniqueIndex(Iterables.filter(serverSet, livenessTest), endpointExtractor);
+
+      Set<E> deadEndpoints = ImmutableSet.copyOf(
+          Sets.difference(endpointPools.keySet(), newEndpoints.keySet()));
+      Set<ObjectPool<S>> deadPools = Sets.newHashSet();
+      for (E endpoint : deadEndpoints) {
+        ObjectPool<S> deadPool = endpointPools.remove(endpoint);
+        deadPools.add(deadPool);
+      }
+
+      Set<E> addedEndpoints = ImmutableSet.copyOf(
+          Sets.difference(newEndpoints.keySet(), endpointPools.keySet()));
+      for (E endpoint : addedEndpoints) {
+        ObjectPool<S> endpointPool = endpointPoolFactory.apply(endpoint);
+        endpointPools.put(endpoint, endpointPool);
+      }
+
+      onPoolRebuilt(deadPools, ImmutableMap.copyOf(endpointPools));
+    }
+
+    protected abstract void onPoolRebuilt(Set<ObjectPool<S>> deadPools,
+        Map<E, ObjectPool<S>> livePools);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java b/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java
new file mode 100644
index 0000000..fb97632
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java
@@ -0,0 +1,343 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.net.pool;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.Command;
+import com.twitter.common.net.loadbalancing.LoadBalancer;
+import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * A connection pool that picks connections from a set of backend pools.  Backend pools are selected
+ * from randomly initially but then as they are used they are ranked according to how many
+ * connections they have available and whether or not the last used connection had an error or not.
+ * In this way, backends that are responsive should get selected in preference to those that are
+ * not.
+ *
+ * <p>Non-responsive backends are monitored after a configurable period in a background thread and
+ * if a connection can be obtained they start to float back up in the rankings.  In this way,
+ * backends that are initially non-responsive but later become responsive should end up getting
+ * selected.
+ *
+ * <p> TODO(John Sirois): take a ShutdownRegistry and register a close command
+ *
+ * @author John Sirois
+ */
+public class MetaPool<T, E> implements ObjectPool<Connection<T, E>> {
+
+  private final Command stopBackendRestorer;
+
+  private Map<E, ObjectPool<Connection<T, E>>> backends = null;
+
+  // Locks to guard mutation of the backends set.
+  private final Lock backendsReadLock;
+  private final Lock backendsWriteLock;
+
+  private final Closure<Collection<E>> onBackendsChosen;
+
+  private final LoadBalancer<E> loadBalancer;
+
+  /**
+   * Creates a connection pool with no backends.  Backends may be added post-creation by calling
+   * {@link #setBackends(java.util.Map)}
+   *
+   * @param loadBalancer the load balancer to distribute requests among backends.
+   * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new
+   *     set of backends to restrict its call distribution to
+   * @param restoreInterval the interval after a backend goes dead to begin checking the backend to
+   *     see if it has come back to a healthy state
+   */
+  public MetaPool(LoadBalancer<E> loadBalancer,
+      Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) {
+    this(ImmutableMap.<E, ObjectPool<Connection<T, E>>>of(), loadBalancer,
+        onBackendsChosen, restoreInterval);
+  }
+
+  /**
+   * Creates a connection pool that balances connections across multiple backend pools.
+   *
+   * @param backends the connection pools for the backends
+   * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new
+   *     set of backends to restrict its call distribution to
+   * @param loadBalancer the load balancer to distribute requests among backends.
+   * @param restoreInterval the interval after a backend goes dead to begin checking the backend to
+   *     see if it has come back to a healthy state
+   */
+  public MetaPool(
+      ImmutableMap<E, ObjectPool<Connection<T, E>>> backends,
+      LoadBalancer<E> loadBalancer,
+      Closure<Collection<E>> onBackendsChosen, Amount<Long, Time> restoreInterval) {
+
+    this.loadBalancer = Preconditions.checkNotNull(loadBalancer);
+    this.onBackendsChosen = Preconditions.checkNotNull(onBackendsChosen);
+
+    ReadWriteLock backendsLock = new ReentrantReadWriteLock(true);
+    backendsReadLock = backendsLock.readLock();
+    backendsWriteLock = backendsLock.writeLock();
+
+    setBackends(backends);
+
+    Preconditions.checkNotNull(restoreInterval);
+    Preconditions.checkArgument(restoreInterval.getValue() > 0);
+    stopBackendRestorer = startDeadBackendRestorer(restoreInterval);
+  }
+
+  /**
+   * Assigns the backend pools that this pool should draw from.
+   *
+   * @param pools New pools to use.
+   */
+  public void setBackends(Map<E, ObjectPool<Connection<T, E>>> pools) {
+    backendsWriteLock.lock();
+    try {
+      backends = Preconditions.checkNotNull(pools);
+      loadBalancer.offerBackends(pools.keySet(), onBackendsChosen);
+    } finally {
+      backendsWriteLock.unlock();
+    }
+  }
+
+  private Command startDeadBackendRestorer(final Amount<Long, Time> restoreInterval) {
+
+    final AtomicBoolean shouldRestore = new AtomicBoolean(true);
+    Runnable restoreDeadBackends = new Runnable() {
+      @Override public void run() {
+        if (shouldRestore.get()) {
+          restoreDeadBackends(restoreInterval);
+        }
+      }
+    };
+    final ScheduledExecutorService scheduledExecutorService =
+        Executors.newScheduledThreadPool(1,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("MTCP-DeadBackendRestorer[%s]")
+                .build());
+    long restoreDelay = restoreInterval.getValue();
+    scheduledExecutorService.scheduleWithFixedDelay(restoreDeadBackends, restoreDelay,
+        restoreDelay, restoreInterval.getUnit().getTimeUnit());
+
+    return new Command() {
+      @Override public void execute() {
+        shouldRestore.set(false);
+        scheduledExecutorService.shutdownNow();
+        LOG.info("Backend restorer shut down");
+      }
+    };
+  }
+
+  private static final Logger LOG = Logger.getLogger(MetaPool.class.getName());
+
+  private void restoreDeadBackends(Amount<Long, Time> restoreInterval) {
+    for (E backend : snapshotBackends()) {
+      ObjectPool<Connection<T, E>> pool;
+      backendsReadLock.lock();
+      try {
+        pool = backends.get(backend);
+      } finally {
+        backendsReadLock.unlock();
+      }
+
+      // We can lose a race if the backends change - and that's fine, we'll restore the new set of
+      // backends in the next scheduled restoration run.
+      if (pool != null) {
+        try {
+          release(get(backend, pool, restoreInterval));
+        } catch (TimeoutException e) {
+          LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e);
+        } catch (ResourceExhaustedException e) {
+          LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e);
+        }
+      }
+    }
+  }
+
+  private Iterable<E> snapshotBackends() {
+    backendsReadLock.lock();
+    try {
+      return ImmutableList.copyOf(backends.keySet());
+    } finally {
+      backendsReadLock.unlock();
+    }
+  }
+
+  @Override
+  public Connection<T, E> get() throws ResourceExhaustedException, TimeoutException {
+    return get(ObjectPool.NO_TIMEOUT);
+  }
+
+  @Override
+  public Connection<T, E> get(Amount<Long, Time> timeout)
+      throws ResourceExhaustedException, TimeoutException {
+
+    E backend;
+    ObjectPool<Connection<T, E>> pool;
+
+    backendsReadLock.lock();
+    try {
+      backend = loadBalancer.nextBackend();
+      Preconditions.checkNotNull(backend, "Load balancer gave a null backend.");
+
+      pool = backends.get(backend);
+      Preconditions.checkNotNull(backend,
+          "Given backend %s not found in tracked backends: %s", backend, backends);
+    } finally {
+      backendsReadLock.unlock();
+    }
+
+    return get(backend, pool, timeout);
+  }
+
+  private static class ManagedConnection<T, E> implements Connection<T, E> {
+    private final Connection<T, E> connection;
+    private final ObjectPool<Connection<T, E>> pool;
+
+    private ManagedConnection(Connection<T, E> connection, ObjectPool<Connection<T, E>> pool) {
+      this.connection = connection;
+      this.pool = pool;
+    }
+
+    @Override
+    public void close() {
+      connection.close();
+    }
+
+    @Override
+    public T get() {
+      return connection.get();
+    }
+
+    @Override
+    public boolean isValid() {
+      return connection.isValid();
+    }
+
+    @Override
+    public E getEndpoint() {
+      return connection.getEndpoint();
+    }
+
+    @Override public String toString() {
+      return "ManagedConnection[" + connection.toString() + "]";
+    }
+
+    void release(boolean remove) {
+      if (remove) {
+        pool.remove(connection);
+      } else {
+        pool.release(connection);
+      }
+    }
+  }
+
+  private Connection<T, E> get(E backend, ObjectPool<Connection<T, E>> pool,
+      Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException {
+
+    long startNanos = System.nanoTime();
+    try {
+      Connection<T, E> connection = (timeout.getValue() == 0) ? pool.get() : pool.get(timeout);
+
+      // BEWARE: We have leased a connection from the underlying pool here and must return it to the
+      // caller so they can later release it.  If we fail to do so, the connection will leak.
+      // Catching intermediate exceptions ourselves and pro-actively returning the connection to the
+      // pool before re-throwing is not a viable option since the return would have to succeed,
+      // forcing us to ignore the timeout passed in.
+
+      // NOTE: LoadBalancer gracefully ignores backends it does not know about so even if we acquire
+      // a (backend, pool) pair atomically that has since been removed, we can safely let the lb
+      // know about backend events and it will just ignore us.
+
+      try {
+        loadBalancer.connected(backend, System.nanoTime() - startNanos);
+      } catch (RuntimeException e) {
+        LOG.log(Level.WARNING, "Encountered an exception updating load balancer stats after "
+                               + "leasing a connection - continuing", e);
+      }
+      return new ManagedConnection<T, E>(connection, pool);
+    } catch (TimeoutException e) {
+      loadBalancer.connectFailed(backend, ConnectionResult.TIMEOUT);
+      throw e;
+    } catch (ResourceExhaustedException e) {
+      loadBalancer.connectFailed(backend, ConnectionResult.FAILED);
+      throw e;
+    }
+  }
+
+  @Override
+  public void release(Connection<T, E> connection) {
+    release(connection, false);
+  }
+
+  /**
+   * Equivalent to releasing a Connection with isValid() == false.
+   * @see ObjectPool#remove(Object)
+   */
+  @Override
+  public void remove(Connection<T, E> connection) {
+    release(connection, true);
+  }
+
+  private void release(Connection<T, E> connection, boolean remove) {
+    backendsWriteLock.lock();
+    try {
+
+      if (!(connection instanceof ManagedConnection)) {
+        throw new IllegalArgumentException("Connection not controlled by this connection pool: "
+                                           + connection);
+      }
+      ((ManagedConnection) connection).release(remove);
+
+      loadBalancer.released(connection.getEndpoint());
+    } finally {
+      backendsWriteLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() {
+    stopBackendRestorer.execute();
+
+    backendsWriteLock.lock();
+    try {
+      for (ObjectPool<Connection<T, E>> backend : backends.values()) {
+        backend.close();
+      }
+    } finally {
+      backendsWriteLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java b/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java
new file mode 100644
index 0000000..63bf788
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java
@@ -0,0 +1,85 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.net.pool;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A generic object pool that provides object of a given type for exclusive use by the caller.
+ * Object pools generally pool expensive resources and so offer a {@link #close} method that should
+ * be used to free these resources when the pool is no longer needed.
+ *
+ * @author John Sirois
+ */
+public interface ObjectPool<T> {
+
+  /**
+   * Gets a resource potentially blocking for as long as it takes to either create a new one or wait
+   * for one to be {@link #release(Object) released}.  Callers must {@link #release(Object) release}
+   * the connection when they are done with it.
+   *
+   * @return a resource for exclusive use by the caller
+   * @throws ResourceExhaustedException if no resource could be obtained because this pool was
+   *     exhausted
+   * @throws TimeoutException if we timed out while trying to fetch a resource
+   */
+  T get() throws ResourceExhaustedException, TimeoutException;
+
+  /**
+   * A convenience constant representing a no timeout.
+   */
+  Amount<Long,Time> NO_TIMEOUT = Amount.of(0L, Time.MILLISECONDS);
+
+  /**
+   * Gets a resource; timing out if there are none available and it takes longer than specified to
+   * create a new one or wait for one to be {@link #release(Object) released}.  Callers must
+   * {@link #release (Object) release} the connection when they are done with it.
+   *
+   * @param timeout the maximum amount of time to wait
+   * @return a resource for exclusive use by the caller
+   * @throws TimeoutException if the specified timeout was reached before a resource became
+   *     available
+   * @throws ResourceExhaustedException if no resource could be obtained because this pool was
+   *     exhausted
+   */
+  T get(Amount<Long, Time> timeout) throws ResourceExhaustedException, TimeoutException;
+
+  /**
+   * Releases a resource obtained from this pool back into the pool of available resources. It is an
+   * error to release a resource not obtained from this pool.
+   *
+   * @param resource Resource to release.
+   */
+  void release(T resource);
+
+  /**
+   * Removes a resource obtained from this pool from its available resources.  It is an error to
+   * remove a resource not obtained from this pool.
+   *
+   * @param resource Resource to remove.
+   */
+  void remove(T resource);
+
+  /**
+   * Disallows further gets from this pool, "closes" all idle objects and any outstanding objects
+   * when they are released.
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java b/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java
new file mode 100644
index 0000000..f2c38ec
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java
@@ -0,0 +1,30 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.net.pool;
+
+/**
+ * @author John Sirois
+ */
+public class ResourceExhaustedException extends Exception {
+  public ResourceExhaustedException(String message) {
+    super(message);
+  }
+
+  public ResourceExhaustedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java b/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java
new file mode 100644
index 0000000..97af48d
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java
@@ -0,0 +1,430 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.objectsize;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Sets;
+
+/**
+ * Contains utility methods for calculating the memory usage of objects. It
+ * only works on the HotSpot JVM, and infers the actual memory layout (32 bit
+ * vs. 64 bit word size, compressed object pointers vs. uncompressed) from
+ * best available indicators. It can reliably detect a 32 bit vs. 64 bit JVM.
+ * It can only make an educated guess at whether compressed OOPs are used,
+ * though; specifically, it knows what the JVM's default choice of OOP
+ * compression would be based on HotSpot version and maximum heap sizes, but if
+ * the choice is explicitly overridden with the <tt>-XX:{+|-}UseCompressedOops</tt> command line
+ * switch, it can not detect
+ * this fact and will report incorrect sizes, as it will presume the default JVM
+ * behavior.
+ *
+ * @author Attila Szegedi
+ */
+public class ObjectSizeCalculator {
+
+  /**
+   * Describes constant memory overheads for various constructs in a JVM implementation.
+   */
+  public interface MemoryLayoutSpecification {
+
+    /**
+     * Returns the fixed overhead of an array of any type or length in this JVM.
+     *
+     * @return the fixed overhead of an array.
+     */
+    int getArrayHeaderSize();
+
+    /**
+     * Returns the fixed overhead of for any {@link Object} subclass in this JVM.
+     *
+     * @return the fixed overhead of any object.
+     */
+    int getObjectHeaderSize();
+
+    /**
+     * Returns the quantum field size for a field owned by an object in this JVM.
+     *
+     * @return the quantum field size for an object.
+     */
+    int getObjectPadding();
+
+    /**
+     * Returns the fixed size of an object reference in this JVM.
+     *
+     * @return the size of all object references.
+     */
+    int getReferenceSize();
+
+    /**
+     * Returns the quantum field size for a field owned by one of an object's ancestor superclasses
+     * in this JVM.
+     *
+     * @return the quantum field size for a superclass field.
+     */
+    int getSuperclassFieldPadding();
+  }
+
+  private static class CurrentLayout {
+    private static final MemoryLayoutSpecification SPEC =
+        getEffectiveMemoryLayoutSpecification();
+  }
+
+  /**
+   * Given an object, returns the total allocated size, in bytes, of the object
+   * and all other objects reachable from it.  Attempts to to detect the current JVM memory layout,
+   * but may fail with {@link UnsupportedOperationException};
+   *
+   * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do
+   *          anything special, it measures the size of all objects
+   *          reachable through it (which will include its class loader, and by
+   *          extension, all other Class objects loaded by
+   *          the same loader, and all the parent class loaders). It doesn't provide the
+   *          size of the static fields in the JVM class that the Class object
+   *          represents.
+   * @return the total allocated size of the object and all other objects it
+   *         retains.
+   * @throws UnsupportedOperationException if the current vm memory layout cannot be detected.
+   */
+  public static long getObjectSize(Object obj) throws UnsupportedOperationException {
+    return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj);
+  }
+
+  // Fixed object header size for arrays.
+  private final int arrayHeaderSize;
+  // Fixed object header size for non-array objects.
+  private final int objectHeaderSize;
+  // Padding for the object size - if the object size is not an exact multiple
+  // of this, it is padded to the next multiple.
+  private final int objectPadding;
+  // Size of reference (pointer) fields.
+  private final int referenceSize;
+  // Padding for the fields of superclass before fields of subclasses are
+  // added.
+  private final int superclassFieldPadding;
+
+  private final LoadingCache<Class<?>, ClassSizeInfo> classSizeInfos =
+      CacheBuilder.newBuilder().build(new CacheLoader<Class<?>, ClassSizeInfo>() {
+        public ClassSizeInfo load(Class<?> clazz) {
+          return new ClassSizeInfo(clazz);
+        }
+      });
+
+
+  private final Set<Object> alreadyVisited = Sets.newIdentityHashSet();
+  private final Deque<Object> pending = new ArrayDeque<Object>(16 * 1024);
+  private long size;
+
+  /**
+   * Creates an object size calculator that can calculate object sizes for a given
+   * {@code memoryLayoutSpecification}.
+   *
+   * @param memoryLayoutSpecification a description of the JVM memory layout.
+   */
+  public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) {
+    Preconditions.checkNotNull(memoryLayoutSpecification);
+    arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize();
+    objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize();
+    objectPadding = memoryLayoutSpecification.getObjectPadding();
+    referenceSize = memoryLayoutSpecification.getReferenceSize();
+    superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding();
+  }
+
+  /**
+   * Given an object, returns the total allocated size, in bytes, of the object
+   * and all other objects reachable from it.
+   *
+   * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do
+   *          anything special, it measures the size of all objects
+   *          reachable through it (which will include its class loader, and by
+   *          extension, all other Class objects loaded by
+   *          the same loader, and all the parent class loaders). It doesn't provide the
+   *          size of the static fields in the JVM class that the Class object
+   *          represents.
+   * @return the total allocated size of the object and all other objects it
+   *         retains.
+   */
+  public synchronized long calculateObjectSize(Object obj) {
+    // Breadth-first traversal instead of naive depth-first with recursive
+    // implementation, so we don't blow the stack traversing long linked lists.
+    try {
+      for (;;) {
+        visit(obj);
+        if (pending.isEmpty()) {
+          return size;
+        }
+        obj = pending.removeFirst();
+      }
+    } finally {
+      alreadyVisited.clear();
+      pending.clear();
+      size = 0;
+    }
+  }
+
+  private void visit(Object obj) {
+    if (alreadyVisited.contains(obj)) {
+      return;
+    }
+    final Class<?> clazz = obj.getClass();
+    if (clazz == ArrayElementsVisitor.class) {
+      ((ArrayElementsVisitor) obj).visit(this);
+    } else {
+      alreadyVisited.add(obj);
+      if (clazz.isArray()) {
+        visitArray(obj);
+      } else {
+        classSizeInfos.getUnchecked(clazz).visit(obj, this);
+      }
+    }
+  }
+
+  private void visitArray(Object array) {
+    final Class<?> componentType = array.getClass().getComponentType();
+    final int length = Array.getLength(array);
+    if (componentType.isPrimitive()) {
+      increaseByArraySize(length, getPrimitiveFieldSize(componentType));
+    } else {
+      increaseByArraySize(length, referenceSize);
+      // If we didn't use an ArrayElementsVisitor, we would be enqueueing every
+      // element of the array here instead. For large arrays, it would
+      // tremendously enlarge the queue. In essence, we're compressing it into
+      // a small command object instead. This is different than immediately
+      // visiting the elements, as their visiting is scheduled for the end of
+      // the current queue.
+      switch (length) {
+        case 0: {
+          break;
+        }
+        case 1: {
+          enqueue(Array.get(array, 0));
+          break;
+        }
+        default: {
+          enqueue(new ArrayElementsVisitor((Object[]) array));
+        }
+      }
+    }
+  }
+
+  private void increaseByArraySize(int length, long elementSize) {
+    increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding));
+  }
+
+  private static class ArrayElementsVisitor {
+    private final Object[] array;
+
+    ArrayElementsVisitor(Object[] array) {
+      this.array = array;
+    }
+
+    public void visit(ObjectSizeCalculator calc) {
+      for (Object elem : array) {
+        if (elem != null) {
+          calc.visit(elem);
+        }
+      }
+    }
+  }
+
+  void enqueue(Object obj) {
+    if (obj != null) {
+      pending.addLast(obj);
+    }
+  }
+
+  void increaseSize(long objectSize) {
+    size += objectSize;
+  }
+
+  @VisibleForTesting
+  static long roundTo(long x, int multiple) {
+    return ((x + multiple - 1) / multiple) * multiple;
+  }
+
+  private class ClassSizeInfo {
+    // Padded fields + header size
+    private final long objectSize;
+    // Only the fields size - used to calculate the subclasses' memory
+    // footprint.
+    private final long fieldsSize;
+    private final Field[] referenceFields;
+
+    public ClassSizeInfo(Class<?> clazz) {
+      long fieldsSize = 0;
+      final List<Field> referenceFields = new LinkedList<Field>();
+      for (Field f : clazz.getDeclaredFields()) {
+        if (Modifier.isStatic(f.getModifiers())) {
+          continue;
+        }
+        final Class<?> type = f.getType();
+        if (type.isPrimitive()) {
+          fieldsSize += getPrimitiveFieldSize(type);
+        } else {
+          f.setAccessible(true);
+          referenceFields.add(f);
+          fieldsSize += referenceSize;
+        }
+      }
+      final Class<?> superClass = clazz.getSuperclass();
+      if (superClass != null) {
+        final ClassSizeInfo superClassInfo = classSizeInfos.getUnchecked(superClass);
+        fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding);
+        referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields));
+      }
+      this.fieldsSize = fieldsSize;
+      this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding);
+      this.referenceFields = referenceFields.toArray(
+          new Field[referenceFields.size()]);
+    }
+
+    void visit(Object obj, ObjectSizeCalculator calc) {
+      calc.increaseSize(objectSize);
+      enqueueReferencedObjects(obj, calc);
+    }
+
+    public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) {
+      for (Field f : referenceFields) {
+        try {
+          calc.enqueue(f.get(obj));
+        } catch (IllegalAccessException e) {
+          final AssertionError ae = new AssertionError(
+              "Unexpected denial of access to " + f);
+          ae.initCause(e);
+          throw ae;
+        }
+      }
+    }
+  }
+
+  private static long getPrimitiveFieldSize(Class<?> type) {
+    if (type == boolean.class || type == byte.class) {
+      return 1;
+    }
+    if (type == char.class || type == short.class) {
+      return 2;
+    }
+    if (type == int.class || type == float.class) {
+      return 4;
+    }
+    if (type == long.class || type == double.class) {
+      return 8;
+    }
+    throw new AssertionError("Encountered unexpected primitive type " +
+        type.getName());
+  }
+
+  @VisibleForTesting
+  static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() {
+    final String vmName = System.getProperty("java.vm.name");
+    if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ")
+        || vmName.startsWith("OpenJDK") || vmName.startsWith("TwitterJDK"))) {
+      throw new UnsupportedOperationException(
+          "ObjectSizeCalculator only supported on HotSpot VM");
+    }
+
+    final String dataModel = System.getProperty("sun.arch.data.model");
+    if ("32".equals(dataModel)) {
+      // Running with 32-bit data model
+      return new MemoryLayoutSpecification() {
+        @Override public int getArrayHeaderSize() {
+          return 12;
+        }
+        @Override public int getObjectHeaderSize() {
+          return 8;
+        }
+        @Override public int getObjectPadding() {
+          return 8;
+        }
+        @Override public int getReferenceSize() {
+          return 4;
+        }
+        @Override public int getSuperclassFieldPadding() {
+          return 4;
+        }
+      };
+    } else if (!"64".equals(dataModel)) {
+      throw new UnsupportedOperationException("Unrecognized value '" +
+          dataModel + "' of sun.arch.data.model system property");
+    }
+
+    final String strVmVersion = System.getProperty("java.vm.version");
+    final int vmVersion = Integer.parseInt(strVmVersion.substring(0,
+        strVmVersion.indexOf('.')));
+    if (vmVersion >= 17) {
+      long maxMemory = 0;
+      for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
+        maxMemory += mp.getUsage().getMax();
+      }
+      if (maxMemory < 30L * 1024 * 1024 * 1024) {
+        // HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total
+        // for all memory pools (yes, including code cache).
+        return new MemoryLayoutSpecification() {
+          @Override public int getArrayHeaderSize() {
+            return 16;
+          }
+          @Override public int getObjectHeaderSize() {
+            return 12;
+          }
+          @Override public int getObjectPadding() {
+            return 8;
+          }
+          @Override public int getReferenceSize() {
+            return 4;
+          }
+          @Override public int getSuperclassFieldPadding() {
+            return 4;
+          }
+        };
+      }
+    }
+
+    // In other cases, it's a 64-bit uncompressed OOPs object model
+    return new MemoryLayoutSpecification() {
+      @Override public int getArrayHeaderSize() {
+        return 24;
+      }
+      @Override public int getObjectHeaderSize() {
+        return 16;
+      }
+      @Override public int getObjectPadding() {
+        return 8;
+      }
+      @Override public int getReferenceSize() {
+        return 8;
+      }
+      @Override public int getSuperclassFieldPadding() {
+        return 8;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Amount.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/quantity/Amount.java b/commons/src/main/java/com/twitter/common/quantity/Amount.java
new file mode 100644
index 0000000..5b7b904
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/quantity/Amount.java
@@ -0,0 +1,211 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.quantity;
+
+import com.google.common.base.Preconditions;
+
+import com.twitter.common.collections.Pair;
+
+/**
+ * Represents a value in a unit system and facilitates unambiguous communication of amounts.
+ * Instances are created via static factory {@code of(...)} methods.
+ *
+ * @param <T> the type of number the amount value is expressed in
+ * @param <U> the type of unit that this amount quantifies
+ *
+ * @author John Sirois
+ */
+public abstract class Amount<T extends Number & Comparable<T>, U extends Unit<U>>
+    implements Comparable<Amount<T, U>> {
+
+  /**
+   * Thrown when a checked operation on an amount would overflow.
+   */
+
+  public static class TypeOverflowException extends RuntimeException {
+    public TypeOverflowException() {
+      super();
+    }
+  }
+
+  private final Pair<T, U> amount;
+  private final T maxValue;
+
+  private Amount(T value, U unit, T maxValue) {
+    Preconditions.checkNotNull(value);
+    Preconditions.checkNotNull(unit);
+    this.maxValue = maxValue;
+    this.amount = Pair.of(value, unit);
+  }
+
+  public T getValue() {
+    return amount.getFirst();
+  }
+
+  public U getUnit() {
+    return amount.getSecond();
+  }
+
+  public T as(U unit) {
+    return asUnit(unit);
+  }
+
+  /**
+   * Throws TypeOverflowException if an overflow occurs during scaling.
+   */
+  public T asChecked(U unit) {
+    T retVal = asUnit(unit);
+    if (retVal.equals(maxValue)) {
+      throw new TypeOverflowException();
+    }
+    return retVal;
+  }
+
+  private T asUnit(Unit<?> unit) {
+    return sameUnits(unit) ? getValue() : scale(getUnit().multiplier() / unit.multiplier());
+  }
+
+  @Override
+  public int hashCode() {
+    return amount.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof Amount)) {
+      return false;
+    }
+
+    Amount<?, ?> other = (Amount<?, ?>) obj;
+    return amount.equals(other.amount) || isSameAmount(other);
+  }
+
+  private boolean isSameAmount(Amount<?, ?> other) {
+    // Equals allows Object - so we have no compile time check that other has the right value type;
+    // ie: make sure they don't have Integer when we have Long.
+    Number value = other.getValue();
+    if (!getValue().getClass().isInstance(value)) {
+      return false;
+    }
+
+    Unit<?> unit = other.getUnit();
+    if (!getUnit().getClass().isInstance(unit)) {
+      return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    U otherUnit = (U) other.getUnit();
+    return isSameAmount(other, otherUnit);
+  }
+
+  private boolean isSameAmount(Amount<?, ?> other, U otherUnit) {
+    // Compare in the more precise unit (the one with the lower multiplier).
+    if (otherUnit.multiplier() > getUnit().multiplier()) {
+      return getValue().equals(other.asUnit(getUnit()));
+    } else {
+      return as(otherUnit).equals(other.getValue());
+    }
+  }
+
+  @Override
+  public String toString() {
+    return amount.toString();
+  }
+
+  @Override
+  public int compareTo(Amount<T, U> other) {
+    // Compare in the more precise unit (the one with the lower multiplier).
+    if (other.getUnit().multiplier() > getUnit().multiplier()) {
+      return getValue().compareTo(other.as(getUnit()));
+    } else {
+      return as(other.getUnit()).compareTo(other.getValue());
+    }
+  }
+
+  private boolean sameUnits(Unit<? extends Unit<?>> unit) {
+    return getUnit().equals(unit);
+  }
+
+  protected abstract T scale(double multiplier);
+
+  /**
+   * Creates an amount that uses a {@code double} value.
+   *
+   * @param number the number of units the returned amount should quantify
+   * @param unit the unit the returned amount is expressed in terms of
+   * @param <U> the type of unit that the returned amount quantifies
+   * @return an amount quantifying the given {@code number} of {@code unit}s
+   */
+  public static <U extends Unit<U>> Amount<Double, U> of(double number, U unit) {
+    return new Amount<Double, U>(number, unit, Double.MAX_VALUE) {
+      @Override protected Double scale(double multiplier) {
+        return getValue() * multiplier;
+      }
+    };
+  }
+
+  /**
+   * Creates an amount that uses a {@code float} value.
+   *
+   * @param number the number of units the returned amount should quantify
+   * @param unit the unit the returned amount is expressed in terms of
+   * @param <U> the type of unit that the returned amount quantifies
+   * @return an amount quantifying the given {@code number} of {@code unit}s
+   */
+  public static <U extends Unit<U>> Amount<Float, U> of(float number, U unit) {
+    return new Amount<Float, U>(number, unit, Float.MAX_VALUE) {
+      @Override protected Float scale(double multiplier) {
+        return (float) (getValue() * multiplier);
+      }
+    };
+  }
+
+  /**
+   * Creates an amount that uses a {@code long} value.
+   *
+   * @param number the number of units the returned amount should quantify
+   * @param unit the unit the returned amount is expressed in terms of
+   * @param <U> the type of unit that the returned amount quantifies
+   * @return an amount quantifying the given {@code number} of {@code unit}s
+   */
+  public static <U extends Unit<U>> Amount<Long, U> of(long number, U unit) {
+    return new Amount<Long, U>(number, unit, Long.MAX_VALUE) {
+      @Override protected Long scale(double multiplier) {
+        return (long) (getValue() * multiplier);
+      }
+    };
+  }
+
+  /**
+   * Creates an amount that uses an {@code int} value.
+   *
+   * @param number the number of units the returned amount should quantify
+   * @param unit the unit the returned amount is expressed in terms of
+   * @param <U> the type of unit that the returned amount quantifies
+   * @return an amount quantifying the given {@code number} of {@code unit}s
+   */
+  public static <U extends Unit<U>> Amount<Integer, U> of(int number, U unit) {
+    return new Amount<Integer, U>(number, unit, Integer.MAX_VALUE) {
+      @Override protected Integer scale(double multiplier) {
+        return (int) (getValue() * multiplier);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Data.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/quantity/Data.java b/commons/src/main/java/com/twitter/common/quantity/Data.java
new file mode 100644
index 0000000..de0e484
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/quantity/Data.java
@@ -0,0 +1,54 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.quantity;
+
+/**
+ * Provides a unit to allow conversions and unambiguous passing around of data {@link Amount}s.
+ * The kilo/mega/giga/... hierarchy is built on base 2 so that the hierarchy increases by a factor
+ * of 1024 instead of 1000 as typical in metric units.  Additionally, units are divided in 2
+ * hierarchies one based on bits and the other on bytes.  Thus {@link #Kb} represents kilobits; so
+ * 1 Kb = 1024 bits, and {@link #KB} represents kilobytes so 1 KB = 1024 bytes or 8192 bits.
+ *
+ * @author John Sirois
+ */
+public enum Data implements Unit<Data> {
+  BITS(1),
+  Kb(1024, BITS),
+  Mb(1024, Kb),
+  Gb(1024, Mb),
+  BYTES(8, BITS),
+  KB(1024, BYTES),
+  MB(1024, KB),
+  GB(1024, MB),
+  TB(1024, GB),
+  PB(1024, TB);
+
+  private final double multiplier;
+
+  private Data(double multiplier) {
+    this.multiplier = multiplier;
+  }
+
+  private Data(double multiplier, Data base) {
+    this(multiplier * base.multiplier);
+  }
+
+  @Override
+  public double multiplier() {
+    return multiplier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Time.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/quantity/Time.java b/commons/src/main/java/com/twitter/common/quantity/Time.java
new file mode 100644
index 0000000..215b0a7
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/quantity/Time.java
@@ -0,0 +1,65 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.quantity;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provides a unit to allow conversions and unambiguous passing around of time {@link Amount}s.
+ *
+ * @author John Sirois
+ */
+public enum Time implements Unit<Time> {
+  NANOSECONDS(1, TimeUnit.NANOSECONDS, "ns"),
+  MICROSECONDS(1000, NANOSECONDS, TimeUnit.MICROSECONDS, "us"),
+  MILLISECONDS(1000, MICROSECONDS, TimeUnit.MILLISECONDS, "ms"),
+  SECONDS(1000, MILLISECONDS, TimeUnit.SECONDS, "secs"),
+  MINUTES(60, SECONDS, TimeUnit.MINUTES, "mins"),
+  HOURS(60, MINUTES, TimeUnit.HOURS, "hrs"),
+  DAYS(24, HOURS, TimeUnit.DAYS, "days");
+
+  private final double multiplier;
+  private final TimeUnit timeUnit;
+  private final String display;
+
+  private Time(double multiplier, TimeUnit timeUnit, String display) {
+    this.multiplier = multiplier;
+    this.timeUnit = timeUnit;
+    this.display = display;
+  }
+
+  private Time(double multiplier, Time base, TimeUnit timeUnit, String display) {
+    this(multiplier * base.multiplier, timeUnit, display);
+  }
+
+  @Override
+  public double multiplier() {
+    return multiplier;
+  }
+
+  /**
+   * Returns the equivalent {@code TimeUnit}.
+   */
+  public TimeUnit getTimeUnit() {
+    return timeUnit;
+  }
+
+  @Override
+  public String toString() {
+    return display;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Unit.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/quantity/Unit.java b/commons/src/main/java/com/twitter/common/quantity/Unit.java
new file mode 100644
index 0000000..c64067c
--- /dev/null
+++ b/commons/src/main/java/com/twitter/common/quantity/Unit.java
@@ -0,0 +1,36 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.quantity;
+
+/**
+ * Represents a unit hierarchy for a given unit of measure; eg: time.  Instances represent specific
+ * units from the hierarchy; eg: seconds.
+ *
+ * @param <U> the type of the concrete unit implementation
+ *
+ * @author John Sirois
+ */
+public interface Unit<U extends Unit<U>> {
+
+  /**
+   * Returns the weight of this unit relative to other units in the same hierarchy.  Typically the
+   * smallest unit in the hierarchy returns 1, but this need not be the case.  It is only required
+   * that each unit of the hierarchy return a multiplier relative to a common base unit for the
+   * hierarchy.
+   */
+  double multiplier();
+}


Mime
View raw message