incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Working on BLUR-79.
Date Mon, 06 May 2013 11:15:30 GMT
Working on BLUR-79.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/827ba0c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/827ba0c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/827ba0c4

Branch: refs/heads/0.1.5
Commit: 827ba0c485899ee79d7d03fc1687a12243001692
Parents: f475982
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun May 5 15:31:17 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun May 5 15:31:17 2013 -0400

----------------------------------------------------------------------
 .../java/org/apache/blur/thrift/ThriftServer.java  |   23 +-
 .../thrift/server/AbstractNonblockingServer.java   |  575 +++++++++++++
 .../org/apache/blur/thrift/server/Invocation.java  |   20 +
 .../thrift/server/TThreadedSelectorServer.java     |  653 +++++++++++++++
 4 files changed, 1258 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/827ba0c4/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
index c1ec8ce..49e6b9a 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
@@ -32,11 +32,11 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
 import org.apache.blur.thrift.generated.Blur;
 import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.server.TThreadedSelectorServer;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TTransportException;
 
 public class ThriftServer {
@@ -106,17 +106,14 @@ public class ThriftServer {
   public void start() throws TTransportException {
     _executorService = Executors.newThreadPool("thrift-processors", _threadCount);
     Blur.Processor<Blur.Iface> processor = new Blur.Processor<Blur.Iface>(_iface);
-     TServerSocket serverTransport = new
-     TServerSocket(getBindInetSocketAddress(_configuration));
-    
-     TThreadPoolServer.Args args = new
-     TThreadPoolServer.Args(serverTransport);
-     
-     args.processor(processor);
-     args.executorService(_executorService);
-     args.transportFactory(new TFramedTransport.Factory());
-     args.protocolFactory(new TBinaryProtocol.Factory(true, true));
-     _server = new TThreadPoolServer(args);
+
+    TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(getBindInetSocketAddress(_configuration));
+    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverTransport);
+    args.processor(processor);
+    args.executorService(_executorService);
+    args.transportFactory(new TFramedTransport.Factory());
+    args.protocolFactory(new TBinaryProtocol.Factory(true, true));
+    _server = new TThreadedSelectorServer(args);
     LOG.info("Starting server [{0}]", _nodeName);
     _server.serve();
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/827ba0c4/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
b/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
new file mode 100644
index 0000000..6fe721f
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/AbstractNonblockingServer.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thrift.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServer.AbstractServerArgs;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides common methods and classes used by nonblocking TServer
+ * implementations.
+ */
+public abstract class AbstractNonblockingServer extends TServer {
+  protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
+
+  public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>>
extends AbstractServerArgs<T> {
+    public long maxReadBufferBytes = Long.MAX_VALUE;
+
+    public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
+      super(transport);
+      transportFactory(new TFramedTransport.Factory());
+    }
+  }
+
+  /**
+   * The maximum amount of memory we will allocate to client IO buffers at a
+   * time. Without this limit, the server will gladly allocate client buffers
+   * right into an out of memory exception, rather than waiting.
+   */
+  private final long MAX_READ_BUFFER_BYTES;
+
+  /**
+   * How many bytes are currently allocated to read buffers.
+   */
+  private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
+
+  public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
+    super(args);
+    MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
+  }
+
+  /**
+   * Begin accepting connections and processing invocations.
+   */
+  public void serve() {
+    // start any IO threads
+    if (!startThreads()) {
+      return;
+    }
+
+    // start listening, or exit
+    if (!startListening()) {
+      return;
+    }
+
+    setServing(true);
+
+    // this will block while we serve
+    waitForShutdown();
+
+    setServing(false);
+
+    // do a little cleanup
+    stopListening();
+  }
+
+  /**
+   * Starts any threads required for serving.
+   * 
+   * @return true if everything went ok, false if threads could not be started.
+   */
+  protected abstract boolean startThreads();
+
+  /**
+   * A method that will block until when threads handling the serving have been
+   * shut down.
+   */
+  protected abstract void waitForShutdown();
+
+  /**
+   * Have the server transport start accepting connections.
+   * 
+   * @return true if we started listening successfully, false if something went
+   *         wrong.
+   */
+  protected boolean startListening() {
+    try {
+      serverTransport_.listen();
+      return true;
+    } catch (TTransportException ttx) {
+      LOGGER.error("Failed to start listening on server socket!", ttx);
+      return false;
+    }
+  }
+
+  /**
+   * Stop listening for connections.
+   */
+  protected void stopListening() {
+    serverTransport_.close();
+  }
+
+  /**
+   * Perform an invocation. This method could behave several different ways -
+   * invoke immediately inline, queue for separate execution, etc.
+   * 
+   * @return true if invocation was successfully requested, which is not a
+   *         guarantee that invocation has completed. False if the request
+   *         failed.
+   */
+  protected abstract boolean requestInvoke(FrameBuffer frameBuffer);
+
+  /**
+   * An abstract thread that handles selecting on a set of transports and
+   * {@link FrameBuffer FrameBuffers} associated with selected keys
+   * corresponding to requests.
+   */
+  protected abstract class AbstractSelectThread extends Thread {
+    protected final Selector selector;
+
+    // List of FrameBuffers that want to change their selection interests.
+    protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
+
+    public AbstractSelectThread() throws IOException {
+      this.selector = SelectorProvider.provider().openSelector();
+    }
+
+    /**
+     * If the selector is blocked, wake it up.
+     */
+    public void wakeupSelector() {
+      selector.wakeup();
+    }
+
+    /**
+     * Add FrameBuffer to the list of select interest changes and wake up the
+     * selector if it's blocked. When the select() call exits, it'll give the
+     * FrameBuffer a chance to change its interests.
+     */
+    public void requestSelectInterestChange(FrameBuffer frameBuffer) {
+      synchronized (selectInterestChanges) {
+        selectInterestChanges.add(frameBuffer);
+      }
+      // wakeup the selector, if it's currently blocked.
+      selector.wakeup();
+    }
+
+    /**
+     * Check to see if there are any FrameBuffers that have switched their
+     * interest type from read to write or vice versa.
+     */
+    protected void processInterestChanges() {
+      synchronized (selectInterestChanges) {
+        for (FrameBuffer fb : selectInterestChanges) {
+          fb.changeSelectInterests();
+        }
+        selectInterestChanges.clear();
+      }
+    }
+
+    /**
+     * Do the work required to read from a readable client. If the frame is
+     * fully read, then invoke the method call.
+     */
+    protected void handleRead(SelectionKey key) {
+      FrameBuffer buffer = (FrameBuffer) key.attachment();
+      if (!buffer.read()) {
+        cleanupSelectionKey(key);
+        return;
+      }
+
+      // if the buffer's frame read is complete, invoke the method.
+      if (buffer.isFrameFullyRead()) {
+        if (!requestInvoke(buffer)) {
+          cleanupSelectionKey(key);
+        }
+      }
+    }
+
+    /**
+     * Let a writable client get written, if there's data to be written.
+     */
+    protected void handleWrite(SelectionKey key) {
+      FrameBuffer buffer = (FrameBuffer) key.attachment();
+      if (!buffer.write()) {
+        cleanupSelectionKey(key);
+      }
+    }
+
+    /**
+     * Do connection-close cleanup on a given SelectionKey.
+     */
+    protected void cleanupSelectionKey(SelectionKey key) {
+      // remove the records from the two maps
+      FrameBuffer buffer = (FrameBuffer) key.attachment();
+      if (buffer != null) {
+        // close the buffer
+        buffer.close();
+      }
+      // cancel the selection key
+      key.cancel();
+    }
+  } // SelectThread
+
+  /**
+   * Possible states for the FrameBuffer state machine.
+   */
+  private enum FrameBufferState {
+    // in the midst of reading the frame size off the wire
+    READING_FRAME_SIZE,
+    // reading the actual frame data now, but not all the way done yet
+    READING_FRAME,
+    // completely read the frame, so an invocation can now happen
+    READ_FRAME_COMPLETE,
+    // waiting to get switched to listening for write events
+    AWAITING_REGISTER_WRITE,
+    // started writing response data, not fully complete yet
+    WRITING,
+    // another thread wants this framebuffer to go back to reading
+    AWAITING_REGISTER_READ,
+    // we want our transport and selection key invalidated in the selector
+    // thread
+    AWAITING_CLOSE
+  }
+
+  /**
+   * Class that implements a sort of state machine around the interaction with a
+   * client and an invoker. It manages reading the frame size and frame data,
+   * getting it handed off as wrapped transports, and then the writing of
+   * response data back to the client. In the process it manages flipping the
+   * read and write bits on the selection key for its client.
+   */
+  protected class FrameBuffer {
+    // the actual transport hooked up to the client.
+    public final TNonblockingTransport trans_;
+
+    // the SelectionKey that corresponds to our transport
+    private final SelectionKey selectionKey_;
+
+    // the SelectThread that owns the registration of our transport
+    private final AbstractSelectThread selectThread_;
+
+    // where in the process of reading/writing are we?
+    private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
+
+    // the ByteBuffer we'll be using to write and read, depending on the state
+    private ByteBuffer buffer_;
+
+    private final TByteArrayOutputStream response_;
+    
+    // the frame that the TTransport should wrap.
+    private final TMemoryInputTransport frameTrans_;
+    
+    // the transport that should be used to connect to clients
+    private final TTransport inTrans_;
+    
+    private final TTransport outTrans_;
+    
+    // the input protocol to use on frames
+    private final TProtocol inProt_;
+    
+    // the output protocol to use on frames
+    private final TProtocol outProt_;
+    
+    // context associated with this connection
+    private final ServerContext context_;
+
+    public FrameBuffer(final TNonblockingTransport trans,
+        final SelectionKey selectionKey,
+        final AbstractSelectThread selectThread) {
+      trans_ = trans;
+      selectionKey_ = selectionKey;
+      selectThread_ = selectThread;
+      buffer_ = ByteBuffer.allocate(4);
+
+      frameTrans_ = new TMemoryInputTransport();
+      response_ = new TByteArrayOutputStream();
+      inTrans_ = inputTransportFactory_.getTransport(frameTrans_);
+      outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
+      inProt_ = inputProtocolFactory_.getProtocol(inTrans_);
+      outProt_ = outputProtocolFactory_.getProtocol(outTrans_);
+
+      if (eventHandler_ != null) {
+        context_ = eventHandler_.createContext(inProt_, outProt_);
+      } else {
+        context_  = null;
+      }
+    }
+
+    /**
+     * Give this FrameBuffer a chance to read. The selector loop should have
+     * received a read event for this FrameBuffer.
+     * 
+     * @return true if the connection should live on, false if it should be
+     *         closed
+     */
+    public boolean read() {
+      if (state_ == FrameBufferState.READING_FRAME_SIZE) {
+        // try to read the frame size completely
+        if (!internalRead()) {
+          return false;
+        }
+
+        // if the frame size has been read completely, then prepare to read the
+        // actual frame.
+        if (buffer_.remaining() == 0) {
+          // pull out the frame size as an integer.
+          int frameSize = buffer_.getInt(0);
+          if (frameSize <= 0) {
+            LOGGER.error("Read an invalid frame size of " + frameSize
+                + ". Are you using TFramedTransport on the client side?");
+            return false;
+          }
+
+          // if this frame will always be too large for this server, log the
+          // error and close the connection.
+          if (frameSize > MAX_READ_BUFFER_BYTES) {
+            LOGGER.error("Read a frame size of " + frameSize
+                + ", which is bigger than the maximum allowable buffer size for ALL connections.");
+            return false;
+          }
+
+          // if this frame will push us over the memory limit, then return.
+          // with luck, more memory will free up the next time around.
+          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
+            return true;
+          }
+
+          // increment the amount of memory allocated to read buffers
+          readBufferBytesAllocated.addAndGet(frameSize + 4);
+
+          // reallocate the readbuffer as a frame-sized buffer
+          buffer_ = ByteBuffer.allocate(frameSize + 4);
+          buffer_.putInt(frameSize);
+
+          state_ = FrameBufferState.READING_FRAME;
+        } else {
+          // this skips the check of READING_FRAME state below, since we can't
+          // possibly go on to that state if there's data left to be read at
+          // this one.
+          return true;
+        }
+      }
+
+      // it is possible to fall through from the READING_FRAME_SIZE section
+      // to READING_FRAME if there's already some frame data available once
+      // READING_FRAME_SIZE is complete.
+
+      if (state_ == FrameBufferState.READING_FRAME) {
+        if (!internalRead()) {
+          return false;
+        }
+
+        // since we're already in the select loop here for sure, we can just
+        // modify our selection key directly.
+        if (buffer_.remaining() == 0) {
+          // get rid of the read select interests
+          selectionKey_.interestOps(0);
+          state_ = FrameBufferState.READ_FRAME_COMPLETE;
+        }
+
+        return true;
+      }
+
+      // if we fall through to this point, then the state must be invalid.
+      LOGGER.error("Read was called but state is invalid (" + state_ + ")");
+      return false;
+    }
+
+    /**
+     * Give this FrameBuffer a chance to write its output to the final client.
+     */
+    public boolean write() {
+      if (state_ == FrameBufferState.WRITING) {
+        try {
+          if (trans_.write(buffer_) < 0) {
+            return false;
+          }
+        } catch (IOException e) {
+          LOGGER.warn("Got an IOException during write!", e);
+          return false;
+        }
+
+        // we're done writing. now we need to switch back to reading.
+        if (buffer_.remaining() == 0) {
+          prepareRead();
+        }
+        return true;
+      }
+
+      LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
+      return false;
+    }
+
+    /**
+     * Give this FrameBuffer a chance to set its interest to write, once data
+     * has come in.
+     */
+    public void changeSelectInterests() {
+      if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
+        // set the OP_WRITE interest
+        selectionKey_.interestOps(SelectionKey.OP_WRITE);
+        state_ = FrameBufferState.WRITING;
+      } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
+        prepareRead();
+      } else if (state_ == FrameBufferState.AWAITING_CLOSE) {
+        close();
+        selectionKey_.cancel();
+      } else {
+        LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ +
")");
+      }
+    }
+
+    /**
+     * Shut the connection down.
+     */
+    public void close() {
+      // if we're being closed due to an error, we might have allocated a
+      // buffer that we need to subtract for our memory accounting.
+      if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE)
{
+        readBufferBytesAllocated.addAndGet(-buffer_.array().length);
+      }
+      trans_.close();
+      if (eventHandler_ != null) {
+        eventHandler_.deleteContext(context_, inProt_, outProt_);
+      }
+    }
+
+    /**
+     * Check if this FrameBuffer has a full frame read.
+     */
+    public boolean isFrameFullyRead() {
+      return state_ == FrameBufferState.READ_FRAME_COMPLETE;
+    }
+
+    /**
+     * After the processor has processed the invocation, whatever thread is
+     * managing invocations should call this method on this FrameBuffer so we
+     * know it's time to start trying to write again. Also, if it turns out that
+     * there actually isn't any data in the response buffer, we'll skip trying
+     * to write and instead go back to reading.
+     */
+    public void responseReady() {
+      // the read buffer is definitely no longer in use, so we will decrement
+      // our read buffer count. we do this here as well as in close because
+      // we'd like to free this read memory up as quickly as possible for other
+      // clients.
+      readBufferBytesAllocated.addAndGet(-buffer_.array().length);
+
+      if (response_.len() == 0) {
+        // go straight to reading again. this was probably an oneway method
+        state_ = FrameBufferState.AWAITING_REGISTER_READ;
+        buffer_ = null;
+      } else {
+        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
+
+        // set state that we're waiting to be switched to write. we do this
+        // asynchronously through requestSelectInterestChange() because there is
+        // a possibility that we're not in the main thread, and thus currently
+        // blocked in select(). (this functionality is in place for the sake of
+        // the HsHa server.)
+        state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
+      }
+      requestSelectInterestChange();
+    }
+
+    /**
+     * Actually invoke the method signified by this FrameBuffer.
+     */
+    public void invoke() {
+      frameTrans_.reset(buffer_.array());
+      response_.reset();
+      
+      try {
+        if (eventHandler_ != null) {
+          eventHandler_.processContext(context_, inTrans_, outTrans_);
+        }
+        processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
+        responseReady();
+        return;
+      } catch (TException te) {
+        LOGGER.warn("Exception while invoking!", te);
+      } catch (Throwable t) {
+        LOGGER.error("Unexpected throwable while invoking!", t);
+      }
+      // This will only be reached when there is a throwable.
+      state_ = FrameBufferState.AWAITING_CLOSE;
+      requestSelectInterestChange();
+    }
+
+    /**
+     * Perform a read into buffer.
+     * 
+     * @return true if the read succeeded, false if there was an error or the
+     *         connection closed.
+     */
+    private boolean internalRead() {
+      try {
+        if (trans_.read(buffer_) < 0) {
+          return false;
+        }
+        return true;
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException in internalRead!", e);
+        return false;
+      }
+    }
+
+    /**
+     * We're done writing, so reset our interest ops and change state
+     * accordingly.
+     */
+    private void prepareRead() {
+      // we can set our interest directly without using the queue because
+      // we're in the select thread.
+      selectionKey_.interestOps(SelectionKey.OP_READ);
+      // get ready for another go-around
+      buffer_ = ByteBuffer.allocate(4);
+      state_ = FrameBufferState.READING_FRAME_SIZE;
+    }
+
+    /**
+     * When this FrameBuffer needs to change its select interests and execution
+     * might not be in its select thread, then this method will make sure the
+     * interest change gets done when the select thread wakes back up. When the
+     * current thread is this FrameBuffer's select thread, then it just does the
+     * interest change immediately.
+     */
+    private void requestSelectInterestChange() {
+      if (Thread.currentThread() == this.selectThread_) {
+        changeSelectInterests();
+      } else {
+        this.selectThread_.requestSelectInterestChange(this);
+      }
+    }
+  } // FrameBuffer
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/827ba0c4/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java b/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java
new file mode 100644
index 0000000..57cc085
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/Invocation.java
@@ -0,0 +1,20 @@
+package org.apache.blur.thrift.server;
+
+import org.apache.blur.thrift.server.AbstractNonblockingServer.FrameBuffer;
+
+/**
+ * An Invocation represents a method call that is prepared to execute, given an
+ * idle worker thread. It contains the input and output protocols the thread's
+ * processor should use to perform the usual Thrift invocation.
+ */
+class Invocation implements Runnable {
+  private final FrameBuffer frameBuffer;
+
+  public Invocation(final FrameBuffer frameBuffer) {
+    this.frameBuffer = frameBuffer;
+  }
+
+  public void run() {
+    frameBuffer.invoke();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/827ba0c4/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java
b/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java
new file mode 100644
index 0000000..8936b74
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thrift/server/TThreadedSelectorServer.java
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thrift.server;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Half-Sync/Half-Async server with a separate pool of threads to handle
+ * non-blocking I/O. Accepts are handled on a single thread, and a configurable
+ * number of nonblocking selector threads manage reading and writing of client
+ * connections. A synchronous worker thread pool handles processing of requests.
+ * 
+ * Performs better than TNonblockingServer/THsHaServer in multi-core
+ * environments when the the bottleneck is CPU on the single selector thread
+ * handling I/O. In addition, because the accept handling is decoupled from
+ * reads/writes and invocation, the server has better ability to handle back-
+ * pressure from new connections (e.g. stop accepting when busy).
+ * 
+ * Like TNonblockingServer, it relies on the use of TFramedTransport.
+ */
+public class TThreadedSelectorServer extends AbstractNonblockingServer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName());
+
+  public static class Args extends AbstractNonblockingServerArgs<Args> {
+
+    /** The number of threads for selecting on already-accepted connections */
+    public int selectorThreads = 2;
+    /**
+     * The size of the executor service (if none is specified) that will handle
+     * invocations. This may be set to 0, in which case invocations will be
+     * handled directly on the selector threads (as is in TNonblockingServer)
+     */
+    private int workerThreads = 5;
+    /** Time to wait for server to stop gracefully */
+    private int stopTimeoutVal = 60;
+    private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+    /** The ExecutorService for handling dispatched requests */
+    private ExecutorService executorService = null;
+    /**
+     * The size of the blocking queue per selector thread for passing accepted
+     * connections to the selector thread
+     */
+    private int acceptQueueSizePerThread = 4;
+
+    /**
+     * Determines the strategy for handling new accepted connections.
+     */
+    public static enum AcceptPolicy {
+      /**
+       * Require accepted connection registration to be handled by the executor.
+       * If the worker pool is saturated, further accepts will be closed
+       * immediately. Slightly increases latency due to an extra scheduling.
+       */
+      FAIR_ACCEPT,
+      /**
+       * Handle the accepts as fast as possible, disregarding the status of the
+       * executor service.
+       */
+      FAST_ACCEPT
+    }
+
+    private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;
+
+    public Args(TNonblockingServerTransport transport) {
+      super(transport);
+    }
+
+    public Args selectorThreads(int i) {
+      selectorThreads = i;
+      return this;
+    }
+
+    public int getSelectorThreads() {
+      return selectorThreads;
+    }
+
+    public Args workerThreads(int i) {
+      workerThreads = i;
+      return this;
+    }
+
+    public int getWorkerThreads() {
+      return workerThreads;
+    }
+
+    public int getStopTimeoutVal() {
+      return stopTimeoutVal;
+    }
+
+    public Args stopTimeoutVal(int stopTimeoutVal) {
+      this.stopTimeoutVal = stopTimeoutVal;
+      return this;
+    }
+
+    public TimeUnit getStopTimeoutUnit() {
+      return stopTimeoutUnit;
+    }
+
+    public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
+      this.stopTimeoutUnit = stopTimeoutUnit;
+      return this;
+    }
+
+    public ExecutorService getExecutorService() {
+      return executorService;
+    }
+
+    public Args executorService(ExecutorService executorService) {
+      this.executorService = executorService;
+      return this;
+    }
+
+    public int getAcceptQueueSizePerThread() {
+      return acceptQueueSizePerThread;
+    }
+
+    public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) {
+      this.acceptQueueSizePerThread = acceptQueueSizePerThread;
+      return this;
+    }
+
+    public AcceptPolicy getAcceptPolicy() {
+      return acceptPolicy;
+    }
+
+    public Args acceptPolicy(AcceptPolicy acceptPolicy) {
+      this.acceptPolicy = acceptPolicy;
+      return this;
+    }
+
+    public void validate() {
+      if (selectorThreads <= 0) {
+        throw new IllegalArgumentException("selectorThreads must be positive.");
+      }
+      if (workerThreads < 0) {
+        throw new IllegalArgumentException("workerThreads must be non-negative.");
+      }
+      if (acceptQueueSizePerThread <= 0) {
+        throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
+      }
+    }
+  }
+
+  // Flag for stopping the server
+  // Please see THRIFT-1795 for the usage of this flag
+  private volatile boolean stopped_ = false;
+
+  // The thread handling all accepts
+  private AcceptThread acceptThread;
+
+  // Threads handling events on client transports
+  private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
+
+  // This wraps all the functionality of queueing and thread pool management
+  // for the passing of Invocations from the selector thread(s) to the workers
+  // (if any).
+  private final ExecutorService invoker;
+
+  private final Args args;
+
+  /**
+   * Create the server with the specified Args configuration
+   */
+  public TThreadedSelectorServer(Args args) {
+    super(args);
+    args.validate();
+    invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
+    this.args = args;
+  }
+
+  /**
+   * Start the accept and selector threads running to deal with clients.
+   * 
+   * @return true if everything went ok, false if we couldn't start for some
+   *         reason.
+   */
+  @Override
+  protected boolean startThreads() {
+    try {
+      for (int i = 0; i < args.selectorThreads; ++i) {
+        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
+      }
+      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
+        createSelectorThreadLoadBalancer(selectorThreads));
+      for (SelectorThread thread : selectorThreads) {
+        thread.start();
+      }
+      acceptThread.start();
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to start threads!", e);
+      return false;
+    }
+  }
+
+  /**
+   * Joins the accept and selector threads and shuts down the executor service.
+   */
+  @Override
+  protected void waitForShutdown() {
+    try {
+      joinThreads();
+    } catch (InterruptedException e) {
+      // Non-graceful shutdown occurred
+      LOGGER.error("Interrupted while joining threads!", e);
+    }
+    gracefullyShutdownInvokerPool();
+  }
+
+  protected void joinThreads() throws InterruptedException {
+    // wait until the io threads exit
+    acceptThread.join();
+    for (SelectorThread thread : selectorThreads) {
+      thread.join();
+    }
+  }
+
+  /**
+   * Stop serving and shut everything down.
+   */
+  @Override
+  public void stop() {
+    stopped_ = true;
+
+    // Stop queuing connect attempts asap
+    stopListening();
+
+    if (acceptThread != null) {
+      acceptThread.wakeupSelector();
+    }
+    if (selectorThreads != null) {
+      for (SelectorThread thread : selectorThreads) {
+        if (thread != null)
+          thread.wakeupSelector();
+      }
+    }
+  }
+
+  protected void gracefullyShutdownInvokerPool() {
+    // try to gracefully shut down the executor service
+    invoker.shutdown();
+
+    // Loop until awaitTermination finally does return without a interrupted
+    // exception. If we don't do this, then we'll shut down prematurely. We want
+    // to let the executorService clear it's task queue, closing client sockets
+    // appropriately.
+    long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
+    long now = System.currentTimeMillis();
+    while (timeoutMS >= 0) {
+      try {
+        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+        break;
+      } catch (InterruptedException ix) {
+        long newnow = System.currentTimeMillis();
+        timeoutMS -= (newnow - now);
+        now = newnow;
+      }
+    }
+  }
+
+  /**
+   * We override the standard invoke method here to queue the invocation for
+   * invoker service instead of immediately invoking. If there is no thread
+   * pool, handle the invocation inline on this thread
+   */
+  @Override
+  protected boolean requestInvoke(FrameBuffer frameBuffer) {
+    Runnable invocation = getRunnable(frameBuffer);
+    if (invoker != null) {
+      try {
+        invoker.execute(invocation);
+        return true;
+      } catch (RejectedExecutionException rx) {
+        LOGGER.warn("ExecutorService rejected execution!", rx);
+        return false;
+      }
+    } else {
+      // Invoke on the caller's thread
+      invocation.run();
+      return true;
+    }
+  }
+
+  protected Runnable getRunnable(FrameBuffer frameBuffer) {
+    return new Invocation(frameBuffer);
+  }
+
+  /**
+   * Helper to create the invoker if one is not specified
+   */
+  protected static ExecutorService createDefaultExecutor(Args options) {
+    return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads)
: null;
+  }
+
+  private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int
queueSize) {
+    if (queueSize == 0) {
+      // Unbounded queue
+      return new LinkedBlockingQueue<TNonblockingTransport>();
+    }
+    return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
+  }
+
+  /**
+   * The thread that selects on the server transport (listen socket) and accepts
+   * new connections to hand off to the IO selector threads
+   */
+  protected class AcceptThread extends Thread {
+
+    // The listen socket to accept on
+    private final TNonblockingServerTransport serverTransport;
+    private final Selector acceptSelector;
+
+    private final SelectorThreadLoadBalancer threadChooser;
+
+    /**
+     * Set up the AcceptThead
+     * 
+     * @throws IOException
+     */
+    public AcceptThread(TNonblockingServerTransport serverTransport,
+        SelectorThreadLoadBalancer threadChooser) throws IOException {
+      this.serverTransport = serverTransport;
+      this.threadChooser = threadChooser;
+      this.acceptSelector = SelectorProvider.provider().openSelector();
+      this.serverTransport.registerSelector(acceptSelector);
+    }
+
+    /**
+     * The work loop. Selects on the server transport and accepts. If there was
+     * a server transport that had blocking accepts, and returned on blocking
+     * client transports, that should be used instead
+     */
+    public void run() {
+      try {
+        if (eventHandler_ != null) {
+          eventHandler_.preServe();
+        }
+
+        while (!stopped_) {
+          select();
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        // This will wake up the selector threads
+        TThreadedSelectorServer.this.stop();
+      }
+    }
+
+    /**
+     * If the selector is blocked, wake it up.
+     */
+    public void wakeupSelector() {
+      acceptSelector.wakeup();
+    }
+
+    /**
+     * Select and process IO events appropriately: If there are connections to
+     * be accepted, accept them.
+     */
+    private void select() {
+      try {
+        // wait for connect events.
+        acceptSelector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            continue;
+          }
+
+          if (key.isAcceptable()) {
+            handleAccept();
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    /**
+     * Accept a new connection.
+     */
+    private void handleAccept() {
+      final TNonblockingTransport client = doAccept();
+      if (client != null) {
+        // Pass this connection to a selector thread
+        final SelectorThread targetThread = threadChooser.nextThread();
+
+        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
+          doAddAccept(targetThread, client);
+        } else {
+          // FAIR_ACCEPT
+          try {
+            invoker.submit(new Runnable() {
+              public void run() {
+                doAddAccept(targetThread, client);
+              }
+            });
+          } catch (RejectedExecutionException rx) {
+            LOGGER.warn("ExecutorService rejected accept registration!", rx);
+            // close immediately
+            client.close();
+          }
+        }
+      }
+    }
+
+    private TNonblockingTransport doAccept() {
+      try {
+        return (TNonblockingTransport) serverTransport.accept();
+      } catch (TTransportException tte) {
+        // something went wrong accepting.
+        LOGGER.warn("Exception trying to accept!", tte);
+        return null;
+      }
+    }
+
+    private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
+      if (!thread.addAcceptedConnection(client)) {
+        client.close();
+      }
+    }
+  } // AcceptThread
+
+  /**
+   * The SelectorThread(s) will be doing all the selecting on accepted active
+   * connections.
+   */
+  protected class SelectorThread extends AbstractSelectThread {
+
+    // Accepted connections added by the accept thread.
+    private final BlockingQueue<TNonblockingTransport> acceptedQueue;
+
+    /**
+     * Set up the SelectorThread with an unbounded queue for incoming accepts.
+     * 
+     * @throws IOException
+     *           if a selector cannot be created
+     */
+    public SelectorThread() throws IOException {
+      this(new LinkedBlockingQueue<TNonblockingTransport>());
+    }
+
+    /**
+     * Set up the SelectorThread with an bounded queue for incoming accepts.
+     * 
+     * @throws IOException
+     *           if a selector cannot be created
+     */
+    public SelectorThread(int maxPendingAccepts) throws IOException {
+      this(createDefaultAcceptQueue(maxPendingAccepts));
+    }
+
+    /**
+     * Set up the SelectorThread with a specified queue for connections.
+     * 
+     * @param acceptedQueue
+     *          The BlockingQueue implementation for holding incoming accepted
+     *          connections.
+     * @throws IOException
+     *           if a selector cannot be created.
+     */
+    public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws
IOException {
+      this.acceptedQueue = acceptedQueue;
+    }
+
+    /**
+     * Hands off an accepted connection to be handled by this thread. This
+     * method will block if the queue for new connections is at capacity.
+     * 
+     * @param accepted
+     *          The connection that has been accepted.
+     * @return true if the connection has been successfully added.
+     */
+    public boolean addAcceptedConnection(TNonblockingTransport accepted) {
+      try {
+        acceptedQueue.put(accepted);
+      } catch (InterruptedException e) {
+        LOGGER.warn("Interrupted while adding accepted connection!", e);
+        return false;
+      }
+      selector.wakeup();
+      return true;
+    }
+
+    /**
+     * The work loop. Handles selecting (read/write IO), dispatching, and
+     * managing the selection preferences of all existing connections.
+     */
+    public void run() {
+      try {
+        while (!stopped_) {
+          select();
+          processAcceptedConnections();
+          processInterestChanges();
+        }
+        for (SelectionKey selectionKey : selector.keys()) {
+          cleanupSelectionKey(selectionKey);
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        // This will wake up the accept thread and the other selector threads
+        TThreadedSelectorServer.this.stop();
+      }
+    }
+
+    /**
+     * Select and process IO events appropriately: If there are existing
+     * connections with data waiting to be read, read it, buffering until a
+     * whole frame has been read. If there are any pending responses, buffer
+     * them until their target client is available, and then send the data.
+     */
+    private void select() {
+      try {
+        // wait for io events.
+        selector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            cleanupSelectionKey(key);
+            continue;
+          }
+
+          if (key.isReadable()) {
+            // deal with reads
+            handleRead(key);
+          } else if (key.isWritable()) {
+            // deal with writes
+            handleWrite(key);
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    private void processAcceptedConnections() {
+      // Register accepted connections
+      while (!stopped_) {
+        TNonblockingTransport accepted = acceptedQueue.poll();
+        if (accepted == null) {
+          break;
+        }
+        registerAccepted(accepted);
+      }
+    }
+
+    private void registerAccepted(TNonblockingTransport accepted) {
+      SelectionKey clientKey = null;
+      try {
+        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
+
+        FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this);
+        clientKey.attach(frameBuffer);
+      } catch (IOException e) {
+        LOGGER.warn("Failed to register accepted connection to selector!", e);
+        if (clientKey != null) {
+          cleanupSelectionKey(clientKey);
+        }
+        accepted.close();
+      }
+    }
+  } // SelectorThread
+
+  /**
+   * Creates a SelectorThreadLoadBalancer to be used by the accept thread for
+   * assigning newly accepted connections across the threads.
+   */
+  protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends
SelectorThread> threads) {
+    return new SelectorThreadLoadBalancer(threads);
+  }
+
+  /**
+   * A round robin load balancer for choosing selector threads for new
+   * connections.
+   */
+  protected class SelectorThreadLoadBalancer {
+    private final Collection<? extends SelectorThread> threads;
+    private Iterator<? extends SelectorThread> nextThreadIterator;
+
+    public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T>
threads) {
+      if (threads.isEmpty()) {
+        throw new IllegalArgumentException("At least one selector thread is required");
+      }
+      this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
+      nextThreadIterator = this.threads.iterator();
+    }
+
+    public SelectorThread nextThread() {
+      // Choose a selector thread (round robin)
+      if (!nextThreadIterator.hasNext()) {
+        nextThreadIterator = threads.iterator();
+      }
+      return nextThreadIterator.next();
+    }
+  }
+}
\ No newline at end of file


Mime
View raw message