accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [4/5] accumulo git commit: ACCUMULO-3394 Consolidate thrift related classes in one package.
Date Tue, 09 Dec 2014 03:11:57 GMT
ACCUMULO-3394 Consolidate thrift related classes in one package.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f25d8505
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f25d8505
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f25d8505

Branch: refs/heads/master
Commit: f25d850561c4da093222621ae0ea5f1ab8079dd0
Parents: ab4cc79
Author: Josh Elser <elserj@apache.org>
Authored: Mon Dec 8 17:34:32 2014 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Mon Dec 8 22:04:17 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/accumulo/proxy/Proxy.java   |   2 +-
 .../security/AuditedSecurityOperation.java      |   2 +-
 .../thrift/ClientInfoProcessorFactory.java      |  53 +++
 .../server/thrift/CustomNonBlockingServer.java  | 268 +++++++++++++++
 .../accumulo/server/thrift/RpcWrapper.java      |  62 ++++
 .../accumulo/server/thrift/ServerAddress.java   |  42 +++
 .../server/thrift/TBufferedServerSocket.java    |  71 ++++
 .../server/thrift/TNonblockingServerSocket.java | 157 +++++++++
 .../accumulo/server/thrift/TServerUtils.java    | 272 +++++++++++++++
 .../accumulo/server/thrift/TimedProcessor.java  |  69 ++++
 .../server/util/CustomNonBlockingServer.java    | 268 ---------------
 .../apache/accumulo/server/util/RpcWrapper.java |  62 ----
 .../server/util/TBufferedServerSocket.java      |  71 ----
 .../server/util/TNonblockingServerSocket.java   | 157 ---------
 .../accumulo/server/util/TServerUtils.java      | 342 -------------------
 .../accumulo/server/util/TServerUtilsTest.java  |   3 +
 .../accumulo/gc/SimpleGarbageCollector.java     |   4 +-
 .../java/org/apache/accumulo/master/Master.java |   6 +-
 .../apache/accumulo/tserver/TabletServer.java   |   6 +-
 .../accumulo/tserver/session/Session.java       |   2 +-
 .../accumulo/test/functional/ZombieTServer.java |  32 +-
 .../test/performance/thrift/NullTserver.java    |   2 +-
 22 files changed, 1025 insertions(+), 928 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 972eee7..0a7b301 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -27,7 +27,7 @@ import org.apache.accumulo.core.cli.Help;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.proxy.thrift.AccumuloProxy;
-import org.apache.accumulo.server.util.RpcWrapper;
+import org.apache.accumulo.server.thrift.RpcWrapper;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TCompactProtocol;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
index e473822..e09f7fd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
@@ -47,7 +47,7 @@ import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.security.handler.Authenticator;
 import org.apache.accumulo.server.security.handler.Authorizor;
 import org.apache.accumulo.server.security.handler.PermissionHandler;
-import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.thrift.TServerUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
new file mode 100644
index 0000000..208fde5
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import org.apache.accumulo.core.util.TBufferedSocket;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sets the address of a client in a ThreadLocal to allow for more informative log messages.
+ */
+public class ClientInfoProcessorFactory extends TProcessorFactory {
+  private static final Logger log = LoggerFactory.getLogger(ClientInfoProcessorFactory.class);
+
+  private final ThreadLocal<String> clientAddress;
+
+  public ClientInfoProcessorFactory(ThreadLocal<String> clientAddress, TProcessor processor) {
+    super(processor);
+    this.clientAddress = clientAddress;
+  }
+
+  @Override
+  public TProcessor getProcessor(TTransport trans) {
+    if (trans instanceof TBufferedSocket) {
+      TBufferedSocket tsock = (TBufferedSocket) trans;
+      clientAddress.set(tsock.getClientString());
+    } else if (trans instanceof TSocket) {
+      TSocket tsock = (TSocket) trans;
+      clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort());
+    } else {
+      log.warn("Unable to extract clientAddress from transport of type {}", trans.getClass());
+    }
+    return super.getProcessor(trans);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
new file mode 100644
index 0000000..ceb0a42
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * This class implements a custom non-blocking thrift server, incorporating the {@link THsHaServer} features, and overriding the underlying
+ * {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
+ * {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with
+ * one that reveals the client address from its transport.
+ * 
+ * <p>
+ * The justification for this is explained in https://issues.apache.org/jira/browse/ACCUMULO-1691, and is needed due to the repeated regressions:
+ * <ul>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-958</li>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-1464</li>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-2173</li>
+ * </ul>
+ * 
+ * <p>
+ * This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
+ * instantiating a custom FrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and
+ * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer}. Because of this, any change in the implementation upstream will require a review
+ * of this implementation here, to ensure any new bugfixes/features in the upstream Thrift class are also applied here, at least until
+ * https://issues.apache.org/jira/browse/THRIFT-2173 is implemented. In the meantime, the maven-enforcer-plugin ensures that Thrift remains at version 0.9.1,
+ * which has been reviewed and tested.
+ */
+public class CustomNonBlockingServer extends THsHaServer {
+
+  private static final Logger LOGGER = Logger.getLogger(CustomNonBlockingServer.class);
+  private SelectAcceptThread selectAcceptThread_;
+  private volatile boolean stopped_ = false;
+
+  public CustomNonBlockingServer(Args args) {
+    super(args);
+  }
+
+  @Override
+  protected Runnable getRunnable(final FrameBuffer frameBuffer) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        if (frameBuffer instanceof CustomNonblockingFrameBuffer) {
+          TNonblockingTransport trans = ((CustomNonblockingFrameBuffer) frameBuffer).getTransport();
+          if (trans instanceof TNonblockingSocket) {
+            TNonblockingSocket tsock = (TNonblockingSocket) trans;
+            Socket sock = tsock.getSocketChannel().socket();
+            TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+          }
+        }
+        frameBuffer.invoke();
+      }
+    };
+  }
+
+  @Override
+  protected boolean startThreads() {
+    // start the selector
+    try {
+      selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
+      selectAcceptThread_.start();
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to start selector thread!", e);
+      return false;
+    }
+  }
+
+  @Override
+  public void stop() {
+    stopped_ = true;
+    if (selectAcceptThread_ != null) {
+      selectAcceptThread_.wakeupSelector();
+    }
+  }
+
+  @Override
+  public boolean isStopped() {
+    return selectAcceptThread_.isStopped();
+  }
+
+  @Override
+  protected void joinSelector() {
+    // wait until the selector thread exits
+    try {
+      selectAcceptThread_.join();
+    } catch (InterruptedException e) {
+      // for now, just silently ignore. technically this means we'll have less of
+      // a graceful shutdown as a result.
+    }
+  }
+
+  private interface CustomNonblockingFrameBuffer {
+    TNonblockingTransport getTransport();
+  }
+
+  private class CustomAsyncFrameBuffer extends AsyncFrameBuffer implements CustomNonblockingFrameBuffer {
+    private TNonblockingTransport trans;
+
+    public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+      super(trans, selectionKey, selectThread);
+      this.trans = trans;
+    }
+
+    @Override
+    public TNonblockingTransport getTransport() {
+      return trans;
+    }
+  }
+
+  private class CustomFrameBuffer extends FrameBuffer implements CustomNonblockingFrameBuffer {
+    private TNonblockingTransport trans;
+
+    public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+      super(trans, selectionKey, selectThread);
+      this.trans = trans;
+    }
+
+    @Override
+    public TNonblockingTransport getTransport() {
+      return trans;
+    }
+  }
+
+  // @formatter:off
+  private class SelectAcceptThread extends AbstractSelectThread {
+
+    // The server transport on which new client transports will be accepted
+    private final TNonblockingServerTransport serverTransport;
+
+    /**
+     * Set up the thread that will handle the non-blocking accepts, reads, and
+     * writes.
+     */
+    public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
+    throws IOException {
+      this.serverTransport = serverTransport;
+      serverTransport.registerSelector(selector);
+    }
+
+    public boolean isStopped() {
+      return stopped_;
+    }
+
+    /**
+     * The work loop. Handles both selecting (all IO operations) and managing
+     * the selection preferences of all existing connections.
+     */
+    @Override
+    public void run() {
+      try {
+        if (eventHandler_ != null) {
+          eventHandler_.preServe();
+        }
+
+        while (!stopped_) {
+          select();
+          processInterestChanges();
+        }
+        for (SelectionKey selectionKey : selector.keys()) {
+          cleanupSelectionKey(selectionKey);
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        stopped_ = true;
+      }
+    }
+
+    /**
+     * Select and process IO events appropriately:
+     * If there are connections to be accepted, accept them.
+     * If there are existing connections with data waiting to be read, read it,
+     * buffering until a whole frame has been read.
+     * If there are any pending responses, buffer them until their target client
+     * is available, and then send the data.
+     */
+    private void select() {
+      try {
+        // wait for io events.
+        selector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            cleanupSelectionKey(key);
+            continue;
+          }
+
+          // if the key is marked Accept, then it has to be the server
+          // transport.
+          if (key.isAcceptable()) {
+            handleAccept();
+          } else if (key.isReadable()) {
+            // deal with reads
+            handleRead(key);
+          } else if (key.isWritable()) {
+            // deal with writes
+            handleWrite(key);
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    /**
+     * Accept a new connection.
+     */
+    @SuppressWarnings("unused")
+    private void handleAccept() throws IOException {
+      SelectionKey clientKey = null;
+      TNonblockingTransport client = null;
+      try {
+        // accept the connection
+        client = (TNonblockingTransport)serverTransport.accept();
+        clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
+
+        // add this key to the map
+          FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
+                  new CustomAsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
+                  new CustomFrameBuffer(client, clientKey,SelectAcceptThread.this);
+
+          clientKey.attach(frameBuffer);
+      } catch (TTransportException tte) {
+        // something went wrong accepting.
+        LOGGER.warn("Exception trying to accept!", tte);
+        tte.printStackTrace();
+        if (clientKey != null) cleanupSelectionKey(clientKey);
+        if (client != null) client.close();
+      }
+    }
+  } // SelectAcceptThread
+  // @formatter:on
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
new file mode 100644
index 0000000..db1863b
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
+import org.apache.accumulo.core.trace.wrappers.TraceWrap;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to
+ * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like
+ * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as
+ * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
+ * 
+ * @since 1.6.1
+ */
+public class RpcWrapper {
+
+  public static <T> T service(final T instance) {
+    InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) {
+      @Override
+      public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
+        try {
+          return super.invoke(obj, method, args);
+        } catch (RuntimeException e) {
+          String msg = e.getMessage();
+          LoggerFactory.getLogger(instance.getClass()).error(msg, e);
+          throw new TException(msg);
+        } catch (Error e) {
+          String msg = e.getMessage();
+          LoggerFactory.getLogger(instance.getClass()).error(msg, e);
+          throw new TException(msg);
+        }
+      }
+    };
+
+    @SuppressWarnings("unchecked")
+    T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler);
+    return proxiedInstance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
new file mode 100644
index 0000000..f52951e
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import org.apache.thrift.server.TServer;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Encapsulate a Thrift server and the address, host and port, to which it is bound.
+ */
+public class ServerAddress {
+  public final TServer server;
+  public final HostAndPort address;
+
+  public ServerAddress(TServer server, HostAndPort address) {
+    this.server = server;
+    this.address = address;
+  }
+
+  public TServer getServer() {
+    return server;
+  }
+
+  public HostAndPort getAddress() {
+    return address;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
new file mode 100644
index 0000000..bf35bdf
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import org.apache.accumulo.core.util.TBufferedSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+// Thrift-959 removed the small buffer from TSocket; this adds it back for servers
+public class TBufferedServerSocket extends TServerTransport {
+  
+  // expose acceptImpl
+  static class TServerSocket extends org.apache.thrift.transport.TServerSocket {
+    public TServerSocket(ServerSocket serverSocket) {
+      super(serverSocket);
+    }
+    
+    public TSocket acceptImplPublic() throws TTransportException {
+      return acceptImpl();
+    }
+  }
+  
+  final TServerSocket impl;
+  final int bufferSize;
+  
+  public TBufferedServerSocket(ServerSocket serverSocket, int bufferSize) {
+    this.impl = new TServerSocket(serverSocket);
+    this.bufferSize = bufferSize;
+  }
+  
+  @Override
+  public void listen() throws TTransportException {
+    impl.listen();
+  }
+  
+  @Override
+  public void close() {
+    impl.close();
+  }
+  
+  // Wrap accepted sockets using buffered IO
+  @Override
+  protected TTransport acceptImpl() throws TTransportException {
+    TSocket sock = impl.acceptImplPublic();
+    try {
+      return new TBufferedSocket(sock, this.bufferSize);
+    } catch (IOException e) {
+      throw new TTransportException(e);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
new file mode 100644
index 0000000..f4b2df4
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.server.thrift;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Wrapper around ServerSocketChannel.
+ * 
+ * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
+ * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+  private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
+
+  /**
+   * This channel is where all the nonblocking magic happens.
+   */
+  private ServerSocketChannel serverSocketChannel = null;
+
+  /**
+   * Underlying ServerSocket object
+   */
+  private ServerSocket serverSocket_ = null;
+
+  /**
+   * Timeout for client sockets from accept
+   */
+  private int clientTimeout_ = 0;
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port) throws TTransportException {
+    this(port, 0);
+  }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
+    this(new InetSocketAddress(port), clientTimeout);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+    this(bindAddr, 0);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+    clientTimeout_ = clientTimeout;
+    try {
+      serverSocketChannel = ServerSocketChannel.open();
+      serverSocketChannel.configureBlocking(false);
+
+      // Make server socket
+      serverSocket_ = serverSocketChannel.socket();
+      // Prevent 2MSL delay problem on server restarts
+      serverSocket_.setReuseAddress(true);
+      // Bind to listening port
+      serverSocket_.bind(bindAddr);
+    } catch (IOException ioe) {
+      serverSocket_ = null;
+      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+    }
+  }
+
+  public void listen() throws TTransportException {
+    // Make sure not to block on accept
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.setSoTimeout(0);
+      } catch (SocketException sx) {
+        log.error("SocketException caused by serverSocket in listen()", sx);
+      }
+    }
+  }
+
+  protected TNonblockingSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+    }
+    try {
+      SocketChannel socketChannel = serverSocketChannel.accept();
+      if (socketChannel == null) {
+        return null;
+      }
+
+      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+      tsocket.setTimeout(clientTimeout_);
+      return tsocket;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void registerSelector(Selector selector) {
+    try {
+      // Register the server socket channel, indicating an interest in
+      // accepting new connections
+      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+    } catch (ClosedChannelException e) {
+      // this shouldn't happen, ideally...
+      // TODO: decide what to do with this.
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        log.warn("WARNING: Could not close server socket: " + iox.getMessage());
+      }
+      serverSocket_ = null;
+    }
+  }
+
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
+  public int getPort() {
+    return serverSocket_.getLocalPort();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
new file mode 100644
index 0000000..ca182d4
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.SslConnectionParams;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.net.HostAndPort;
+
+public class TServerUtils {
+  private static final Logger log = Logger.getLogger(TServerUtils.class);
+
+  public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
+
+  /**
+   * Start a server, at the given port, or higher, if that port is not available.
+   *
+   * @param portHintProperty
+   *          the port to attempt to open, can be zero, meaning "any available port"
+   * @param processor
+   *          the service to be started
+   * @param serverName
+   *          the name of the class that is providing the service
+   * @param threadName
+   *          name this service's thread for better debugging
+   * @return the server object created, and the port actually used
+   * @throws UnknownHostException
+   *           when we don't know our own address
+   */
+  public static ServerAddress startServer(AccumuloServerContext service, String address, Property portHintProperty, TProcessor processor, String serverName,
+      String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
+      throws UnknownHostException {
+    int portHint = service.getConfiguration().getPort(portHintProperty);
+    int minThreads = 2;
+    if (minThreadProperty != null)
+      minThreads = service.getConfiguration().getCount(minThreadProperty);
+    long timeBetweenThreadChecks = 1000;
+    if (timeBetweenThreadChecksProperty != null)
+      timeBetweenThreadChecks = service.getConfiguration().getTimeInMillis(timeBetweenThreadChecksProperty);
+    long maxMessageSize = 10 * 1000 * 1000;
+    if (maxMessageSizeProperty != null)
+      maxMessageSize = service.getConfiguration().getMemoryInBytes(maxMessageSizeProperty);
+    boolean portSearch = false;
+    if (portSearchProperty != null)
+      portSearch = service.getConfiguration().getBoolean(portSearchProperty);
+    // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
+    TimedProcessor timedProcessor = new TimedProcessor(service.getConfiguration(), processor, serverName, threadName);
+    Random random = new Random();
+    for (int j = 0; j < 100; j++) {
+
+      // Are we going to slide around, looking for an open port?
+      int portsToSearch = 1;
+      if (portSearch)
+        portsToSearch = 1000;
+
+      for (int i = 0; i < portsToSearch; i++) {
+        int port = portHint + i;
+        if (portHint != 0 && i > 0)
+          port = 1024 + random.nextInt(65535 - 1024);
+        if (port > 65535)
+          port = 1024 + port % (65535 - 1024);
+        try {
+          HostAndPort addr = HostAndPort.fromParts(address, port);
+          return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads,
+              service.getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), timeBetweenThreadChecks, maxMessageSize,
+              service.getServerSslParams(), service.getClientTimeoutInMillis());
+        } catch (TTransportException ex) {
+          log.error("Unable to start TServer", ex);
+          if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
+            // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
+            // TTransportException, and with a TSocket created by TSSLTransportFactory, it
+            // comes through as caused by a BindException.
+            log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
+            UtilWaitThread.sleep(250);
+          } else {
+            // thrift is passing up a nested exception that isn't a BindException,
+            // so no reason to believe retrying on a different port would help.
+            log.error("Unable to start TServer", ex);
+            break;
+          }
+        }
+      }
+    }
+    throw new UnknownHostException("Unable to find a listen port");
+  }
+
+  public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName,
+      final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+    TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
+    CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
+    options.protocolFactory(ThriftUtil.protocolFactory());
+    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+    options.maxReadBufferBytes = maxMessageSize;
+    options.stopTimeoutVal(5);
+    /*
+     * Create our own very special thread pool.
+     */
+    final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+    // periodically adjust the number of threads we need by checking how busy our threads are
+    SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
+      @Override
+      public void run() {
+        if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+          int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+          log.info("Increasing server thread pool size on " + serverName + " to " + larger);
+          pool.setMaximumPoolSize(larger);
+          pool.setCorePoolSize(larger);
+        } else {
+          if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+            int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
+            if (smaller != pool.getCorePoolSize()) {
+              // ACCUMULO-2997 there is a race condition here... the active count could be higher by the time
+              // we decrease the core pool size... so the active count could end up higher than
+              // the core pool size, in which case everything will be queued... the increase case
+              // should handle this and prevent deadlock
+              log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
+              pool.setCorePoolSize(smaller);
+            }
+          }
+        }
+      }
+    }, timeBetweenThreadChecks, timeBetweenThreadChecks);
+    options.executorService(pool);
+    options.processorFactory(new TProcessorFactory(processor));
+    if (address.getPort() == 0) {
+      address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
+    }
+    return new ServerAddress(new CustomNonBlockingServer(options), address);
+  }
+
+  public static ServerAddress createThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads)
+      throws TTransportException {
+
+    // if port is zero, then we must bind to get the port number
+    ServerSocket sock;
+    try {
+      sock = ServerSocketChannel.open().socket();
+      sock.setReuseAddress(true);
+      sock.bind(new InetSocketAddress(address.getHostText(), address.getPort()));
+      address = HostAndPort.fromParts(address.getHostText(), sock.getLocalPort());
+    } catch (IOException ex) {
+      throw new TTransportException(ex);
+    }
+    TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024);
+    return new ServerAddress(createThreadPoolServer(transport, processor), address);
+  }
+
+  public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
+    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+    options.protocolFactory(ThriftUtil.protocolFactory());
+    options.transportFactory(ThriftUtil.transportFactory());
+    options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
+    return new TThreadPoolServer(options);
+  }
+
+  public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
+      throws TTransportException {
+    org.apache.thrift.transport.TServerSocket transport;
+    try {
+      transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
+    } catch (UnknownHostException e) {
+      throw new TTransportException(e);
+    }
+    if (address.getPort() == 0) {
+      address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+    }
+    return new ServerAddress(createThreadPoolServer(transport, processor), address);
+  }
+
+  public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads,
+      long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+    return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
+        timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
+  }
+
+  /**
+   * Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters. Non-null SSL parameters will cause an SSL server to be started.
+   *
+   * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
+   */
+  public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
+    int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+
+    ServerAddress serverAddress;
+    if (sslParams != null) {
+      serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
+    } else {
+      serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+    }
+    final TServer finalServer = serverAddress.server;
+    Runnable serveTask = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          finalServer.serve();
+        } catch (Error e) {
+          Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+        }
+      }
+    };
+    serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
+    Thread thread = new Daemon(serveTask, threadName);
+    thread.start();
+    // check for the special "bind to everything address"
+    if (serverAddress.address.getHostText().equals("0.0.0.0")) {
+      // can't get the address from the bind, so we'll do our best to invent our hostname
+      try {
+        serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
+      } catch (UnknownHostException e) {
+        throw new TTransportException(e);
+      }
+    }
+    return serverAddress;
+  }
+
+  // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
+  public static void stopTServer(TServer s) {
+    if (s == null)
+      return;
+    s.stop();
+    try {
+      Field f = s.getClass().getDeclaredField("executorService_");
+      f.setAccessible(true);
+      ExecutorService es = (ExecutorService) f.get(s);
+      es.shutdownNow();
+    } catch (Exception e) {
+      TServerUtils.log.error("Unable to call shutdownNow", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/TimedProcessor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TimedProcessor.java
new file mode 100644
index 0000000..56c235c
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TimedProcessor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.server.metrics.Metrics;
+import org.apache.accumulo.server.metrics.MetricsFactory;
+import org.apache.accumulo.server.metrics.ThriftMetrics;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link TProcessor} which tracks the duration of an RPC and adds it to the metrics subsystem.
+ */
+public class TimedProcessor implements TProcessor {
+  private static final Logger log = LoggerFactory.getLogger(TimedProcessor.class);
+
+  private final TProcessor other;
+  private final Metrics metrics;
+  private long idleStart = 0;
+
+  public TimedProcessor(AccumuloConfiguration conf, TProcessor next, String serverName, String threadName) {
+    this.other = next;
+    // Register the metrics MBean
+    MetricsFactory factory = new MetricsFactory(conf);
+    metrics = factory.createThriftMetrics(serverName, threadName);
+    try {
+      metrics.register();
+    } catch (Exception e) {
+      log.error("Exception registering MBean with MBean Server", e);
+    }
+    idleStart = System.currentTimeMillis();
+  }
+
+  @Override
+  public boolean process(TProtocol in, TProtocol out) throws TException {
+    long now = 0;
+    final boolean metricsEnabled = metrics.isEnabled();
+    if (metricsEnabled) {
+      now = System.currentTimeMillis();
+      metrics.add(ThriftMetrics.idle, (now - idleStart));
+    }
+    try {
+      return other.process(in, out);
+    } finally {
+      if (metricsEnabled) {
+        idleStart = System.currentTimeMillis();
+        metrics.add(ThriftMetrics.execute, idleStart - now);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
deleted file mode 100644
index 0f01068..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.nio.channels.SelectionKey;
-import java.util.Iterator;
-
-import org.apache.log4j.Logger;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TNonblockingTransport;
-import org.apache.thrift.transport.TTransportException;
-
-/**
- * This class implements a custom non-blocking thrift server, incorporating the {@link THsHaServer} features, and overriding the underlying
- * {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
- * {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with
- * one that reveals the client address from its transport.
- * 
- * <p>
- * The justification for this is explained in https://issues.apache.org/jira/browse/ACCUMULO-1691, and is needed due to the repeated regressions:
- * <ul>
- * <li>https://issues.apache.org/jira/browse/THRIFT-958</li>
- * <li>https://issues.apache.org/jira/browse/THRIFT-1464</li>
- * <li>https://issues.apache.org/jira/browse/THRIFT-2173</li>
- * </ul>
- * 
- * <p>
- * This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
- * instantiating a custom FrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and
- * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer}. Because of this, any change in the implementation upstream will require a review
- * of this implementation here, to ensure any new bugfixes/features in the upstream Thrift class are also applied here, at least until
- * https://issues.apache.org/jira/browse/THRIFT-2173 is implemented. In the meantime, the maven-enforcer-plugin ensures that Thrift remains at version 0.9.1,
- * which has been reviewed and tested.
- */
-public class CustomNonBlockingServer extends THsHaServer {
-
-  private static final Logger LOGGER = Logger.getLogger(CustomNonBlockingServer.class);
-  private SelectAcceptThread selectAcceptThread_;
-  private volatile boolean stopped_ = false;
-
-  public CustomNonBlockingServer(Args args) {
-    super(args);
-  }
-
-  @Override
-  protected Runnable getRunnable(final FrameBuffer frameBuffer) {
-    return new Runnable() {
-      @Override
-      public void run() {
-        if (frameBuffer instanceof CustomNonblockingFrameBuffer) {
-          TNonblockingTransport trans = ((CustomNonblockingFrameBuffer) frameBuffer).getTransport();
-          if (trans instanceof TNonblockingSocket) {
-            TNonblockingSocket tsock = (TNonblockingSocket) trans;
-            Socket sock = tsock.getSocketChannel().socket();
-            TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
-          }
-        }
-        frameBuffer.invoke();
-      }
-    };
-  }
-
-  @Override
-  protected boolean startThreads() {
-    // start the selector
-    try {
-      selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
-      selectAcceptThread_.start();
-      return true;
-    } catch (IOException e) {
-      LOGGER.error("Failed to start selector thread!", e);
-      return false;
-    }
-  }
-
-  @Override
-  public void stop() {
-    stopped_ = true;
-    if (selectAcceptThread_ != null) {
-      selectAcceptThread_.wakeupSelector();
-    }
-  }
-
-  @Override
-  public boolean isStopped() {
-    return selectAcceptThread_.isStopped();
-  }
-
-  @Override
-  protected void joinSelector() {
-    // wait until the selector thread exits
-    try {
-      selectAcceptThread_.join();
-    } catch (InterruptedException e) {
-      // for now, just silently ignore. technically this means we'll have less of
-      // a graceful shutdown as a result.
-    }
-  }
-
-  private interface CustomNonblockingFrameBuffer {
-    TNonblockingTransport getTransport();
-  }
-
-  private class CustomAsyncFrameBuffer extends AsyncFrameBuffer implements CustomNonblockingFrameBuffer {
-    private TNonblockingTransport trans;
-
-    public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
-      super(trans, selectionKey, selectThread);
-      this.trans = trans;
-    }
-
-    @Override
-    public TNonblockingTransport getTransport() {
-      return trans;
-    }
-  }
-
-  private class CustomFrameBuffer extends FrameBuffer implements CustomNonblockingFrameBuffer {
-    private TNonblockingTransport trans;
-
-    public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
-      super(trans, selectionKey, selectThread);
-      this.trans = trans;
-    }
-
-    @Override
-    public TNonblockingTransport getTransport() {
-      return trans;
-    }
-  }
-
-  // @formatter:off
-  private class SelectAcceptThread extends AbstractSelectThread {
-
-    // The server transport on which new client transports will be accepted
-    private final TNonblockingServerTransport serverTransport;
-
-    /**
-     * Set up the thread that will handle the non-blocking accepts, reads, and
-     * writes.
-     */
-    public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
-    throws IOException {
-      this.serverTransport = serverTransport;
-      serverTransport.registerSelector(selector);
-    }
-
-    public boolean isStopped() {
-      return stopped_;
-    }
-
-    /**
-     * The work loop. Handles both selecting (all IO operations) and managing
-     * the selection preferences of all existing connections.
-     */
-    @Override
-    public void run() {
-      try {
-        if (eventHandler_ != null) {
-          eventHandler_.preServe();
-        }
-
-        while (!stopped_) {
-          select();
-          processInterestChanges();
-        }
-        for (SelectionKey selectionKey : selector.keys()) {
-          cleanupSelectionKey(selectionKey);
-        }
-      } catch (Throwable t) {
-        LOGGER.error("run() exiting due to uncaught error", t);
-      } finally {
-        stopped_ = true;
-      }
-    }
-
-    /**
-     * Select and process IO events appropriately:
-     * If there are connections to be accepted, accept them.
-     * If there are existing connections with data waiting to be read, read it,
-     * buffering until a whole frame has been read.
-     * If there are any pending responses, buffer them until their target client
-     * is available, and then send the data.
-     */
-    private void select() {
-      try {
-        // wait for io events.
-        selector.select();
-
-        // process the io events we received
-        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
-        while (!stopped_ && selectedKeys.hasNext()) {
-          SelectionKey key = selectedKeys.next();
-          selectedKeys.remove();
-
-          // skip if not valid
-          if (!key.isValid()) {
-            cleanupSelectionKey(key);
-            continue;
-          }
-
-          // if the key is marked Accept, then it has to be the server
-          // transport.
-          if (key.isAcceptable()) {
-            handleAccept();
-          } else if (key.isReadable()) {
-            // deal with reads
-            handleRead(key);
-          } else if (key.isWritable()) {
-            // deal with writes
-            handleWrite(key);
-          } else {
-            LOGGER.warn("Unexpected state in select! " + key.interestOps());
-          }
-        }
-      } catch (IOException e) {
-        LOGGER.warn("Got an IOException while selecting!", e);
-      }
-    }
-
-    /**
-     * Accept a new connection.
-     */
-    @SuppressWarnings("unused")
-    private void handleAccept() throws IOException {
-      SelectionKey clientKey = null;
-      TNonblockingTransport client = null;
-      try {
-        // accept the connection
-        client = (TNonblockingTransport)serverTransport.accept();
-        clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
-
-        // add this key to the map
-          FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
-                  new CustomAsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
-                  new CustomFrameBuffer(client, clientKey,SelectAcceptThread.this);
-
-          clientKey.attach(frameBuffer);
-      } catch (TTransportException tte) {
-        // something went wrong accepting.
-        LOGGER.warn("Exception trying to accept!", tte);
-        tte.printStackTrace();
-        if (clientKey != null) cleanupSelectionKey(clientKey);
-        if (client != null) client.close();
-      }
-    }
-  } // SelectAcceptThread
-  // @formatter:on
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java
deleted file mode 100644
index 2464a15..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-
-import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
-import org.apache.accumulo.core.trace.wrappers.TraceWrap;
-import org.apache.thrift.TApplicationException;
-import org.apache.thrift.TException;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to
- * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like
- * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as
- * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
- * 
- * @since 1.6.1
- */
-public class RpcWrapper {
-
-  public static <T> T service(final T instance) {
-    InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) {
-      @Override
-      public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
-        try {
-          return super.invoke(obj, method, args);
-        } catch (RuntimeException e) {
-          String msg = e.getMessage();
-          LoggerFactory.getLogger(instance.getClass()).error(msg, e);
-          throw new TException(msg);
-        } catch (Error e) {
-          String msg = e.getMessage();
-          LoggerFactory.getLogger(instance.getClass()).error(msg, e);
-          throw new TException(msg);
-        }
-      }
-    };
-
-    @SuppressWarnings("unchecked")
-    T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler);
-    return proxiedInstance;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java
deleted file mode 100644
index 2962a52..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-
-import org.apache.accumulo.core.util.TBufferedSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-// Thrift-959 removed the small buffer from TSocket; this adds it back for servers
-public class TBufferedServerSocket extends TServerTransport {
-  
-  // expose acceptImpl
-  static class TServerSocket extends org.apache.thrift.transport.TServerSocket {
-    public TServerSocket(ServerSocket serverSocket) {
-      super(serverSocket);
-    }
-    
-    public TSocket acceptImplPublic() throws TTransportException {
-      return acceptImpl();
-    }
-  }
-  
-  final TServerSocket impl;
-  final int bufferSize;
-  
-  public TBufferedServerSocket(ServerSocket serverSocket, int bufferSize) {
-    this.impl = new TServerSocket(serverSocket);
-    this.bufferSize = bufferSize;
-  }
-  
-  @Override
-  public void listen() throws TTransportException {
-    impl.listen();
-  }
-  
-  @Override
-  public void close() {
-    impl.close();
-  }
-  
-  // Wrap accepted sockets using buffered IO
-  @Override
-  protected TTransport acceptImpl() throws TTransportException {
-    TSocket sock = impl.acceptImplPublic();
-    try {
-      return new TBufferedSocket(sock, this.bufferSize);
-    } catch (IOException e) {
-      throw new TTransportException(e);
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
deleted file mode 100644
index d1cdd8e..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.accumulo.server.util;
-
-import org.apache.log4j.Logger;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TTransportException;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-/**
- * Wrapper around ServerSocketChannel.
- * 
- * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
- * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
- */
-public class TNonblockingServerSocket extends TNonblockingServerTransport {
-  private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
-
-  /**
-   * This channel is where all the nonblocking magic happens.
-   */
-  private ServerSocketChannel serverSocketChannel = null;
-
-  /**
-   * Underlying ServerSocket object
-   */
-  private ServerSocket serverSocket_ = null;
-
-  /**
-   * Timeout for client sockets from accept
-   */
-  private int clientTimeout_ = 0;
-
-  /**
-   * Creates just a port listening server socket
-   */
-  public TNonblockingServerSocket(int port) throws TTransportException {
-    this(port, 0);
-  }
-
-  /**
-   * Creates just a port listening server socket
-   */
-  public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
-    this(new InetSocketAddress(port), clientTimeout);
-  }
-
-  public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
-    this(bindAddr, 0);
-  }
-
-  public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
-    clientTimeout_ = clientTimeout;
-    try {
-      serverSocketChannel = ServerSocketChannel.open();
-      serverSocketChannel.configureBlocking(false);
-
-      // Make server socket
-      serverSocket_ = serverSocketChannel.socket();
-      // Prevent 2MSL delay problem on server restarts
-      serverSocket_.setReuseAddress(true);
-      // Bind to listening port
-      serverSocket_.bind(bindAddr);
-    } catch (IOException ioe) {
-      serverSocket_ = null;
-      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
-    }
-  }
-
-  public void listen() throws TTransportException {
-    // Make sure not to block on accept
-    if (serverSocket_ != null) {
-      try {
-        serverSocket_.setSoTimeout(0);
-      } catch (SocketException sx) {
-        log.error("SocketException caused by serverSocket in listen()", sx);
-      }
-    }
-  }
-
-  protected TNonblockingSocket acceptImpl() throws TTransportException {
-    if (serverSocket_ == null) {
-      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
-    }
-    try {
-      SocketChannel socketChannel = serverSocketChannel.accept();
-      if (socketChannel == null) {
-        return null;
-      }
-
-      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
-      tsocket.setTimeout(clientTimeout_);
-      return tsocket;
-    } catch (IOException iox) {
-      throw new TTransportException(iox);
-    }
-  }
-
-  public void registerSelector(Selector selector) {
-    try {
-      // Register the server socket channel, indicating an interest in
-      // accepting new connections
-      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
-    } catch (ClosedChannelException e) {
-      // this shouldn't happen, ideally...
-      // TODO: decide what to do with this.
-    }
-  }
-
-  public void close() {
-    if (serverSocket_ != null) {
-      try {
-        serverSocket_.close();
-      } catch (IOException iox) {
-        log.warn("WARNING: Could not close server socket: " + iox.getMessage());
-      }
-      serverSocket_ = null;
-    }
-  }
-
-  public void interrupt() {
-    // The thread-safeness of this is dubious, but Java documentation suggests
-    // that it is safe to do this from a different thread context
-    close();
-  }
-
-  public int getPort() {
-    return serverSocket_.getLocalPort();
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
deleted file mode 100644
index f1156d4..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
-import org.apache.accumulo.core.util.SimpleThreadPool;
-import org.apache.accumulo.core.util.SslConnectionParams;
-import org.apache.accumulo.core.util.TBufferedSocket;
-import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.metrics.Metrics;
-import org.apache.accumulo.server.metrics.MetricsFactory;
-import org.apache.accumulo.server.metrics.ThriftMetrics;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-import com.google.common.net.HostAndPort;
-
-public class TServerUtils {
-  private static final Logger log = Logger.getLogger(TServerUtils.class);
-
-  public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
-
-  public static class ServerAddress {
-    public final TServer server;
-    public final HostAndPort address;
-
-    public ServerAddress(TServer server, HostAndPort address) {
-      this.server = server;
-      this.address = address;
-    }
-  }
-
-  /**
-   * Start a server, at the given port, or higher, if that port is not available.
-   *
-   * @param portHintProperty
-   *          the port to attempt to open, can be zero, meaning "any available port"
-   * @param processor
-   *          the service to be started
-   * @param serverName
-   *          the name of the class that is providing the service
-   * @param threadName
-   *          name this service's thread for better debugging
-   * @return the server object created, and the port actually used
-   * @throws UnknownHostException
-   *           when we don't know our own address
-   */
-  public static ServerAddress startServer(AccumuloServerContext service, String address, Property portHintProperty, TProcessor processor, String serverName,
-      String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
-      throws UnknownHostException {
-    int portHint = service.getConfiguration().getPort(portHintProperty);
-    int minThreads = 2;
-    if (minThreadProperty != null)
-      minThreads = service.getConfiguration().getCount(minThreadProperty);
-    long timeBetweenThreadChecks = 1000;
-    if (timeBetweenThreadChecksProperty != null)
-      timeBetweenThreadChecks = service.getConfiguration().getTimeInMillis(timeBetweenThreadChecksProperty);
-    long maxMessageSize = 10 * 1000 * 1000;
-    if (maxMessageSizeProperty != null)
-      maxMessageSize = service.getConfiguration().getMemoryInBytes(maxMessageSizeProperty);
-    boolean portSearch = false;
-    if (portSearchProperty != null)
-      portSearch = service.getConfiguration().getBoolean(portSearchProperty);
-    // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
-    TServerUtils.TimedProcessor timedProcessor = new TServerUtils.TimedProcessor(service.getConfiguration(), processor, serverName, threadName);
-    Random random = new Random();
-    for (int j = 0; j < 100; j++) {
-
-      // Are we going to slide around, looking for an open port?
-      int portsToSearch = 1;
-      if (portSearch)
-        portsToSearch = 1000;
-
-      for (int i = 0; i < portsToSearch; i++) {
-        int port = portHint + i;
-        if (portHint != 0 && i > 0)
-          port = 1024 + random.nextInt(65535 - 1024);
-        if (port > 65535)
-          port = 1024 + port % (65535 - 1024);
-        try {
-          HostAndPort addr = HostAndPort.fromParts(address, port);
-          return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads,
-              service.getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), timeBetweenThreadChecks, maxMessageSize,
-              service.getServerSslParams(), service.getClientTimeoutInMillis());
-        } catch (TTransportException ex) {
-          log.error("Unable to start TServer", ex);
-          if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
-            // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
-            // TTransportException, and with a TSocket created by TSSLTransportFactory, it
-            // comes through as caused by a BindException.
-            log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
-            UtilWaitThread.sleep(250);
-          } else {
-            // thrift is passing up a nested exception that isn't a BindException,
-            // so no reason to believe retrying on a different port would help.
-            log.error("Unable to start TServer", ex);
-            break;
-          }
-        }
-      }
-    }
-    throw new UnknownHostException("Unable to find a listen port");
-  }
-
-  public static class TimedProcessor implements TProcessor {
-
-    final TProcessor other;
-    Metrics metrics = null;
-    long idleStart = 0;
-
-    TimedProcessor(AccumuloConfiguration conf, TProcessor next, String serverName, String threadName) {
-      this.other = next;
-      // Register the metrics MBean
-      MetricsFactory factory = new MetricsFactory(conf);
-      metrics = factory.createThriftMetrics(serverName, threadName);
-      try {
-        metrics.register();
-      } catch (Exception e) {
-        log.error("Exception registering MBean with MBean Server", e);
-      }
-      idleStart = System.currentTimeMillis();
-    }
-
-    @Override
-    public boolean process(TProtocol in, TProtocol out) throws TException {
-      long now = 0;
-      if (metrics.isEnabled()) {
-        now = System.currentTimeMillis();
-        metrics.add(ThriftMetrics.idle, (now - idleStart));
-      }
-      try {
-        return other.process(in, out);
-      } finally {
-        if (metrics.isEnabled()) {
-          idleStart = System.currentTimeMillis();
-          metrics.add(ThriftMetrics.execute, idleStart - now);
-        }
-      }
-    }
-  }
-
-  public static class ClientInfoProcessorFactory extends TProcessorFactory {
-
-    public ClientInfoProcessorFactory(TProcessor processor) {
-      super(processor);
-    }
-
-    @Override
-    public TProcessor getProcessor(TTransport trans) {
-      if (trans instanceof TBufferedSocket) {
-        TBufferedSocket tsock = (TBufferedSocket) trans;
-        clientAddress.set(tsock.getClientString());
-      } else if (trans instanceof TSocket) {
-        TSocket tsock = (TSocket) trans;
-        clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort());
-      } else {
-        log.warn("Unable to extract clientAddress from transport of type " + trans.getClass());
-      }
-      return super.getProcessor(trans);
-    }
-  }
-
-  public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName,
-      final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
-    TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
-    CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
-    options.protocolFactory(ThriftUtil.protocolFactory());
-    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
-    options.maxReadBufferBytes = maxMessageSize;
-    options.stopTimeoutVal(5);
-    /*
-     * Create our own very special thread pool.
-     */
-    final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
-    // periodically adjust the number of threads we need by checking how busy our threads are
-    SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
-      @Override
-      public void run() {
-        if (pool.getCorePoolSize() <= pool.getActiveCount()) {
-          int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
-          log.info("Increasing server thread pool size on " + serverName + " to " + larger);
-          pool.setMaximumPoolSize(larger);
-          pool.setCorePoolSize(larger);
-        } else {
-          if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
-            int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
-            if (smaller != pool.getCorePoolSize()) {
-              // ACCUMULO-2997 there is a race condition here... the active count could be higher by the time
-              // we decrease the core pool size... so the active count could end up higher than
-              // the core pool size, in which case everything will be queued... the increase case
-              // should handle this and prevent deadlock
-              log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
-              pool.setCorePoolSize(smaller);
-            }
-          }
-        }
-      }
-    }, timeBetweenThreadChecks, timeBetweenThreadChecks);
-    options.executorService(pool);
-    options.processorFactory(new TProcessorFactory(processor));
-    if (address.getPort() == 0) {
-      address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
-    }
-    return new ServerAddress(new CustomNonBlockingServer(options), address);
-  }
-
-  public static ServerAddress createThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads)
-      throws TTransportException {
-
-    // if port is zero, then we must bind to get the port number
-    ServerSocket sock;
-    try {
-      sock = ServerSocketChannel.open().socket();
-      sock.setReuseAddress(true);
-      sock.bind(new InetSocketAddress(address.getHostText(), address.getPort()));
-      address = HostAndPort.fromParts(address.getHostText(), sock.getLocalPort());
-    } catch (IOException ex) {
-      throw new TTransportException(ex);
-    }
-    TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024);
-    return new ServerAddress(createThreadPoolServer(transport, processor), address);
-  }
-
-  public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
-    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
-    options.protocolFactory(ThriftUtil.protocolFactory());
-    options.transportFactory(ThriftUtil.transportFactory());
-    options.processorFactory(new ClientInfoProcessorFactory(processor));
-    return new TThreadPoolServer(options);
-  }
-
-  public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
-      throws TTransportException {
-    org.apache.thrift.transport.TServerSocket transport;
-    try {
-      transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
-    } catch (UnknownHostException e) {
-      throw new TTransportException(e);
-    }
-    if (address.getPort() == 0) {
-      address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
-    }
-    return new ServerAddress(createThreadPoolServer(transport, processor), address);
-  }
-
-  public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads,
-      long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
-    return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
-        timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
-  }
-
-  public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
-    int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
-
-    ServerAddress serverAddress;
-    if (sslParams != null) {
-      serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
-    } else {
-      serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
-    }
-    final TServer finalServer = serverAddress.server;
-    Runnable serveTask = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          finalServer.serve();
-        } catch (Error e) {
-          Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
-        }
-      }
-    };
-    serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
-    Thread thread = new Daemon(serveTask, threadName);
-    thread.start();
-    // check for the special "bind to everything address"
-    if (serverAddress.address.getHostText().equals("0.0.0.0")) {
-      // can't get the address from the bind, so we'll do our best to invent our hostname
-      try {
-        serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
-      } catch (UnknownHostException e) {
-        throw new TTransportException(e);
-      }
-    }
-    return serverAddress;
-  }
-
-  // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
-  public static void stopTServer(TServer s) {
-    if (s == null)
-      return;
-    s.stop();
-    try {
-      Field f = s.getClass().getDeclaredField("executorService_");
-      f.setAccessible(true);
-      ExecutorService es = (ExecutorService) f.get(s);
-      es.shutdownNow();
-    } catch (Exception e) {
-      TServerUtils.log.error("Unable to call shutdownNow", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index a822b92..337c055 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -17,9 +17,12 @@
 package org.apache.accumulo.server.util;
 
 import java.util.concurrent.ExecutorService;
+
+import org.apache.accumulo.server.thrift.TServerUtils;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TServerSocket;
 import org.junit.Test;
+
 import static org.junit.Assert.*;
 import static org.easymock.EasyMock.*;
 


Mime
View raw message