Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A7C0C18260 for ; Tue, 25 Aug 2015 18:19:17 +0000 (UTC) Received: (qmail 74248 invoked by uid 500); 25 Aug 2015 18:19:17 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 74170 invoked by uid 500); 25 Aug 2015 18:19:17 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 72954 invoked by uid 99); 25 Aug 2015 18:19:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 18:19:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5DF67E7D9C; Tue, 25 Aug 2015 18:19:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zmanji@apache.org To: commits@aurora.apache.org Date: Tue, 25 Aug 2015 18:19:40 -0000 Message-Id: <032bc49882b4492c992c9ce7ea9b577b@git.apache.org> In-Reply-To: <2d86d301903d4a1c81757199842a5e58@git.apache.org> References: <2d86d301903d4a1c81757199842a5e58@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/37] aurora git commit: Import of Twitter Commons. http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java b/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java new file mode 100644 index 0000000..2887a50 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/ConnectionPool.java @@ -0,0 +1,337 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.twitter.common.base.Supplier; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.Stats; +import com.twitter.common.stats.StatsProvider; + +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A generic connection pool that delegates growth policy to a {@link ConnectionFactory} and + * connection choice to a supplied strategy. + * + *

TODO(John Sirois): implement a reaper to clean up connections that may become invalid when not in + * use. + * + *

TODO(John Sirois): take a ShutdownRegistry and register a close command + * + * @author John Sirois + */ +public final class ConnectionPool> implements ObjectPool { + + private static final Logger LOG = Logger.getLogger(ConnectionPool.class.getName()); + + private final Set leasedConnections = + Sets.newSetFromMap(Maps.newIdentityHashMap()); + private final Set availableConnections = Sets.newHashSet(); + private final Lock poolLock; + private final Condition available; + + private final ConnectionFactory connectionFactory; + private final Executor executor; + + private volatile boolean closed; + private final AtomicLong connectionsCreated; + private final AtomicLong connectionsDestroyed; + private final AtomicLong connectionsReturned; + + /** + * Creates a connection pool with a connection picker that selects the first item in the set of + * available connections, exporting statistics to stats provider {@link Stats#STATS_PROVIDER}. + * + * @param connectionFactory Factory to create and destroy connections. + */ + public ConnectionPool(ConnectionFactory connectionFactory) { + this(connectionFactory, Stats.STATS_PROVIDER); + } + + /** + * Creates a connection pool with a connection picker that selects the first item in the set of + * available connections and uses the supplied StatsProvider to register stats with. + * + * @param connectionFactory Factory to create and destroy connections. + * @param statsProvider Stats export provider. + */ + public ConnectionPool(ConnectionFactory connectionFactory, StatsProvider statsProvider) { + this(Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("CP-" + connectionFactory + "[%d]") + .setDaemon(true) + .build()), + new ReentrantLock(true), connectionFactory, statsProvider); + } + + @VisibleForTesting + ConnectionPool(Executor executor, Lock poolLock, ConnectionFactory connectionFactory, + StatsProvider statsProvider) { + Preconditions.checkNotNull(executor); + Preconditions.checkNotNull(poolLock); + Preconditions.checkNotNull(connectionFactory); + Preconditions.checkNotNull(statsProvider); + + this.executor = executor; + this.poolLock = poolLock; + available = poolLock.newCondition(); + this.connectionFactory = connectionFactory; + + String cfName = Stats.normalizeName(connectionFactory.toString()); + statsProvider.makeGauge("cp_leased_connections_" + cfName, + new Supplier() { + @Override public Integer get() { + return leasedConnections.size(); + } + }); + statsProvider.makeGauge("cp_available_connections_" + cfName, + new Supplier() { + @Override public Integer get() { + return availableConnections.size(); + } + }); + this.connectionsCreated = + statsProvider.makeCounter("cp_created_connections_" + cfName); + this.connectionsDestroyed = + statsProvider.makeCounter("cp_destroyed_connections_" + cfName); + this.connectionsReturned = + statsProvider.makeCounter("cp_returned_connections_" + cfName); + } + + @Override + public String toString() { + return "CP-" + connectionFactory; + } + + @Override + public S get() throws ResourceExhaustedException, TimeoutException { + checkNotClosed(); + poolLock.lock(); + try { + return leaseConnection(NO_TIMEOUT); + } finally { + poolLock.unlock(); + } + } + + @Override + public S get(Amount timeout) + throws ResourceExhaustedException, TimeoutException { + + checkNotClosed(); + Preconditions.checkNotNull(timeout); + if (timeout.getValue() == 0) { + return get(); + } + + try { + long start = System.nanoTime(); + long timeBudgetNs = timeout.as(Time.NANOSECONDS); + if (poolLock.tryLock(timeBudgetNs, TimeUnit.NANOSECONDS)) { + try { + timeBudgetNs -= (System.nanoTime() - start); + return leaseConnection(Amount.of(timeBudgetNs, Time.NANOSECONDS)); + } finally { + poolLock.unlock(); + } + } else { + throw new TimeoutException("Timed out waiting for pool lock"); + } + } catch (InterruptedException e) { + throw new TimeoutException("Interrupted waiting for pool lock"); + } + } + + private S leaseConnection(Amount timeout) throws ResourceExhaustedException, + TimeoutException { + S connection = getConnection(timeout); + if (connection == null) { + throw new ResourceExhaustedException("Connection pool resources exhausted"); + } + return leaseConnection(connection); + } + + @Override + public void release(S connection) { + release(connection, false); + } + + /** + * Equivalent to releasing a Connection with isValid() == false. + * @see ObjectPool#remove(Object) + */ + @Override + public void remove(S connection) { + release(connection, true); + } + + // TODO(John Sirois): release could block indefinitely if someone is blocked in get() on a create + // connection - reason about this and potentially submit release to our executor + private void release(S connection, boolean remove) { + poolLock.lock(); + try { + if (!leasedConnections.remove(connection)) { + throw new IllegalArgumentException("Connection not controlled by this connection pool: " + + connection); + } + + if (!closed && !remove && connection.isValid()) { + addConnection(connection); + connectionsReturned.incrementAndGet(); + } else { + connectionFactory.destroy(connection); + connectionsDestroyed.incrementAndGet(); + } + } finally { + poolLock.unlock(); + } + } + + @Override + public void close() { + poolLock.lock(); + try { + for (S availableConnection : availableConnections) { + connectionFactory.destroy(availableConnection); + } + } finally { + closed = true; + poolLock.unlock(); + } + } + + private void checkNotClosed() { + Preconditions.checkState(!closed); + } + + private S leaseConnection(S connection) { + leasedConnections.add(connection); + return connection; + } + + // TODO(John Sirois): pool growth is serialized by poolLock currently - it seems like this could be + // fixed but there may be no need - do gedankanalysis + private S getConnection(final Amount timeout) throws ResourceExhaustedException, + TimeoutException { + if (availableConnections.isEmpty()) { + if (leasedConnections.isEmpty()) { + // Completely empty pool + try { + return createConnection(timeout); + } catch (Exception e) { + throw new ResourceExhaustedException("failed to create a new connection", e); + } + } else { + // If the pool is allowed to grow - let the connection factory race a release + if (connectionFactory.mightCreate()) { + executor.execute(new Runnable() { + @Override public void run() { + try { + // The connection timeout is not needed here to honor the callers get requested + // timeout, but we don't want to have an infinite timeout which could exhaust a + // thread pool over many backgrounded create calls + S connection = createConnection(timeout); + if (connection != null) { + addConnection(connection); + } else { + LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client " + + "due to maximum pool size or timeout"); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to create a new connection for a waiting client", e); + } + } + }); + } + + try { + // We wait for a returned/new connection here in loops to guard against the + // "spurious wakeups" that are documented can occur with Condition.await() + if (timeout.getValue() == 0) { + while(availableConnections.isEmpty()) { + available.await(); + } + } else { + long timeRemainingNs = timeout.as(Time.NANOSECONDS); + while(availableConnections.isEmpty()) { + long start = System.nanoTime(); + if (!available.await(timeRemainingNs, TimeUnit.NANOSECONDS)) { + throw new TimeoutException( + "timeout waiting for a connection to be released to the pool"); + } else { + timeRemainingNs -= (System.nanoTime() - start); + } + } + if (availableConnections.isEmpty()) { + throw new TimeoutException( + "timeout waiting for a connection to be released to the pool"); + } + } + } catch (InterruptedException e) { + throw new TimeoutException("Interrupted while waiting for a connection."); + } + } + } + + return getAvailableConnection(); + } + + private S getAvailableConnection() { + S connection = (availableConnections.size() == 1) + ? Iterables.getOnlyElement(availableConnections) + : availableConnections.iterator().next(); + if (!availableConnections.remove(connection)) { + throw new IllegalArgumentException("Connection picked not in pool: " + connection); + } + return connection; + } + + private S createConnection(Amount timeout) throws Exception { + S connection = connectionFactory.create(timeout); + if (connection != null) { + connectionsCreated.incrementAndGet(); + } + return connection; + } + + private void addConnection(S connection) { + poolLock.lock(); + try { + availableConnections.add(connection); + available.signal(); + } finally { + poolLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java new file mode 100644 index 0000000..4c20604 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSet.java @@ -0,0 +1,79 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.google.common.collect.ImmutableSet; + +import com.twitter.common.base.Command; + +/** + * A host set that can be monitored for changes. + * + * @param The type that is used to identify members of the host set. + */ +public interface DynamicHostSet { + + /** + * Registers a monitor to receive change notices for this server set as long as this jvm process + * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. + * The monitor will be notified if the membership set or parameters of existing members have + * changed. + * + * @param monitor the server set monitor to call back when the host set changes + * @throws MonitorException if there is a problem monitoring the host set + * @deprecated Deprecated in favor of {@link #watch(HostChangeMonitor)} + */ + @Deprecated + public void monitor(final HostChangeMonitor monitor) throws MonitorException; + + /** + * Registers a monitor to receive change notices for this server set as long as this jvm process + * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. + * The monitor will be notified if the membership set or parameters of existing members have + * changed. + * + * @param monitor the server set monitor to call back when the host set changes + * @return A command which, when executed, will stop monitoring the host set. + * @throws MonitorException if there is a problem monitoring the host set + */ + public Command watch(final HostChangeMonitor monitor) throws MonitorException; + + /** + * An interface to an object that is interested in receiving notification whenever the host set + * changes. + */ + public static interface HostChangeMonitor { + + /** + * Called when either the available set of services changes (when a service dies or a new + * instance comes on-line) or when an existing service advertises a status or health change. + * + * @param hostSet the current set of available ServiceInstances + */ + void onChange(ImmutableSet hostSet); + } + + public static class MonitorException extends Exception { + public MonitorException(String msg) { + super(msg); + } + + public MonitorException(String msg, Throwable cause) { + super(msg, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java new file mode 100644 index 0000000..e9cc0f0 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/DynamicHostSetUtil.java @@ -0,0 +1,52 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.google.common.collect.ImmutableSet; + +import com.twitter.common.base.Command; + +import static com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor; +import static com.twitter.common.net.pool.DynamicHostSet.MonitorException; + +/** + * Utility methods for dealing with dynamic sets of hosts. + */ +public final class DynamicHostSetUtil { + + /** + * Gets a snapshot of a set of dynamic hosts (e.g. a ServerSet) and returns a readable copy of + * the underlying actual endpoints. + * + * @param hostSet The hostSet to snapshot. + * @throws MonitorException if there was a problem obtaining the snapshot. + */ + public static ImmutableSet getSnapshot(DynamicHostSet hostSet) throws MonitorException { + final ImmutableSet.Builder snapshot = ImmutableSet.builder(); + Command unwatch = hostSet.watch(new HostChangeMonitor() { + @Override public void onChange(ImmutableSet hostSet) { + snapshot.addAll(hostSet); + } + }); + unwatch.execute(); + return snapshot.build(); + } + + private DynamicHostSetUtil() { + // utility + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java b/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java new file mode 100644 index 0000000..dc9aa21 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/DynamicPool.java @@ -0,0 +1,172 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.twitter.common.base.Closure; +import com.twitter.common.net.loadbalancing.LoadBalancer; +import com.twitter.common.net.pool.DynamicHostSet.MonitorException; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +/** + * An ObjectPool that maintains a set of connections for a set of service endpoints defined by a + * {@link com.twitter.common.zookeeper.ServerSet}. + * + * @param The type that contains metadata information about hosts, such as liveness and address. + * @param The raw connection type that is being pooled. + * @param The type that identifies the endpoint of the pool, such as an address. + * @author John Sirois + */ +public class DynamicPool implements ObjectPool> { + + private final MetaPool pool; + + /** + * Creates a new ServerSetConnectionPool and blocks on an initial read and constructions of pools + * for the given {@code serverSet}. + * + * @param hostSet the dynamic set of available servers to pool connections for + * @param endpointPoolFactory a factory that can generate a connection pool for an endpoint + * @param loadBalancer Load balancer to manage request flow. + * @param onBackendsChosen A callback to notify of chosen backends. + * @param restoreInterval the interval after connection errors start occurring for a target to + * begin checking to see if it has come back to a healthy state + * @param endpointExtractor Function that transforms a service instance into an endpoint instance. + * @param livenessChecker Filter that will determine whether a host indicates itself as available. + * @throws MonitorException if there is a problem monitoring the host set + */ + public DynamicPool(DynamicHostSet hostSet, + Function>> endpointPoolFactory, + LoadBalancer loadBalancer, + Closure> onBackendsChosen, + Amount restoreInterval, + Function endpointExtractor, + Predicate livenessChecker) + throws DynamicHostSet.MonitorException { + Preconditions.checkNotNull(hostSet); + Preconditions.checkNotNull(endpointPoolFactory); + + pool = new MetaPool(loadBalancer, onBackendsChosen, restoreInterval); + + // TODO(John Sirois): consider an explicit start/stop + hostSet.monitor(new PoolMonitor>(endpointPoolFactory, endpointExtractor, + livenessChecker) { + @Override protected void onPoolRebuilt(Set>> deadPools, + Map>> livePools) { + poolRebuilt(deadPools, livePools); + } + }); + } + + @VisibleForTesting + void poolRebuilt(Set>> deadPools, + Map>> livePools) { + + pool.setBackends(livePools); + + for (ObjectPool> deadTargetPool : deadPools) { + deadTargetPool.close(); + } + } + + @Override + public Connection get() throws ResourceExhaustedException, TimeoutException { + return pool.get(); + } + + @Override + public Connection get(Amount timeout) + throws ResourceExhaustedException, TimeoutException { + return pool.get(timeout); + } + + @Override + public void release(Connection connection) { + pool.release(connection); + } + + @Override + public void remove(Connection connection) { + pool.remove(connection); + } + + @Override + public void close() { + pool.close(); + } + + private abstract class PoolMonitor> + implements DynamicHostSet.HostChangeMonitor { + + private final Function> endpointPoolFactory; + private final Function endpointExtractor; + private final Predicate livenessTest; + + public PoolMonitor(Function> endpointPoolFactory, + Function endpointExtractor, + Predicate livenessTest) { + this.endpointPoolFactory = endpointPoolFactory; + this.endpointExtractor = endpointExtractor; + this.livenessTest = livenessTest; + } + + private final Map> endpointPools = Maps.newHashMap(); + + @Override + public synchronized void onChange(ImmutableSet serverSet) { + // TODO(John Sirois): change onChange to pass the delta data since its already computed by + // ServerSet + + Map newEndpoints = + Maps.uniqueIndex(Iterables.filter(serverSet, livenessTest), endpointExtractor); + + Set deadEndpoints = ImmutableSet.copyOf( + Sets.difference(endpointPools.keySet(), newEndpoints.keySet())); + Set> deadPools = Sets.newHashSet(); + for (E endpoint : deadEndpoints) { + ObjectPool deadPool = endpointPools.remove(endpoint); + deadPools.add(deadPool); + } + + Set addedEndpoints = ImmutableSet.copyOf( + Sets.difference(newEndpoints.keySet(), endpointPools.keySet())); + for (E endpoint : addedEndpoints) { + ObjectPool endpointPool = endpointPoolFactory.apply(endpoint); + endpointPools.put(endpoint, endpointPool); + } + + onPoolRebuilt(deadPools, ImmutableMap.copyOf(endpointPools)); + } + + protected abstract void onPoolRebuilt(Set> deadPools, + Map> livePools); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java b/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java new file mode 100644 index 0000000..fb97632 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/MetaPool.java @@ -0,0 +1,343 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import com.twitter.common.base.Closure; +import com.twitter.common.base.Command; +import com.twitter.common.net.loadbalancing.LoadBalancer; +import com.twitter.common.net.loadbalancing.LoadBalancingStrategy.ConnectionResult; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +/** + * A connection pool that picks connections from a set of backend pools. Backend pools are selected + * from randomly initially but then as they are used they are ranked according to how many + * connections they have available and whether or not the last used connection had an error or not. + * In this way, backends that are responsive should get selected in preference to those that are + * not. + * + *

Non-responsive backends are monitored after a configurable period in a background thread and + * if a connection can be obtained they start to float back up in the rankings. In this way, + * backends that are initially non-responsive but later become responsive should end up getting + * selected. + * + *

TODO(John Sirois): take a ShutdownRegistry and register a close command + * + * @author John Sirois + */ +public class MetaPool implements ObjectPool> { + + private final Command stopBackendRestorer; + + private Map>> backends = null; + + // Locks to guard mutation of the backends set. + private final Lock backendsReadLock; + private final Lock backendsWriteLock; + + private final Closure> onBackendsChosen; + + private final LoadBalancer loadBalancer; + + /** + * Creates a connection pool with no backends. Backends may be added post-creation by calling + * {@link #setBackends(java.util.Map)} + * + * @param loadBalancer the load balancer to distribute requests among backends. + * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new + * set of backends to restrict its call distribution to + * @param restoreInterval the interval after a backend goes dead to begin checking the backend to + * see if it has come back to a healthy state + */ + public MetaPool(LoadBalancer loadBalancer, + Closure> onBackendsChosen, Amount restoreInterval) { + this(ImmutableMap.>>of(), loadBalancer, + onBackendsChosen, restoreInterval); + } + + /** + * Creates a connection pool that balances connections across multiple backend pools. + * + * @param backends the connection pools for the backends + * @param onBackendsChosen a callback to notify whenever the {@code loadBalancer} chooses a new + * set of backends to restrict its call distribution to + * @param loadBalancer the load balancer to distribute requests among backends. + * @param restoreInterval the interval after a backend goes dead to begin checking the backend to + * see if it has come back to a healthy state + */ + public MetaPool( + ImmutableMap>> backends, + LoadBalancer loadBalancer, + Closure> onBackendsChosen, Amount restoreInterval) { + + this.loadBalancer = Preconditions.checkNotNull(loadBalancer); + this.onBackendsChosen = Preconditions.checkNotNull(onBackendsChosen); + + ReadWriteLock backendsLock = new ReentrantReadWriteLock(true); + backendsReadLock = backendsLock.readLock(); + backendsWriteLock = backendsLock.writeLock(); + + setBackends(backends); + + Preconditions.checkNotNull(restoreInterval); + Preconditions.checkArgument(restoreInterval.getValue() > 0); + stopBackendRestorer = startDeadBackendRestorer(restoreInterval); + } + + /** + * Assigns the backend pools that this pool should draw from. + * + * @param pools New pools to use. + */ + public void setBackends(Map>> pools) { + backendsWriteLock.lock(); + try { + backends = Preconditions.checkNotNull(pools); + loadBalancer.offerBackends(pools.keySet(), onBackendsChosen); + } finally { + backendsWriteLock.unlock(); + } + } + + private Command startDeadBackendRestorer(final Amount restoreInterval) { + + final AtomicBoolean shouldRestore = new AtomicBoolean(true); + Runnable restoreDeadBackends = new Runnable() { + @Override public void run() { + if (shouldRestore.get()) { + restoreDeadBackends(restoreInterval); + } + } + }; + final ScheduledExecutorService scheduledExecutorService = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("MTCP-DeadBackendRestorer[%s]") + .build()); + long restoreDelay = restoreInterval.getValue(); + scheduledExecutorService.scheduleWithFixedDelay(restoreDeadBackends, restoreDelay, + restoreDelay, restoreInterval.getUnit().getTimeUnit()); + + return new Command() { + @Override public void execute() { + shouldRestore.set(false); + scheduledExecutorService.shutdownNow(); + LOG.info("Backend restorer shut down"); + } + }; + } + + private static final Logger LOG = Logger.getLogger(MetaPool.class.getName()); + + private void restoreDeadBackends(Amount restoreInterval) { + for (E backend : snapshotBackends()) { + ObjectPool> pool; + backendsReadLock.lock(); + try { + pool = backends.get(backend); + } finally { + backendsReadLock.unlock(); + } + + // We can lose a race if the backends change - and that's fine, we'll restore the new set of + // backends in the next scheduled restoration run. + if (pool != null) { + try { + release(get(backend, pool, restoreInterval)); + } catch (TimeoutException e) { + LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e); + } catch (ResourceExhaustedException e) { + LOG.warning("Backend restorer failed to revive backend: " + backend + " -> " + e); + } + } + } + } + + private Iterable snapshotBackends() { + backendsReadLock.lock(); + try { + return ImmutableList.copyOf(backends.keySet()); + } finally { + backendsReadLock.unlock(); + } + } + + @Override + public Connection get() throws ResourceExhaustedException, TimeoutException { + return get(ObjectPool.NO_TIMEOUT); + } + + @Override + public Connection get(Amount timeout) + throws ResourceExhaustedException, TimeoutException { + + E backend; + ObjectPool> pool; + + backendsReadLock.lock(); + try { + backend = loadBalancer.nextBackend(); + Preconditions.checkNotNull(backend, "Load balancer gave a null backend."); + + pool = backends.get(backend); + Preconditions.checkNotNull(backend, + "Given backend %s not found in tracked backends: %s", backend, backends); + } finally { + backendsReadLock.unlock(); + } + + return get(backend, pool, timeout); + } + + private static class ManagedConnection implements Connection { + private final Connection connection; + private final ObjectPool> pool; + + private ManagedConnection(Connection connection, ObjectPool> pool) { + this.connection = connection; + this.pool = pool; + } + + @Override + public void close() { + connection.close(); + } + + @Override + public T get() { + return connection.get(); + } + + @Override + public boolean isValid() { + return connection.isValid(); + } + + @Override + public E getEndpoint() { + return connection.getEndpoint(); + } + + @Override public String toString() { + return "ManagedConnection[" + connection.toString() + "]"; + } + + void release(boolean remove) { + if (remove) { + pool.remove(connection); + } else { + pool.release(connection); + } + } + } + + private Connection get(E backend, ObjectPool> pool, + Amount timeout) throws ResourceExhaustedException, TimeoutException { + + long startNanos = System.nanoTime(); + try { + Connection connection = (timeout.getValue() == 0) ? pool.get() : pool.get(timeout); + + // BEWARE: We have leased a connection from the underlying pool here and must return it to the + // caller so they can later release it. If we fail to do so, the connection will leak. + // Catching intermediate exceptions ourselves and pro-actively returning the connection to the + // pool before re-throwing is not a viable option since the return would have to succeed, + // forcing us to ignore the timeout passed in. + + // NOTE: LoadBalancer gracefully ignores backends it does not know about so even if we acquire + // a (backend, pool) pair atomically that has since been removed, we can safely let the lb + // know about backend events and it will just ignore us. + + try { + loadBalancer.connected(backend, System.nanoTime() - startNanos); + } catch (RuntimeException e) { + LOG.log(Level.WARNING, "Encountered an exception updating load balancer stats after " + + "leasing a connection - continuing", e); + } + return new ManagedConnection(connection, pool); + } catch (TimeoutException e) { + loadBalancer.connectFailed(backend, ConnectionResult.TIMEOUT); + throw e; + } catch (ResourceExhaustedException e) { + loadBalancer.connectFailed(backend, ConnectionResult.FAILED); + throw e; + } + } + + @Override + public void release(Connection connection) { + release(connection, false); + } + + /** + * Equivalent to releasing a Connection with isValid() == false. + * @see ObjectPool#remove(Object) + */ + @Override + public void remove(Connection connection) { + release(connection, true); + } + + private void release(Connection connection, boolean remove) { + backendsWriteLock.lock(); + try { + + if (!(connection instanceof ManagedConnection)) { + throw new IllegalArgumentException("Connection not controlled by this connection pool: " + + connection); + } + ((ManagedConnection) connection).release(remove); + + loadBalancer.released(connection.getEndpoint()); + } finally { + backendsWriteLock.unlock(); + } + } + + @Override + public void close() { + stopBackendRestorer.execute(); + + backendsWriteLock.lock(); + try { + for (ObjectPool> backend : backends.values()) { + backend.close(); + } + } finally { + backendsWriteLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java b/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java new file mode 100644 index 0000000..63bf788 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/ObjectPool.java @@ -0,0 +1,85 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +import java.util.concurrent.TimeoutException; + +/** + * A generic object pool that provides object of a given type for exclusive use by the caller. + * Object pools generally pool expensive resources and so offer a {@link #close} method that should + * be used to free these resources when the pool is no longer needed. + * + * @author John Sirois + */ +public interface ObjectPool { + + /** + * Gets a resource potentially blocking for as long as it takes to either create a new one or wait + * for one to be {@link #release(Object) released}. Callers must {@link #release(Object) release} + * the connection when they are done with it. + * + * @return a resource for exclusive use by the caller + * @throws ResourceExhaustedException if no resource could be obtained because this pool was + * exhausted + * @throws TimeoutException if we timed out while trying to fetch a resource + */ + T get() throws ResourceExhaustedException, TimeoutException; + + /** + * A convenience constant representing a no timeout. + */ + Amount NO_TIMEOUT = Amount.of(0L, Time.MILLISECONDS); + + /** + * Gets a resource; timing out if there are none available and it takes longer than specified to + * create a new one or wait for one to be {@link #release(Object) released}. Callers must + * {@link #release (Object) release} the connection when they are done with it. + * + * @param timeout the maximum amount of time to wait + * @return a resource for exclusive use by the caller + * @throws TimeoutException if the specified timeout was reached before a resource became + * available + * @throws ResourceExhaustedException if no resource could be obtained because this pool was + * exhausted + */ + T get(Amount timeout) throws ResourceExhaustedException, TimeoutException; + + /** + * Releases a resource obtained from this pool back into the pool of available resources. It is an + * error to release a resource not obtained from this pool. + * + * @param resource Resource to release. + */ + void release(T resource); + + /** + * Removes a resource obtained from this pool from its available resources. It is an error to + * remove a resource not obtained from this pool. + * + * @param resource Resource to remove. + */ + void remove(T resource); + + /** + * Disallows further gets from this pool, "closes" all idle objects and any outstanding objects + * when they are released. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java b/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java new file mode 100644 index 0000000..f2c38ec --- /dev/null +++ b/commons/src/main/java/com/twitter/common/net/pool/ResourceExhaustedException.java @@ -0,0 +1,30 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.net.pool; + +/** + * @author John Sirois + */ +public class ResourceExhaustedException extends Exception { + public ResourceExhaustedException(String message) { + super(message); + } + + public ResourceExhaustedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java b/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java new file mode 100644 index 0000000..97af48d --- /dev/null +++ b/commons/src/main/java/com/twitter/common/objectsize/ObjectSizeCalculator.java @@ -0,0 +1,430 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.objectsize; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryPoolMXBean; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Sets; + +/** + * Contains utility methods for calculating the memory usage of objects. It + * only works on the HotSpot JVM, and infers the actual memory layout (32 bit + * vs. 64 bit word size, compressed object pointers vs. uncompressed) from + * best available indicators. It can reliably detect a 32 bit vs. 64 bit JVM. + * It can only make an educated guess at whether compressed OOPs are used, + * though; specifically, it knows what the JVM's default choice of OOP + * compression would be based on HotSpot version and maximum heap sizes, but if + * the choice is explicitly overridden with the -XX:{+|-}UseCompressedOops command line + * switch, it can not detect + * this fact and will report incorrect sizes, as it will presume the default JVM + * behavior. + * + * @author Attila Szegedi + */ +public class ObjectSizeCalculator { + + /** + * Describes constant memory overheads for various constructs in a JVM implementation. + */ + public interface MemoryLayoutSpecification { + + /** + * Returns the fixed overhead of an array of any type or length in this JVM. + * + * @return the fixed overhead of an array. + */ + int getArrayHeaderSize(); + + /** + * Returns the fixed overhead of for any {@link Object} subclass in this JVM. + * + * @return the fixed overhead of any object. + */ + int getObjectHeaderSize(); + + /** + * Returns the quantum field size for a field owned by an object in this JVM. + * + * @return the quantum field size for an object. + */ + int getObjectPadding(); + + /** + * Returns the fixed size of an object reference in this JVM. + * + * @return the size of all object references. + */ + int getReferenceSize(); + + /** + * Returns the quantum field size for a field owned by one of an object's ancestor superclasses + * in this JVM. + * + * @return the quantum field size for a superclass field. + */ + int getSuperclassFieldPadding(); + } + + private static class CurrentLayout { + private static final MemoryLayoutSpecification SPEC = + getEffectiveMemoryLayoutSpecification(); + } + + /** + * Given an object, returns the total allocated size, in bytes, of the object + * and all other objects reachable from it. Attempts to to detect the current JVM memory layout, + * but may fail with {@link UnsupportedOperationException}; + * + * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do + * anything special, it measures the size of all objects + * reachable through it (which will include its class loader, and by + * extension, all other Class objects loaded by + * the same loader, and all the parent class loaders). It doesn't provide the + * size of the static fields in the JVM class that the Class object + * represents. + * @return the total allocated size of the object and all other objects it + * retains. + * @throws UnsupportedOperationException if the current vm memory layout cannot be detected. + */ + public static long getObjectSize(Object obj) throws UnsupportedOperationException { + return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj); + } + + // Fixed object header size for arrays. + private final int arrayHeaderSize; + // Fixed object header size for non-array objects. + private final int objectHeaderSize; + // Padding for the object size - if the object size is not an exact multiple + // of this, it is padded to the next multiple. + private final int objectPadding; + // Size of reference (pointer) fields. + private final int referenceSize; + // Padding for the fields of superclass before fields of subclasses are + // added. + private final int superclassFieldPadding; + + private final LoadingCache, ClassSizeInfo> classSizeInfos = + CacheBuilder.newBuilder().build(new CacheLoader, ClassSizeInfo>() { + public ClassSizeInfo load(Class clazz) { + return new ClassSizeInfo(clazz); + } + }); + + + private final Set alreadyVisited = Sets.newIdentityHashSet(); + private final Deque pending = new ArrayDeque(16 * 1024); + private long size; + + /** + * Creates an object size calculator that can calculate object sizes for a given + * {@code memoryLayoutSpecification}. + * + * @param memoryLayoutSpecification a description of the JVM memory layout. + */ + public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) { + Preconditions.checkNotNull(memoryLayoutSpecification); + arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize(); + objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize(); + objectPadding = memoryLayoutSpecification.getObjectPadding(); + referenceSize = memoryLayoutSpecification.getReferenceSize(); + superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding(); + } + + /** + * Given an object, returns the total allocated size, in bytes, of the object + * and all other objects reachable from it. + * + * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do + * anything special, it measures the size of all objects + * reachable through it (which will include its class loader, and by + * extension, all other Class objects loaded by + * the same loader, and all the parent class loaders). It doesn't provide the + * size of the static fields in the JVM class that the Class object + * represents. + * @return the total allocated size of the object and all other objects it + * retains. + */ + public synchronized long calculateObjectSize(Object obj) { + // Breadth-first traversal instead of naive depth-first with recursive + // implementation, so we don't blow the stack traversing long linked lists. + try { + for (;;) { + visit(obj); + if (pending.isEmpty()) { + return size; + } + obj = pending.removeFirst(); + } + } finally { + alreadyVisited.clear(); + pending.clear(); + size = 0; + } + } + + private void visit(Object obj) { + if (alreadyVisited.contains(obj)) { + return; + } + final Class clazz = obj.getClass(); + if (clazz == ArrayElementsVisitor.class) { + ((ArrayElementsVisitor) obj).visit(this); + } else { + alreadyVisited.add(obj); + if (clazz.isArray()) { + visitArray(obj); + } else { + classSizeInfos.getUnchecked(clazz).visit(obj, this); + } + } + } + + private void visitArray(Object array) { + final Class componentType = array.getClass().getComponentType(); + final int length = Array.getLength(array); + if (componentType.isPrimitive()) { + increaseByArraySize(length, getPrimitiveFieldSize(componentType)); + } else { + increaseByArraySize(length, referenceSize); + // If we didn't use an ArrayElementsVisitor, we would be enqueueing every + // element of the array here instead. For large arrays, it would + // tremendously enlarge the queue. In essence, we're compressing it into + // a small command object instead. This is different than immediately + // visiting the elements, as their visiting is scheduled for the end of + // the current queue. + switch (length) { + case 0: { + break; + } + case 1: { + enqueue(Array.get(array, 0)); + break; + } + default: { + enqueue(new ArrayElementsVisitor((Object[]) array)); + } + } + } + } + + private void increaseByArraySize(int length, long elementSize) { + increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding)); + } + + private static class ArrayElementsVisitor { + private final Object[] array; + + ArrayElementsVisitor(Object[] array) { + this.array = array; + } + + public void visit(ObjectSizeCalculator calc) { + for (Object elem : array) { + if (elem != null) { + calc.visit(elem); + } + } + } + } + + void enqueue(Object obj) { + if (obj != null) { + pending.addLast(obj); + } + } + + void increaseSize(long objectSize) { + size += objectSize; + } + + @VisibleForTesting + static long roundTo(long x, int multiple) { + return ((x + multiple - 1) / multiple) * multiple; + } + + private class ClassSizeInfo { + // Padded fields + header size + private final long objectSize; + // Only the fields size - used to calculate the subclasses' memory + // footprint. + private final long fieldsSize; + private final Field[] referenceFields; + + public ClassSizeInfo(Class clazz) { + long fieldsSize = 0; + final List referenceFields = new LinkedList(); + for (Field f : clazz.getDeclaredFields()) { + if (Modifier.isStatic(f.getModifiers())) { + continue; + } + final Class type = f.getType(); + if (type.isPrimitive()) { + fieldsSize += getPrimitiveFieldSize(type); + } else { + f.setAccessible(true); + referenceFields.add(f); + fieldsSize += referenceSize; + } + } + final Class superClass = clazz.getSuperclass(); + if (superClass != null) { + final ClassSizeInfo superClassInfo = classSizeInfos.getUnchecked(superClass); + fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding); + referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields)); + } + this.fieldsSize = fieldsSize; + this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding); + this.referenceFields = referenceFields.toArray( + new Field[referenceFields.size()]); + } + + void visit(Object obj, ObjectSizeCalculator calc) { + calc.increaseSize(objectSize); + enqueueReferencedObjects(obj, calc); + } + + public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) { + for (Field f : referenceFields) { + try { + calc.enqueue(f.get(obj)); + } catch (IllegalAccessException e) { + final AssertionError ae = new AssertionError( + "Unexpected denial of access to " + f); + ae.initCause(e); + throw ae; + } + } + } + } + + private static long getPrimitiveFieldSize(Class type) { + if (type == boolean.class || type == byte.class) { + return 1; + } + if (type == char.class || type == short.class) { + return 2; + } + if (type == int.class || type == float.class) { + return 4; + } + if (type == long.class || type == double.class) { + return 8; + } + throw new AssertionError("Encountered unexpected primitive type " + + type.getName()); + } + + @VisibleForTesting + static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() { + final String vmName = System.getProperty("java.vm.name"); + if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ") + || vmName.startsWith("OpenJDK") || vmName.startsWith("TwitterJDK"))) { + throw new UnsupportedOperationException( + "ObjectSizeCalculator only supported on HotSpot VM"); + } + + final String dataModel = System.getProperty("sun.arch.data.model"); + if ("32".equals(dataModel)) { + // Running with 32-bit data model + return new MemoryLayoutSpecification() { + @Override public int getArrayHeaderSize() { + return 12; + } + @Override public int getObjectHeaderSize() { + return 8; + } + @Override public int getObjectPadding() { + return 8; + } + @Override public int getReferenceSize() { + return 4; + } + @Override public int getSuperclassFieldPadding() { + return 4; + } + }; + } else if (!"64".equals(dataModel)) { + throw new UnsupportedOperationException("Unrecognized value '" + + dataModel + "' of sun.arch.data.model system property"); + } + + final String strVmVersion = System.getProperty("java.vm.version"); + final int vmVersion = Integer.parseInt(strVmVersion.substring(0, + strVmVersion.indexOf('.'))); + if (vmVersion >= 17) { + long maxMemory = 0; + for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) { + maxMemory += mp.getUsage().getMax(); + } + if (maxMemory < 30L * 1024 * 1024 * 1024) { + // HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total + // for all memory pools (yes, including code cache). + return new MemoryLayoutSpecification() { + @Override public int getArrayHeaderSize() { + return 16; + } + @Override public int getObjectHeaderSize() { + return 12; + } + @Override public int getObjectPadding() { + return 8; + } + @Override public int getReferenceSize() { + return 4; + } + @Override public int getSuperclassFieldPadding() { + return 4; + } + }; + } + } + + // In other cases, it's a 64-bit uncompressed OOPs object model + return new MemoryLayoutSpecification() { + @Override public int getArrayHeaderSize() { + return 24; + } + @Override public int getObjectHeaderSize() { + return 16; + } + @Override public int getObjectPadding() { + return 8; + } + @Override public int getReferenceSize() { + return 8; + } + @Override public int getSuperclassFieldPadding() { + return 8; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Amount.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/quantity/Amount.java b/commons/src/main/java/com/twitter/common/quantity/Amount.java new file mode 100644 index 0000000..5b7b904 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/quantity/Amount.java @@ -0,0 +1,211 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.quantity; + +import com.google.common.base.Preconditions; + +import com.twitter.common.collections.Pair; + +/** + * Represents a value in a unit system and facilitates unambiguous communication of amounts. + * Instances are created via static factory {@code of(...)} methods. + * + * @param the type of number the amount value is expressed in + * @param the type of unit that this amount quantifies + * + * @author John Sirois + */ +public abstract class Amount, U extends Unit> + implements Comparable> { + + /** + * Thrown when a checked operation on an amount would overflow. + */ + + public static class TypeOverflowException extends RuntimeException { + public TypeOverflowException() { + super(); + } + } + + private final Pair amount; + private final T maxValue; + + private Amount(T value, U unit, T maxValue) { + Preconditions.checkNotNull(value); + Preconditions.checkNotNull(unit); + this.maxValue = maxValue; + this.amount = Pair.of(value, unit); + } + + public T getValue() { + return amount.getFirst(); + } + + public U getUnit() { + return amount.getSecond(); + } + + public T as(U unit) { + return asUnit(unit); + } + + /** + * Throws TypeOverflowException if an overflow occurs during scaling. + */ + public T asChecked(U unit) { + T retVal = asUnit(unit); + if (retVal.equals(maxValue)) { + throw new TypeOverflowException(); + } + return retVal; + } + + private T asUnit(Unit unit) { + return sameUnits(unit) ? getValue() : scale(getUnit().multiplier() / unit.multiplier()); + } + + @Override + public int hashCode() { + return amount.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof Amount)) { + return false; + } + + Amount other = (Amount) obj; + return amount.equals(other.amount) || isSameAmount(other); + } + + private boolean isSameAmount(Amount other) { + // Equals allows Object - so we have no compile time check that other has the right value type; + // ie: make sure they don't have Integer when we have Long. + Number value = other.getValue(); + if (!getValue().getClass().isInstance(value)) { + return false; + } + + Unit unit = other.getUnit(); + if (!getUnit().getClass().isInstance(unit)) { + return false; + } + + @SuppressWarnings("unchecked") + U otherUnit = (U) other.getUnit(); + return isSameAmount(other, otherUnit); + } + + private boolean isSameAmount(Amount other, U otherUnit) { + // Compare in the more precise unit (the one with the lower multiplier). + if (otherUnit.multiplier() > getUnit().multiplier()) { + return getValue().equals(other.asUnit(getUnit())); + } else { + return as(otherUnit).equals(other.getValue()); + } + } + + @Override + public String toString() { + return amount.toString(); + } + + @Override + public int compareTo(Amount other) { + // Compare in the more precise unit (the one with the lower multiplier). + if (other.getUnit().multiplier() > getUnit().multiplier()) { + return getValue().compareTo(other.as(getUnit())); + } else { + return as(other.getUnit()).compareTo(other.getValue()); + } + } + + private boolean sameUnits(Unit> unit) { + return getUnit().equals(unit); + } + + protected abstract T scale(double multiplier); + + /** + * Creates an amount that uses a {@code double} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static > Amount of(double number, U unit) { + return new Amount(number, unit, Double.MAX_VALUE) { + @Override protected Double scale(double multiplier) { + return getValue() * multiplier; + } + }; + } + + /** + * Creates an amount that uses a {@code float} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static > Amount of(float number, U unit) { + return new Amount(number, unit, Float.MAX_VALUE) { + @Override protected Float scale(double multiplier) { + return (float) (getValue() * multiplier); + } + }; + } + + /** + * Creates an amount that uses a {@code long} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static > Amount of(long number, U unit) { + return new Amount(number, unit, Long.MAX_VALUE) { + @Override protected Long scale(double multiplier) { + return (long) (getValue() * multiplier); + } + }; + } + + /** + * Creates an amount that uses an {@code int} value. + * + * @param number the number of units the returned amount should quantify + * @param unit the unit the returned amount is expressed in terms of + * @param the type of unit that the returned amount quantifies + * @return an amount quantifying the given {@code number} of {@code unit}s + */ + public static > Amount of(int number, U unit) { + return new Amount(number, unit, Integer.MAX_VALUE) { + @Override protected Integer scale(double multiplier) { + return (int) (getValue() * multiplier); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Data.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/quantity/Data.java b/commons/src/main/java/com/twitter/common/quantity/Data.java new file mode 100644 index 0000000..de0e484 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/quantity/Data.java @@ -0,0 +1,54 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.quantity; + +/** + * Provides a unit to allow conversions and unambiguous passing around of data {@link Amount}s. + * The kilo/mega/giga/... hierarchy is built on base 2 so that the hierarchy increases by a factor + * of 1024 instead of 1000 as typical in metric units. Additionally, units are divided in 2 + * hierarchies one based on bits and the other on bytes. Thus {@link #Kb} represents kilobits; so + * 1 Kb = 1024 bits, and {@link #KB} represents kilobytes so 1 KB = 1024 bytes or 8192 bits. + * + * @author John Sirois + */ +public enum Data implements Unit { + BITS(1), + Kb(1024, BITS), + Mb(1024, Kb), + Gb(1024, Mb), + BYTES(8, BITS), + KB(1024, BYTES), + MB(1024, KB), + GB(1024, MB), + TB(1024, GB), + PB(1024, TB); + + private final double multiplier; + + private Data(double multiplier) { + this.multiplier = multiplier; + } + + private Data(double multiplier, Data base) { + this(multiplier * base.multiplier); + } + + @Override + public double multiplier() { + return multiplier; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/quantity/Time.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/quantity/Time.java b/commons/src/main/java/com/twitter/common/quantity/Time.java new file mode 100644 index 0000000..215b0a7 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/quantity/Time.java @@ -0,0 +1,65 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.quantity; + +import java.util.concurrent.TimeUnit; + +/** + * Provides a unit to allow conversions and unambiguous passing around of time {@link Amount}s. + * + * @author John Sirois + */ +public enum Time implements Unit