incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [21/92] [abbrv] [partial] Fixed BLUR-126.
Date Tue, 11 Jun 2013 02:41:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadPoolServer.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadPoolServer.java
new file mode 100644
index 0000000..fc744f1
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadPoolServer.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.server;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.TProcessor;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TServerTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Server which uses Java's built in ThreadPool management to spawn off
+ * a worker pool that
+ *
+ */
+public class TThreadPoolServer extends TServer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName());
+
+  public static class Args extends AbstractServerArgs<Args> {
+    public int minWorkerThreads = 5;
+    public int maxWorkerThreads = Integer.MAX_VALUE;
+    public ExecutorService executorService;
+    public int stopTimeoutVal = 60;
+    public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+
+    public Args(TServerTransport transport) {
+      super(transport);
+    }
+
+    public Args minWorkerThreads(int n) {
+      minWorkerThreads = n;
+      return this;
+    }
+
+    public Args maxWorkerThreads(int n) {
+      maxWorkerThreads = n;
+      return this;
+    }
+
+    public Args executorService(ExecutorService executorService) {
+      this.executorService = executorService;
+      return this;
+    }
+  }
+
+  // Executor service for handling client connections
+  private ExecutorService executorService_;
+
+  // Flag for stopping the server
+  private volatile boolean stopped_;
+
+  private final TimeUnit stopTimeoutUnit;
+
+  private final long stopTimeoutVal;
+
+  public TThreadPoolServer(Args args) {
+    super(args);
+
+    stopTimeoutUnit = args.stopTimeoutUnit;
+    stopTimeoutVal = args.stopTimeoutVal;
+
+    executorService_ = args.executorService != null ?
+        args.executorService : createDefaultExecutorService(args);
+  }
+
+  private static ExecutorService createDefaultExecutorService(Args args) {
+    SynchronousQueue<Runnable> executorQueue =
+      new SynchronousQueue<Runnable>();
+    return new ThreadPoolExecutor(args.minWorkerThreads,
+                                  args.maxWorkerThreads,
+                                  60,
+                                  TimeUnit.SECONDS,
+                                  executorQueue);
+  }
+
+
+  public void serve() {
+    try {
+      serverTransport_.listen();
+    } catch (TTransportException ttx) {
+      LOGGER.error("Error occurred during listening.", ttx);
+      return;
+    }
+
+    // Run the preServe event
+    if (eventHandler_ != null) {
+      eventHandler_.preServe();
+    }
+
+    stopped_ = false;
+    setServing(true);
+    while (!stopped_) {
+      int failureCount = 0;
+      try {
+        TTransport client = serverTransport_.accept();
+        WorkerProcess wp = new WorkerProcess(client);
+        executorService_.execute(wp);
+      } catch (TTransportException ttx) {
+        if (!stopped_) {
+          ++failureCount;
+          LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
+        }
+      }
+    }
+
+    executorService_.shutdown();
+
+    // Loop until awaitTermination finally does return without a interrupted
+    // exception. If we don't do this, then we'll shut down prematurely. We want
+    // to let the executorService clear it's task queue, closing client sockets
+    // appropriately.
+    long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
+    long now = System.currentTimeMillis();
+    while (timeoutMS >= 0) {
+      try {
+        executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+        break;
+      } catch (InterruptedException ix) {
+        long newnow = System.currentTimeMillis();
+        timeoutMS -= (newnow - now);
+        now = newnow;
+      }
+    }
+    setServing(false);
+  }
+
+  public void stop() {
+    stopped_ = true;
+    serverTransport_.interrupt();
+  }
+
+  private class WorkerProcess implements Runnable {
+
+    /**
+     * Client that this services.
+     */
+    private TTransport client_;
+
+    /**
+     * Default constructor.
+     *
+     * @param client Transport to process
+     */
+    private WorkerProcess(TTransport client) {
+      client_ = client;
+    }
+
+    /**
+     * Loops on processing a client forever
+     */
+    public void run() {
+      TProcessor processor = null;
+      TTransport inputTransport = null;
+      TTransport outputTransport = null;
+      TProtocol inputProtocol = null;
+      TProtocol outputProtocol = null;
+
+      TServerEventHandler eventHandler = null;
+      ServerContext connectionContext = null;
+
+      try {
+        processor = processorFactory_.getProcessor(client_);
+        inputTransport = inputTransportFactory_.getTransport(client_);
+        outputTransport = outputTransportFactory_.getTransport(client_);
+        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);	  
+
+        eventHandler = getEventHandler();
+        if (eventHandler != null) {
+          connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
+        }
+        // we check stopped_ first to make sure we're not supposed to be shutting
+        // down. this is necessary for graceful shutdown.
+        while (true) {
+
+            if (eventHandler != null) {
+              eventHandler.processContext(connectionContext, inputTransport, outputTransport);
+            }
+
+            if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {
+              break;
+            }
+        }
+      } catch (TTransportException ttx) {
+        // Assume the client died and continue silently
+      } catch (TException tx) {
+        LOGGER.error("Thrift error occurred during processing of message.", tx);
+      } catch (Exception x) {
+        LOGGER.error("Error occurred during processing of message.", x);
+      }
+
+      if (eventHandler != null) {
+        eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
+      }
+
+      if (inputTransport != null) {
+        inputTransport.close();
+      }
+
+      if (outputTransport != null) {
+        outputTransport.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadedSelectorServer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadedSelectorServer.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadedSelectorServer.java
new file mode 100644
index 0000000..c96da82
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/server/TThreadedSelectorServer.java
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.server;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingServerTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Half-Sync/Half-Async server with a separate pool of threads to handle
+ * non-blocking I/O. Accepts are handled on a single thread, and a configurable
+ * number of nonblocking selector threads manage reading and writing of client
+ * connections. A synchronous worker thread pool handles processing of requests.
+ * 
+ * Performs better than TNonblockingServer/THsHaServer in multi-core
+ * environments when the the bottleneck is CPU on the single selector thread
+ * handling I/O. In addition, because the accept handling is decoupled from
+ * reads/writes and invocation, the server has better ability to handle back-
+ * pressure from new connections (e.g. stop accepting when busy).
+ * 
+ * Like TNonblockingServer, it relies on the use of TFramedTransport.
+ */
+public class TThreadedSelectorServer extends AbstractNonblockingServer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName());
+
+  public static class Args extends AbstractNonblockingServerArgs<Args> {
+
+    /** The number of threads for selecting on already-accepted connections */
+    public int selectorThreads = 2;
+    /**
+     * The size of the executor service (if none is specified) that will handle
+     * invocations. This may be set to 0, in which case invocations will be
+     * handled directly on the selector threads (as is in TNonblockingServer)
+     */
+    private int workerThreads = 5;
+    /** Time to wait for server to stop gracefully */
+    private int stopTimeoutVal = 60;
+    private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+    /** The ExecutorService for handling dispatched requests */
+    private ExecutorService executorService = null;
+    /**
+     * The size of the blocking queue per selector thread for passing accepted
+     * connections to the selector thread
+     */
+    private int acceptQueueSizePerThread = 4;
+
+    /**
+     * Determines the strategy for handling new accepted connections.
+     */
+    public static enum AcceptPolicy {
+      /**
+       * Require accepted connection registration to be handled by the executor.
+       * If the worker pool is saturated, further accepts will be closed
+       * immediately. Slightly increases latency due to an extra scheduling.
+       */
+      FAIR_ACCEPT,
+      /**
+       * Handle the accepts as fast as possible, disregarding the status of the
+       * executor service.
+       */
+      FAST_ACCEPT
+    }
+
+    private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;
+
+    public Args(TNonblockingServerTransport transport) {
+      super(transport);
+    }
+
+    public Args selectorThreads(int i) {
+      selectorThreads = i;
+      return this;
+    }
+
+    public int getSelectorThreads() {
+      return selectorThreads;
+    }
+
+    public Args workerThreads(int i) {
+      workerThreads = i;
+      return this;
+    }
+
+    public int getWorkerThreads() {
+      return workerThreads;
+    }
+
+    public int getStopTimeoutVal() {
+      return stopTimeoutVal;
+    }
+
+    public Args stopTimeoutVal(int stopTimeoutVal) {
+      this.stopTimeoutVal = stopTimeoutVal;
+      return this;
+    }
+
+    public TimeUnit getStopTimeoutUnit() {
+      return stopTimeoutUnit;
+    }
+
+    public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
+      this.stopTimeoutUnit = stopTimeoutUnit;
+      return this;
+    }
+
+    public ExecutorService getExecutorService() {
+      return executorService;
+    }
+
+    public Args executorService(ExecutorService executorService) {
+      this.executorService = executorService;
+      return this;
+    }
+
+    public int getAcceptQueueSizePerThread() {
+      return acceptQueueSizePerThread;
+    }
+
+    public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) {
+      this.acceptQueueSizePerThread = acceptQueueSizePerThread;
+      return this;
+    }
+
+    public AcceptPolicy getAcceptPolicy() {
+      return acceptPolicy;
+    }
+
+    public Args acceptPolicy(AcceptPolicy acceptPolicy) {
+      this.acceptPolicy = acceptPolicy;
+      return this;
+    }
+
+    public void validate() {
+      if (selectorThreads <= 0) {
+        throw new IllegalArgumentException("selectorThreads must be positive.");
+      }
+      if (workerThreads < 0) {
+        throw new IllegalArgumentException("workerThreads must be non-negative.");
+      }
+      if (acceptQueueSizePerThread <= 0) {
+        throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
+      }
+    }
+  }
+
+  // Flag for stopping the server
+  private volatile boolean stopped_ = true;
+
+  // The thread handling all accepts
+  private AcceptThread acceptThread;
+
+  // Threads handling events on client transports
+  private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
+
+  // This wraps all the functionality of queueing and thread pool management
+  // for the passing of Invocations from the selector thread(s) to the workers
+  // (if any).
+  private final ExecutorService invoker;
+
+  private final Args args;
+
+  /**
+   * Create the server with the specified Args configuration
+   */
+  public TThreadedSelectorServer(Args args) {
+    super(args);
+    args.validate();
+    invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
+    this.args = args;
+  }
+
+  /**
+   * Start the accept and selector threads running to deal with clients.
+   * 
+   * @return true if everything went ok, false if we couldn't start for some
+   *         reason.
+   */
+  @Override
+  protected boolean startThreads() {
+    try {
+      for (int i = 0; i < args.selectorThreads; ++i) {
+        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
+      }
+      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
+        createSelectorThreadLoadBalancer(selectorThreads));
+      stopped_ = false;
+      for (SelectorThread thread : selectorThreads) {
+        thread.start();
+      }
+      acceptThread.start();
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to start threads!", e);
+      return false;
+    }
+  }
+
+  /**
+   * Joins the accept and selector threads and shuts down the executor service.
+   */
+  @Override
+  protected void waitForShutdown() {
+    try {
+      joinThreads();
+    } catch (InterruptedException e) {
+      // Non-graceful shutdown occurred
+      LOGGER.error("Interrupted while joining threads!", e);
+    }
+    gracefullyShutdownInvokerPool();
+  }
+
+  protected void joinThreads() throws InterruptedException {
+    // wait until the io threads exit
+    acceptThread.join();
+    for (SelectorThread thread : selectorThreads) {
+      thread.join();
+    }
+  }
+
+  /**
+   * Stop serving and shut everything down.
+   */
+  @Override
+  public void stop() {
+    stopped_ = true;
+
+    // Stop queuing connect attempts asap
+    stopListening();
+
+    if (acceptThread != null) {
+      acceptThread.wakeupSelector();
+    }
+    if (selectorThreads != null) {
+      for (SelectorThread thread : selectorThreads) {
+        if (thread != null)
+          thread.wakeupSelector();
+      }
+    }
+  }
+
+  protected void gracefullyShutdownInvokerPool() {
+    // try to gracefully shut down the executor service
+    invoker.shutdown();
+
+    // Loop until awaitTermination finally does return without a interrupted
+    // exception. If we don't do this, then we'll shut down prematurely. We want
+    // to let the executorService clear it's task queue, closing client sockets
+    // appropriately.
+    long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
+    long now = System.currentTimeMillis();
+    while (timeoutMS >= 0) {
+      try {
+        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+        break;
+      } catch (InterruptedException ix) {
+        long newnow = System.currentTimeMillis();
+        timeoutMS -= (newnow - now);
+        now = newnow;
+      }
+    }
+  }
+
+  /**
+   * We override the standard invoke method here to queue the invocation for
+   * invoker service instead of immediately invoking. If there is no thread
+   * pool, handle the invocation inline on this thread
+   */
+  @Override
+  protected boolean requestInvoke(FrameBuffer frameBuffer) {
+    Runnable invocation = getRunnable(frameBuffer);
+    if (invoker != null) {
+      try {
+        invoker.execute(invocation);
+        return true;
+      } catch (RejectedExecutionException rx) {
+        LOGGER.warn("ExecutorService rejected execution!", rx);
+        return false;
+      }
+    } else {
+      // Invoke on the caller's thread
+      invocation.run();
+      return true;
+    }
+  }
+
+  protected Runnable getRunnable(FrameBuffer frameBuffer) {
+    return new Invocation(frameBuffer);
+  }
+
+  /**
+   * Helper to create the invoker if one is not specified
+   */
+  protected static ExecutorService createDefaultExecutor(Args options) {
+    return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null;
+  }
+
+  private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) {
+    if (queueSize == 0) {
+      // Unbounded queue
+      return new LinkedBlockingQueue<TNonblockingTransport>();
+    }
+    return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
+  }
+
+  /**
+   * The thread that selects on the server transport (listen socket) and accepts
+   * new connections to hand off to the IO selector threads
+   */
+  protected class AcceptThread extends Thread {
+
+    // The listen socket to accept on
+    private final TNonblockingServerTransport serverTransport;
+    private final Selector acceptSelector;
+
+    private final SelectorThreadLoadBalancer threadChooser;
+
+    /**
+     * Set up the AcceptThead
+     * 
+     * @throws IOException
+     */
+    public AcceptThread(TNonblockingServerTransport serverTransport,
+        SelectorThreadLoadBalancer threadChooser) throws IOException {
+      this.serverTransport = serverTransport;
+      this.threadChooser = threadChooser;
+      this.acceptSelector = SelectorProvider.provider().openSelector();
+      this.serverTransport.registerSelector(acceptSelector);
+    }
+
+    /**
+     * The work loop. Selects on the server transport and accepts. If there was
+     * a server transport that had blocking accepts, and returned on blocking
+     * client transports, that should be used instead
+     */
+    public void run() {
+      try {
+        while (!stopped_) {
+          select();
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        // This will wake up the selector threads
+        TThreadedSelectorServer.this.stop();
+      }
+    }
+
+    /**
+     * If the selector is blocked, wake it up.
+     */
+    public void wakeupSelector() {
+      acceptSelector.wakeup();
+    }
+
+    /**
+     * Select and process IO events appropriately: If there are connections to
+     * be accepted, accept them.
+     */
+    private void select() {
+      try {
+        // wait for connect events.
+        acceptSelector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            continue;
+          }
+
+          if (key.isAcceptable()) {
+            handleAccept();
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    /**
+     * Accept a new connection.
+     */
+    private void handleAccept() {
+      final TNonblockingTransport client = doAccept();
+      if (client != null) {
+        // Pass this connection to a selector thread
+        final SelectorThread targetThread = threadChooser.nextThread();
+
+        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
+          doAddAccept(targetThread, client);
+        } else {
+          // FAIR_ACCEPT
+          try {
+            invoker.submit(new Runnable() {
+              public void run() {
+                doAddAccept(targetThread, client);
+              }
+            });
+          } catch (RejectedExecutionException rx) {
+            LOGGER.warn("ExecutorService rejected accept registration!", rx);
+            // close immediately
+            client.close();
+          }
+        }
+      }
+    }
+
+    private TNonblockingTransport doAccept() {
+      try {
+        return (TNonblockingTransport) serverTransport.accept();
+      } catch (TTransportException tte) {
+        // something went wrong accepting.
+        LOGGER.warn("Exception trying to accept!", tte);
+        return null;
+      }
+    }
+
+    private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
+      if (!thread.addAcceptedConnection(client)) {
+        client.close();
+      }
+    }
+  } // AcceptThread
+
+  /**
+   * The SelectorThread(s) will be doing all the selecting on accepted active
+   * connections.
+   */
+  protected class SelectorThread extends AbstractSelectThread {
+
+    // Accepted connections added by the accept thread.
+    private final BlockingQueue<TNonblockingTransport> acceptedQueue;
+
+    /**
+     * Set up the SelectorThread with an unbounded queue for incoming accepts.
+     * 
+     * @throws IOException
+     *           if a selector cannot be created
+     */
+    public SelectorThread() throws IOException {
+      this(new LinkedBlockingQueue<TNonblockingTransport>());
+    }
+
+    /**
+     * Set up the SelectorThread with an bounded queue for incoming accepts.
+     * 
+     * @throws IOException
+     *           if a selector cannot be created
+     */
+    public SelectorThread(int maxPendingAccepts) throws IOException {
+      this(createDefaultAcceptQueue(maxPendingAccepts));
+    }
+
+    /**
+     * Set up the SelectorThread with a specified queue for connections.
+     * 
+     * @param acceptedQueue
+     *          The BlockingQueue implementation for holding incoming accepted
+     *          connections.
+     * @throws IOException
+     *           if a selector cannot be created.
+     */
+    public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException {
+      this.acceptedQueue = acceptedQueue;
+    }
+
+    /**
+     * Hands off an accepted connection to be handled by this thread. This
+     * method will block if the queue for new connections is at capacity.
+     * 
+     * @param accepted
+     *          The connection that has been accepted.
+     * @return true if the connection has been successfully added.
+     */
+    public boolean addAcceptedConnection(TNonblockingTransport accepted) {
+      try {
+        acceptedQueue.put(accepted);
+      } catch (InterruptedException e) {
+        LOGGER.warn("Interrupted while adding accepted connection!", e);
+        return false;
+      }
+      selector.wakeup();
+      return true;
+    }
+
+    /**
+     * The work loop. Handles selecting (read/write IO), dispatching, and
+     * managing the selection preferences of all existing connections.
+     */
+    public void run() {
+      try {
+        while (!stopped_) {
+          select();
+          processAcceptedConnections();
+          processInterestChanges();
+        }
+        for (SelectionKey selectionKey : selector.keys()) {
+          cleanupSelectionKey(selectionKey);
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        // This will wake up the accept thread and the other selector threads
+        TThreadedSelectorServer.this.stop();
+      }
+    }
+
+    /**
+     * Select and process IO events appropriately: If there are existing
+     * connections with data waiting to be read, read it, buffering until a
+     * whole frame has been read. If there are any pending responses, buffer
+     * them until their target client is available, and then send the data.
+     */
+    private void select() {
+      try {
+        // wait for io events.
+        selector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            cleanupSelectionKey(key);
+            continue;
+          }
+
+          if (key.isReadable()) {
+            // deal with reads
+            handleRead(key);
+          } else if (key.isWritable()) {
+            // deal with writes
+            handleWrite(key);
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    private void processAcceptedConnections() {
+      // Register accepted connections
+      while (!stopped_) {
+        TNonblockingTransport accepted = acceptedQueue.poll();
+        if (accepted == null) {
+          break;
+        }
+        registerAccepted(accepted);
+      }
+    }
+
+    private void registerAccepted(TNonblockingTransport accepted) {
+      SelectionKey clientKey = null;
+      try {
+        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
+
+        FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this);
+        clientKey.attach(frameBuffer);
+      } catch (IOException e) {
+        LOGGER.warn("Failed to register accepted connection to selector!", e);
+        if (clientKey != null) {
+          cleanupSelectionKey(clientKey);
+        }
+        accepted.close();
+      }
+    }
+  } // SelectorThread
+
+  /**
+   * Creates a SelectorThreadLoadBalancer to be used by the accept thread for
+   * assigning newly accepted connections across the threads.
+   */
+  protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
+    return new SelectorThreadLoadBalancer(threads);
+  }
+
+  /**
+   * A round robin load balancer for choosing selector threads for new
+   * connections.
+   */
+  protected class SelectorThreadLoadBalancer {
+    private final Collection<? extends SelectorThread> threads;
+    private Iterator<? extends SelectorThread> nextThreadIterator;
+
+    public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
+      if (threads.isEmpty()) {
+        throw new IllegalArgumentException("At least one selector thread is required");
+      }
+      this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
+      nextThreadIterator = this.threads.iterator();
+    }
+
+    public SelectorThread nextThread() {
+      // Choose a selector thread (round robin)
+      if (!nextThreadIterator.hasNext()) {
+        nextThreadIterator = threads.iterator();
+      }
+      return nextThreadIterator.next();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBuffer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBuffer.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBuffer.java
new file mode 100644
index 0000000..6922317
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBuffer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+/**
+ * Helper class that wraps a byte[] so that it can expand and be reused. Users
+ * should call resizeIfNecessary to make sure the buffer has suitable capacity,
+ * and then use the array as needed. Note that the internal array will grow at a
+ * rate slightly faster than the requested capacity with the (untested)
+ * objective of avoiding expensive buffer allocations and copies.
+ */
+public class AutoExpandingBuffer {
+  private byte[] array;
+
+  private final double growthCoefficient;
+
+  public AutoExpandingBuffer(int initialCapacity, double growthCoefficient) {
+    if (growthCoefficient < 1.0) {
+      throw new IllegalArgumentException("Growth coefficient must be >= 1.0");
+    }
+    array = new byte[initialCapacity];
+    this.growthCoefficient = growthCoefficient;
+  }
+
+  public void resizeIfNecessary(int size) {
+    if (array.length < size) {
+      byte[] newBuf = new byte[(int)(size * growthCoefficient)];
+      System.arraycopy(array, 0, newBuf, 0, array.length);
+      array = newBuf;
+    }
+  }
+
+  public byte[] array() {
+    return array;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBufferReadTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBufferReadTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBufferReadTransport.java
new file mode 100644
index 0000000..9b9d82e
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBufferReadTransport.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * TTransport for reading from an AutoExpandingBuffer.
+ */
+public class AutoExpandingBufferReadTransport extends TTransport {
+
+  private final AutoExpandingBuffer buf;
+
+  private int pos = 0;
+  private int limit = 0;
+
+  public AutoExpandingBufferReadTransport(int initialCapacity, double overgrowthCoefficient) {
+    this.buf = new AutoExpandingBuffer(initialCapacity, overgrowthCoefficient);
+  }
+
+  public void fill(TTransport inTrans, int length) throws TTransportException {
+    buf.resizeIfNecessary(length);
+    inTrans.readAll(buf.array(), 0, length);
+    pos = 0;
+    limit = length;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public boolean isOpen() { return true; }
+
+  @Override
+  public void open() throws TTransportException {}
+
+  @Override
+  public final int read(byte[] target, int off, int len) throws TTransportException {
+    int amtToRead = Math.min(len, getBytesRemainingInBuffer());
+    System.arraycopy(buf.array(), pos, target, off, amtToRead);
+    consumeBuffer(amtToRead);
+    return amtToRead;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public final void consumeBuffer(int len) {
+    pos += len;
+  }
+
+  @Override
+  public final byte[] getBuffer() {
+    return buf.array();
+  }
+
+  @Override
+  public final int getBufferPosition() {
+    return pos;
+  }
+
+  @Override
+  public final int getBytesRemainingInBuffer() {
+    return limit - pos;
+  }
+}
+  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBufferWriteTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBufferWriteTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBufferWriteTransport.java
new file mode 100644
index 0000000..3b81697
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/AutoExpandingBufferWriteTransport.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * TTransport for writing to an AutoExpandingBuffer.
+ */
+public final class AutoExpandingBufferWriteTransport extends TTransport {
+
+  private final AutoExpandingBuffer buf;
+  private int pos;
+
+  public AutoExpandingBufferWriteTransport(int initialCapacity, double growthCoefficient) {
+    this.buf = new AutoExpandingBuffer(initialCapacity, growthCoefficient);
+    this.pos = 0;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public boolean isOpen() {return true;}
+
+  @Override
+  public void open() throws TTransportException {}
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void write(byte[] toWrite, int off, int len) throws TTransportException {
+    buf.resizeIfNecessary(pos + len);
+    System.arraycopy(toWrite, off, buf.array(), pos, len);
+    pos += len;
+  }
+
+  public AutoExpandingBuffer getBuf() {
+    return buf;
+  }
+
+  public int getPos() {
+    return pos;
+  }
+
+  public void reset() {
+    pos = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFastFramedTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFastFramedTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFastFramedTransport.java
new file mode 100644
index 0000000..d16ce08
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFastFramedTransport.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+/**
+ * This transport is wire compatible with {@link TFramedTransport}, but makes 
+ * use of reusable, expanding read and write buffers in order to avoid
+ * allocating new byte[]s all the time. Since the buffers only expand, you
+ * should probably only use this transport if your messages are not too variably
+ * large, unless the persistent memory cost is not an issue.
+ * 
+ * This implementation is NOT threadsafe.
+ */
+public class TFastFramedTransport extends TTransport {
+
+  public static class Factory extends TTransportFactory {
+    private final int initialCapacity;
+    private final int maxLength;
+
+    public Factory() {
+      this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
+    }
+
+    public Factory(int initialCapacity) {
+      this(initialCapacity, DEFAULT_MAX_LENGTH);
+    }
+
+    public Factory(int initialCapacity, int maxLength) {
+      this.initialCapacity = initialCapacity;
+      this.maxLength = maxLength;
+    }
+
+    @Override
+    public TTransport getTransport(TTransport trans) {
+      return new TFastFramedTransport(trans,
+          initialCapacity,
+          maxLength);
+    }
+  }
+
+  /**
+   * How big should the default read and write buffers be?
+   */
+  public static final int DEFAULT_BUF_CAPACITY = 1024;
+  /**
+   * How big is the largest allowable frame? Defaults to 16MB.
+   */
+  public static final int DEFAULT_MAX_LENGTH = 16384000;
+
+  private final TTransport underlying;
+  private final AutoExpandingBufferWriteTransport writeBuffer;
+  private final AutoExpandingBufferReadTransport readBuffer;
+  private final byte[] i32buf = new byte[4];
+  private final int maxLength;
+
+  /**
+   * Create a new {@link TFastFramedTransport}. Use the defaults
+   * for initial buffer size and max frame length.
+   * @param underlying Transport that real reads and writes will go through to.
+   */
+  public TFastFramedTransport(TTransport underlying) {
+    this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
+  }
+
+  /**
+   * Create a new {@link TFastFramedTransport}. Use the specified
+   * initial buffer capacity and the default max frame length.
+   * @param underlying Transport that real reads and writes will go through to.
+   * @param initialBufferCapacity The initial size of the read and write buffers.
+   * In practice, it's not critical to set this unless you know in advance that
+   * your messages are going to be very large.
+   */
+  public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) {
+    this(underlying, initialBufferCapacity, DEFAULT_MAX_LENGTH);
+  }
+
+  /**
+   * 
+   * @param underlying Transport that real reads and writes will go through to.
+   * @param initialBufferCapacity The initial size of the read and write buffers.
+   * In practice, it's not critical to set this unless you know in advance that
+   * your messages are going to be very large. (You can pass
+   * TFramedTransportWithReusableBuffer.DEFAULT_BUF_CAPACITY if you're only
+   * using this constructor because you want to set the maxLength.)
+   * @param maxLength The max frame size you are willing to read. You can use
+   * this parameter to limit how much memory can be allocated.
+   */
+  public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
+    this.underlying = underlying;
+    this.maxLength = maxLength;
+    writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 1.5);
+    readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity, 1.5);
+  }
+
+  @Override
+  public void close() {
+    underlying.close();
+  }
+
+  @Override
+  public boolean isOpen() {
+    return underlying.isOpen();
+  }
+
+  @Override
+  public void open() throws TTransportException {
+    underlying.open();
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    int got = readBuffer.read(buf, off, len);
+    if (got > 0) {
+      return got;
+    }
+
+    // Read another frame of data
+    readFrame();
+
+    return readBuffer.read(buf, off, len);
+  }
+
+  private void readFrame() throws TTransportException {
+    underlying.readAll(i32buf , 0, 4);
+    int size = TFramedTransport.decodeFrameSize(i32buf);
+
+    if (size < 0) {
+      throw new TTransportException("Read a negative frame size (" + size + ")!");
+    }
+
+    if (size > maxLength) {
+      throw new TTransportException("Frame size (" + size + ") larger than max length (" + maxLength + ")!");
+    }
+
+    readBuffer.fill(underlying, size);
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    writeBuffer.write(buf, off, len);
+  }
+
+  @Override
+  public void consumeBuffer(int len) {
+    readBuffer.consumeBuffer(len);
+  }
+
+  @Override
+  public void flush() throws TTransportException {
+    int length = writeBuffer.getPos();
+    TFramedTransport.encodeFrameSize(length, i32buf);
+    underlying.write(i32buf, 0, 4);
+    underlying.write(writeBuffer.getBuf().array(), 0, length);
+    writeBuffer.reset();
+    underlying.flush();
+  }
+
+  @Override
+  public byte[] getBuffer() {
+    return readBuffer.getBuffer();
+  }
+
+  @Override
+  public int getBufferPosition() {
+    return readBuffer.getBufferPosition();
+  }
+
+  @Override
+  public int getBytesRemainingInBuffer() {
+    return readBuffer.getBytesRemainingInBuffer();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileProcessor.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileProcessor.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileProcessor.java
new file mode 100644
index 0000000..09c6bb6
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileProcessor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.TProcessor;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolFactory;
+
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.RandomAccessFile;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+
+/**
+ * FileProcessor: helps in processing files generated by TFileTransport.
+ * Port of original cpp implementation
+ *
+ * @author Joydeep Sen Sarma <jssarma@facebook.com>
+ */
+public class TFileProcessor {
+
+  private TProcessor processor_;
+  private TProtocolFactory inputProtocolFactory_;
+  private TProtocolFactory outputProtocolFactory_;
+  private TFileTransport inputTransport_;
+  private TTransport outputTransport_;
+
+  public TFileProcessor(TProcessor processor, TProtocolFactory protocolFactory,
+                        TFileTransport inputTransport,
+                        TTransport outputTransport) {
+    processor_ = processor;
+    inputProtocolFactory_ = outputProtocolFactory_ = protocolFactory;
+    inputTransport_ = inputTransport;
+    outputTransport_ = outputTransport;
+  }
+
+  public TFileProcessor(TProcessor processor, 
+                        TProtocolFactory inputProtocolFactory,
+                        TProtocolFactory outputProtocolFactory,
+                        TFileTransport inputTransport,
+                        TTransport outputTransport) {
+    processor_ = processor;
+    inputProtocolFactory_ = inputProtocolFactory;
+    outputProtocolFactory_ = outputProtocolFactory;
+    inputTransport_ = inputTransport;
+    outputTransport_ = outputTransport;
+  }
+
+  private void processUntil(int lastChunk) throws TException {
+    TProtocol ip = inputProtocolFactory_.getProtocol(inputTransport_);
+    TProtocol op = outputProtocolFactory_.getProtocol(outputTransport_);
+    int curChunk = inputTransport_.getCurChunk();
+
+    try {
+      while (lastChunk >= curChunk) {
+        processor_.process(ip, op);
+        int newChunk = inputTransport_.getCurChunk();
+        curChunk = newChunk;
+      } 
+    } catch (TTransportException e) {
+      // if we are processing the last chunk - we could have just hit EOF
+      // on EOF - trap the error and stop processing.
+      if(e.getType() != TTransportException.END_OF_FILE)
+        throw e;
+      else {
+        return;
+      }
+    }
+  }
+
+  /**
+   * Process from start to last chunk both inclusive where chunks begin from 0
+
+   * @param startChunkNum first chunk to be processed
+   * @param endChunkNum last chunk to be processed
+   */
+  public void processChunk(int startChunkNum, int endChunkNum) throws TException {
+    int numChunks = inputTransport_.getNumChunks();
+    if(endChunkNum < 0)
+      endChunkNum += numChunks;
+
+    if(startChunkNum < 0)
+      startChunkNum += numChunks;
+
+    if(endChunkNum < startChunkNum)
+      throw new TException("endChunkNum " + endChunkNum + " is less than " + startChunkNum);
+
+    inputTransport_.seekToChunk(startChunkNum);
+    processUntil(endChunkNum);
+  }
+
+  /**
+   * Process a single chunk
+   *
+   * @param chunkNum chunk to be processed
+   */
+  public void processChunk(int chunkNum) throws TException {
+    processChunk(chunkNum, chunkNum);
+  }
+
+  /**
+   * Process a current chunk
+   */
+  public void processChunk() throws TException {
+    processChunk(inputTransport_.getCurChunk());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileTransport.java
new file mode 100644
index 0000000..57cff88
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileTransport.java
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * FileTransport implementation of the TTransport interface.
+ * Currently this is a straightforward port of the cpp implementation
+ * 
+ * It may make better sense to provide a basic stream access on top of the framed file format
+ * The FileTransport can then be a user of this framed file format with some additional logic
+ * for chunking.
+ *
+ * @author Joydeep Sen Sarma <jssarma@facebook.com>
+ */
+public class TFileTransport extends TTransport {
+
+  public static class truncableBufferedInputStream extends BufferedInputStream {
+    public void trunc() {
+      pos = count = 0;
+    }        
+    public truncableBufferedInputStream(InputStream in) {
+      super(in);
+    }
+    public truncableBufferedInputStream(InputStream in, int size) {
+      super(in, size);
+    }
+  }
+
+
+  public static class Event {
+    private byte[] buf_;
+    private int nread_;
+    private int navailable_;
+
+    /**
+     * Initialize an event. Initially, it has no valid contents
+     *
+     * @param buf byte array buffer to store event 
+     */
+    public Event(byte[] buf) {
+      buf_ = buf;
+      nread_ = navailable_ = 0;
+    }
+
+    public byte[] getBuf() { return buf_;}
+    public int getSize() { return buf_.length; }
+
+
+    public void setAvailable(int sz) { nread_ = 0; navailable_=sz;}
+    public int getRemaining() { return (navailable_ - nread_); }
+
+    public int emit(byte[] buf, int offset, int ndesired) {
+      if((ndesired == 0) || (ndesired > getRemaining()))
+        ndesired = getRemaining();
+
+      if(ndesired <= 0)
+        return (ndesired);
+
+      System.arraycopy(buf_, nread_, buf, offset, ndesired);
+      nread_ += ndesired;
+
+      return(ndesired);
+    }
+  };
+
+  public static class chunkState {
+    /**
+     * Chunk Size. Must be same across all implementations
+     */
+    public static final int DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+
+    private int chunk_size_ = DEFAULT_CHUNK_SIZE;
+    private long offset_ = 0;
+
+    public chunkState() {}
+    public chunkState(int chunk_size) { chunk_size_ = chunk_size; }
+
+    public void skip(int size) {offset_ += size; }
+    public void seek(long offset) {offset_ = offset;}
+
+    public int getChunkSize() { return chunk_size_;}
+    public int getChunkNum() { return ((int)(offset_/chunk_size_));}
+    public int getRemaining() { return (chunk_size_ - ((int)(offset_ % chunk_size_)));}
+    public long getOffset() { return (offset_);}
+  }
+
+  public static enum tailPolicy {
+
+    NOWAIT(0, 0),
+      WAIT_FOREVER(500, -1);
+
+    /**
+     * Time in milliseconds to sleep before next read
+     * If 0, no sleep
+     */
+    public final int timeout_;
+
+    /**
+     * Number of retries before giving up
+     * if 0, no retries
+     * if -1, retry forever
+     */
+    public final int retries_;
+
+    /**
+     * ctor for policy
+     *
+     * @param timeout sleep time for this particular policy
+     * @param retries number of retries
+     */
+
+    tailPolicy(int timeout, int retries) {
+      timeout_ = timeout;
+      retries_ = retries;
+    }
+  }
+
+  /**
+   * Current tailing policy
+   */
+  tailPolicy currentPolicy_ = tailPolicy.NOWAIT;
+
+
+  /** 
+   * Underlying file being read
+   */
+  protected TSeekableFile inputFile_ = null;
+
+  /** 
+   * Underlying outputStream 
+   */
+  protected OutputStream outputStream_ = null;
+
+
+  /**
+   * Event currently read in
+   */
+  Event currentEvent_ = null;
+
+  /**
+   * InputStream currently being used for reading
+   */
+  InputStream inputStream_ = null;
+
+  /**
+   * current Chunk state
+   */
+  chunkState cs = null;
+
+  /**
+   * Read timeout
+   */
+  private int readTimeout_ = 0;
+
+  /**
+   * is read only?
+   */
+  private boolean readOnly_ = false;
+
+  /**
+   * Get File Tailing Policy
+   * 
+   * @return current read policy
+   */
+  public tailPolicy getTailPolicy() {
+    return (currentPolicy_);
+  }
+
+  /**
+   * Set file Tailing Policy
+   * 
+   * @param policy New policy to set
+   * @return Old policy
+   */
+  public tailPolicy setTailPolicy(tailPolicy policy) {
+    tailPolicy old = currentPolicy_;
+    currentPolicy_ = policy;
+    return (old);
+  }
+
+
+  /**
+   * Initialize read input stream
+   * 
+   * @return input stream to read from file
+   */
+  private InputStream createInputStream() throws TTransportException {
+    InputStream is;
+    try {
+      if(inputStream_ != null) {
+        ((truncableBufferedInputStream)inputStream_).trunc();
+        is = inputStream_;
+      } else {
+        is = new truncableBufferedInputStream(inputFile_.getInputStream());
+      }
+    } catch (IOException iox) {
+      System.err.println("createInputStream: "+iox.getMessage());
+      throw new TTransportException(iox.getMessage(), iox);
+    }
+    return(is);
+  }
+
+  /**
+   * Read (potentially tailing) an input stream
+   * 
+   * @param is InputStream to read from
+   * @param buf Buffer to read into
+   * @param off Offset in buffer to read into
+   * @param len Number of bytes to read
+   * @param tp  policy to use if we hit EOF
+   *
+   * @return number of bytes read
+   */
+  private int tailRead(InputStream is, byte[] buf, 
+                       int off, int len, tailPolicy tp) throws TTransportException {
+    int orig_len = len;
+    try {
+      int retries = 0;
+      while(len > 0) {
+        int cnt = is.read(buf, off, len);
+        if(cnt > 0) {
+          off += cnt;
+          len -= cnt;
+          retries = 0;
+          cs.skip(cnt); // remember that we read so many bytes
+        } else if (cnt == -1) {
+          // EOF
+          retries++;
+
+          if((tp.retries_ != -1) && tp.retries_ < retries)
+            return (orig_len - len);
+
+          if(tp.timeout_ > 0) {
+            try {Thread.sleep(tp.timeout_);} catch(InterruptedException e) {}
+          }
+        } else {
+          // either non-zero or -1 is what the contract says!
+          throw new
+            TTransportException("Unexpected return from InputStream.read = "
+                                + cnt);
+        }
+      }
+    } catch (IOException iox) {
+      throw new TTransportException(iox.getMessage(), iox);
+    }
+
+    return(orig_len - len);
+  }
+
+  /**
+   * Event is corrupted. Do recovery
+   *
+   * @return true if recovery could be performed and we can read more data
+   *         false is returned only when nothing more can be read
+   */
+  private boolean performRecovery() throws TTransportException {
+    int numChunks = getNumChunks();
+    int curChunk = cs.getChunkNum();
+
+    if(curChunk >= (numChunks-1)) {
+      return false;
+    }
+    seekToChunk(curChunk+1);
+    return true;
+  }
+
+  /**
+   * Read event from underlying file
+   *
+   * @return true if event could be read, false otherwise (on EOF)
+   */
+  private boolean readEvent() throws TTransportException {
+    byte[] ebytes = new byte[4];
+    int esize;
+    int nread;
+    int nrequested;
+
+    retry:
+    do {
+      // corner case. read to end of chunk
+      nrequested = cs.getRemaining();
+      if(nrequested < 4) {
+        nread = tailRead(inputStream_, ebytes, 0, nrequested, currentPolicy_);
+        if(nread != nrequested) {
+          return(false);
+        }
+      }
+
+      // assuming serialized on little endian machine
+      nread = tailRead(inputStream_, ebytes, 0, 4, currentPolicy_);
+      if(nread != 4) {
+        return(false);
+      }
+
+      esize=0;
+      for(int i=3; i>=0; i--) {
+        int val = (0x000000ff & (int)ebytes[i]);
+        esize |= (val << (i*8));
+      }
+
+      // check if event is corrupted and do recovery as required
+      if(esize > cs.getRemaining()) {
+        throw new TTransportException("FileTransport error: bad event size");
+        /*        
+                  if(performRecovery()) {
+                  esize=0;
+                  } else {
+                  return false;
+                  }
+        */
+      }
+    } while (esize == 0);
+
+    // reset existing event or get a larger one
+    if(currentEvent_.getSize() < esize)
+      currentEvent_ = new Event(new byte [esize]);
+
+    // populate the event
+    byte[] buf = currentEvent_.getBuf();
+    nread = tailRead(inputStream_, buf, 0, esize, currentPolicy_);
+    if(nread != esize) {
+      return(false);
+    }
+    currentEvent_.setAvailable(esize);
+    return(true);
+  }
+
+  /**
+   * open if both input/output open unless readonly
+   *
+   * @return true
+   */
+  public boolean isOpen() {
+    return ((inputStream_ != null) && (readOnly_ || (outputStream_ != null)));
+  }
+
+
+  /**
+   * Diverging from the cpp model and sticking to the TSocket model
+   * Files are not opened in ctor - but in explicit open call
+   */
+  public void open() throws TTransportException {
+    if (isOpen()) 
+      throw new TTransportException(TTransportException.ALREADY_OPEN);
+
+    try {
+      inputStream_ = createInputStream();
+      cs = new chunkState();
+      currentEvent_ = new Event(new byte [256]);
+
+      if(!readOnly_)
+        outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream(), 8192);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.NOT_OPEN, iox);
+    }
+  }
+
+  /**
+   * Closes the transport.
+   */
+  public void close() {
+    if (inputFile_ != null) {
+      try {
+        inputFile_.close();
+      } catch (IOException iox) {
+        System.err.println("WARNING: Error closing input file: " +
+                           iox.getMessage());
+      }
+      inputFile_ = null;
+    }
+    if (outputStream_ != null) {
+      try {
+        outputStream_.close();
+      } catch (IOException iox) {
+        System.err.println("WARNING: Error closing output stream: " +
+                           iox.getMessage());
+      }
+      outputStream_ = null;
+    }
+  }
+
+
+  /**
+   * File Transport ctor
+   *
+   * @param path File path to read and write from
+   * @param readOnly Whether this is a read-only transport
+   */ 
+  public TFileTransport(final String path, boolean readOnly) throws IOException {
+    inputFile_ = new TStandardFile(path);
+    readOnly_ = readOnly;
+  }
+
+  /**
+   * File Transport ctor
+   *
+   * @param inputFile open TSeekableFile to read/write from
+   * @param readOnly Whether this is a read-only transport
+   */
+  public TFileTransport(TSeekableFile inputFile, boolean readOnly) {
+    inputFile_ = inputFile;
+    readOnly_ = readOnly;
+  }
+
+
+  /**
+   * Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception
+   * where one is detected.
+   */
+  public int readAll(byte[] buf, int off, int len)
+    throws TTransportException {
+    int got = 0;
+    int ret = 0;
+    while (got < len) {
+      ret = read(buf, off+got, len-got);
+      if (ret < 0) {
+        throw new TTransportException("Error in reading from file");
+      }
+      if(ret == 0) {
+        throw new TTransportException(TTransportException.END_OF_FILE,
+                                      "End of File reached");
+      }
+      got += ret;
+    }
+    return got;
+  }
+
+
+  /**
+   * Reads up to len bytes into buffer buf, starting at offset off.
+   *
+   * @param buf Array to read into
+   * @param off Index to start reading at
+   * @param len Maximum number of bytes to read
+   * @return The number of bytes actually read
+   * @throws TTransportException if there was an error reading data
+   */
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if(!isOpen()) 
+      throw new TTransportException(TTransportException.NOT_OPEN, 
+                                    "Must open before reading");
+
+    if(currentEvent_.getRemaining() == 0) {
+      if(!readEvent())
+        return(0);
+    }
+
+    int nread = currentEvent_.emit(buf, off, len);
+    return nread;
+  }
+
+  public int getNumChunks() throws TTransportException {
+    if(!isOpen()) 
+      throw new TTransportException(TTransportException.NOT_OPEN, 
+                                    "Must open before getNumChunks");
+    try {
+      long len = inputFile_.length();
+      if(len == 0)
+        return 0;
+      else 
+        return (((int)(len/cs.getChunkSize())) + 1);
+
+    } catch (IOException iox) {
+      throw new TTransportException(iox.getMessage(), iox);
+    }
+  }
+
+  public int getCurChunk() throws TTransportException {
+    if(!isOpen()) 
+      throw new TTransportException(TTransportException.NOT_OPEN, 
+                                    "Must open before getCurChunk");
+    return (cs.getChunkNum());
+
+  }
+
+
+  public void seekToChunk(int chunk) throws TTransportException {
+    if(!isOpen()) 
+      throw new TTransportException(TTransportException.NOT_OPEN, 
+                                    "Must open before seeking");
+
+    int numChunks = getNumChunks();
+
+    // file is empty, seeking to chunk is pointless
+    if (numChunks == 0) {
+      return;
+    }
+
+    // negative indicates reverse seek (from the end)
+    if (chunk < 0) {
+      chunk += numChunks;
+    }
+
+    // too large a value for reverse seek, just seek to beginning
+    if (chunk < 0) {
+      chunk = 0;
+    }
+
+    long eofOffset=0;
+    boolean seekToEnd = (chunk >= numChunks);
+    if(seekToEnd) {
+      chunk = chunk - 1;
+      try { eofOffset = inputFile_.length(); }
+      catch (IOException iox) {throw new TTransportException(iox.getMessage(),
+                                                             iox);}
+    }
+
+    if(chunk*cs.getChunkSize() != cs.getOffset()) {
+      try { inputFile_.seek((long)chunk*cs.getChunkSize()); } 
+      catch (IOException iox) {
+        System.err.println("createInputStream: "+iox.getMessage());
+        throw new TTransportException("Seek to chunk " +
+                                      chunk + " " +iox.getMessage(), iox);
+      }
+
+      cs.seek((long)chunk*cs.getChunkSize());
+      currentEvent_.setAvailable(0);
+      inputStream_ = createInputStream();
+    }
+
+    if(seekToEnd) {
+      // waiting forever here - otherwise we can hit EOF and end up
+      // having consumed partial data from the data stream.
+      tailPolicy old = setTailPolicy(tailPolicy.WAIT_FOREVER);
+      while(cs.getOffset() < eofOffset) { readEvent(); }
+      currentEvent_.setAvailable(0);
+      setTailPolicy(old);
+    }
+  }
+
+  public void seekToEnd() throws TTransportException {
+    if(!isOpen()) 
+      throw new TTransportException(TTransportException.NOT_OPEN, 
+                                    "Must open before seeking");
+    seekToChunk(getNumChunks());
+  }
+
+
+  /**
+   * Writes up to len bytes from the buffer.
+   *
+   * @param buf The output data buffer
+   * @param off The offset to start writing from
+   * @param len The number of bytes to write
+   * @throws TTransportException if there was an error writing data
+   */
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    throw new TTransportException("Not Supported");
+  }
+
+  /**
+   * Flush any pending data out of a transport buffer.
+   *
+   * @throws TTransportException if there was an error writing out data.
+   */
+  public void flush() throws TTransportException {
+    throw new TTransportException("Not Supported");
+  }
+
+  /**
+   * test program
+   * 
+   */
+  public static void main(String[] args) throws Exception {
+
+    int num_chunks = 10;
+
+    if((args.length < 1) || args[0].equals("--help")
+       || args[0].equals("-h") || args[0].equals("-?")) {
+      printUsage();
+    }
+
+    if(args.length > 1) {
+      try {
+        num_chunks = Integer.parseInt(args[1]);
+      } catch (Exception e) {
+        System.err.println("Cannot parse " + args[1]); 
+        printUsage();
+      }
+    }
+
+    TFileTransport t = new TFileTransport(args[0], true);
+    t.open();
+    System.out.println("NumChunks="+t.getNumChunks());
+
+    Random r = new Random();
+    for(int j=0; j<num_chunks; j++) {
+      byte[] buf = new byte[4096];
+      int cnum = r.nextInt(t.getNumChunks()-1);
+      System.out.println("Reading chunk "+cnum);
+      t.seekToChunk(cnum);
+      for(int i=0; i<4096; i++) {
+        t.read(buf, 0, 4096);
+      }
+    }
+  }
+
+  private static void printUsage() {
+    System.err.println("Usage: TFileTransport <filename> [num_chunks]");
+    System.err.println("       (Opens and reads num_chunks chunks from file randomly)");
+    System.exit(1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFramedTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFramedTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFramedTransport.java
new file mode 100644
index 0000000..9968b34
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFramedTransport.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TByteArrayOutputStream;
+
+/**
+ * TFramedTransport is a buffered TTransport that ensures a fully read message
+ * every time by preceding messages with a 4-byte frame size.
+ */
+public class TFramedTransport extends TTransport {
+
+  protected static final int DEFAULT_MAX_LENGTH = 16384000;
+
+  private int maxLength_;
+
+  /**
+   * Underlying transport
+   */
+  private TTransport transport_ = null;
+
+  /**
+   * Buffer for output
+   */
+  private final TByteArrayOutputStream writeBuffer_ =
+    new TByteArrayOutputStream(1024);
+
+  /**
+   * Buffer for input
+   */
+  private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);
+
+  public static class Factory extends TTransportFactory {
+    private int maxLength_;
+
+    public Factory() {
+      maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+    }
+
+    public Factory(int maxLength) {
+      maxLength_ = maxLength;
+    }
+
+    @Override
+    public TTransport getTransport(TTransport base) {
+      return new TFramedTransport(base, maxLength_);
+    }
+  }
+
+  /**
+   * Constructor wraps around another transport
+   */
+  public TFramedTransport(TTransport transport, int maxLength) {
+    transport_ = transport;
+    maxLength_ = maxLength;
+  }
+
+  public TFramedTransport(TTransport transport) {
+    transport_ = transport;
+    maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+  }
+
+  public void open() throws TTransportException {
+    transport_.open();
+  }
+
+  public boolean isOpen() {
+    return transport_.isOpen();
+  }
+
+  public void close() {
+    transport_.close();
+  }
+
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if (readBuffer_ != null) {
+      int got = readBuffer_.read(buf, off, len);
+      if (got > 0) {
+        return got;
+      }
+    }
+
+    // Read another frame of data
+    readFrame();
+
+    return readBuffer_.read(buf, off, len);
+  }
+
+  @Override
+  public byte[] getBuffer() {
+    return readBuffer_.getBuffer();
+  }
+
+  @Override
+  public int getBufferPosition() {
+    return readBuffer_.getBufferPosition();
+  }
+
+  @Override
+  public int getBytesRemainingInBuffer() {
+    return readBuffer_.getBytesRemainingInBuffer();
+  }
+
+  @Override
+  public void consumeBuffer(int len) {
+    readBuffer_.consumeBuffer(len);
+  }
+
+  private final byte[] i32buf = new byte[4];
+
+  private void readFrame() throws TTransportException {
+    transport_.readAll(i32buf, 0, 4);
+    int size = decodeFrameSize(i32buf);
+
+    if (size < 0) {
+      throw new TTransportException("Read a negative frame size (" + size + ")!");
+    }
+
+    if (size > maxLength_) {
+      throw new TTransportException("Frame size (" + size + ") larger than max length (" + maxLength_ + ")!");
+    }
+
+    byte[] buff = new byte[size];
+    transport_.readAll(buff, 0, size);
+    readBuffer_.reset(buff);
+  }
+
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    writeBuffer_.write(buf, off, len);
+  }
+
+  @Override
+  public void flush() throws TTransportException {
+    byte[] buf = writeBuffer_.get();
+    int len = writeBuffer_.len();
+    writeBuffer_.reset();
+
+    encodeFrameSize(len, i32buf);
+    transport_.write(i32buf, 0, 4);
+    transport_.write(buf, 0, len);
+    transport_.flush();
+  }
+
+  public static final void encodeFrameSize(final int frameSize, final byte[] buf) {
+    buf[0] = (byte)(0xff & (frameSize >> 24));
+    buf[1] = (byte)(0xff & (frameSize >> 16));
+    buf[2] = (byte)(0xff & (frameSize >> 8));
+    buf[3] = (byte)(0xff & (frameSize));
+  }
+
+  public static final int decodeFrameSize(final byte[] buf) {
+    return 
+      ((buf[0] & 0xff) << 24) |
+      ((buf[1] & 0xff) << 16) |
+      ((buf[2] & 0xff) <<  8) |
+      ((buf[3] & 0xff));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/THttpClient.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/THttpClient.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/THttpClient.java
new file mode 100644
index 0000000..91a5b72
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/THttpClient.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+import java.net.URL;
+import java.net.HttpURLConnection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * HTTP implementation of the TTransport interface. Used for working with a
+ * Thrift web services implementation (using for example TServlet).
+ *
+ * This class offers two implementations of the HTTP transport.
+ * One uses HttpURLConnection instances, the other HttpClient from Apache
+ * Http Components.
+ * The chosen implementation depends on the constructor used to
+ * create the THttpClient instance.
+ * Using the THttpClient(String url) constructor or passing null as the
+ * HttpClient to THttpClient(String url, HttpClient client) will create an
+ * instance which will use HttpURLConnection.
+ *
+ * When using HttpClient, the following configuration leads to 5-15% 
+ * better performance than the HttpURLConnection implementation:
+ *
+ * http.protocol.version=HttpVersion.HTTP_1_1
+ * http.protocol.content-charset=UTF-8
+ * http.protocol.expect-continue=false
+ * http.connection.stalecheck=false
+ *
+ * Also note that under high load, the HttpURLConnection implementation
+ * may exhaust the open file descriptor limit.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/THRIFT-970">THRIFT-970</a>
+ */
+
+public class THttpClient extends TTransport {
+
+  private URL url_ = null;
+
+  private final ByteArrayOutputStream requestBuffer_ = new ByteArrayOutputStream();
+
+  private InputStream inputStream_ = null;
+
+  private int connectTimeout_ = 0;
+
+  private int readTimeout_ = 0;
+
+  private Map<String,String> customHeaders_ = null;
+
+  private final HttpHost host;
+  
+  private final HttpClient client;
+  
+  public static class Factory extends TTransportFactory {
+    
+    private final String url;
+    private final HttpClient client;
+    
+    public Factory(String url) {
+      this.url = url;
+      this.client = null;
+    }
+
+    public Factory(String url, HttpClient client) {
+      this.url = url;
+      this.client = client;
+    }
+    
+    @Override
+    public TTransport getTransport(TTransport trans) {
+      try {
+        if (null != client) {
+          return new THttpClient(url, client);
+        } else {
+          return new THttpClient(url);
+        }
+      } catch (TTransportException tte) {
+        return null;
+      }
+    }
+  }
+
+  public THttpClient(String url) throws TTransportException {
+    try {
+      url_ = new URL(url);
+      this.client = null;
+      this.host = null;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public THttpClient(String url, HttpClient client) throws TTransportException {
+    try {
+      url_ = new URL(url);
+      this.client = client;
+      this.host = new HttpHost(url_.getHost(), -1 == url_.getPort() ? url_.getDefaultPort() : url_.getPort(), url_.getProtocol());
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void setConnectTimeout(int timeout) {
+    connectTimeout_ = timeout;
+    if (null != this.client) {
+      // WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the
+      // same HttpClient is used for something else.
+      client.getParams().setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, connectTimeout_);
+    }
+  }
+
+  public void setReadTimeout(int timeout) {
+    readTimeout_ = timeout;
+    if (null != this.client) {
+      // WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the
+      // same HttpClient is used for something else.
+      client.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, readTimeout_);
+    }
+  }
+
+  public void setCustomHeaders(Map<String,String> headers) {
+    customHeaders_ = headers;
+  }
+
+  public void setCustomHeader(String key, String value) {
+    if (customHeaders_ == null) {
+      customHeaders_ = new HashMap<String, String>();
+    }
+    customHeaders_.put(key, value);
+  }
+
+  public void open() {}
+
+  public void close() {
+    if (null != inputStream_) {
+      try {
+        inputStream_.close();
+      } catch (IOException ioe) {
+        ;
+      }
+      inputStream_ = null;
+    }
+  }
+
+  public boolean isOpen() {
+    return true;
+  }
+
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if (inputStream_ == null) {
+      throw new TTransportException("Response buffer is empty, no request.");
+    }
+    try {
+      int ret = inputStream_.read(buf, off, len);
+      if (ret == -1) {
+        throw new TTransportException("No more data available.");
+      }
+      return ret;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void write(byte[] buf, int off, int len) {
+    requestBuffer_.write(buf, off, len);
+  }
+
+  private void flushUsingHttpClient() throws TTransportException {
+    
+    if (null == this.client) {
+      throw new TTransportException("Null HttpClient, aborting.");
+    }
+
+    // Extract request and reset buffer
+    byte[] data = requestBuffer_.toByteArray();
+    requestBuffer_.reset();
+
+    HttpPost post = null;
+    
+    InputStream is = null;
+    
+    try {      
+      // Set request to path + query string
+      post = new HttpPost(this.url_.getFile());
+      
+      //
+      // Headers are added to the HttpPost instance, not
+      // to HttpClient.
+      //
+      
+      post.setHeader("Content-Type", "application/x-thrift");
+      post.setHeader("Accept", "application/x-thrift");
+      post.setHeader("User-Agent", "Java/THttpClient/HC");
+      
+      if (null != customHeaders_) {
+        for (Map.Entry<String, String> header : customHeaders_.entrySet()) {
+          post.setHeader(header.getKey(), header.getValue());
+        }
+      }
+
+      post.setEntity(new ByteArrayEntity(data));
+      
+      HttpResponse response = this.client.execute(this.host, post);
+      int responseCode = response.getStatusLine().getStatusCode();
+
+      //      
+      // Retrieve the inputstream BEFORE checking the status code so
+      // resources get freed in the finally clause.
+      //
+
+      is = response.getEntity().getContent();
+      
+      if (responseCode != HttpStatus.SC_OK) {
+        throw new TTransportException("HTTP Response code: " + responseCode);
+      }
+
+      // Read the responses into a byte array so we can release the connection
+      // early. This implies that the whole content will have to be read in
+      // memory, and that momentarily we might use up twice the memory (while the
+      // thrift struct is being read up the chain).
+      // Proceeding differently might lead to exhaustion of connections and thus
+      // to app failure.
+      
+      byte[] buf = new byte[1024];
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      
+      int len = 0;
+      do {
+        len = is.read(buf);
+        if (len > 0) {
+          baos.write(buf, 0, len);
+        }
+      } while (-1 != len);
+      
+      try {
+        // Indicate we're done with the content.
+        EntityUtils.consume(response.getEntity());
+      } catch (IOException ioe) {
+        // We ignore this exception, it might only mean the server has no
+        // keep-alive capability.
+      }
+            
+      inputStream_ = new ByteArrayInputStream(baos.toByteArray());
+    } catch (IOException ioe) {
+      // Abort method so the connection gets released back to the connection manager
+      if (null != post) {
+        post.abort();
+      }
+      throw new TTransportException(ioe);
+    } finally {
+      if (null != is) {
+        // Close the entity's input stream, this will release the underlying connection
+        try {
+          is.close();
+        } catch (IOException ioe) {
+          throw new TTransportException(ioe);
+        }
+      }
+    }
+  }
+
+  public void flush() throws TTransportException {
+
+    if (null != this.client) {
+      flushUsingHttpClient();
+      return;
+    }
+
+    // Extract request and reset buffer
+    byte[] data = requestBuffer_.toByteArray();
+    requestBuffer_.reset();
+
+    try {
+      // Create connection object
+      HttpURLConnection connection = (HttpURLConnection)url_.openConnection();
+
+      // Timeouts, only if explicitly set
+      if (connectTimeout_ > 0) {
+        connection.setConnectTimeout(connectTimeout_);
+      }
+      if (readTimeout_ > 0) {
+        connection.setReadTimeout(readTimeout_);
+      }
+
+      // Make the request
+      connection.setRequestMethod("POST");
+      connection.setRequestProperty("Content-Type", "application/x-thrift");
+      connection.setRequestProperty("Accept", "application/x-thrift");
+      connection.setRequestProperty("User-Agent", "Java/THttpClient");
+      if (customHeaders_ != null) {
+        for (Map.Entry<String, String> header : customHeaders_.entrySet()) {
+          connection.setRequestProperty(header.getKey(), header.getValue());
+        }
+      }
+      connection.setDoOutput(true);
+      connection.connect();
+      connection.getOutputStream().write(data);
+
+      int responseCode = connection.getResponseCode();
+      if (responseCode != HttpURLConnection.HTTP_OK) {
+        throw new TTransportException("HTTP Response code: " + responseCode);
+      }
+
+      // Read the responses
+      inputStream_ = connection.getInputStream();
+
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+}


Mime
View raw message