incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [6/7] git commit: Removing non-blocking server, using only threaded thrift servers now.
Date Mon, 07 Dec 2015 21:15:25 GMT
Removing non-blocking server, using only threaded thrift servers now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/5853d86e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/5853d86e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/5853d86e

Branch: refs/heads/v2_command
Commit: 5853d86e9d7fe3792693c140ba4ac961d438216d
Parents: 5c0cfd5
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Dec 7 15:09:31 2015 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Dec 7 15:09:31 2015 -0500

----------------------------------------------------------------------
 .../apache/blur/server/BlurServerContext.java   |   9 +-
 .../blur/thrift/ThriftBlurControllerServer.java |   5 +-
 .../blur/thrift/ThriftBlurShardServer.java      |   2 +-
 .../org/apache/blur/thrift/ThriftServer.java    |  64 +-
 .../server/AbstractNonblockingServer.java       | 614 -----------------
 .../apache/blur/thrift/server/Invocation.java   |  36 -
 .../thrift/server/TThreadedSelectorServer.java  | 659 -------------------
 .../apache/blur/thrift/server/ThriftTrace.java  |  23 -
 .../apache/blur/thrift/server/ThriftTracer.java |  37 --
 9 files changed, 28 insertions(+), 1421 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-core/src/main/java/org/apache/blur/server/BlurServerContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/BlurServerContext.java b/blur-core/src/main/java/org/apache/blur/server/BlurServerContext.java
index 542f057..3290f90 100644
--- a/blur-core/src/main/java/org/apache/blur/server/BlurServerContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/BlurServerContext.java
@@ -22,10 +22,8 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.thirdparty.thrift_0_9_0.server.ServerContext;
 import org.apache.blur.thrift.generated.User;
-import org.apache.blur.thrift.server.ThriftTrace;
-import org.apache.blur.thrift.server.ThriftTracer;
 
-public class BlurServerContext implements ServerContext, ThriftTrace {
+public class BlurServerContext implements ServerContext {
 
   private static final Log LOG = LogFactory.getLog(BlurServerContext.class);
 
@@ -94,9 +92,4 @@ public class BlurServerContext implements ServerContext, ThriftTrace {
     _traceRequestId = null;
   }
 
-  @Override
-  public ThriftTracer getTracer(String name) {
-    return ThriftTracer.NOTHING;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index a1c0f88..a6312bd 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -118,7 +118,7 @@ public class ThriftBlurControllerServer extends ThriftServer {
     if (configBindPort == 0) {
       instanceBindPort = 0;
     }
-    TServerTransport serverTransport = ThriftServer.getTServerTransport(bindAddress, instanceBindPort, configuration);
+    TServerTransport serverTransport = ThriftServer.getTServerTransport(bindAddress, instanceBindPort);
     instanceBindPort = ThriftServer.getBindingPort(serverTransport);
 
     LOG.info("Controller Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":"
@@ -187,7 +187,8 @@ public class ThriftBlurControllerServer extends ThriftServer {
     Trace.setStorage(traceStorage);
     Trace.setNodeName(nodeName);
 
-    List<ServerSecurityFilter> serverSecurity = getServerSecurityList(configuration, ServerSecurityFilterFactory.ServerType.CONTROLLER);
+    List<ServerSecurityFilter> serverSecurity = getServerSecurityList(configuration,
+        ServerSecurityFilterFactory.ServerType.CONTROLLER);
 
     Iface iface = BlurUtil.wrapFilteredBlurServer(configuration, controllerServer, false);
     iface = ServerSecurityUtil.applySecurity(iface, serverSecurity, false);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index b7f969c..47772ff 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -159,7 +159,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     if (configBindPort == 0) {
       instanceBindPort = 0;
     }
-    TServerTransport serverTransport = ThriftServer.getTServerTransport(bindAddress, instanceBindPort, configuration);
+    TServerTransport serverTransport = ThriftServer.getTServerTransport(bindAddress, instanceBindPort);
     instanceBindPort = ThriftServer.getBindingPort(serverTransport);
 
     Set<Entry<String, String>> set = configuration.getProperties().entrySet();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
index 51451fe..e69f2be 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
@@ -60,22 +60,19 @@ import org.apache.blur.server.ServerSecurityFilter;
 import org.apache.blur.server.ServerSecurityFilterFactory;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TBinaryProtocol;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TCompactProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolFactory;
 import org.apache.blur.thirdparty.thrift_0_9_0.server.TServer;
 import org.apache.blur.thirdparty.thrift_0_9_0.server.TServerEventHandler;
 import org.apache.blur.thirdparty.thrift_0_9_0.server.TThreadPoolServer;
 import org.apache.blur.thirdparty.thrift_0_9_0.server.TThreadPoolServer.Args;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerSocket;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerTransport;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TServerSocket;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TServerTransport;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportFactory;
 import org.apache.blur.thrift.generated.Blur;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.sasl.SaslHelper;
-import org.apache.blur.thrift.sasl.TSaslServerTransport;
-import org.apache.blur.thrift.server.TThreadedSelectorServer;
-import org.apache.blur.thrift.server.TThreadedSelectorServer.Args.AcceptPolicy;
 import org.apache.blur.trace.LogTraceStorage;
 import org.apache.blur.trace.TraceStorage;
 import org.apache.blur.trace.hdfs.HdfsTraceStorage;
@@ -302,34 +299,27 @@ public class ThriftServer {
 
   public void start() throws TTransportException, IOException {
     Blur.Processor<Blur.Iface> processor = new Blur.Processor<Blur.Iface>(_iface);
+    _executorService = Executors.newThreadPool("thrift-processors", _threadCount, false);
+    TProtocolFactory protocolFactory;
+    TTransportFactory transportFactory;
+
     if (SaslHelper.isSaslEnabled(_configuration)) {
-      _executorService = Executors.newThreadPool("thrift-processors", _threadCount, false);
-      TSaslServerTransport.Factory saslTransportFactory = SaslHelper.getTSaslServerTransportFactory(_configuration);
-      Args args = new TThreadPoolServer.Args(_serverTransport);
-      args.executorService(_executorService);
-      args.processor(processor);
-      args.protocolFactory(new TCompactProtocol.Factory());
-      args.transportFactory(saslTransportFactory);
-
-      _server = new TThreadPoolServer(args);
-      _server.setServerEventHandler(_eventHandler);
+      transportFactory = SaslHelper.getTSaslServerTransportFactory(_configuration);
+      protocolFactory = new TCompactProtocol.Factory();
     } else {
-      _executorService = Executors.newThreadPool("thrift-processors", _threadCount);
-      TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(
-          (TNonblockingServerTransport) _serverTransport);
-      args.processor(processor);
-      args.executorService(_executorService);
-      args.transportFactory(new TFramedTransport.Factory(_maxFrameSize));
-      args.protocolFactory(new TBinaryProtocol.Factory(true, true));
-      args.selectorThreads = _selectorThreads;
-      args.maxReadBufferBytes = _maxReadBufferBytes;
-      args.acceptQueueSizePerThread(_acceptQueueSizePerThread);
-      args.acceptPolicy(AcceptPolicy.FAIR_ACCEPT);
-
-      _server = new TThreadedSelectorServer(args);
-      _server.setServerEventHandler(_eventHandler);
+      transportFactory = new TFramedTransport.Factory(_maxFrameSize);
+      protocolFactory = new TBinaryProtocol.Factory(true, true);
     }
 
+    Args args = new TThreadPoolServer.Args(_serverTransport);
+    args.executorService(_executorService);
+    args.processor(processor);
+    args.protocolFactory(protocolFactory);
+    args.transportFactory(transportFactory);
+
+    _server = new TThreadPoolServer(args);
+    _server.setServerEventHandler(_eventHandler);
+
     LOG.info("Starting server [{0}]", _nodeName);
     _server.serve();
   }
@@ -437,21 +427,13 @@ public class ThriftServer {
     this._configuration = configuration;
   }
 
-  public static TServerTransport getTServerTransport(String bindAddress, int bindPort, BlurConfiguration configuration)
-      throws TTransportException {
+  public static TServerTransport getTServerTransport(String bindAddress, int bindPort) throws TTransportException {
     InetSocketAddress bindInetSocketAddress = getBindInetSocketAddress(bindAddress, bindPort);
-    if (SaslHelper.isSaslEnabled(configuration)) {
-      return new TServerSocket(bindInetSocketAddress);
-    } else {
-      return new TNonblockingServerSocket(bindInetSocketAddress);
-    }
+    return new TServerSocket(bindInetSocketAddress);
   }
 
   public static int getBindingPort(TServerTransport serverTransport) {
-    if (serverTransport instanceof TNonblockingServerSocket) {
-      TNonblockingServerSocket nonblockingServerSocket = (TNonblockingServerSocket) serverTransport;
-      return nonblockingServerSocket.getServerSocket().getLocalPort();
-    } else if (serverTransport instanceof TServerSocket) {
+    if (serverTransport instanceof TServerSocket) {
       TServerSocket serverSocket = (TServerSocket) serverTransport;
       return serverSocket.getServerSocket().getLocalPort();
     } else {
@@ -499,7 +481,7 @@ public class ThriftServer {
     int sessionTimeout = conf.getInt(BLUR_ZOOKEEPER_TIMEOUT, BLUR_ZOOKEEPER_TIMEOUT_DEFAULT);
     int slash = zkConnectionStr.indexOf('/');
 
-    if ((slash != -1) && (slash != zkConnectionStr.length()-1)) {
+    if ((slash != -1) && (slash != zkConnectionStr.length() - 1)) {
       ZooKeeper rootZk = ZkUtils.newZooKeeper(zkConnectionStr.substring(0, slash), sessionTimeout);
       String rootPath = zkConnectionStr.substring(slash, zkConnectionStr.length());
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
deleted file mode 100644
index 667c2a4..0000000
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
+++ /dev/null
@@ -1,614 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.blur.thrift.server;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.blur.thirdparty.thrift_0_9_0.TByteArrayOutputStream;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
-import org.apache.blur.thirdparty.thrift_0_9_0.server.ServerContext;
-import org.apache.blur.thirdparty.thrift_0_9_0.server.TServer;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TMemoryInputTransport;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerTransport;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingTransport;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransport;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides common methods and classes used by nonblocking TServer
- * implementations.
- */
-public abstract class AbstractNonblockingServer extends TServer {
-  protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
-
-  public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends
-      AbstractServerArgs<T> {
-    public long maxReadBufferBytes = Long.MAX_VALUE;
-
-    public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
-      super(transport);
-      transportFactory(new TFramedTransport.Factory());
-    }
-  }
-
-  /**
-   * The maximum amount of memory we will allocate to client IO buffers at a
-   * time. Without this limit, the server will gladly allocate client buffers
-   * right into an out of memory exception, rather than waiting.
-   */
-  private final long MAX_READ_BUFFER_BYTES;
-
-  /**
-   * How many bytes are currently allocated to read buffers.
-   */
-  private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
-
-  public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
-    super(args);
-    MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
-  }
-
-  /**
-   * Begin accepting connections and processing invocations.
-   */
-  public void serve() {
-    // start any IO threads
-    if (!startThreads()) {
-      return;
-    }
-
-    // start listening, or exit
-    if (!startListening()) {
-      return;
-    }
-
-    setServing(true);
-
-    // this will block while we serve
-    waitForShutdown();
-
-    setServing(false);
-
-    // do a little cleanup
-    stopListening();
-  }
-
-  /**
-   * Starts any threads required for serving.
-   * 
-   * @return true if everything went ok, false if threads could not be started.
-   */
-  protected abstract boolean startThreads();
-
-  /**
-   * A method that will block until when threads handling the serving have been
-   * shut down.
-   */
-  protected abstract void waitForShutdown();
-
-  /**
-   * Have the server transport start accepting connections.
-   * 
-   * @return true if we started listening successfully, false if something went
-   *         wrong.
-   */
-  protected boolean startListening() {
-    try {
-      serverTransport_.listen();
-      return true;
-    } catch (TTransportException ttx) {
-      LOGGER.error("Failed to start listening on server socket!", ttx);
-      return false;
-    }
-  }
-
-  /**
-   * Stop listening for connections.
-   */
-  protected void stopListening() {
-    serverTransport_.close();
-  }
-
-  /**
-   * Perform an invocation. This method could behave several different ways -
-   * invoke immediately inline, queue for separate execution, etc.
-   * 
-   * @return true if invocation was successfully requested, which is not a
-   *         guarantee that invocation has completed. False if the request
-   *         failed.
-   */
-  protected abstract boolean requestInvoke(FrameBuffer frameBuffer);
-
-  /**
-   * An abstract thread that handles selecting on a set of transports and
-   * {@link FrameBuffer FrameBuffers} associated with selected keys
-   * corresponding to requests.
-   */
-  protected abstract class AbstractSelectThread extends Thread {
-    protected final Selector selector;
-
-    // List of FrameBuffers that want to change their selection interests.
-    protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
-
-    public AbstractSelectThread() throws IOException {
-      this.selector = SelectorProvider.provider().openSelector();
-    }
-
-    /**
-     * If the selector is blocked, wake it up.
-     */
-    public void wakeupSelector() {
-      selector.wakeup();
-    }
-
-    /**
-     * Add FrameBuffer to the list of select interest changes and wake up the
-     * selector if it's blocked. When the select() call exits, it'll give the
-     * FrameBuffer a chance to change its interests.
-     */
-    public void requestSelectInterestChange(FrameBuffer frameBuffer) {
-      synchronized (selectInterestChanges) {
-        selectInterestChanges.add(frameBuffer);
-      }
-      // wakeup the selector, if it's currently blocked.
-      selector.wakeup();
-    }
-
-    /**
-     * Check to see if there are any FrameBuffers that have switched their
-     * interest type from read to write or vice versa.
-     */
-    protected void processInterestChanges() {
-      synchronized (selectInterestChanges) {
-        for (FrameBuffer fb : selectInterestChanges) {
-          fb.changeSelectInterests();
-        }
-        selectInterestChanges.clear();
-      }
-    }
-
-    /**
-     * Do the work required to read from a readable client. If the frame is
-     * fully read, then invoke the method call.
-     */
-    protected void handleRead(SelectionKey key) {
-      FrameBuffer buffer = (FrameBuffer) key.attachment();
-      ThriftTracer readTracer = ThriftTracer.NOTHING;
-      if (buffer.context_ != null) {
-        ServerContext context = buffer.context_;
-        if (context instanceof ThriftTrace) {
-          ThriftTrace thriftTrace = (ThriftTrace) context;
-          readTracer = thriftTrace.getTracer("thrift - handle read");
-        }
-      }
-      readTracer.start();
-      try {
-        if (!buffer.read()) {
-          cleanupSelectionKey(key);
-          return;
-        }
-      } finally {
-        readTracer.end();
-      }
-
-      // if the buffer's frame read is complete, invoke the method.
-      if (buffer.isFrameFullyRead()) {
-        ThriftTracer processTracer = ThriftTracer.NOTHING;
-        if (buffer.context_ != null) {
-          ServerContext context = buffer.context_;
-          if (context instanceof ThriftTrace) {
-            ThriftTrace thriftTrace = (ThriftTrace) context;
-            processTracer = thriftTrace.getTracer("thrift - handle request");
-          }
-        }
-        processTracer.start();
-        try {
-          if (!requestInvoke(buffer)) {
-            cleanupSelectionKey(key);
-          }
-        } finally {
-          processTracer.end();
-        }
-      }
-
-    }
-
-    /**
-     * Let a writable client get written, if there's data to be written.
-     */
-    protected void handleWrite(SelectionKey key) {
-      FrameBuffer buffer = (FrameBuffer) key.attachment();
-      ThriftTracer writeTracer = ThriftTracer.NOTHING;
-      if (buffer.context_ != null) {
-        ServerContext context = buffer.context_;
-        if (context instanceof ThriftTrace) {
-          ThriftTrace thriftTrace = (ThriftTrace) context;
-          writeTracer = thriftTrace.getTracer("thrift - handle write");
-        }
-      }
-      writeTracer.start();
-      try {
-        if (!buffer.write()) {
-          cleanupSelectionKey(key);
-        }
-      } finally {
-        writeTracer.end();
-      }
-    }
-
-    /**
-     * Do connection-close cleanup on a given SelectionKey.
-     */
-    protected void cleanupSelectionKey(SelectionKey key) {
-      // remove the records from the two maps
-      FrameBuffer buffer = (FrameBuffer) key.attachment();
-      if (buffer != null) {
-        // close the buffer
-        buffer.close();
-      }
-      // cancel the selection key
-      key.cancel();
-    }
-  } // SelectThread
-
-  /**
-   * Possible states for the FrameBuffer state machine.
-   */
-  private enum FrameBufferState {
-    // in the midst of reading the frame size off the wire
-    READING_FRAME_SIZE,
-    // reading the actual frame data now, but not all the way done yet
-    READING_FRAME,
-    // completely read the frame, so an invocation can now happen
-    READ_FRAME_COMPLETE,
-    // waiting to get switched to listening for write events
-    AWAITING_REGISTER_WRITE,
-    // started writing response data, not fully complete yet
-    WRITING,
-    // another thread wants this framebuffer to go back to reading
-    AWAITING_REGISTER_READ,
-    // we want our transport and selection key invalidated in the selector
-    // thread
-    AWAITING_CLOSE
-  }
-
-  /**
-   * Class that implements a sort of state machine around the interaction with a
-   * client and an invoker. It manages reading the frame size and frame data,
-   * getting it handed off as wrapped transports, and then the writing of
-   * response data back to the client. In the process it manages flipping the
-   * read and write bits on the selection key for its client.
-   */
-  protected class FrameBuffer {
-    // the actual transport hooked up to the client.
-    public final TNonblockingTransport trans_;
-
-    // the SelectionKey that corresponds to our transport
-    private final SelectionKey selectionKey_;
-
-    // the SelectThread that owns the registration of our transport
-    private final AbstractSelectThread selectThread_;
-
-    // where in the process of reading/writing are we?
-    private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
-
-    // the ByteBuffer we'll be using to write and read, depending on the state
-    private ByteBuffer buffer_;
-
-    private final TByteArrayOutputStream response_;
-
-    // the frame that the TTransport should wrap.
-    private final TMemoryInputTransport frameTrans_;
-
-    // the transport that should be used to connect to clients
-    private final TTransport inTrans_;
-
-    private final TTransport outTrans_;
-
-    // the input protocol to use on frames
-    private final TProtocol inProt_;
-
-    // the output protocol to use on frames
-    private final TProtocol outProt_;
-
-    // context associated with this connection
-    private final ServerContext context_;
-
-    public FrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey,
-        final AbstractSelectThread selectThread) {
-      trans_ = trans;
-      selectionKey_ = selectionKey;
-      selectThread_ = selectThread;
-      buffer_ = ByteBuffer.allocate(4);
-
-      frameTrans_ = new TMemoryInputTransport();
-      response_ = new TByteArrayOutputStream();
-      inTrans_ = inputTransportFactory_.getTransport(frameTrans_);
-      outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
-      inProt_ = inputProtocolFactory_.getProtocol(inTrans_);
-      outProt_ = outputProtocolFactory_.getProtocol(outTrans_);
-
-      if (eventHandler_ != null) {
-        context_ = eventHandler_.createContext(inProt_, outProt_, selectionKey);
-      } else {
-        context_ = null;
-      }
-    }
-
-    /**
-     * Give this FrameBuffer a chance to read. The selector loop should have
-     * received a read event for this FrameBuffer.
-     * 
-     * @return true if the connection should live on, false if it should be
-     *         closed
-     */
-    public boolean read() {
-      if (state_ == FrameBufferState.READING_FRAME_SIZE) {
-        // try to read the frame size completely
-        if (!internalRead()) {
-          return false;
-        }
-
-        // if the frame size has been read completely, then prepare to read the
-        // actual frame.
-        if (buffer_.remaining() == 0) {
-          // pull out the frame size as an integer.
-          int frameSize = buffer_.getInt(0);
-          if (frameSize <= 0) {
-            LOGGER.error("Read an invalid frame size of " + frameSize
-                + ". Are you using TFramedTransport on the client side?");
-            return false;
-          }
-
-          // if this frame will always be too large for this server, log the
-          // error and close the connection.
-          if (frameSize > MAX_READ_BUFFER_BYTES) {
-            LOGGER.error("Read a frame size of " + frameSize
-                + ", which is bigger than the maximum allowable buffer size for ALL connections.");
-            return false;
-          }
-
-          // if this frame will push us over the memory limit, then return.
-          // with luck, more memory will free up the next time around.
-          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
-            return true;
-          }
-
-          // increment the amount of memory allocated to read buffers
-          readBufferBytesAllocated.addAndGet(frameSize + 4);
-
-          // reallocate the readbuffer as a frame-sized buffer
-          buffer_ = ByteBuffer.allocate(frameSize + 4);
-          buffer_.putInt(frameSize);
-
-          state_ = FrameBufferState.READING_FRAME;
-        } else {
-          // this skips the check of READING_FRAME state below, since we can't
-          // possibly go on to that state if there's data left to be read at
-          // this one.
-          return true;
-        }
-      }
-
-      // it is possible to fall through from the READING_FRAME_SIZE section
-      // to READING_FRAME if there's already some frame data available once
-      // READING_FRAME_SIZE is complete.
-
-      if (state_ == FrameBufferState.READING_FRAME) {
-        if (!internalRead()) {
-          return false;
-        }
-
-        // since we're already in the select loop here for sure, we can just
-        // modify our selection key directly.
-        if (buffer_.remaining() == 0) {
-          // get rid of the read select interests
-          selectionKey_.interestOps(0);
-          state_ = FrameBufferState.READ_FRAME_COMPLETE;
-        }
-
-        return true;
-      }
-
-      // if we fall through to this point, then the state must be invalid.
-      LOGGER.error("Read was called but state is invalid (" + state_ + ")");
-      return false;
-    }
-
-    /**
-     * Give this FrameBuffer a chance to write its output to the final client.
-     */
-    public boolean write() {
-      if (state_ == FrameBufferState.WRITING) {
-        try {
-          if (trans_.write(buffer_) < 0) {
-            return false;
-          }
-        } catch (IOException e) {
-          LOGGER.warn("Got an IOException during write!", e);
-          return false;
-        }
-
-        // we're done writing. now we need to switch back to reading.
-        if (buffer_.remaining() == 0) {
-          prepareRead();
-        }
-        return true;
-      }
-
-      LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
-      return false;
-    }
-
-    /**
-     * Give this FrameBuffer a chance to set its interest to write, once data
-     * has come in.
-     */
-    public void changeSelectInterests() {
-      if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
-        // set the OP_WRITE interest
-        selectionKey_.interestOps(SelectionKey.OP_WRITE);
-        state_ = FrameBufferState.WRITING;
-      } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
-        prepareRead();
-      } else if (state_ == FrameBufferState.AWAITING_CLOSE) {
-        close();
-        selectionKey_.cancel();
-      } else {
-        LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
-      }
-    }
-
-    /**
-     * Shut the connection down.
-     */
-    public void close() {
-      // if we're being closed due to an error, we might have allocated a
-      // buffer that we need to subtract for our memory accounting.
-      if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE) {
-        readBufferBytesAllocated.addAndGet(-buffer_.array().length);
-      }
-      trans_.close();
-      if (eventHandler_ != null) {
-        eventHandler_.deleteContext(context_, inProt_, outProt_);
-      }
-    }
-
-    /**
-     * Check if this FrameBuffer has a full frame read.
-     */
-    public boolean isFrameFullyRead() {
-      return state_ == FrameBufferState.READ_FRAME_COMPLETE;
-    }
-
-    /**
-     * After the processor has processed the invocation, whatever thread is
-     * managing invocations should call this method on this FrameBuffer so we
-     * know it's time to start trying to write again. Also, if it turns out that
-     * there actually isn't any data in the response buffer, we'll skip trying
-     * to write and instead go back to reading.
-     */
-    public void responseReady() {
-      // the read buffer is definitely no longer in use, so we will decrement
-      // our read buffer count. we do this here as well as in close because
-      // we'd like to free this read memory up as quickly as possible for other
-      // clients.
-      readBufferBytesAllocated.addAndGet(-buffer_.array().length);
-
-      if (response_.len() == 0) {
-        // go straight to reading again. this was probably an oneway method
-        state_ = FrameBufferState.AWAITING_REGISTER_READ;
-        buffer_ = null;
-      } else {
-        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
-
-        // set state that we're waiting to be switched to write. we do this
-        // asynchronously through requestSelectInterestChange() because there is
-        // a possibility that we're not in the main thread, and thus currently
-        // blocked in select(). (this functionality is in place for the sake of
-        // the HsHa server.)
-        state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
-      }
-      requestSelectInterestChange();
-    }
-
-    /**
-     * Actually invoke the method signified by this FrameBuffer.
-     */
-    public void invoke() {
-      frameTrans_.reset(buffer_.array());
-      response_.reset();
-
-      try {
-        if (eventHandler_ != null) {
-          eventHandler_.processContext(context_, inTrans_, outTrans_);
-        }
-        processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
-        responseReady();
-        return;
-      } catch (TException te) {
-        LOGGER.warn("Exception while invoking!", te);
-      } catch (Throwable t) {
-        LOGGER.error("Unexpected throwable while invoking!", t);
-      }
-      // This will only be reached when there is a throwable.
-      state_ = FrameBufferState.AWAITING_CLOSE;
-      requestSelectInterestChange();
-    }
-
-    /**
-     * Perform a read into buffer.
-     * 
-     * @return true if the read succeeded, false if there was an error or the
-     *         connection closed.
-     */
-    private boolean internalRead() {
-      try {
-        if (trans_.read(buffer_) < 0) {
-          return false;
-        }
-        return true;
-      } catch (IOException e) {
-        LOGGER.warn("Got an IOException in internalRead!", e);
-        return false;
-      }
-    }
-
-    /**
-     * We're done writing, so reset our interest ops and change state
-     * accordingly.
-     */
-    private void prepareRead() {
-      // we can set our interest directly without using the queue because
-      // we're in the select thread.
-      selectionKey_.interestOps(SelectionKey.OP_READ);
-      // get ready for another go-around
-      buffer_ = ByteBuffer.allocate(4);
-      state_ = FrameBufferState.READING_FRAME_SIZE;
-    }
-
-    /**
-     * When this FrameBuffer needs to change its select interests and execution
-     * might not be in its select thread, then this method will make sure the
-     * interest change gets done when the select thread wakes back up. When the
-     * current thread is this FrameBuffer's select thread, then it just does the
-     * interest change immediately.
-     */
-    private void requestSelectInterestChange() {
-      if (Thread.currentThread() == this.selectThread_) {
-        changeSelectInterests();
-      } else {
-        this.selectThread_.requestSelectInterestChange(this);
-      }
-    }
-  } // FrameBuffer
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java
deleted file mode 100644
index fe71e11..0000000
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.blur.thrift.server;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.blur.thrift.server.AbstractNonblockingServer.FrameBuffer;
-
-/**
- * An Invocation represents a method call that is prepared to execute, given an
- * idle worker thread. It contains the input and output protocols the thread's
- * processor should use to perform the usual Thrift invocation.
- */
-class Invocation implements Runnable {
-  private final FrameBuffer frameBuffer;
-
-  public Invocation(final FrameBuffer frameBuffer) {
-    this.frameBuffer = frameBuffer;
-  }
-
-  public void run() {
-    frameBuffer.invoke();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java
deleted file mode 100644
index b58b063..0000000
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java
+++ /dev/null
@@ -1,659 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.blur.thrift.server;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerTransport;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingTransport;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Half-Sync/Half-Async server with a separate pool of threads to handle
- * non-blocking I/O. Accepts are handled on a single thread, and a configurable
- * number of nonblocking selector threads manage reading and writing of client
- * connections. A synchronous worker thread pool handles processing of requests.
- * 
- * Performs better than TNonblockingServer/THsHaServer in multi-core
- * environments when the the bottleneck is CPU on the single selector thread
- * handling I/O. In addition, because the accept handling is decoupled from
- * reads/writes and invocation, the server has better ability to handle back-
- * pressure from new connections (e.g. stop accepting when busy).
- * 
- * Like TNonblockingServer, it relies on the use of TFramedTransport.
- */
-public class TThreadedSelectorServer extends AbstractNonblockingServer {
-  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName());
-
-  private static int selectorThreadCount;
-
-  public static class Args extends AbstractNonblockingServerArgs<Args> {
-
-    /** The number of threads for selecting on already-accepted connections */
-    public int selectorThreads = 2;
-    /**
-     * The size of the executor service (if none is specified) that will handle
-     * invocations. This may be set to 0, in which case invocations will be
-     * handled directly on the selector threads (as is in TNonblockingServer)
-     */
-    private int workerThreads = 5;
-    /** Time to wait for server to stop gracefully */
-    private int stopTimeoutVal = 60;
-    private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
-    /** The ExecutorService for handling dispatched requests */
-    private ExecutorService executorService = null;
-    /**
-     * The size of the blocking queue per selector thread for passing accepted
-     * connections to the selector thread
-     */
-    private int acceptQueueSizePerThread = 4;
-
-    /**
-     * Determines the strategy for handling new accepted connections.
-     */
-    public static enum AcceptPolicy {
-      /**
-       * Require accepted connection registration to be handled by the executor.
-       * If the worker pool is saturated, further accepts will be closed
-       * immediately. Slightly increases latency due to an extra scheduling.
-       */
-      FAIR_ACCEPT,
-      /**
-       * Handle the accepts as fast as possible, disregarding the status of the
-       * executor service.
-       */
-      FAST_ACCEPT
-    }
-
-    private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;
-
-    public Args(TNonblockingServerTransport transport) {
-      super(transport);
-    }
-
-    public Args selectorThreads(int i) {
-      selectorThreads = i;
-      return this;
-    }
-
-    public int getSelectorThreads() {
-      return selectorThreads;
-    }
-
-    public Args workerThreads(int i) {
-      workerThreads = i;
-      return this;
-    }
-
-    public int getWorkerThreads() {
-      return workerThreads;
-    }
-
-    public int getStopTimeoutVal() {
-      return stopTimeoutVal;
-    }
-
-    public Args stopTimeoutVal(int stopTimeoutVal) {
-      this.stopTimeoutVal = stopTimeoutVal;
-      return this;
-    }
-
-    public TimeUnit getStopTimeoutUnit() {
-      return stopTimeoutUnit;
-    }
-
-    public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
-      this.stopTimeoutUnit = stopTimeoutUnit;
-      return this;
-    }
-
-    public ExecutorService getExecutorService() {
-      return executorService;
-    }
-
-    public Args executorService(ExecutorService executorService) {
-      this.executorService = executorService;
-      return this;
-    }
-
-    public int getAcceptQueueSizePerThread() {
-      return acceptQueueSizePerThread;
-    }
-
-    public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) {
-      this.acceptQueueSizePerThread = acceptQueueSizePerThread;
-      return this;
-    }
-
-    public AcceptPolicy getAcceptPolicy() {
-      return acceptPolicy;
-    }
-
-    public Args acceptPolicy(AcceptPolicy acceptPolicy) {
-      this.acceptPolicy = acceptPolicy;
-      return this;
-    }
-
-    public void validate() {
-      if (selectorThreads <= 0) {
-        throw new IllegalArgumentException("selectorThreads must be positive.");
-      }
-      if (workerThreads < 0) {
-        throw new IllegalArgumentException("workerThreads must be non-negative.");
-      }
-      if (acceptQueueSizePerThread <= 0) {
-        throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
-      }
-    }
-  }
-
-  // Flag for stopping the server
-  // Please see THRIFT-1795 for the usage of this flag
-  private volatile boolean stopped_ = false;
-
-  // The thread handling all accepts
-  private AcceptThread acceptThread;
-
-  // Threads handling events on client transports
-  private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
-
-  // This wraps all the functionality of queueing and thread pool management
-  // for the passing of Invocations from the selector thread(s) to the workers
-  // (if any).
-  private final ExecutorService invoker;
-
-  private final Args args;
-
-  /**
-   * Create the server with the specified Args configuration
-   */
-  public TThreadedSelectorServer(Args args) {
-    super(args);
-    args.validate();
-    invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
-    this.args = args;
-  }
-
-  /**
-   * Start the accept and selector threads running to deal with clients.
-   * 
-   * @return true if everything went ok, false if we couldn't start for some
-   *         reason.
-   */
-  @Override
-  protected boolean startThreads() {
-    try {
-      for (int i = 0; i < args.selectorThreads; ++i) {
-        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
-      }
-      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
-          createSelectorThreadLoadBalancer(selectorThreads));
-      for (SelectorThread thread : selectorThreads) {
-        thread.start();
-      }
-      acceptThread.start();
-      return true;
-    } catch (IOException e) {
-      LOGGER.error("Failed to start threads!", e);
-      return false;
-    }
-  }
-
-  /**
-   * Joins the accept and selector threads and shuts down the executor service.
-   */
-  @Override
-  protected void waitForShutdown() {
-    try {
-      joinThreads();
-    } catch (InterruptedException e) {
-      // Non-graceful shutdown occurred
-      LOGGER.error("Interrupted while joining threads!", e);
-    }
-    gracefullyShutdownInvokerPool();
-  }
-
-  protected void joinThreads() throws InterruptedException {
-    // wait until the io threads exit
-    acceptThread.join();
-    for (SelectorThread thread : selectorThreads) {
-      thread.join();
-    }
-  }
-
-  /**
-   * Stop serving and shut everything down.
-   */
-  @Override
-  public void stop() {
-    stopped_ = true;
-
-    // Stop queuing connect attempts asap
-    stopListening();
-
-    if (acceptThread != null) {
-      acceptThread.wakeupSelector();
-    }
-    if (selectorThreads != null) {
-      for (SelectorThread thread : selectorThreads) {
-        if (thread != null)
-          thread.wakeupSelector();
-      }
-    }
-  }
-
-  protected void gracefullyShutdownInvokerPool() {
-    // try to gracefully shut down the executor service
-    invoker.shutdown();
-
-    // Loop until awaitTermination finally does return without a interrupted
-    // exception. If we don't do this, then we'll shut down prematurely. We want
-    // to let the executorService clear it's task queue, closing client sockets
-    // appropriately.
-    long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
-    long now = System.currentTimeMillis();
-    while (timeoutMS >= 0) {
-      try {
-        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
-        break;
-      } catch (InterruptedException ix) {
-        long newnow = System.currentTimeMillis();
-        timeoutMS -= (newnow - now);
-        now = newnow;
-      }
-    }
-  }
-
-  /**
-   * We override the standard invoke method here to queue the invocation for
-   * invoker service instead of immediately invoking. If there is no thread
-   * pool, handle the invocation inline on this thread
-   */
-  @Override
-  protected boolean requestInvoke(FrameBuffer frameBuffer) {
-    Runnable invocation = getRunnable(frameBuffer);
-    if (invoker != null) {
-      try {
-        invoker.execute(invocation);
-        return true;
-      } catch (RejectedExecutionException rx) {
-        LOGGER.warn("ExecutorService rejected execution!", rx);
-        return false;
-      }
-    } else {
-      // Invoke on the caller's thread
-      invocation.run();
-      return true;
-    }
-  }
-
-  protected Runnable getRunnable(FrameBuffer frameBuffer) {
-    return new Invocation(frameBuffer);
-  }
-
-  /**
-   * Helper to create the invoker if one is not specified
-   */
-  protected static ExecutorService createDefaultExecutor(Args options) {
-    return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null;
-  }
-
-  private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) {
-    if (queueSize == 0) {
-      // Unbounded queue
-      return new LinkedBlockingQueue<TNonblockingTransport>();
-    }
-    return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
-  }
-
-  /**
-   * The thread that selects on the server transport (listen socket) and accepts
-   * new connections to hand off to the IO selector threads
-   */
-  protected class AcceptThread extends Thread {
-
-    // The listen socket to accept on
-    private final TNonblockingServerTransport serverTransport;
-    private final Selector acceptSelector;
-
-    private final SelectorThreadLoadBalancer threadChooser;
-
-    /**
-     * Set up the AcceptThead
-     * 
-     * @throws IOException
-     */
-    public AcceptThread(TNonblockingServerTransport serverTransport, SelectorThreadLoadBalancer threadChooser)
-        throws IOException {
-      this.serverTransport = serverTransport;
-      this.threadChooser = threadChooser;
-      this.acceptSelector = SelectorProvider.provider().openSelector();
-      this.serverTransport.registerSelector(acceptSelector);
-    }
-
-    /**
-     * The work loop. Selects on the server transport and accepts. If there was
-     * a server transport that had blocking accepts, and returned on blocking
-     * client transports, that should be used instead
-     */
-    public void run() {
-      try {
-        if (eventHandler_ != null) {
-          eventHandler_.preServe();
-        }
-
-        while (!stopped_) {
-          select();
-        }
-      } catch (Throwable t) {
-        LOGGER.error("run() exiting due to uncaught error", t);
-      } finally {
-        // This will wake up the selector threads
-        TThreadedSelectorServer.this.stop();
-      }
-    }
-
-    /**
-     * If the selector is blocked, wake it up.
-     */
-    public void wakeupSelector() {
-      acceptSelector.wakeup();
-    }
-
-    /**
-     * Select and process IO events appropriately: If there are connections to
-     * be accepted, accept them.
-     */
-    private void select() {
-      try {
-        // wait for connect events.
-        acceptSelector.select();
-
-        // process the io events we received
-        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
-        while (!stopped_ && selectedKeys.hasNext()) {
-          SelectionKey key = selectedKeys.next();
-          selectedKeys.remove();
-
-          // skip if not valid
-          if (!key.isValid()) {
-            continue;
-          }
-
-          if (key.isAcceptable()) {
-            handleAccept();
-          } else {
-            LOGGER.warn("Unexpected state in select! " + key.interestOps());
-          }
-        }
-      } catch (IOException e) {
-        LOGGER.warn("Got an IOException while selecting!", e);
-      }
-    }
-
-    /**
-     * Accept a new connection.
-     */
-    private void handleAccept() {
-      final TNonblockingTransport client = doAccept();
-      if (client != null) {
-        // Pass this connection to a selector thread
-        final SelectorThread targetThread = threadChooser.nextThread();
-
-        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
-          doAddAccept(targetThread, client);
-        } else {
-          // FAIR_ACCEPT
-          try {
-            invoker.submit(new Runnable() {
-              public void run() {
-                doAddAccept(targetThread, client);
-              }
-            });
-          } catch (RejectedExecutionException rx) {
-            LOGGER.warn("ExecutorService rejected accept registration!", rx);
-            // close immediately
-            client.close();
-          }
-        }
-      }
-    }
-
-    private TNonblockingTransport doAccept() {
-      try {
-        return (TNonblockingTransport) serverTransport.accept();
-      } catch (TTransportException tte) {
-        // something went wrong accepting.
-        LOGGER.warn("Exception trying to accept!", tte);
-        return null;
-      }
-    }
-
-    private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
-      if (!thread.addAcceptedConnection(client)) {
-        client.close();
-      }
-    }
-  } // AcceptThread
-
-  /**
-   * The SelectorThread(s) will be doing all the selecting on accepted active
-   * connections.
-   */
-  protected class SelectorThread extends AbstractSelectThread {
-
-    // Accepted connections added by the accept thread.
-    private final BlockingQueue<TNonblockingTransport> acceptedQueue;
-
-    /**
-     * Set up the SelectorThread with an unbounded queue for incoming accepts.
-     * 
-     * @throws IOException
-     *           if a selector cannot be created
-     */
-    public SelectorThread() throws IOException {
-      this(new LinkedBlockingQueue<TNonblockingTransport>());
-    }
-
-    /**
-     * Set up the SelectorThread with an bounded queue for incoming accepts.
-     * 
-     * @throws IOException
-     *           if a selector cannot be created
-     */
-    public SelectorThread(int maxPendingAccepts) throws IOException {
-      this(createDefaultAcceptQueue(maxPendingAccepts));
-    }
-
-    /**
-     * Set up the SelectorThread with a specified queue for connections.
-     * 
-     * @param acceptedQueue
-     *          The BlockingQueue implementation for holding incoming accepted
-     *          connections.
-     * @throws IOException
-     *           if a selector cannot be created.
-     */
-    public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException {
-      this.acceptedQueue = acceptedQueue;
-    }
-
-    /**
-     * Hands off an accepted connection to be handled by this thread. This
-     * method will block if the queue for new connections is at capacity.
-     * 
-     * @param accepted
-     *          The connection that has been accepted.
-     * @return true if the connection has been successfully added.
-     */
-    public boolean addAcceptedConnection(TNonblockingTransport accepted) {
-      try {
-        acceptedQueue.put(accepted);
-      } catch (InterruptedException e) {
-        LOGGER.warn("Interrupted while adding accepted connection!", e);
-        return false;
-      }
-      selector.wakeup();
-      return true;
-    }
-
-    /**
-     * The work loop. Handles selecting (read/write IO), dispatching, and
-     * managing the selection preferences of all existing connections.
-     */
-    public void run() {
-      try {
-        Thread thread = Thread.currentThread();
-        if (thread.getName().startsWith("Thread-")) {
-          thread.setName("TThreadedSelectorServer-SelectorThread-" + selectorThreadCount++);
-        }
-        while (!stopped_) {
-          select();
-          processAcceptedConnections();
-          processInterestChanges();
-        }
-        for (SelectionKey selectionKey : selector.keys()) {
-          cleanupSelectionKey(selectionKey);
-        }
-      } catch (Throwable t) {
-        LOGGER.error("run() exiting due to uncaught error", t);
-      } finally {
-        // This will wake up the accept thread and the other selector threads
-        TThreadedSelectorServer.this.stop();
-      }
-    }
-
-    /**
-     * Select and process IO events appropriately: If there are existing
-     * connections with data waiting to be read, read it, buffering until a
-     * whole frame has been read. If there are any pending responses, buffer
-     * them until their target client is available, and then send the data.
-     */
-    private void select() {
-      try {
-        // wait for io events.
-        selector.select();
-
-        // process the io events we received
-        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
-        while (!stopped_ && selectedKeys.hasNext()) {
-          SelectionKey key = selectedKeys.next();
-          selectedKeys.remove();
-
-          // skip if not valid
-          if (!key.isValid()) {
-            cleanupSelectionKey(key);
-            continue;
-          }
-
-          if (key.isReadable()) {
-            // deal with reads
-            handleRead(key);
-          } else if (key.isWritable()) {
-            // deal with writes
-            handleWrite(key);
-          } else {
-            LOGGER.warn("Unexpected state in select! " + key.interestOps());
-          }
-        }
-      } catch (IOException e) {
-        LOGGER.warn("Got an IOException while selecting!", e);
-      }
-    }
-
-    private void processAcceptedConnections() {
-      // Register accepted connections
-      while (!stopped_) {
-        TNonblockingTransport accepted = acceptedQueue.poll();
-        if (accepted == null) {
-          break;
-        }
-        registerAccepted(accepted);
-      }
-    }
-
-    private void registerAccepted(TNonblockingTransport accepted) {
-      SelectionKey clientKey = null;
-      try {
-        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
-
-        FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this);
-        clientKey.attach(frameBuffer);
-      } catch (IOException e) {
-        LOGGER.warn("Failed to register accepted connection to selector!", e);
-        if (clientKey != null) {
-          cleanupSelectionKey(clientKey);
-        }
-        accepted.close();
-      }
-    }
-  } // SelectorThread
-
-  /**
-   * Creates a SelectorThreadLoadBalancer to be used by the accept thread for
-   * assigning newly accepted connections across the threads.
-   */
-  protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
-    return new SelectorThreadLoadBalancer(threads);
-  }
-
-  /**
-   * A round robin load balancer for choosing selector threads for new
-   * connections.
-   */
-  protected class SelectorThreadLoadBalancer {
-    private final Collection<? extends SelectorThread> threads;
-    private Iterator<? extends SelectorThread> nextThreadIterator;
-
-    public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
-      if (threads.isEmpty()) {
-        throw new IllegalArgumentException("At least one selector thread is required");
-      }
-      this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
-      nextThreadIterator = this.threads.iterator();
-    }
-
-    public SelectorThread nextThread() {
-      // Choose a selector thread (round robin)
-      if (!nextThreadIterator.hasNext()) {
-        nextThreadIterator = threads.iterator();
-      }
-      return nextThreadIterator.next();
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTrace.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTrace.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTrace.java
deleted file mode 100644
index 3a42983..0000000
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTrace.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.thrift.server;
-
-public interface ThriftTrace {
-
-  ThriftTracer getTracer(String name);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5853d86e/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTracer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTracer.java b/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTracer.java
deleted file mode 100644
index c8a9937..0000000
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/server/ThriftTracer.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.thrift.server;
-
-public abstract class ThriftTracer {
-
-  public static final ThriftTracer NOTHING = new ThriftTracer() {
-    @Override
-    public void start() {
-      
-    }
-    
-    @Override
-    public void end() {
-      
-    }
-  };
-
-  public abstract void start();
-
-  public abstract void end();
-
-}


Mime
View raw message