Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DF04218A21 for ; Wed, 26 Aug 2015 20:59:52 +0000 (UTC) Received: (qmail 99355 invoked by uid 500); 26 Aug 2015 20:59:52 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 99272 invoked by uid 500); 26 Aug 2015 20:59:52 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 98075 invoked by uid 99); 26 Aug 2015 20:59:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Aug 2015 20:59:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 87970E7D9C; Wed, 26 Aug 2015 20:59:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zmanji@apache.org To: commits@aurora.apache.org Date: Wed, 26 Aug 2015 21:00:08 -0000 Message-Id: <4ed7f508df95482aac00e0c38794ede0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/51] [partial] aurora git commit: Move packages from com.twitter.common to org.apache.aurora.common http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java new file mode 100644 index 0000000..84a0f4e --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/callers/RetryingCaller.java @@ -0,0 +1,224 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift.callers; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.aurora.common.thrift.TResourceExhaustedException; +import org.apache.thrift.async.AsyncMethodCallback; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.StatsProvider; + +/** +* A caller that will retry calls to the wrapped caller. +* +* @author William Farner +*/ +public class RetryingCaller extends CallerDecorator { + private static final Logger LOG = Logger.getLogger(RetryingCaller.class.getName()); + + @VisibleForTesting + public static final Amount NONBLOCKING_TIMEOUT = Amount.of(-1L, Time.MILLISECONDS); + + private final StatsProvider statsProvider; + private final String serviceName; + private final int retries; + private final ImmutableSet> retryableExceptions; + private final boolean debug; + + /** + * Creates a new retrying caller. The retrying caller will attempt to call invoked methods on the + * underlying caller at most {@code retries} times. A retry will be performed only when one of + * the {@code retryableExceptions} is caught. + * + * @param decoratedCall The caller to decorate with retries. + * @param async Whether the caller is asynchronous. + * @param statsProvider The stat provider to export retry statistics through. + * @param serviceName The service name that calls are being invoked on. + * @param retries The maximum number of retries to perform. + * @param retryableExceptions The exceptions that can be retried. + * @param debug Whether to include debugging information when retries are being performed. + */ + public RetryingCaller(Caller decoratedCall, boolean async, StatsProvider statsProvider, + String serviceName, int retries, ImmutableSet> retryableExceptions, + boolean debug) { + super(decoratedCall, async); + this.statsProvider = statsProvider; + this.serviceName = serviceName; + this.retries = retries; + this.retryableExceptions = retryableExceptions; + this.debug = debug; + } + + private final LoadingCache stats = + CacheBuilder.newBuilder().build(new CacheLoader() { + @Override public AtomicLong load(Method method) { + // Thrift does not support overloads - so just the name disambiguates all calls. + return statsProvider.makeCounter(serviceName + "_" + method.getName() + "_retries"); + } + }); + + @Override public Object call(final Method method, final Object[] args, + @Nullable final AsyncMethodCallback callback, + @Nullable final Amount connectTimeoutOverride) throws Throwable { + final AtomicLong retryCounter = stats.get(method); + final AtomicInteger attempts = new AtomicInteger(); + final List exceptions = Lists.newArrayList(); + + final ResultCapture capture = new ResultCapture() { + @Override public void success() { + // No-op. + } + + @Override public boolean fail(Throwable t) { + if (!isRetryable(t)) { + if (debug) { + LOG.warning(String.format( + "Call failed with un-retryable exception of [%s]: %s, previous exceptions: %s", + t.getClass().getName(), t.getMessage(), combineStackTraces(exceptions))); + } + + return true; + } else if (attempts.get() >= retries) { + exceptions.add(t); + + if (debug) { + LOG.warning(String.format("Retried %d times, last error: %s, exceptions: %s", + attempts.get(), t, combineStackTraces(exceptions))); + } + + return true; + } else { + exceptions.add(t); + + if (isAsync() && attempts.incrementAndGet() <= retries) { + try { + retryCounter.incrementAndGet(); + // override connect timeout in ThriftCaller to prevent blocking for a connection + // for async retries (since this is within the callback in the selector thread) + invoke(method, args, callback, this, NONBLOCKING_TIMEOUT); + } catch (Throwable throwable) { + return fail(throwable); + } + } + + return false; + } + } + }; + + boolean continueLoop; + do { + try { + // If this is an async call, the looping will be handled within the capture. + return invoke(method, args, callback, capture, connectTimeoutOverride); + } catch (Throwable t) { + if (!isRetryable(t)) { + Throwable propagated = t; + + if (!exceptions.isEmpty() && (t instanceof TResourceExhaustedException)) { + // If we've been trucking along through retries that have had remote call failures + // and we suddenly can't immediately get a connection on the next retry, throw the + // previous remote call failure - the idea here is that the remote call failure is + // more interesting than a transient inability to get an immediate connection. + propagated = exceptions.remove(exceptions.size() - 1); + } + + if (isAsync()) { + callback.onError((Exception) propagated); + } else { + throw propagated; + } + } + } + + continueLoop = !isAsync() && attempts.incrementAndGet() <= retries; + if (continueLoop) retryCounter.incrementAndGet(); + } while (continueLoop); + + Throwable lastRetriedException = Iterables.getLast(exceptions); + if (debug) { + if (!exceptions.isEmpty()) { + LOG.warning( + String.format("Retried %d times, last error: %s, previous exceptions: %s", + attempts.get(), lastRetriedException, combineStackTraces(exceptions))); + } else { + LOG.warning( + String.format("Retried 1 time, last error: %s", lastRetriedException)); + } + } + + if (!isAsync()) throw lastRetriedException; + return null; + } + + private boolean isRetryable(Throwable throwable) { + return isRetryable.getUnchecked(throwable.getClass()); + } + + private final LoadingCache, Boolean> isRetryable = + CacheBuilder.newBuilder().build(new CacheLoader, Boolean>() { + @Override public Boolean load(Class exceptionClass) { + return isRetryable(exceptionClass); + } + }); + + private boolean isRetryable(final Class exceptionClass) { + if (retryableExceptions.contains(exceptionClass)) { + return true; + } + return Iterables.any(retryableExceptions, new Predicate>() { + @Override public boolean apply(Class retryableExceptionClass) { + return retryableExceptionClass.isAssignableFrom(exceptionClass); + } + }); + } + + private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n'); + + private static String combineStackTraces(List exceptions) { + if (exceptions.isEmpty()) { + return "none"; + } else { + return STACK_TRACE_JOINER.join(Iterables.transform(exceptions, + new Function() { + private int index = 1; + @Override public String apply(Throwable exception) { + return String.format("[%d] %s", + index++, Throwables.getStackTraceAsString(exception)); + } + })); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/callers/StatTrackingCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/StatTrackingCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/StatTrackingCaller.java new file mode 100644 index 0000000..514e665 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/callers/StatTrackingCaller.java @@ -0,0 +1,103 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift.callers; + +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +import org.apache.aurora.common.thrift.TResourceExhaustedException; +import org.apache.aurora.common.thrift.TTimeoutException; +import org.apache.thrift.async.AsyncMethodCallback; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.stats.StatsProvider.RequestTimer; + +/** + * A caller that exports statistics about calls made to the wrapped caller. + * + * @author William Farner + */ +public class StatTrackingCaller extends CallerDecorator { + + private final StatsProvider statsProvider; + private final String serviceName; + + private final LoadingCache stats = + CacheBuilder.newBuilder().build(new CacheLoader() { + @Override public RequestTimer load(Method method) { + // Thrift does not support overloads - so just the name disambiguates all calls. + return statsProvider.makeRequestTimer(serviceName + "_" + method.getName()); + } + }); + + /** + * Creates a new stat tracking caller, which will export stats to the given {@link StatsProvider}. + * + * @param decoratedCaller The caller to decorate with a deadline. + * @param async Whether the caller is asynchronous. + * @param statsProvider The stat provider to export statistics to. + * @param serviceName The name of the service that methods are being called on. + */ + public StatTrackingCaller(Caller decoratedCaller, boolean async, StatsProvider statsProvider, + String serviceName) { + super(decoratedCaller, async); + + this.statsProvider = statsProvider; + this.serviceName = serviceName; + } + + @Override + public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback, + @Nullable Amount connectTimeoutOverride) throws Throwable { + final RequestTimer requestStats = stats.get(method); + final long startTime = System.nanoTime(); + + ResultCapture capture = new ResultCapture() { + @Override public void success() { + requestStats.requestComplete(TimeUnit.NANOSECONDS.toMicros( + System.nanoTime() - startTime)); + } + + @Override public boolean fail(Throwable t) { + // TODO(John Sirois): the ruby client reconnects for timeouts too - this provides a natural + // backoff mechanism - consider how to plumb something similar. + if (t instanceof TTimeoutException || t instanceof TimeoutException) { + requestStats.incTimeouts(); + return true; + } + + // TODO(John Sirois): consider ditching reconnects since its nearly redundant with errors as + // it stands. + if (!(t instanceof TResourceExhaustedException)) { + requestStats.incReconnects(); + } + // TODO(John Sirois): provide more detailed stats: track counts for distinct exceptions types, + // track retries-per-method, etc... + requestStats.incErrors(); + return true; + } + }; + + return invoke(method, args, callback, capture, connectTimeoutOverride); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/callers/ThriftCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/ThriftCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/ThriftCaller.java new file mode 100644 index 0000000..4e62940 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/callers/ThriftCaller.java @@ -0,0 +1,157 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift.callers; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.aurora.common.net.pool.Connection; +import org.apache.aurora.common.net.pool.ObjectPool; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.net.pool.ResourceExhaustedException; +import org.apache.aurora.common.thrift.TResourceExhaustedException; +import org.apache.aurora.common.thrift.TTimeoutException; +import org.apache.aurora.common.net.loadbalancing.RequestTracker; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.transport.TTransport; + +import javax.annotation.Nullable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +/** + * A caller that issues calls to a target that is assumed to be a client to a thrift service. + * + * @author William Farner + */ +public class ThriftCaller implements Caller { + private static final Logger LOG = Logger.getLogger(ThriftCaller.class.getName()); + + private final ObjectPool> connectionPool; + private final RequestTracker requestTracker; + private final Function clientFactory; + private final Amount timeout; + private final boolean debug; + + /** + * Creates a new thrift caller. + * + * @param connectionPool The connection pool to use. + * @param requestTracker The request tracker to nofify of request results. + * @param clientFactory Factory to use for building client object instances. + * @param timeout The timeout to use when requesting objects from the connection pool. + * @param debug Whether to use the caller in debug mode. + */ + public ThriftCaller(ObjectPool> connectionPool, + RequestTracker requestTracker, Function clientFactory, + Amount timeout, boolean debug) { + + this.connectionPool = connectionPool; + this.requestTracker = requestTracker; + this.clientFactory = clientFactory; + this.timeout = timeout; + this.debug = debug; + } + + @Override + public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback, + @Nullable Amount connectTimeoutOverride) throws Throwable { + + final Connection connection = getConnection(connectTimeoutOverride); + final long startNanos = System.nanoTime(); + + ResultCapture capture = new ResultCapture() { + @Override public void success() { + try { + requestTracker.requestResult(connection.getEndpoint(), + RequestTracker.RequestResult.SUCCESS, System.nanoTime() - startNanos); + } finally { + connectionPool.release(connection); + } + } + + @Override public boolean fail(Throwable t) { + if (debug) { + LOG.warning(String.format("Call to endpoint: %s failed: %s", connection, t)); + } + + try { + requestTracker.requestResult(connection.getEndpoint(), + RequestTracker.RequestResult.FAILED, System.nanoTime() - startNanos); + } finally { + connectionPool.remove(connection); + } + return true; + } + }; + + return invokeMethod(clientFactory.apply(connection.get()), method, args, callback, capture); + } + + private static Object invokeMethod(Object target, Method method, Object[] args, + AsyncMethodCallback callback, final ResultCapture capture) throws Throwable { + + // Swap the wrapped callback out for ours. + if (callback != null) { + callback = new WrappedMethodCallback(callback, capture); + + List argsList = Lists.newArrayList(args); + argsList.add(callback); + args = argsList.toArray(); + } + + try { + Object result = method.invoke(target, args); + if (callback == null) capture.success(); + + return result; + } catch (InvocationTargetException t) { + // We allow this one to go to both sync and async captures. + if (callback != null) { + callback.onError((Exception) t.getCause()); + return null; + } else { + capture.fail(t.getCause()); + throw t.getCause(); + } + } + } + + private Connection getConnection( + Amount connectTimeoutOverride) + throws TResourceExhaustedException, TTimeoutException { + try { + Connection connection; + if (connectTimeoutOverride != null) { + connection = connectionPool.get(connectTimeoutOverride); + } else { + connection = (timeout.getValue() > 0) + ? connectionPool.get(timeout) : connectionPool.get(); + } + + if (connection == null) { + throw new TResourceExhaustedException("no connection was available"); + } + return connection; + } catch (ResourceExhaustedException e) { + throw new TResourceExhaustedException(e); + } catch (TimeoutException e) { + throw new TTimeoutException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java b/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java new file mode 100644 index 0000000..a14f53a4 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java @@ -0,0 +1,80 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift.monitoring; + +import com.google.common.base.Preconditions; +import org.apache.aurora.common.net.monitoring.ConnectionMonitor; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TTransportException; + +import java.net.InetSocketAddress; + +/** + * Extension of TNonblockingServerSocket that allows for tracking of connected clients. + * + * @author William Farner + */ +public class TMonitoredNonblockingServerSocket extends TNonblockingServerSocket { + private final ConnectionMonitor monitor; + + public TMonitoredNonblockingServerSocket(int port, ConnectionMonitor monitor) + throws TTransportException { + super(port); + this.monitor = Preconditions.checkNotNull(monitor); + } + + public TMonitoredNonblockingServerSocket(int port, int clientTimeout, ConnectionMonitor monitor) + throws TTransportException { + super(port, clientTimeout); + this.monitor = Preconditions.checkNotNull(monitor); + } + + public TMonitoredNonblockingServerSocket(InetSocketAddress bindAddr, ConnectionMonitor monitor) + throws TTransportException { + super(bindAddr); + this.monitor = Preconditions.checkNotNull(monitor); + } + + public TMonitoredNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout, + ConnectionMonitor monitor) throws TTransportException { + super(bindAddr, clientTimeout); + this.monitor = Preconditions.checkNotNull(monitor); + } + + @Override + protected TNonblockingSocket acceptImpl() throws TTransportException { + /* TODO(William Farner): Finish implementing...may require an object proxy. + final TNonblockingSocket socket = super.acceptImpl(); + + TNonblockingSocket wrappedSocket = new TNonblockingSocket(socket.get) { + @Override public void close() { + super.close(); + monitor.disconnected(this); + } + }; + + monitor.connected(wrappedSocket, socket.getSocket().getInetAddress()); + + return wrappedSocket; + + */ + return super.acceptImpl(); + } + + @Override + public void close() { + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredProcessor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredProcessor.java b/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredProcessor.java new file mode 100644 index 0000000..a0d7d5f --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredProcessor.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift.monitoring; + +import com.google.common.base.Preconditions; +import org.apache.aurora.common.net.loadbalancing.RequestTracker; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; + +import java.net.InetSocketAddress; + +import static org.apache.aurora.common.net.loadbalancing.RequestTracker.RequestResult.*; + +/** + * A TProcessor that joins a wrapped TProcessor with a monitor. + * + * @author William Farner + */ +public class TMonitoredProcessor implements TProcessor { + private final TProcessor wrapped; + private final TMonitoredServerSocket monitoredServerSocket; + private final RequestTracker monitor; + + public TMonitoredProcessor(TProcessor wrapped, TMonitoredServerSocket monitoredServerSocket, + RequestTracker monitor) { + this.wrapped = Preconditions.checkNotNull(wrapped); + this.monitoredServerSocket = Preconditions.checkNotNull(monitoredServerSocket); + this.monitor = Preconditions.checkNotNull(monitor); + } + + @Override + public boolean process(TProtocol in, TProtocol out) throws TException { + long startNanos = System.nanoTime(); + boolean exceptionThrown = false; + try { + return wrapped.process(in, out); + } catch (TException e) { + exceptionThrown = true; + throw e; + } finally { + InetSocketAddress address = monitoredServerSocket.getAddress((TSocket) in.getTransport()); + Preconditions.checkState(address != null, + "Address unknown for transport " + in.getTransport()); + + monitor.requestResult(address, exceptionThrown ? FAILED : SUCCESS, + System.nanoTime() - startNanos); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredServerSocket.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredServerSocket.java b/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredServerSocket.java new file mode 100644 index 0000000..f4405c4 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredServerSocket.java @@ -0,0 +1,111 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift.monitoring; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.aurora.common.net.monitoring.ConnectionMonitor; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.Collections; +import java.util.Map; + +/** + * Extension of TServerSocket that allows for tracking of connected clients. + * + * @author William Farner + */ +public class TMonitoredServerSocket extends TServerSocket { + private ConnectionMonitor monitor; + + public TMonitoredServerSocket(ServerSocket serverSocket, + ConnectionMonitor monitor) { + super(serverSocket); + this.monitor = Preconditions.checkNotNull(monitor); + } + + public TMonitoredServerSocket(ServerSocket serverSocket, int clientTimeout, + ConnectionMonitor monitor) { + super(serverSocket, clientTimeout); + this.monitor = Preconditions.checkNotNull(monitor); + } + + public TMonitoredServerSocket(int port, ConnectionMonitor monitor) + throws TTransportException { + super(port); + this.monitor = Preconditions.checkNotNull(monitor); + } + + public TMonitoredServerSocket(int port, int clientTimeout, + ConnectionMonitor monitor) throws TTransportException { + super(port, clientTimeout); + this.monitor = Preconditions.checkNotNull(monitor); + } + + public TMonitoredServerSocket(InetSocketAddress bindAddr, + ConnectionMonitor monitor) throws TTransportException { + super(bindAddr); + this.monitor = Preconditions.checkNotNull(monitor); + } + + public TMonitoredServerSocket(InetSocketAddress bindAddr, int clientTimeout, + ConnectionMonitor monitor) throws TTransportException { + super(bindAddr, clientTimeout); + this.monitor = Preconditions.checkNotNull(monitor); + } + + private final Map addressMap = + Collections.synchronizedMap(Maps.newHashMap()); + + public InetSocketAddress getAddress(TSocket socket) { + return addressMap.get(socket); + } + + @Override + protected TSocket acceptImpl() throws TTransportException { + final TSocket socket = super.acceptImpl(); + final InetSocketAddress remoteAddress = + (InetSocketAddress) socket.getSocket().getRemoteSocketAddress(); + + TSocket monitoredSocket = new TSocket(socket.getSocket()) { + boolean closed = false; + + @Override public void close() { + try { + super.close(); + } finally { + if (!closed) { + monitor.released(remoteAddress); + addressMap.remove(this); + } + closed = true; + } + } + }; + + addressMap.put(monitoredSocket, remoteAddress); + + monitor.connected(remoteAddress); + return monitoredSocket; + } + + @Override + public void close() { + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/testing/MockTSocket.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/testing/MockTSocket.java b/commons/src/main/java/org/apache/aurora/common/thrift/testing/MockTSocket.java new file mode 100644 index 0000000..2cec711 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/testing/MockTSocket.java @@ -0,0 +1,45 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift.testing; + +import org.apache.thrift.transport.TSocket; + +/** + * @author William Farner + */ +public class MockTSocket extends TSocket { + public static final String HOST = "dummyHost"; + public static final int PORT = 1000; + + private boolean connected = false; + + public MockTSocket() { + super(HOST, PORT); + } + + @Override + public void open() { + connected = true; + // TODO(William Farner): Allow for failure injection here by throwing TTransportException. + } + + @Override + public boolean isOpen() { + return connected; + } + + public void close() { + connected = false; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/thrift/testing/TestThriftTypes.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/testing/TestThriftTypes.java b/commons/src/main/java/org/apache/aurora/common/thrift/testing/TestThriftTypes.java new file mode 100644 index 0000000..377228f --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/thrift/testing/TestThriftTypes.java @@ -0,0 +1,171 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.thrift.testing; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.thrift.TBase; +import org.apache.thrift.TBaseHelper; +import org.apache.thrift.TException; +import org.apache.thrift.TFieldIdEnum; +import org.apache.thrift.protocol.TField; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TStruct; +import org.apache.thrift.protocol.TType; + +import java.util.Map; +import java.util.Map.Entry; + +/** + * Hand-coded thrift types for use in tests. + * + * @author John Sirois + */ +public class TestThriftTypes { + public static class Field implements TFieldIdEnum { + private static final Map FIELDS_BY_ID = Maps.newHashMap(); + public static Field forId(int id) { + Field field = FIELDS_BY_ID.get((short) id); + Preconditions.checkArgument(field != null, "No Field with id: %s", id); + return field; + } + + public static final Field NAME = new Field((short) 0, "name"); + public static final Field VALUE = new Field((short) 1, "value"); + + private final short fieldId; + private final String fieldName; + + private Field(short fieldId, String fieldName) { + this.fieldId = fieldId; + this.fieldName = fieldName; + FIELDS_BY_ID.put(fieldId, this); + } + + @Override + public short getThriftFieldId() { + return fieldId; + } + + @Override + public String getFieldName() { + return fieldName; + } + } + + public static class Struct implements TBase { + private final Map fields = Maps.newHashMap(); + + public Struct() {} + + public Struct(String name, String value) { + fields.put(Field.NAME, name); + fields.put(Field.VALUE, value); + } + + public String getName() { + Object name = getFieldValue(Field.NAME); + return name == null ? null : (String) name; + } + + public String getValue() { + Object value = getFieldValue(Field.VALUE); + return value == null ? null : (String) value; + } + + @Override + public void read(TProtocol tProtocol) throws TException { + tProtocol.readStructBegin(); + TField field; + while((field = tProtocol.readFieldBegin()).type != TType.STOP) { + fields.put(fieldForId(field.id), tProtocol.readString()); + tProtocol.readFieldEnd(); + } + tProtocol.readStructEnd(); + } + + @Override + public void write(TProtocol tProtocol) throws TException { + tProtocol.writeStructBegin(new TStruct("Field")); + for (Entry entry : fields.entrySet()) { + Field field = entry.getKey(); + tProtocol.writeFieldBegin( + new TField(field.getFieldName(), TType.STRING, field.getThriftFieldId())); + tProtocol.writeString(entry.getValue().toString()); + tProtocol.writeFieldEnd(); + } + tProtocol.writeFieldStop(); + tProtocol.writeStructEnd(); + } + + @Override + public boolean isSet(Field field) { + return fields.containsKey(field); + } + + @Override + public Object getFieldValue(Field field) { + return fields.get(field); + } + + @Override + public void setFieldValue(Field field, Object o) { + fields.put(field, o); + } + + @Override + public TBase deepCopy() { + Struct struct = new Struct(); + struct.fields.putAll(fields); + return struct; + } + + @Override + public int compareTo(Struct other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison; + + lastComparison = Integer.valueOf(fields.size()).compareTo(other.fields.size()); + if (lastComparison != 0) { + return lastComparison; + } + + for (Map.Entry entry : fields.entrySet()) { + Field field = entry.getKey(); + lastComparison = Boolean.TRUE.compareTo(other.isSet(field)); + if (lastComparison != 0) { + return lastComparison; + } + lastComparison = TBaseHelper.compareTo(entry.getValue(), other.getFieldValue(field)); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + @Override + public void clear() { + fields.clear(); + } + + @Override + public Field fieldForId(int fieldId) { + return Field.forId(fieldId); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/BackoffDecider.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/BackoffDecider.java b/commons/src/main/java/org/apache/aurora/common/util/BackoffDecider.java new file mode 100644 index 0000000..e11a52c --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/BackoffDecider.java @@ -0,0 +1,663 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.Stats; +import org.apache.aurora.common.stats.StatsProvider; + +import javax.annotation.Nullable; +import java.util.Deque; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** + * Handles logic for deciding whether to back off from calls to a backend. + * + * This works by offering a guard method {@link #shouldBackOff()}, which instructs the caller + * whether they should avoid making the call. The backoff logic will maintain statistics about + * the failure rate, and push into a backoff state (silent period) when the failure rate exceeds + * the configured threshold. At the end of the quiet period, a recovery state will be entered, + * during which the decider will allow traffic to ramp back up to full capacity. + * + * The expected use case looks something like this: + * + *
+ * void sendRequestGuarded() {
+ *   if (!decider.shouldBackOff()) {
+ *     boolean success = sendRequestUnguarded();
+ *     if (success) {
+ *       decider.addSuccess();
+ *     } else {
+ *       decider.addFailure();
+ *     }
+ *   }
+ * }
+ * 
+ * + * @author William Farner + */ +public class BackoffDecider { + private static final Logger LOG = Logger.getLogger(BackoffDecider.class.getName()); + + // The group that this decider is a part of. + private final Iterable deciderGroup; + + private final TimedStateMachine stateMachine; + + private final String name; + + private final double toleratedFailureRate; + + @VisibleForTesting final RequestWindow requests; + + // Used to calculate backoff durations when in backoff state. + private final BackoffStrategy strategy; + + private final Amount recoveryPeriod; + private long previousBackoffPeriodNs = 0; + + // Used for random selection during recovery period. + private final Random random; + + private final Clock clock; + private final AtomicLong backoffs; + private final RecoveryType recoveryType; + + /** + * Different types of recovery mechanisms to use after exiting the backoff state. + */ + public static enum RecoveryType { + // Randomly allows traffic to flow through, with a linearly-ascending probability. + RANDOM_LINEAR, + // Allows full traffic capacity to flow during the recovery period. + FULL_CAPACITY + } + + private BackoffDecider(String name, int seedSize, double toleratedFailureRate, + @Nullable Iterable deciderGroup, BackoffStrategy strategy, + @Nullable Amount recoveryPeriod, + long requestWindowNs, int numBuckets, RecoveryType recoveryType, StatsProvider statsProvider, + Random random, Clock clock) { + MorePreconditions.checkNotBlank(name); + Preconditions.checkArgument(seedSize > 0); + Preconditions.checkArgument(toleratedFailureRate >= 0 && toleratedFailureRate < 1.0); + Preconditions.checkNotNull(strategy); + Preconditions.checkArgument(recoveryPeriod == null || recoveryPeriod.getValue() > 0); + Preconditions.checkArgument(requestWindowNs > 0); + Preconditions.checkArgument(numBuckets > 0); + Preconditions.checkNotNull(recoveryType); + Preconditions.checkNotNull(statsProvider); + Preconditions.checkNotNull(random); + Preconditions.checkNotNull(clock); + + this.name = name; + this.toleratedFailureRate = toleratedFailureRate; + this.deciderGroup = deciderGroup; + this.strategy = strategy; + this.recoveryPeriod = recoveryPeriod; + this.recoveryType = recoveryType; + + this.random = random; + this.clock = clock; + + this.backoffs = statsProvider.makeCounter(name + "_backoffs"); + this.requests = new RequestWindow(requestWindowNs, numBuckets, seedSize); + + this.stateMachine = new TimedStateMachine(name); + } + + /** + * Checks whether the caller should back off and if not then returns immediately; otherwise the + * method blocks until it is safe for the caller to proceed without backing off further based on + * all data available at the time of this call. + * + * @return the amount of time in nanoseconds spent awaiting backoff + * @throws InterruptedException if the calling thread was interrupted while backing off + */ + public long awaitBackoff() throws InterruptedException { + if (shouldBackOff()) { + long backoffTimeMs = stateMachine.getStateRemainingMs(); + + if (backoffTimeMs > 0) { + // Wait without holding any external locks. + Object waitCondition = new Object(); + synchronized (waitCondition) { + waitCondition.wait(backoffTimeMs); + } + return backoffTimeMs; + } + } + return 0; + } + + /** + * Checks whether this decider instructs the caller that it should back off from the associated + * backend. This is determined based on the response history for the backend as well as the + * backoff state of the decider group (if configured). + * + * @return {@code true} if the decider is in backoff mode, otherwise {@code false}. + */ + @SuppressWarnings("fallthrough") + public synchronized boolean shouldBackOff() { + + boolean preventRequest; + switch (stateMachine.getState()) { + case NORMAL: + preventRequest = false; + break; + + case BACKOFF: + if (deciderGroup != null && allOthersBackingOff()) { + LOG.info("Backends in group with " + name + " down, forcing back up."); + stateMachine.transitionUnbounded(State.FORCED_NORMAL); + return false; + } else if (stateMachine.isStateExpired()) { + long recoveryPeriodNs = recoveryPeriod == null ? stateMachine.getStateDurationNs() + : recoveryPeriod.as(Time.NANOSECONDS); + + // The silent period has expired, move to recovery state (and drop to its case block). + stateMachine.transition(State.RECOVERY, recoveryPeriodNs); + LOG.info(String.format("%s recovering for %s ms", name, + Amount.of(recoveryPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS))); + } else { + preventRequest = true; + break; + } + + case RECOVERY: + if (deciderGroup != null && allOthersBackingOff()) { + return false; + } else if (stateMachine.isStateExpired()) { + // We have reached the end of the recovery period, return to normal. + stateMachine.transitionUnbounded(State.NORMAL); + previousBackoffPeriodNs = 0; + preventRequest = false; + } else { + switch (recoveryType) { + case RANDOM_LINEAR: + // In the recovery period, allow request rate to return linearly to the full load. + preventRequest = random.nextDouble() > stateMachine.getStateFractionComplete(); + break; + case FULL_CAPACITY: + preventRequest = false; + break; + default: + throw new IllegalStateException("Unhandled recovery type " + recoveryType); + } + } + + break; + + case FORCED_NORMAL: + if (!allOthersBackingOff()) { + // We were in forced normal state, but at least one other backend is up, try recovering. + stateMachine.transition(State.RECOVERY, stateMachine.getStateDurationNs()); + preventRequest = false; + } else { + preventRequest = true; + } + + break; + + default: + LOG.severe("Unrecognized state: " + stateMachine.getState()); + preventRequest = false; + } + + if (preventRequest) { + backoffs.incrementAndGet(); + } + return preventRequest; + } + + private boolean allOthersBackingOff() { + // Search for another decider that is not backing off. + for (BackoffDecider decider : deciderGroup) { + State deciderState = decider.stateMachine.getState(); + boolean inBackoffState = deciderState == State.BACKOFF || deciderState == State.FORCED_NORMAL; + if ((decider != this) && !inBackoffState) { + return false; + } + } + + return true; + } + + /** + * Records a failed request to the backend. + */ + public void addFailure() { + addResult(false); + } + + /** + * Records a successful request to the backend. + */ + public void addSuccess() { + addResult(true); + } + + /** + * Transitions the state to BACKOFF and logs a message appropriately if it is doing so because of high fail rate + * or by force. + * + * @param failRate rate of request failures on this host. + * @param force if {@code true}, forces the transition to BACKOFF. Typically used in cases when the host + * was not found to be alive by LiveHostChecker. + */ + public synchronized void transitionToBackOff(double failRate, boolean force) { + long prevBackoffMs = Amount.of(previousBackoffPeriodNs, Time.NANOSECONDS) + .as(Time.MILLISECONDS); + + long backoffPeriodNs = Amount.of(strategy.calculateBackoffMs(prevBackoffMs), Time.MILLISECONDS) + .as(Time.NANOSECONDS); + if (!force) { + LOG.info(String.format("%s failure rate at %g, backing off for %s ms", name,failRate, + Amount.of(backoffPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS))); + } else { + LOG.info(String.format("%s forced to back off for %s ms", name, + Amount.of(backoffPeriodNs, Time.NANOSECONDS).as(Time.MILLISECONDS))); + } + stateMachine.transition(State.BACKOFF, backoffPeriodNs); + previousBackoffPeriodNs = backoffPeriodNs; + } + + @SuppressWarnings("fallthrough") + private synchronized void addResult(boolean success) { + // Disallow statistics updating if we are in backoff state. + if (stateMachine.getState() == State.BACKOFF) { + return; + } + + requests.addResult(success); + double failRate = requests.getFailureRate(); + boolean highFailRate = requests.isSeeded() && (failRate > toleratedFailureRate); + + switch (stateMachine.getState()) { + case NORMAL: + if (!highFailRate) { + // No-op. + break; + } else { + // Artificially move into recovery state (by falling through) with a zero-duration + // time window, to trigger the initial backoff period. + stateMachine.setStateDurationNs(0); + } + + case RECOVERY: + if (highFailRate) { + // We were trying to recover, and the failure rate is still too high. Go back to + // backoff state for a longer duration. + requests.reset(); + + // transition the state machine to BACKOFF state, due to high fail rate. + transitionToBackOff(failRate, false); + } else { + // Do nothing. We only exit the recovery state by expiration. + } + break; + + case FORCED_NORMAL: + if (!highFailRate) { + stateMachine.transition(State.RECOVERY, stateMachine.getStateDurationNs()); + } + break; + + case BACKOFF: + throw new IllegalStateException("Backoff state may only be exited by expiration."); + } + } + + /** + * Creates a builder object. + * + * @param name Name for the backoff decider to build. + * @return A builder. + */ + public static Builder builder(String name) { + return new Builder(name); + } + + /** + * Builder class to configure a BackoffDecider. + * + * The builder allows for customization of many different parameters to the BackoffDecider, while + * defining defaults wherever possible. The following defaults are used: + * + *
    + *
  • seed size - The number of requests to accumulate before a backoff will be considered. + * 100 + * + *
  • tolerated failure rate - Maximum failure rate before backing off. + * 0.5 + * + *
  • decider group - Group this decider is a part of, to prevent complete backend failure. + * null (disabled) + * + *
  • strategy - Used to calculate subsequent backoff durations. + * TruncatedBinaryBackoff, initial 100 ms, max 10s + * + *
  • recovery period - Fixed recovery period while ramping traffic back to full capacity.. + * null (use last backoff period) + * + *
  • request window - Duration of the sliding window of requests to track statistics for. + * 10 seconds + * + *
  • num buckets - The number of time slices within the request window, for stat expiration. + * The sliding request window advances in intervals of request window / num buckets. + * 100 + * + *
  • recovery type - Defines behavior during the recovery period, and how traffic is permitted. + * random linear + * + *
  • stat provider - The stats provider to export statistics to. + * Stats.STATS_PROVIDER + *
+ * + */ + public static class Builder { + private String name; + private int seedSize = 100; + private double toleratedFailureRate = 0.5; + private Set deciderGroup = null; + private BackoffStrategy strategy = new TruncatedBinaryBackoff( + Amount.of(100L, Time.MILLISECONDS), Amount.of(10L, Time.SECONDS)); + private Amount recoveryPeriod = null; + private long requestWindowNs = Amount.of(10L, Time.SECONDS).as(Time.NANOSECONDS); + private int numBuckets = 100; + private RecoveryType recoveryType = RecoveryType.RANDOM_LINEAR; + private StatsProvider statsProvider = Stats.STATS_PROVIDER; + private Random random = Random.Util.newDefaultRandom(); + private Clock clock = Clock.SYSTEM_CLOCK; + + Builder(String name) { + this.name = name; + } + + /** + * Sets the number of requests that must be accumulated before the error rate will be + * calculated. This improves the genesis problem where the first few requests are errors, + * causing flapping in and out of backoff state. + * + * @param seedSize Request seed size. + * @return A reference to the builder. + */ + public Builder withSeedSize(int seedSize) { + this.seedSize = seedSize; + return this; + } + + /** + * Sets the tolerated failure rate for the decider. If the rate is exceeded for the time + * window, the decider begins backing off. + * + * @param toleratedRate The tolerated failure rate (between 0.0 and 1.0, exclusive). + * @return A reference to the builder. + */ + public Builder withTolerateFailureRate(double toleratedRate) { + this.toleratedFailureRate = toleratedRate; + return this; + } + + /** + * Makes the decider a part of a group. When a decider is a part of a group, it will monitor + * the other deciders to ensure that all deciders do not back off at once. + * + * @param deciderGroup Group to make this decider a part of. More deciders may be added to the + * group after this call is made. + * @return A reference to the builder. + */ + public Builder groupWith(Set deciderGroup) { + this.deciderGroup = deciderGroup; + return this; + } + + /** + * Overrides the default backoff strategy. + * + * @param strategy Backoff strategy to use. + * @return A reference to the builder. + */ + public Builder withStrategy(BackoffStrategy strategy) { + this.strategy = strategy; + return this; + } + + /** + * Overrides the default recovery period behavior. By default, the recovery period is equal + * to the previous backoff period (which is equivalent to setting the recovery period to null + * here). A non-null value here will assign a fixed recovery period. + * + * @param recoveryPeriod Fixed recovery period. + * @return A reference to the builder. + */ + public Builder withRecoveryPeriod(@Nullable Amount recoveryPeriod) { + this.recoveryPeriod = recoveryPeriod; + return this; + } + + /** + * Sets the time window over which to analyze failures. Beyond the time window, request history + * is discarded (and ignored). + * + * @param requestWindow The analysis time window. + * @return A reference to the builder. + */ + public Builder withRequestWindow(Amount requestWindow) { + this.requestWindowNs = requestWindow.as(Time.NANOSECONDS); + return this; + } + + /** + * Sets the number of time slices that the decider will use to partition aggregate statistics. + * + * @param numBuckets Bucket count. + * @return A reference to the builder. + */ + public Builder withBucketCount(int numBuckets) { + this.numBuckets = numBuckets; + return this; + } + + /** + * Sets the recovery mechanism to use when in the recovery period. + * + * @param recoveryType The recovery mechanism to use. + * @return A reference to the builder. + */ + public Builder withRecoveryType(RecoveryType recoveryType) { + this.recoveryType = recoveryType; + return this; + } + + /** + * Sets the stats provider that statistics should be exported to. + * + * @param statsProvider Stats provider to use. + * @return A reference to the builder. + */ + public Builder withStatsProvider(StatsProvider statsProvider) { + this.statsProvider = statsProvider; + return this; + } + + @VisibleForTesting public Builder withRandom(Random random) { + this.random = random; + return this; + } + + @VisibleForTesting public Builder withClock(Clock clock) { + this.clock = clock; + return this; + } + + /** + * Gets a reference to the built decider object. + * @return A decider object. + */ + public BackoffDecider build() { + BackoffDecider decider = new BackoffDecider(name, seedSize, toleratedFailureRate, + deciderGroup, strategy, recoveryPeriod, requestWindowNs, numBuckets, recoveryType, + statsProvider, random, clock); + if (deciderGroup != null) deciderGroup.add(decider); + return decider; + } + } + + private class TimeSlice { + int requestCount = 0; + int failureCount = 0; + final long bucketStartNs; + + public TimeSlice() { + bucketStartNs = clock.nowNanos(); + } + } + + class RequestWindow { + // These store the sum of the respective fields contained within buckets. Doing so removes the + // need to accumulate the counts within the buckets every time the backoff state is + // recalculated. + @VisibleForTesting long totalRequests = 0; + @VisibleForTesting long totalFailures = 0; + + private final long durationNs; + private final long bucketLengthNs; + private final int seedSize; + + // Stores aggregate request/failure counts for time slices. + private final Deque buckets = Lists.newLinkedList(); + + RequestWindow(long durationNs, int bucketCount, int seedSize) { + this.durationNs = durationNs; + this.bucketLengthNs = durationNs / bucketCount; + buckets.addFirst(new TimeSlice()); + this.seedSize = seedSize; + } + + void reset() { + totalRequests = 0; + totalFailures = 0; + buckets.clear(); + buckets.addFirst(new TimeSlice()); + } + + void addResult(boolean success) { + maybeShuffleBuckets(); + buckets.peekFirst().requestCount++; + totalRequests++; + + if (!success) { + buckets.peekFirst().failureCount++; + totalFailures++; + } + } + + void maybeShuffleBuckets() { + // Check if the first bucket is still relevant. + if (clock.nowNanos() - buckets.peekFirst().bucketStartNs >= bucketLengthNs) { + + // Remove old buckets. + while (!buckets.isEmpty() + && buckets.peekLast().bucketStartNs < clock.nowNanos() - durationNs) { + TimeSlice removed = buckets.removeLast(); + totalRequests -= removed.requestCount; + totalFailures -= removed.failureCount; + } + + buckets.addFirst(new TimeSlice()); + } + } + + boolean isSeeded() { + return totalRequests >= seedSize; + } + + double getFailureRate() { + return totalRequests == 0 ? 0 : ((double) totalFailures) / totalRequests; + } + } + + private static enum State { + NORMAL, // All requests are being permitted. + BACKOFF, // Quiet period while waiting for backend to recover/improve. + RECOVERY, // Ramping period where an ascending fraction of requests is being permitted. + FORCED_NORMAL // All other backends in the group are backing off, so this one is forced normal. + } + private class TimedStateMachine { + final StateMachine stateMachine; + + private long stateEndNs; + private long stateDurationNs; + + TimedStateMachine(String name) { + stateMachine = StateMachine.builder(name + "_backoff_state_machine") + .addState(State.NORMAL, State.BACKOFF, State.FORCED_NORMAL) + .addState(State.BACKOFF, State.RECOVERY, State.FORCED_NORMAL) + .addState(State.RECOVERY, State.NORMAL, State.BACKOFF, State.FORCED_NORMAL) + .addState(State.FORCED_NORMAL, State.RECOVERY) + .initialState(State.NORMAL) + .build(); + } + + State getState() { + return stateMachine.getState(); + } + + void transitionUnbounded(State state) { + stateMachine.transition(state); + } + + void transition(State state, long durationNs) { + transitionUnbounded(state); + this.stateEndNs = clock.nowNanos() + durationNs; + this.stateDurationNs = durationNs; + } + + long getStateDurationNs() { + return stateDurationNs; + } + + long getStateDurationMs() { + return Amount.of(stateDurationNs, Time.NANOSECONDS).as(Time.MILLISECONDS); + } + + void setStateDurationNs(long stateDurationNs) { + this.stateDurationNs = stateDurationNs; + } + + long getStateRemainingNs() { + return stateEndNs - clock.nowNanos(); + } + + long getStateRemainingMs() { + return Amount.of(getStateRemainingNs(), Time.NANOSECONDS).as(Time.MILLISECONDS); + } + + double getStateFractionComplete() { + return 1.0 - ((double) getStateRemainingNs()) / stateDurationNs; + } + + boolean isStateExpired() { + return clock.nowNanos() > stateEndNs; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java new file mode 100644 index 0000000..8f31ea7 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java @@ -0,0 +1,152 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.aurora.common.base.ExceptionalSupplier; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +import java.util.logging.Logger; + +/** + * A utility for dealing with backoffs of retryable actions. + * + *

TODO(John Sirois): investigate synergies with BackoffDecider. + * + * @author John Sirois + */ +public class BackoffHelper { + private static final Logger LOG = Logger.getLogger(BackoffHelper.class.getName()); + + private static final Amount DEFAULT_INITIAL_BACKOFF = Amount.of(1L, Time.SECONDS); + private static final Amount DEFAULT_MAX_BACKOFF = Amount.of(1L, Time.MINUTES); + + private final Clock clock; + private final BackoffStrategy backoffStrategy; + + /** + * Creates a new BackoffHelper that uses truncated binary backoff starting at a 1 second backoff + * and maxing out at a 1 minute backoff. + */ + public BackoffHelper() { + this(DEFAULT_INITIAL_BACKOFF, DEFAULT_MAX_BACKOFF); + } + + /** + * Creates a new BackoffHelper that uses truncated binary backoff starting at the given + * {@code initialBackoff} and maxing out at the given {@code maxBackoff}. + * + * @param initialBackoff the initial amount of time to back off + * @param maxBackoff the maximum amount of time to back off + */ + public BackoffHelper(Amount initialBackoff, Amount maxBackoff) { + this(new TruncatedBinaryBackoff(initialBackoff, maxBackoff)); + } + + /** + * Creates a new BackoffHelper that uses truncated binary backoff starting at the given + * {@code initialBackoff} and maxing out at the given {@code maxBackoff}. This will either: + *

    + *
  • {@code stopAtMax == true} : throw {@code BackoffExpiredException} when maxBackoff is + * reached
  • + *
  • {@code stopAtMax == false} : continue backing off with maxBackoff
  • + *
+ * + * @param initialBackoff the initial amount of time to back off + * @param maxBackoff the maximum amount of time to back off + * @param stopAtMax if true, this will throw {@code BackoffStoppedException} when the max backoff is + * reached + */ + public BackoffHelper(Amount initialBackoff, Amount maxBackoff, + boolean stopAtMax) { + this(new TruncatedBinaryBackoff(initialBackoff, maxBackoff, stopAtMax)); + } + + /** + * Creates a BackoffHelper that uses the given {@code backoffStrategy} to calculate backoffs + * between retries. + * + * @param backoffStrategy the backoff strategy to use + */ + public BackoffHelper(BackoffStrategy backoffStrategy) { + this(Clock.SYSTEM_CLOCK, backoffStrategy); + } + + @VisibleForTesting BackoffHelper(Clock clock, BackoffStrategy backoffStrategy) { + this.clock = Preconditions.checkNotNull(clock); + this.backoffStrategy = Preconditions.checkNotNull(backoffStrategy); + } + + /** + * Executes the given task using the configured backoff strategy until the task succeeds as + * indicated by returning {@code true}. + * + * @param task the retryable task to execute until success + * @throws InterruptedException if interrupted while waiting for the task to execute successfully + * @throws BackoffStoppedException if the backoff stopped unsuccessfully + * @throws E if the task throws + */ + public void doUntilSuccess(final ExceptionalSupplier task) + throws InterruptedException, BackoffStoppedException, E { + doUntilResult(new ExceptionalSupplier() { + @Override public Boolean get() throws E { + Boolean result = task.get(); + return Boolean.TRUE.equals(result) ? result : null; + } + }); + } + + /** + * Executes the given task using the configured backoff strategy until the task succeeds as + * indicated by returning a non-null value. + * + * @param task the retryable task to execute until success + * @return the result of the successfully executed task + * @throws InterruptedException if interrupted while waiting for the task to execute successfully + * @throws BackoffStoppedException if the backoff stopped unsuccessfully + * @throws E if the task throws + */ + public T doUntilResult(ExceptionalSupplier task) + throws InterruptedException, BackoffStoppedException, E { + T result = task.get(); // give an immediate try + return (result != null) ? result : retryWork(task); + } + + private T retryWork(ExceptionalSupplier work) + throws E, InterruptedException, BackoffStoppedException { + long currentBackoffMs = 0; + while (backoffStrategy.shouldContinue(currentBackoffMs)) { + currentBackoffMs = backoffStrategy.calculateBackoffMs(currentBackoffMs); + LOG.fine("Operation failed, backing off for " + currentBackoffMs + "ms"); + clock.waitFor(currentBackoffMs); + + T result = work.get(); + if (result != null) { + return result; + } + } + throw new BackoffStoppedException(String.format("Backoff stopped without succeeding.")); + } + + /** + * Occurs after the backoff strategy should stop. + */ + public static class BackoffStoppedException extends RuntimeException { + public BackoffStoppedException(String msg) { + super(msg); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/BackoffStrategy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/BackoffStrategy.java b/commons/src/main/java/org/apache/aurora/common/util/BackoffStrategy.java new file mode 100644 index 0000000..d954762 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/BackoffStrategy.java @@ -0,0 +1,37 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +/** + * Encapsulates a strategy for backing off from an operation that repeatedly fails. + */ +public interface BackoffStrategy { + + /** + * Calculates the amount of time to backoff from an operation. + * + * @param lastBackoffMs the last used backoff in milliseconds where 0 signifies no backoff has + * been performed yet + * @return the amount of time in milliseconds to back off before retrying the operation + */ + long calculateBackoffMs(long lastBackoffMs); + + /** + * Returns whether to continue backing off. + * + * @param lastBackoffMs the last used backoff in milliseconds + * @return whether to continue backing off + */ + boolean shouldContinue(long lastBackoffMs); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/BuildInfo.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/BuildInfo.java b/commons/src/main/java/org/apache/aurora/common/util/BuildInfo.java new file mode 100644 index 0000000..c290d32 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/BuildInfo.java @@ -0,0 +1,108 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import java.io.InputStream; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.aurora.common.base.MorePreconditions; + +/** + * Handles loading of a build properties file, and provides keys to look up known values in the + * properties. + */ +public class BuildInfo { + + private static final Logger LOG = Logger.getLogger(BuildInfo.class.getName()); + + private static final String DEFAULT_BUILD_PROPERTIES_PATH = "build.properties"; + + private final String resourcePath; + + private Properties properties = null; + + /** + * Creates a build info container that will use the default properties file path. + */ + public BuildInfo() { + this(DEFAULT_BUILD_PROPERTIES_PATH); + } + + /** + * Creates a build info container, reading from the given path. + * + * @param resourcePath The resource path to read build properties from. + */ + public BuildInfo(String resourcePath) { + this.resourcePath = MorePreconditions.checkNotBlank(resourcePath); + } + + @VisibleForTesting + public BuildInfo(Properties properties) { + this.resourcePath = null; + this.properties = properties; + } + + private void fetchProperties() { + properties = new Properties(); + LOG.info("Fetching build properties from " + resourcePath); + InputStream in = ClassLoader.getSystemResourceAsStream(resourcePath); + if (in == null) { + LOG.warning("Failed to fetch build properties from " + resourcePath); + return; + } + + try { + properties.load(in); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to load properties file " + resourcePath, e); + } + } + + /** + * Fetches the properties stored in the resource location. + * + * @return The loaded properties, or a default properties object if there was a problem loading + * the specified properties resource. + */ + public Properties getProperties() { + if (properties == null) fetchProperties(); + return properties; + } + + /** + * Values of keys that are expected to exist in the loaded properties file. + */ + public enum Key { + PATH("build.path"), + USER("build.user.name"), + MACHINE("build.machine"), + DATE("build.date"), + TIME("build.time"), + TIMESTAMP("build.timestamp"), + GIT_TAG("build.git.tag"), + GIT_REVISION("build.git.revision"), + GIT_REVISION_NUMBER("build.git.revision.number"), + GIT_BRANCHNAME("build.git.branchname"); + + public final String value; + private Key(String value) { + this.value = value; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/Clock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/Clock.java b/commons/src/main/java/org/apache/aurora/common/util/Clock.java new file mode 100644 index 0000000..87afb4e --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/Clock.java @@ -0,0 +1,70 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import java.io.Serializable; + +/** + * An abstraction of the system clock. + * + * @author John Sirois + */ +public interface Clock { + + /** + * A clock that returns the the actual time reported by the system. + * This clock is guaranteed to be serializable. + */ + Clock SYSTEM_CLOCK = new SerializableClock() { + @Override public long nowMillis() { + return System.currentTimeMillis(); + } + @Override public long nowNanos() { + return System.nanoTime(); + } + @Override public void waitFor(long millis) throws InterruptedException { + Thread.sleep(millis); + } + }; + + /** + * Returns the current time in milliseconds since the epoch. + * + * @return The current time in milliseconds since the epoch. + * @see System#currentTimeMillis() + */ + long nowMillis(); + + /** + * Returns the current time in nanoseconds. Should be used only for relative timing. + * See {@code System.nanoTime()} for tips on using the value returned here. + * + * @return A measure of the current time in nanoseconds. + * @see System#nanoTime() + */ + long nowNanos(); + + /** + * Waits for the given amount of time to pass on this clock before returning. + * + * @param millis the amount of time to wait in milliseconds + * @throws InterruptedException if this wait was interrupted + */ + void waitFor(long millis) throws InterruptedException; +} + +/** + * A typedef to support anonymous {@link Clock} implementations that are also {@link Serializable}. + */ +interface SerializableClock extends Clock, Serializable { } http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/CommandExecutor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/CommandExecutor.java b/commons/src/main/java/org/apache/aurora/common/util/CommandExecutor.java new file mode 100644 index 0000000..e591721 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/CommandExecutor.java @@ -0,0 +1,42 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +/** + * Asynchronous executor of enqueued tasks in a rate limited manner. + * + * @author Srinivasan Rajagopal + */ +public interface CommandExecutor { + + /** + * Enqueue a task to be executed with retry semantics defined. + * + * @param name Human readable name for this task. + * @param task task to execute. + * @param exceptionClass Concrete exception type. + * @param maxTries num of tries in case of failure. + * @param retryDelay interval between retries in case of failure. + */ + void execute( + String name, + ExceptionalCommand task, + Class exceptionClass, + int maxTries, + Amount retryDelay); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/DateUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/DateUtils.java b/commons/src/main/java/org/apache/aurora/common/util/DateUtils.java new file mode 100644 index 0000000..eb20925 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/DateUtils.java @@ -0,0 +1,57 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +/** + * Utilities for working with java {@link Date}s. + * + * @author John Sirois + */ +public final class DateUtils { + + public static Date now() { + return new Date(); + } + + public static long toUnixTime(Date date) { + return toUnixTime(date.getTime()); + } + + public static long nowUnixTime() { + return toUnixTime(System.currentTimeMillis()); + } + + public static long toUnixTime(long millisSinceEpoch) { + return TimeUnit.MILLISECONDS.toSeconds(millisSinceEpoch); + } + + public static Date ago(int calendarField, int amount) { + return ago(now(), calendarField, amount); + } + + public static Date ago(Date referenceDate, int calendarField, int amount) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(referenceDate); + calendar.add(calendarField, -1 * amount); + return calendar.getTime(); + } + + private DateUtils() { + // utility + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/FileUtils.java b/commons/src/main/java/org/apache/aurora/common/util/FileUtils.java new file mode 100644 index 0000000..9b23ee0 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/FileUtils.java @@ -0,0 +1,51 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import java.io.File; + +/** + * Utilities for working with Files + * + * @author Florian Leibert + */ +public final class FileUtils { + + private FileUtils() { + } + + /** + * recursively deletes the path and all it's content and returns true if it succeeds + * Note that the content could be partially deleted and the method return false + * + * @param path the path to delete + * @return true if the path was deleted + */ + public static boolean forceDeletePath(File path) { + if (path == null) { + return false; + } + if (path.exists() && path.isDirectory()) { + File[] files = path.listFiles(); + for (File file : files) { + if (file.isDirectory()) { + forceDeletePath(file); + } else { + file.delete(); + } + } + } + return path.delete(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/LowResClock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/LowResClock.java b/commons/src/main/java/org/apache/aurora/common/util/LowResClock.java new file mode 100644 index 0000000..69fb9ed --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/LowResClock.java @@ -0,0 +1,108 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import java.io.Closeable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +/** + * Low resolution implementation of a {@link Clock}, + * optimized for fast reads at the expense of precision. + * It works by caching the result of the system clock for a + * {@code resolution} amount of time. + */ +public class LowResClock implements Clock, Closeable { + private static final ScheduledExecutorService GLOBAL_SCHEDULER = + Executors.newScheduledThreadPool(1, new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "LowResClock"); + t.setDaemon(true); + return t; + } + }); + + private volatile long time; + private final ScheduledFuture updaterHandler; + private final Clock underlying; + + @VisibleForTesting + LowResClock(Amount resolution, ScheduledExecutorService executor, Clock clock) { + long sleepTimeMs = resolution.as(Time.MILLISECONDS); + Preconditions.checkArgument(sleepTimeMs > 0); + underlying = clock; + Runnable ticker = new Runnable() { + @Override public void run() { + time = underlying.nowMillis(); + } + }; + + // Ensure the constructing thread sees a LowResClock with a valid (low-res) time by executing a + // blocking call now. + ticker.run(); + + updaterHandler = + executor.scheduleAtFixedRate(ticker, sleepTimeMs, sleepTimeMs, TimeUnit.MILLISECONDS); + } + + + /** + * Construct a LowResClock which wraps the system clock. + * This constructor will also schedule a periodic task responsible for + * updating the time every {@code resolution}. + */ + public LowResClock(Amount resolution) { + this(resolution, GLOBAL_SCHEDULER, Clock.SYSTEM_CLOCK); + } + + /** + * Terminate the underlying updater task. + * Any subsequent usage of the clock will throw an {@link IllegalStateException}. + */ + public void close() { + updaterHandler.cancel(true); + } + + @Override + public long nowMillis() { + checkNotClosed(); + return time; + } + + @Override + public long nowNanos() { + return nowMillis() * 1000 * 1000; + } + + @Override + public void waitFor(long millis) throws InterruptedException { + checkNotClosed(); + underlying.waitFor(millis); + } + + private void checkNotClosed() { + if (updaterHandler.isCancelled()) { + throw new IllegalStateException("LowResClock invoked after being closed!"); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/ParsingUtil.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/ParsingUtil.java b/commons/src/main/java/org/apache/aurora/common/util/ParsingUtil.java new file mode 100644 index 0000000..0747e7a --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/util/ParsingUtil.java @@ -0,0 +1,53 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.util; + +import com.google.common.base.Preconditions; + +import org.apache.aurora.common.collections.Pair; + +/** + * Common methods for parsing configs. + * + * @author John Sirois + */ +public class ParsingUtil { + /** + * Parses a string as a range between one integer and another. The integers must be separated by + * a hypen character (space padding is acceptable). Additionally, the first integer + * (left-hand side) must be less than or equal to the second (right-hand side). + * + * @param rangeString The string to parse as an integer range. + * @return A pair of the parsed integers. + */ + public static Pair parseRange(String rangeString) { + if (rangeString == null) return null; + + String[] startEnd = rangeString.split("-"); + Preconditions.checkState( + startEnd.length == 2, "Shard range format: start-end (e.g. 1-4)"); + int start; + int end; + try { + start = Integer.parseInt(startEnd[0].trim()); + end = Integer.parseInt(startEnd[1].trim()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse shard range.", e); + } + + Preconditions.checkState( + start <= end, "The left-hand side of a shard range must be <= the right-hand side."); + return Pair.of(start, end); + } +}