accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [2/3] accumulo git commit: ACCUMULO-3394 Change thrift package to rpc and make an rpc package in core too
Date Tue, 09 Dec 2014 05:15:39 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
index 3a435c7..09ae4f4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
@@ -20,8 +20,8 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.util.SslConnectionParams;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.security.SystemCredentials;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 593d9b7..f338db8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.trace.wrappers.TraceRunnable;
@@ -59,7 +60,6 @@ import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.StopWatch;
-import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index d392bde..5eea41c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@ -31,12 +31,12 @@ import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.ServerServices;
-import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.util.Halt;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
index cd25d49..9822d0f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
@@ -26,11 +26,11 @@ import java.util.SortedMap;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/ClientInfoProcessorFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ClientInfoProcessorFactory.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ClientInfoProcessorFactory.java
new file mode 100644
index 0000000..5f630c2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ClientInfoProcessorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.accumulo.core.rpc.TBufferedSocket;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sets the address of a client in a ThreadLocal to allow for more informative log messages.
+ */
+public class ClientInfoProcessorFactory extends TProcessorFactory {
+  private static final Logger log = LoggerFactory.getLogger(ClientInfoProcessorFactory.class);
+
+  private final ThreadLocal<String> clientAddress;
+
+  public ClientInfoProcessorFactory(ThreadLocal<String> clientAddress, TProcessor processor) {
+    super(processor);
+    this.clientAddress = clientAddress;
+  }
+
+  @Override
+  public TProcessor getProcessor(TTransport trans) {
+    if (trans instanceof TBufferedSocket) {
+      TBufferedSocket tsock = (TBufferedSocket) trans;
+      clientAddress.set(tsock.getClientString());
+    } else if (trans instanceof TSocket) {
+      TSocket tsock = (TSocket) trans;
+      clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort());
+    } else {
+      log.warn("Unable to extract clientAddress from transport of type {}", trans.getClass());
+    }
+    return super.getProcessor(trans);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
new file mode 100644
index 0000000..21f55b3
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * This class implements a custom non-blocking thrift server, incorporating the {@link THsHaServer} features, and overriding the underlying
+ * {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
+ * {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with
+ * one that reveals the client address from its transport.
+ *
+ * <p>
+ * The justification for this is explained in https://issues.apache.org/jira/browse/ACCUMULO-1691, and is needed due to the repeated regressions:
+ * <ul>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-958</li>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-1464</li>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-2173</li>
+ * </ul>
+ *
+ * <p>
+ * This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
+ * instantiating a custom FrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and
+ * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer}. Because of this, any change in the implementation upstream will require a review
+ * of this implementation here, to ensure any new bugfixes/features in the upstream Thrift class are also applied here, at least until
+ * https://issues.apache.org/jira/browse/THRIFT-2173 is implemented. In the meantime, the maven-enforcer-plugin ensures that Thrift remains at version 0.9.1,
+ * which has been reviewed and tested.
+ */
+public class CustomNonBlockingServer extends THsHaServer {
+
+  private static final Logger LOGGER = Logger.getLogger(CustomNonBlockingServer.class);
+  private SelectAcceptThread selectAcceptThread_;
+  private volatile boolean stopped_ = false;
+
+  public CustomNonBlockingServer(Args args) {
+    super(args);
+  }
+
+  @Override
+  protected Runnable getRunnable(final FrameBuffer frameBuffer) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        if (frameBuffer instanceof CustomNonblockingFrameBuffer) {
+          TNonblockingTransport trans = ((CustomNonblockingFrameBuffer) frameBuffer).getTransport();
+          if (trans instanceof TNonblockingSocket) {
+            TNonblockingSocket tsock = (TNonblockingSocket) trans;
+            Socket sock = tsock.getSocketChannel().socket();
+            TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+          }
+        }
+        frameBuffer.invoke();
+      }
+    };
+  }
+
+  @Override
+  protected boolean startThreads() {
+    // start the selector
+    try {
+      selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
+      selectAcceptThread_.start();
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to start selector thread!", e);
+      return false;
+    }
+  }
+
+  @Override
+  public void stop() {
+    stopped_ = true;
+    if (selectAcceptThread_ != null) {
+      selectAcceptThread_.wakeupSelector();
+    }
+  }
+
+  @Override
+  public boolean isStopped() {
+    return selectAcceptThread_.isStopped();
+  }
+
+  @Override
+  protected void joinSelector() {
+    // wait until the selector thread exits
+    try {
+      selectAcceptThread_.join();
+    } catch (InterruptedException e) {
+      // for now, just silently ignore. technically this means we'll have less of
+      // a graceful shutdown as a result.
+    }
+  }
+
+  private interface CustomNonblockingFrameBuffer {
+    TNonblockingTransport getTransport();
+  }
+
+  private class CustomAsyncFrameBuffer extends AsyncFrameBuffer implements CustomNonblockingFrameBuffer {
+    private TNonblockingTransport trans;
+
+    public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+      super(trans, selectionKey, selectThread);
+      this.trans = trans;
+    }
+
+    @Override
+    public TNonblockingTransport getTransport() {
+      return trans;
+    }
+  }
+
+  private class CustomFrameBuffer extends FrameBuffer implements CustomNonblockingFrameBuffer {
+    private TNonblockingTransport trans;
+
+    public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+      super(trans, selectionKey, selectThread);
+      this.trans = trans;
+    }
+
+    @Override
+    public TNonblockingTransport getTransport() {
+      return trans;
+    }
+  }
+
+  // @formatter:off
+  private class SelectAcceptThread extends AbstractSelectThread {
+
+    // The server transport on which new client transports will be accepted
+    private final TNonblockingServerTransport serverTransport;
+
+    /**
+     * Set up the thread that will handle the non-blocking accepts, reads, and
+     * writes.
+     */
+    public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
+    throws IOException {
+      this.serverTransport = serverTransport;
+      serverTransport.registerSelector(selector);
+    }
+
+    public boolean isStopped() {
+      return stopped_;
+    }
+
+    /**
+     * The work loop. Handles both selecting (all IO operations) and managing
+     * the selection preferences of all existing connections.
+     */
+    @Override
+    public void run() {
+      try {
+        if (eventHandler_ != null) {
+          eventHandler_.preServe();
+        }
+
+        while (!stopped_) {
+          select();
+          processInterestChanges();
+        }
+        for (SelectionKey selectionKey : selector.keys()) {
+          cleanupSelectionKey(selectionKey);
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        stopped_ = true;
+      }
+    }
+
+    /**
+     * Select and process IO events appropriately:
+     * If there are connections to be accepted, accept them.
+     * If there are existing connections with data waiting to be read, read it,
+     * buffering until a whole frame has been read.
+     * If there are any pending responses, buffer them until their target client
+     * is available, and then send the data.
+     */
+    private void select() {
+      try {
+        // wait for io events.
+        selector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            cleanupSelectionKey(key);
+            continue;
+          }
+
+          // if the key is marked Accept, then it has to be the server
+          // transport.
+          if (key.isAcceptable()) {
+            handleAccept();
+          } else if (key.isReadable()) {
+            // deal with reads
+            handleRead(key);
+          } else if (key.isWritable()) {
+            // deal with writes
+            handleWrite(key);
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    /**
+     * Accept a new connection.
+     */
+    @SuppressWarnings("unused")
+    private void handleAccept() throws IOException {
+      SelectionKey clientKey = null;
+      TNonblockingTransport client = null;
+      try {
+        // accept the connection
+        client = (TNonblockingTransport)serverTransport.accept();
+        clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
+
+        // add this key to the map
+          FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
+                  new CustomAsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
+                  new CustomFrameBuffer(client, clientKey,SelectAcceptThread.this);
+
+          clientKey.attach(frameBuffer);
+      } catch (TTransportException tte) {
+        // something went wrong accepting.
+        LOGGER.warn("Exception trying to accept!", tte);
+        tte.printStackTrace();
+        if (clientKey != null) cleanupSelectionKey(clientKey);
+        if (client != null) client.close();
+      }
+    }
+  } // SelectAcceptThread
+  // @formatter:on
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
new file mode 100644
index 0000000..7b34986
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
+import org.apache.accumulo.core.trace.wrappers.TraceWrap;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to
+ * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like
+ * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as
+ * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
+ *
+ * @since 1.6.1
+ */
+public class RpcWrapper {
+
+  public static <T> T service(final T instance) {
+    InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) {
+      @Override
+      public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
+        try {
+          return super.invoke(obj, method, args);
+        } catch (RuntimeException e) {
+          String msg = e.getMessage();
+          LoggerFactory.getLogger(instance.getClass()).error(msg, e);
+          throw new TException(msg);
+        } catch (Error e) {
+          String msg = e.getMessage();
+          LoggerFactory.getLogger(instance.getClass()).error(msg, e);
+          throw new TException(msg);
+        }
+      }
+    };
+
+    @SuppressWarnings("unchecked")
+    T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler);
+    return proxiedInstance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
new file mode 100644
index 0000000..b655287
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.thrift.server.TServer;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Encapsulate a Thrift server and the address, host and port, to which it is bound.
+ */
+public class ServerAddress {
+  public final TServer server;
+  public final HostAndPort address;
+
+  public ServerAddress(TServer server, HostAndPort address) {
+    this.server = server;
+    this.address = address;
+  }
+
+  public TServer getServer() {
+    return server;
+  }
+
+  public HostAndPort getAddress() {
+    return address;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/TBufferedServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TBufferedServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TBufferedServerSocket.java
new file mode 100644
index 0000000..2887f48
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TBufferedServerSocket.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import org.apache.accumulo.core.rpc.TBufferedSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+// Thrift-959 removed the small buffer from TSocket; this adds it back for servers
+public class TBufferedServerSocket extends TServerTransport {
+
+  // expose acceptImpl
+  static class TServerSocket extends org.apache.thrift.transport.TServerSocket {
+    public TServerSocket(ServerSocket serverSocket) {
+      super(serverSocket);
+    }
+
+    public TSocket acceptImplPublic() throws TTransportException {
+      return acceptImpl();
+    }
+  }
+
+  final TServerSocket impl;
+  final int bufferSize;
+
+  public TBufferedServerSocket(ServerSocket serverSocket, int bufferSize) {
+    this.impl = new TServerSocket(serverSocket);
+    this.bufferSize = bufferSize;
+  }
+
+  @Override
+  public void listen() throws TTransportException {
+    impl.listen();
+  }
+
+  @Override
+  public void close() {
+    impl.close();
+  }
+
+  // Wrap accepted sockets using buffered IO
+  @Override
+  protected TTransport acceptImpl() throws TTransportException {
+    TSocket sock = impl.acceptImplPublic();
+    try {
+      return new TBufferedSocket(sock, this.bufferSize);
+    } catch (IOException e) {
+      throw new TTransportException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TNonblockingServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TNonblockingServerSocket.java
new file mode 100644
index 0000000..3afe149
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TNonblockingServerSocket.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.server.rpc;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Wrapper around ServerSocketChannel.
+ *
+ * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
+ * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+  private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
+
+  /**
+   * This channel is where all the nonblocking magic happens.
+   */
+  private ServerSocketChannel serverSocketChannel = null;
+
+  /**
+   * Underlying ServerSocket object
+   */
+  private ServerSocket serverSocket_ = null;
+
+  /**
+   * Timeout for client sockets from accept
+   */
+  private int clientTimeout_ = 0;
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port) throws TTransportException {
+    this(port, 0);
+  }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
+    this(new InetSocketAddress(port), clientTimeout);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+    this(bindAddr, 0);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+    clientTimeout_ = clientTimeout;
+    try {
+      serverSocketChannel = ServerSocketChannel.open();
+      serverSocketChannel.configureBlocking(false);
+
+      // Make server socket
+      serverSocket_ = serverSocketChannel.socket();
+      // Prevent 2MSL delay problem on server restarts
+      serverSocket_.setReuseAddress(true);
+      // Bind to listening port
+      serverSocket_.bind(bindAddr);
+    } catch (IOException ioe) {
+      serverSocket_ = null;
+      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+    }
+  }
+
+  public void listen() throws TTransportException {
+    // Make sure not to block on accept
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.setSoTimeout(0);
+      } catch (SocketException sx) {
+        log.error("SocketException caused by serverSocket in listen()", sx);
+      }
+    }
+  }
+
+  protected TNonblockingSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+    }
+    try {
+      SocketChannel socketChannel = serverSocketChannel.accept();
+      if (socketChannel == null) {
+        return null;
+      }
+
+      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+      tsocket.setTimeout(clientTimeout_);
+      return tsocket;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void registerSelector(Selector selector) {
+    try {
+      // Register the server socket channel, indicating an interest in
+      // accepting new connections
+      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+    } catch (ClosedChannelException e) {
+      // this shouldn't happen, ideally...
+      // TODO: decide what to do with this.
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        log.warn("WARNING: Could not close server socket: " + iox.getMessage());
+      }
+      serverSocket_ = null;
+    }
+  }
+
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
+  public int getPort() {
+    return serverSocket_.getLocalPort();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
new file mode 100644
index 0000000..ebf2c84
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.Field;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.net.HostAndPort;
+
+public class TServerUtils {
+  private static final Logger log = Logger.getLogger(TServerUtils.class);
+
+  public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
+
+  /**
+   * Start a server, at the given port, or higher, if that port is not available.
+   *
+   * @param portHintProperty
+   *          the port to attempt to open, can be zero, meaning "any available port"
+   * @param processor
+   *          the service to be started
+   * @param serverName
+   *          the name of the class that is providing the service
+   * @param threadName
+   *          name this service's thread for better debugging
+   * @return the server object created, and the port actually used
+   * @throws UnknownHostException
+   *           when we don't know our own address
+   */
+  public static ServerAddress startServer(AccumuloServerContext service, String address, Property portHintProperty, TProcessor processor, String serverName,
+      String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
+      throws UnknownHostException {
+    int portHint = service.getConfiguration().getPort(portHintProperty);
+    int minThreads = 2;
+    if (minThreadProperty != null)
+      minThreads = service.getConfiguration().getCount(minThreadProperty);
+    long timeBetweenThreadChecks = 1000;
+    if (timeBetweenThreadChecksProperty != null)
+      timeBetweenThreadChecks = service.getConfiguration().getTimeInMillis(timeBetweenThreadChecksProperty);
+    long maxMessageSize = 10 * 1000 * 1000;
+    if (maxMessageSizeProperty != null)
+      maxMessageSize = service.getConfiguration().getMemoryInBytes(maxMessageSizeProperty);
+    boolean portSearch = false;
+    if (portSearchProperty != null)
+      portSearch = service.getConfiguration().getBoolean(portSearchProperty);
+    // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
+    TimedProcessor timedProcessor = new TimedProcessor(service.getConfiguration(), processor, serverName, threadName);
+    Random random = new Random();
+    for (int j = 0; j < 100; j++) {
+
+      // Are we going to slide around, looking for an open port?
+      int portsToSearch = 1;
+      if (portSearch)
+        portsToSearch = 1000;
+
+      for (int i = 0; i < portsToSearch; i++) {
+        int port = portHint + i;
+        if (portHint != 0 && i > 0)
+          port = 1024 + random.nextInt(65535 - 1024);
+        if (port > 65535)
+          port = 1024 + port % (65535 - 1024);
+        try {
+          HostAndPort addr = HostAndPort.fromParts(address, port);
+          return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads,
+              service.getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), timeBetweenThreadChecks, maxMessageSize,
+              service.getServerSslParams(), service.getClientTimeoutInMillis());
+        } catch (TTransportException ex) {
+          log.error("Unable to start TServer", ex);
+          if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
+            // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
+            // TTransportException, and with a TSocket created by TSSLTransportFactory, it
+            // comes through as caused by a BindException.
+            log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
+            UtilWaitThread.sleep(250);
+          } else {
+            // thrift is passing up a nested exception that isn't a BindException,
+            // so no reason to believe retrying on a different port would help.
+            log.error("Unable to start TServer", ex);
+            break;
+          }
+        }
+      }
+    }
+    throw new UnknownHostException("Unable to find a listen port");
+  }
+
+  /**
+   * Create a NonBlockingServer with a custom thread pool that can dynamically resize itself.
+   */
+  public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName,
+      final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+    TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
+    CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
+    options.protocolFactory(ThriftUtil.protocolFactory());
+    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+    options.maxReadBufferBytes = maxMessageSize;
+    options.stopTimeoutVal(5);
+    /*
+     * Create our own very special thread pool.
+     */
+    final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+    // periodically adjust the number of threads we need by checking how busy our threads are
+    SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
+      @Override
+      public void run() {
+        if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+          int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+          log.info("Increasing server thread pool size on " + serverName + " to " + larger);
+          pool.setMaximumPoolSize(larger);
+          pool.setCorePoolSize(larger);
+        } else {
+          if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+            int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
+            if (smaller != pool.getCorePoolSize()) {
+              // ACCUMULO-2997 there is a race condition here... the active count could be higher by the time
+              // we decrease the core pool size... so the active count could end up higher than
+              // the core pool size, in which case everything will be queued... the increase case
+              // should handle this and prevent deadlock
+              log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
+              pool.setCorePoolSize(smaller);
+            }
+          }
+        }
+      }
+    }, timeBetweenThreadChecks, timeBetweenThreadChecks);
+    options.executorService(pool);
+    options.processorFactory(new TProcessorFactory(processor));
+    if (address.getPort() == 0) {
+      address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
+    }
+    return new ServerAddress(new CustomNonBlockingServer(options), address);
+  }
+
+  public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
+    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+    options.protocolFactory(ThriftUtil.protocolFactory());
+    options.transportFactory(ThriftUtil.transportFactory());
+    options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
+    return new TThreadPoolServer(options);
+  }
+
+  public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
+      throws TTransportException {
+    org.apache.thrift.transport.TServerSocket transport;
+    try {
+      transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
+    } catch (UnknownHostException e) {
+      throw new TTransportException(e);
+    }
+    if (address.getPort() == 0) {
+      address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+    }
+    return new ServerAddress(createThreadPoolServer(transport, processor), address);
+  }
+
+  public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads,
+      long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+    return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
+        timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
+  }
+
+  /**
+   * Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters. Non-null SSL parameters will cause an SSL server to be started.
+   *
+   * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
+   */
+  public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
+    int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+
+    ServerAddress serverAddress;
+    if (sslParams != null) {
+      serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
+    } else {
+      serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+    }
+    final TServer finalServer = serverAddress.server;
+    Runnable serveTask = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          finalServer.serve();
+        } catch (Error e) {
+          Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+        }
+      }
+    };
+    serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
+    Thread thread = new Daemon(serveTask, threadName);
+    thread.start();
+    // check for the special "bind to everything address"
+    if (serverAddress.address.getHostText().equals("0.0.0.0")) {
+      // can't get the address from the bind, so we'll do our best to invent our hostname
+      try {
+        serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
+      } catch (UnknownHostException e) {
+        throw new TTransportException(e);
+      }
+    }
+    return serverAddress;
+  }
+
+  // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
+  public static void stopTServer(TServer s) {
+    if (s == null)
+      return;
+    s.stop();
+    try {
+      Field f = s.getClass().getDeclaredField("executorService_");
+      f.setAccessible(true);
+      ExecutorService es = (ExecutorService) f.get(s);
+      es.shutdownNow();
+    } catch (Exception e) {
+      TServerUtils.log.error("Unable to call shutdownNow", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
new file mode 100644
index 0000000..a842572
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.server.metrics.Metrics;
+import org.apache.accumulo.server.metrics.MetricsFactory;
+import org.apache.accumulo.server.metrics.ThriftMetrics;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link TProcessor} which tracks the duration of an RPC and adds it to the metrics subsystem.
+ */
+public class TimedProcessor implements TProcessor {
+  private static final Logger log = LoggerFactory.getLogger(TimedProcessor.class);
+
+  private final TProcessor other;
+  private final Metrics metrics;
+  private long idleStart = 0;
+
+  public TimedProcessor(AccumuloConfiguration conf, TProcessor next, String serverName, String threadName) {
+    this.other = next;
+    // Register the metrics MBean
+    MetricsFactory factory = new MetricsFactory(conf);
+    metrics = factory.createThriftMetrics(serverName, threadName);
+    try {
+      metrics.register();
+    } catch (Exception e) {
+      log.error("Exception registering MBean with MBean Server", e);
+    }
+    idleStart = System.currentTimeMillis();
+  }
+
+  @Override
+  public boolean process(TProtocol in, TProtocol out) throws TException {
+    long now = 0;
+    final boolean metricsEnabled = metrics.isEnabled();
+    if (metricsEnabled) {
+      now = System.currentTimeMillis();
+      metrics.add(ThriftMetrics.idle, (now - idleStart));
+    }
+    try {
+      return other.process(in, out);
+    } finally {
+      if (metricsEnabled) {
+        idleStart = System.currentTimeMillis();
+        metrics.add(ThriftMetrics.execute, idleStart - now);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
index e09f7fd..a2afeac 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
@@ -44,10 +44,10 @@ import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.security.handler.Authenticator;
 import org.apache.accumulo.server.security.handler.Authorizor;
 import org.apache.accumulo.server.security.handler.PermissionHandler;
-import org.apache.accumulo.server.thrift.TServerUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
deleted file mode 100644
index 208fde5..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.thrift;
-
-import org.apache.accumulo.core.util.TBufferedSocket;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Sets the address of a client in a ThreadLocal to allow for more informative log messages.
- */
-public class ClientInfoProcessorFactory extends TProcessorFactory {
-  private static final Logger log = LoggerFactory.getLogger(ClientInfoProcessorFactory.class);
-
-  private final ThreadLocal<String> clientAddress;
-
-  public ClientInfoProcessorFactory(ThreadLocal<String> clientAddress, TProcessor processor) {
-    super(processor);
-    this.clientAddress = clientAddress;
-  }
-
-  @Override
-  public TProcessor getProcessor(TTransport trans) {
-    if (trans instanceof TBufferedSocket) {
-      TBufferedSocket tsock = (TBufferedSocket) trans;
-      clientAddress.set(tsock.getClientString());
-    } else if (trans instanceof TSocket) {
-      TSocket tsock = (TSocket) trans;
-      clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort());
-    } else {
-      log.warn("Unable to extract clientAddress from transport of type {}", trans.getClass());
-    }
-    return super.getProcessor(trans);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
deleted file mode 100644
index a96f7b5..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.thrift;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.nio.channels.SelectionKey;
-import java.util.Iterator;
-
-import org.apache.log4j.Logger;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TNonblockingTransport;
-import org.apache.thrift.transport.TTransportException;
-
-/**
- * This class implements a custom non-blocking thrift server, incorporating the {@link THsHaServer} features, and overriding the underlying
- * {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
- * {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with
- * one that reveals the client address from its transport.
- *
- * <p>
- * The justification for this is explained in https://issues.apache.org/jira/browse/ACCUMULO-1691, and is needed due to the repeated regressions:
- * <ul>
- * <li>https://issues.apache.org/jira/browse/THRIFT-958</li>
- * <li>https://issues.apache.org/jira/browse/THRIFT-1464</li>
- * <li>https://issues.apache.org/jira/browse/THRIFT-2173</li>
- * </ul>
- *
- * <p>
- * This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
- * instantiating a custom FrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and
- * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer}. Because of this, any change in the implementation upstream will require a review
- * of this implementation here, to ensure any new bugfixes/features in the upstream Thrift class are also applied here, at least until
- * https://issues.apache.org/jira/browse/THRIFT-2173 is implemented. In the meantime, the maven-enforcer-plugin ensures that Thrift remains at version 0.9.1,
- * which has been reviewed and tested.
- */
-public class CustomNonBlockingServer extends THsHaServer {
-
-  private static final Logger LOGGER = Logger.getLogger(CustomNonBlockingServer.class);
-  private SelectAcceptThread selectAcceptThread_;
-  private volatile boolean stopped_ = false;
-
-  public CustomNonBlockingServer(Args args) {
-    super(args);
-  }
-
-  @Override
-  protected Runnable getRunnable(final FrameBuffer frameBuffer) {
-    return new Runnable() {
-      @Override
-      public void run() {
-        if (frameBuffer instanceof CustomNonblockingFrameBuffer) {
-          TNonblockingTransport trans = ((CustomNonblockingFrameBuffer) frameBuffer).getTransport();
-          if (trans instanceof TNonblockingSocket) {
-            TNonblockingSocket tsock = (TNonblockingSocket) trans;
-            Socket sock = tsock.getSocketChannel().socket();
-            TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
-          }
-        }
-        frameBuffer.invoke();
-      }
-    };
-  }
-
-  @Override
-  protected boolean startThreads() {
-    // start the selector
-    try {
-      selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
-      selectAcceptThread_.start();
-      return true;
-    } catch (IOException e) {
-      LOGGER.error("Failed to start selector thread!", e);
-      return false;
-    }
-  }
-
-  @Override
-  public void stop() {
-    stopped_ = true;
-    if (selectAcceptThread_ != null) {
-      selectAcceptThread_.wakeupSelector();
-    }
-  }
-
-  @Override
-  public boolean isStopped() {
-    return selectAcceptThread_.isStopped();
-  }
-
-  @Override
-  protected void joinSelector() {
-    // wait until the selector thread exits
-    try {
-      selectAcceptThread_.join();
-    } catch (InterruptedException e) {
-      // for now, just silently ignore. technically this means we'll have less of
-      // a graceful shutdown as a result.
-    }
-  }
-
-  private interface CustomNonblockingFrameBuffer {
-    TNonblockingTransport getTransport();
-  }
-
-  private class CustomAsyncFrameBuffer extends AsyncFrameBuffer implements CustomNonblockingFrameBuffer {
-    private TNonblockingTransport trans;
-
-    public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
-      super(trans, selectionKey, selectThread);
-      this.trans = trans;
-    }
-
-    @Override
-    public TNonblockingTransport getTransport() {
-      return trans;
-    }
-  }
-
-  private class CustomFrameBuffer extends FrameBuffer implements CustomNonblockingFrameBuffer {
-    private TNonblockingTransport trans;
-
-    public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
-      super(trans, selectionKey, selectThread);
-      this.trans = trans;
-    }
-
-    @Override
-    public TNonblockingTransport getTransport() {
-      return trans;
-    }
-  }
-
-  // @formatter:off
-  private class SelectAcceptThread extends AbstractSelectThread {
-
-    // The server transport on which new client transports will be accepted
-    private final TNonblockingServerTransport serverTransport;
-
-    /**
-     * Set up the thread that will handle the non-blocking accepts, reads, and
-     * writes.
-     */
-    public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
-    throws IOException {
-      this.serverTransport = serverTransport;
-      serverTransport.registerSelector(selector);
-    }
-
-    public boolean isStopped() {
-      return stopped_;
-    }
-
-    /**
-     * The work loop. Handles both selecting (all IO operations) and managing
-     * the selection preferences of all existing connections.
-     */
-    @Override
-    public void run() {
-      try {
-        if (eventHandler_ != null) {
-          eventHandler_.preServe();
-        }
-
-        while (!stopped_) {
-          select();
-          processInterestChanges();
-        }
-        for (SelectionKey selectionKey : selector.keys()) {
-          cleanupSelectionKey(selectionKey);
-        }
-      } catch (Throwable t) {
-        LOGGER.error("run() exiting due to uncaught error", t);
-      } finally {
-        stopped_ = true;
-      }
-    }
-
-    /**
-     * Select and process IO events appropriately:
-     * If there are connections to be accepted, accept them.
-     * If there are existing connections with data waiting to be read, read it,
-     * buffering until a whole frame has been read.
-     * If there are any pending responses, buffer them until their target client
-     * is available, and then send the data.
-     */
-    private void select() {
-      try {
-        // wait for io events.
-        selector.select();
-
-        // process the io events we received
-        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
-        while (!stopped_ && selectedKeys.hasNext()) {
-          SelectionKey key = selectedKeys.next();
-          selectedKeys.remove();
-
-          // skip if not valid
-          if (!key.isValid()) {
-            cleanupSelectionKey(key);
-            continue;
-          }
-
-          // if the key is marked Accept, then it has to be the server
-          // transport.
-          if (key.isAcceptable()) {
-            handleAccept();
-          } else if (key.isReadable()) {
-            // deal with reads
-            handleRead(key);
-          } else if (key.isWritable()) {
-            // deal with writes
-            handleWrite(key);
-          } else {
-            LOGGER.warn("Unexpected state in select! " + key.interestOps());
-          }
-        }
-      } catch (IOException e) {
-        LOGGER.warn("Got an IOException while selecting!", e);
-      }
-    }
-
-    /**
-     * Accept a new connection.
-     */
-    @SuppressWarnings("unused")
-    private void handleAccept() throws IOException {
-      SelectionKey clientKey = null;
-      TNonblockingTransport client = null;
-      try {
-        // accept the connection
-        client = (TNonblockingTransport)serverTransport.accept();
-        clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
-
-        // add this key to the map
-          FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
-                  new CustomAsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
-                  new CustomFrameBuffer(client, clientKey,SelectAcceptThread.this);
-
-          clientKey.attach(frameBuffer);
-      } catch (TTransportException tte) {
-        // something went wrong accepting.
-        LOGGER.warn("Exception trying to accept!", tte);
-        tte.printStackTrace();
-        if (clientKey != null) cleanupSelectionKey(clientKey);
-        if (client != null) client.close();
-      }
-    }
-  } // SelectAcceptThread
-  // @formatter:on
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
deleted file mode 100644
index 53ed709..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.thrift;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-
-import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
-import org.apache.accumulo.core.trace.wrappers.TraceWrap;
-import org.apache.thrift.TApplicationException;
-import org.apache.thrift.TException;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to
- * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like
- * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as
- * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
- *
- * @since 1.6.1
- */
-public class RpcWrapper {
-
-  public static <T> T service(final T instance) {
-    InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) {
-      @Override
-      public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
-        try {
-          return super.invoke(obj, method, args);
-        } catch (RuntimeException e) {
-          String msg = e.getMessage();
-          LoggerFactory.getLogger(instance.getClass()).error(msg, e);
-          throw new TException(msg);
-        } catch (Error e) {
-          String msg = e.getMessage();
-          LoggerFactory.getLogger(instance.getClass()).error(msg, e);
-          throw new TException(msg);
-        }
-      }
-    };
-
-    @SuppressWarnings("unchecked")
-    T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler);
-    return proxiedInstance;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
deleted file mode 100644
index f52951e..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.thrift;
-
-import org.apache.thrift.server.TServer;
-
-import com.google.common.net.HostAndPort;
-
-/**
- * Encapsulate a Thrift server and the address, host and port, to which it is bound.
- */
-public class ServerAddress {
-  public final TServer server;
-  public final HostAndPort address;
-
-  public ServerAddress(TServer server, HostAndPort address) {
-    this.server = server;
-    this.address = address;
-  }
-
-  public TServer getServer() {
-    return server;
-  }
-
-  public HostAndPort getAddress() {
-    return address;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
deleted file mode 100644
index 5534313..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.thrift;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-
-import org.apache.accumulo.core.util.TBufferedSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-// Thrift-959 removed the small buffer from TSocket; this adds it back for servers
-public class TBufferedServerSocket extends TServerTransport {
-
-  // expose acceptImpl
-  static class TServerSocket extends org.apache.thrift.transport.TServerSocket {
-    public TServerSocket(ServerSocket serverSocket) {
-      super(serverSocket);
-    }
-
-    public TSocket acceptImplPublic() throws TTransportException {
-      return acceptImpl();
-    }
-  }
-
-  final TServerSocket impl;
-  final int bufferSize;
-
-  public TBufferedServerSocket(ServerSocket serverSocket, int bufferSize) {
-    this.impl = new TServerSocket(serverSocket);
-    this.bufferSize = bufferSize;
-  }
-
-  @Override
-  public void listen() throws TTransportException {
-    impl.listen();
-  }
-
-  @Override
-  public void close() {
-    impl.close();
-  }
-
-  // Wrap accepted sockets using buffered IO
-  @Override
-  protected TTransport acceptImpl() throws TTransportException {
-    TSocket sock = impl.acceptImplPublic();
-    try {
-      return new TBufferedSocket(sock, this.bufferSize);
-    } catch (IOException e) {
-      throw new TTransportException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
deleted file mode 100644
index 77c5ca6..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.accumulo.server.thrift;
-
-import org.apache.log4j.Logger;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TTransportException;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-/**
- * Wrapper around ServerSocketChannel.
- *
- * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
- * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
- */
-public class TNonblockingServerSocket extends TNonblockingServerTransport {
-  private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
-
-  /**
-   * This channel is where all the nonblocking magic happens.
-   */
-  private ServerSocketChannel serverSocketChannel = null;
-
-  /**
-   * Underlying ServerSocket object
-   */
-  private ServerSocket serverSocket_ = null;
-
-  /**
-   * Timeout for client sockets from accept
-   */
-  private int clientTimeout_ = 0;
-
-  /**
-   * Creates just a port listening server socket
-   */
-  public TNonblockingServerSocket(int port) throws TTransportException {
-    this(port, 0);
-  }
-
-  /**
-   * Creates just a port listening server socket
-   */
-  public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
-    this(new InetSocketAddress(port), clientTimeout);
-  }
-
-  public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
-    this(bindAddr, 0);
-  }
-
-  public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
-    clientTimeout_ = clientTimeout;
-    try {
-      serverSocketChannel = ServerSocketChannel.open();
-      serverSocketChannel.configureBlocking(false);
-
-      // Make server socket
-      serverSocket_ = serverSocketChannel.socket();
-      // Prevent 2MSL delay problem on server restarts
-      serverSocket_.setReuseAddress(true);
-      // Bind to listening port
-      serverSocket_.bind(bindAddr);
-    } catch (IOException ioe) {
-      serverSocket_ = null;
-      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
-    }
-  }
-
-  public void listen() throws TTransportException {
-    // Make sure not to block on accept
-    if (serverSocket_ != null) {
-      try {
-        serverSocket_.setSoTimeout(0);
-      } catch (SocketException sx) {
-        log.error("SocketException caused by serverSocket in listen()", sx);
-      }
-    }
-  }
-
-  protected TNonblockingSocket acceptImpl() throws TTransportException {
-    if (serverSocket_ == null) {
-      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
-    }
-    try {
-      SocketChannel socketChannel = serverSocketChannel.accept();
-      if (socketChannel == null) {
-        return null;
-      }
-
-      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
-      tsocket.setTimeout(clientTimeout_);
-      return tsocket;
-    } catch (IOException iox) {
-      throw new TTransportException(iox);
-    }
-  }
-
-  public void registerSelector(Selector selector) {
-    try {
-      // Register the server socket channel, indicating an interest in
-      // accepting new connections
-      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
-    } catch (ClosedChannelException e) {
-      // this shouldn't happen, ideally...
-      // TODO: decide what to do with this.
-    }
-  }
-
-  public void close() {
-    if (serverSocket_ != null) {
-      try {
-        serverSocket_.close();
-      } catch (IOException iox) {
-        log.warn("WARNING: Could not close server socket: " + iox.getMessage());
-      }
-      serverSocket_ = null;
-    }
-  }
-
-  public void interrupt() {
-    // The thread-safeness of this is dubious, but Java documentation suggests
-    // that it is safe to do this from a different thread context
-    close();
-  }
-
-  public int getPort() {
-    return serverSocket_.getLocalPort();
-  }
-}


Mime
View raw message