Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D705011D90 for ; Thu, 12 Jun 2014 23:20:06 +0000 (UTC) Received: (qmail 72493 invoked by uid 500); 12 Jun 2014 23:20:06 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 72457 invoked by uid 500); 12 Jun 2014 23:20:06 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 72450 invoked by uid 99); 12 Jun 2014 23:20:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Jun 2014 23:20:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 753808C3E18; Thu, 12 Jun 2014 23:20:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vines@apache.org To: commits@accumulo.apache.org Date: Thu, 12 Jun 2014 23:20:06 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: Merge remote-tracking branch 'origin/1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT Merge remote-tracking branch 'origin/1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f34230f7 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f34230f7 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f34230f7 Branch: refs/heads/1.6.1-SNAPSHOT Commit: f34230f7bf32b31d3043aaa761514f6743382a4a Parents: 31aea2a c5aac49 Author: John Vines Authored: Thu Jun 12 19:20:04 2014 -0400 Committer: John Vines Committed: Thu Jun 12 19:20:04 2014 -0400 ---------------------------------------------------------------------- .../java/org/apache/accumulo/server/util/TServerUtils.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f34230f7/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java index 6d9e4c7,0000000..36487d6 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java @@@ -1,374 -1,0 +1,369 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.channels.ServerSocketChannel; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.core.util.SslConnectionParams; +import org.apache.accumulo.core.util.TBufferedSocket; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.metrics.ThriftMetrics; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TProcessorFactory; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import com.google.common.net.HostAndPort; + +public class TServerUtils { + private static final Logger log = Logger.getLogger(TServerUtils.class); + + public static final ThreadLocal clientAddress = new ThreadLocal(); + + public static class ServerAddress { + public final TServer server; + public final HostAndPort address; + + public ServerAddress(TServer server, HostAndPort address) { + this.server = server; + this.address = address; + } + } + + /** + * Start a server, at the given port, or higher, if that port is not available. + * + * @param portHintProperty + * the port to attempt to open, can be zero, meaning "any available port" + * @param processor + * the service to be started + * @param serverName + * the name of the class that is providing the service + * @param threadName + * name this service's thread for better debugging + * @return the server object created, and the port actually used + * @throws UnknownHostException + * when we don't know our own address + */ + public static ServerAddress startServer(AccumuloConfiguration conf, String address, Property portHintProperty, TProcessor processor, String serverName, + String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty) + throws UnknownHostException { + int portHint = conf.getPort(portHintProperty); + int minThreads = 2; + if (minThreadProperty != null) + minThreads = conf.getCount(minThreadProperty); + long timeBetweenThreadChecks = 1000; + if (timeBetweenThreadChecksProperty != null) + timeBetweenThreadChecks = conf.getTimeInMillis(timeBetweenThreadChecksProperty); + long maxMessageSize = 10 * 1000 * 1000; + if (maxMessageSizeProperty != null) + maxMessageSize = conf.getMemoryInBytes(maxMessageSizeProperty); + boolean portSearch = false; + if (portSearchProperty != null) + portSearch = conf.getBoolean(portSearchProperty); + // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once + TServerUtils.TimedProcessor timedProcessor = new TServerUtils.TimedProcessor(processor, serverName, threadName); + Random random = new Random(); + for (int j = 0; j < 100; j++) { + + // Are we going to slide around, looking for an open port? + int portsToSearch = 1; + if (portSearch) + portsToSearch = 1000; + + for (int i = 0; i < portsToSearch; i++) { + int port = portHint + i; + if (portHint != 0 && i > 0) + port = 1024 + random.nextInt(65535 - 1024); + if (port > 65535) + port = 1024 + port % (65535 - 1024); + try { + HostAndPort addr = HostAndPort.fromParts(address, port); + return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads, timeBetweenThreadChecks, maxMessageSize, + SslConnectionParams.forServer(conf), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); + } catch (TTransportException ex) { + log.error("Unable to start TServer", ex); + if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) { + // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less + // TTransportException, and with a TSocket created by TSSLTransportFactory, it + // comes through as caused by a BindException. + log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")"); + UtilWaitThread.sleep(250); + } else { + // thrift is passing up a nested exception that isn't a BindException, + // so no reason to believe retrying on a different port would help. + log.error("Unable to start TServer", ex); + break; + } + } + } + } + throw new UnknownHostException("Unable to find a listen port"); + } + + public static class TimedProcessor implements TProcessor { + + final TProcessor other; + ThriftMetrics metrics = null; + long idleStart = 0; + + TimedProcessor(TProcessor next, String serverName, String threadName) { + this.other = next; + // Register the metrics MBean + try { + metrics = new ThriftMetrics(serverName, threadName); + metrics.register(); + } catch (Exception e) { + log.error("Exception registering MBean with MBean Server", e); + } + idleStart = System.currentTimeMillis(); + } + + @Override + public boolean process(TProtocol in, TProtocol out) throws TException { + long now = 0; + if (metrics.isEnabled()) { + now = System.currentTimeMillis(); + metrics.add(ThriftMetrics.idle, (now - idleStart)); + } + try { - try { - return other.process(in, out); - } catch (NullPointerException ex) { - // THRIFT-1447 - remove with thrift 0.9 - return true; - } ++ return other.process(in, out); + } finally { + if (metrics.isEnabled()) { + idleStart = System.currentTimeMillis(); + metrics.add(ThriftMetrics.execute, idleStart - now); + } + } + } + } + + public static class ClientInfoProcessorFactory extends TProcessorFactory { + + public ClientInfoProcessorFactory(TProcessor processor) { + super(processor); + } + + @Override + public TProcessor getProcessor(TTransport trans) { + if (trans instanceof TBufferedSocket) { + TBufferedSocket tsock = (TBufferedSocket) trans; + clientAddress.set(tsock.getClientString()); + } else if (trans instanceof TSocket) { + TSocket tsock = (TSocket) trans; + clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort()); + } else { + log.warn("Unable to extract clientAddress from transport of type " + trans.getClass()); + } + return super.getProcessor(trans); + } + } + + public static class THsHaServer extends org.apache.thrift.server.THsHaServer { + public THsHaServer(Args args) { + super(args); + } + + @Override + protected Runnable getRunnable(FrameBuffer frameBuffer) { + return new Invocation(frameBuffer); + } + + private class Invocation implements Runnable { + + private final FrameBuffer frameBuffer; + + public Invocation(final FrameBuffer frameBuffer) { + this.frameBuffer = frameBuffer; + } + + @Override + public void run() { + if (frameBuffer.trans_ instanceof TNonblockingSocket) { + TNonblockingSocket tsock = (TNonblockingSocket) frameBuffer.trans_; + Socket sock = tsock.getSocketChannel().socket(); + clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort()); + } + frameBuffer.invoke(); + } + } + } + + public static ServerAddress createHsHaServer(HostAndPort address, TProcessor processor, final String serverName, String threadName, final int numThreads, + long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { + TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort())); + THsHaServer.Args options = new THsHaServer.Args(transport); + options.protocolFactory(ThriftUtil.protocolFactory()); + options.transportFactory(ThriftUtil.transportFactory(maxMessageSize)); + options.maxReadBufferBytes = maxMessageSize; + options.stopTimeoutVal(5); + /* + * Create our own very special thread pool. + */ + final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool"); + // periodically adjust the number of threads we need by checking how busy our threads are + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + if (pool.getCorePoolSize() <= pool.getActiveCount()) { + int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2); + log.info("Increasing server thread pool size on " + serverName + " to " + larger); + pool.setMaximumPoolSize(larger); + pool.setCorePoolSize(larger); + } else { + if (pool.getCorePoolSize() > pool.getActiveCount() + 3) { + int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1); + if (smaller != pool.getCorePoolSize()) { + // there is a race condition here... the active count could be higher by the time + // we decrease the core pool size... so the active count could end up higher than + // the core pool size, in which case everything will be queued... the increase case + // should handle this and prevent deadlock + log.info("Decreasing server thread pool size on " + serverName + " to " + smaller); + pool.setCorePoolSize(smaller); + } + } + } + } + }, timeBetweenThreadChecks, timeBetweenThreadChecks); + options.executorService(pool); + options.processorFactory(new TProcessorFactory(processor)); + if (address.getPort() == 0) { + address = HostAndPort.fromParts(address.getHostText(), transport.getPort()); + } + return new ServerAddress(new THsHaServer(options), address); + } + + public static ServerAddress createThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads) + throws TTransportException { + + // if port is zero, then we must bind to get the port number + ServerSocket sock; + try { + sock = ServerSocketChannel.open().socket(); + sock.setReuseAddress(true); + sock.bind(new InetSocketAddress(address.getHostText(), address.getPort())); + address = HostAndPort.fromParts(address.getHostText(), sock.getLocalPort()); + } catch (IOException ex) { + throw new TTransportException(ex); + } + TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024); + return new ServerAddress(createThreadPoolServer(transport, processor), address); + } + + public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) { + TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); + options.protocolFactory(ThriftUtil.protocolFactory()); + options.transportFactory(ThriftUtil.transportFactory()); + options.processorFactory(new ClientInfoProcessorFactory(processor)); + return new TThreadPoolServer(options); + } + + public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams) + throws TTransportException { + org.apache.thrift.transport.TServerSocket transport; + try { + transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams); + } catch (UnknownHostException e) { + throw new TTransportException(e); + } + if (address.getPort() == 0) { + address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort()); + } + return new ServerAddress(createThreadPoolServer(transport, processor), address); + } + + public static ServerAddress startTServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, + long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException { + return startTServer(address, new TimedProcessor(processor, serverName, threadName), serverName, threadName, numThreads, timeBetweenThreadChecks, + maxMessageSize, sslParams, sslSocketTimeout); + } + + public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads, + long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException { + + ServerAddress serverAddress; + if (sslParams != null) { + serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams); + } else { + serverAddress = createHsHaServer(address, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize); + } + final TServer finalServer = serverAddress.server; + Runnable serveTask = new Runnable() { + @Override + public void run() { + try { + finalServer.serve(); + } catch (Error e) { + Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting."); + } + } + }; + serveTask = new LoggingRunnable(TServerUtils.log, serveTask); + Thread thread = new Daemon(serveTask, threadName); + thread.start(); + // check for the special "bind to everything address" + if (serverAddress.address.getHostText().equals("0.0.0.0")) { + // can't get the address from the bind, so we'll do our best to invent our hostname + try { + serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort())); + } catch (UnknownHostException e) { + throw new TTransportException(e); + } + } + return serverAddress; + } + + // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown. + public static void stopTServer(TServer s) { + if (s == null) + return; + s.stop(); + try { + Field f = s.getClass().getDeclaredField("executorService_"); + f.setAccessible(true); + ExecutorService es = (ExecutorService) f.get(s); + es.shutdownNow(); + } catch (Exception e) { + TServerUtils.log.error("Unable to call shutdownNow", e); + } + } +}