incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [13/19] Thrift re-package, sorry for the huge commit.
Date Thu, 23 May 2013 19:39:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileTransport.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileTransport.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileTransport.java
new file mode 100644
index 0000000..57cff88
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFileTransport.java
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * FileTransport implementation of the TTransport interface.
+ * Currently this is a straightforward port of the cpp implementation
+ * 
+ * It may make better sense to provide a basic stream access on top of the framed file format
+ * The FileTransport can then be a user of this framed file format with some additional logic
+ * for chunking.
+ *
+ * @author Joydeep Sen Sarma <jssarma@facebook.com>
+ */
+public class TFileTransport extends TTransport {
+
+  public static class truncableBufferedInputStream extends BufferedInputStream {
+    public void trunc() {
+      pos = count = 0;
+    }        
+    public truncableBufferedInputStream(InputStream in) {
+      super(in);
+    }
+    public truncableBufferedInputStream(InputStream in, int size) {
+      super(in, size);
+    }
+  }
+
+
+  public static class Event {
+    private byte[] buf_;
+    private int nread_;
+    private int navailable_;
+
+    /**
+     * Initialize an event. Initially, it has no valid contents
+     *
+     * @param buf byte array buffer to store event 
+     */
+    public Event(byte[] buf) {
+      buf_ = buf;
+      nread_ = navailable_ = 0;
+    }
+
+    public byte[] getBuf() { return buf_;}
+    public int getSize() { return buf_.length; }
+
+
+    public void setAvailable(int sz) { nread_ = 0; navailable_=sz;}
+    public int getRemaining() { return (navailable_ - nread_); }
+
+    public int emit(byte[] buf, int offset, int ndesired) {
+      if((ndesired == 0) || (ndesired > getRemaining()))
+        ndesired = getRemaining();
+
+      if(ndesired <= 0)
+        return (ndesired);
+
+      System.arraycopy(buf_, nread_, buf, offset, ndesired);
+      nread_ += ndesired;
+
+      return(ndesired);
+    }
+  };
+
+  public static class chunkState {
+    /**
+     * Chunk Size. Must be same across all implementations
+     */
+    public static final int DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+
+    private int chunk_size_ = DEFAULT_CHUNK_SIZE;
+    private long offset_ = 0;
+
+    public chunkState() {}
+    public chunkState(int chunk_size) { chunk_size_ = chunk_size; }
+
+    public void skip(int size) {offset_ += size; }
+    public void seek(long offset) {offset_ = offset;}
+
+    public int getChunkSize() { return chunk_size_;}
+    public int getChunkNum() { return ((int)(offset_/chunk_size_));}
+    public int getRemaining() { return (chunk_size_ - ((int)(offset_ % chunk_size_)));}
+    public long getOffset() { return (offset_);}
+  }
+
+  public static enum tailPolicy {
+
+    NOWAIT(0, 0),
+      WAIT_FOREVER(500, -1);
+
+    /**
+     * Time in milliseconds to sleep before next read
+     * If 0, no sleep
+     */
+    public final int timeout_;
+
+    /**
+     * Number of retries before giving up
+     * if 0, no retries
+     * if -1, retry forever
+     */
+    public final int retries_;
+
+    /**
+     * ctor for policy
+     *
+     * @param timeout sleep time for this particular policy
+     * @param retries number of retries
+     */
+
+    tailPolicy(int timeout, int retries) {
+      timeout_ = timeout;
+      retries_ = retries;
+    }
+  }
+
+  /**
+   * Current tailing policy
+   */
+  tailPolicy currentPolicy_ = tailPolicy.NOWAIT;
+
+
+  /** 
+   * Underlying file being read
+   */
+  protected TSeekableFile inputFile_ = null;
+
+  /** 
+   * Underlying outputStream 
+   */
+  protected OutputStream outputStream_ = null;
+
+
+  /**
+   * Event currently read in
+   */
+  Event currentEvent_ = null;
+
+  /**
+   * InputStream currently being used for reading
+   */
+  InputStream inputStream_ = null;
+
+  /**
+   * current Chunk state
+   */
+  chunkState cs = null;
+
+  /**
+   * Read timeout
+   */
+  private int readTimeout_ = 0;
+
+  /**
+   * is read only?
+   */
+  private boolean readOnly_ = false;
+
+  /**
+   * Get File Tailing Policy
+   * 
+   * @return current read policy
+   */
+  public tailPolicy getTailPolicy() {
+    return (currentPolicy_);
+  }
+
+  /**
+   * Set file Tailing Policy
+   * 
+   * @param policy New policy to set
+   * @return Old policy
+   */
+  public tailPolicy setTailPolicy(tailPolicy policy) {
+    tailPolicy old = currentPolicy_;
+    currentPolicy_ = policy;
+    return (old);
+  }
+
+
+  /**
+   * Initialize read input stream
+   * 
+   * @return input stream to read from file
+   */
+  private InputStream createInputStream() throws TTransportException {
+    InputStream is;
+    try {
+      if(inputStream_ != null) {
+        ((truncableBufferedInputStream)inputStream_).trunc();
+        is = inputStream_;
+      } else {
+        is = new truncableBufferedInputStream(inputFile_.getInputStream());
+      }
+    } catch (IOException iox) {
+      System.err.println("createInputStream: "+iox.getMessage());
+      throw new TTransportException(iox.getMessage(), iox);
+    }
+    return(is);
+  }
+
+  /**
+   * Read (potentially tailing) an input stream
+   * 
+   * @param is InputStream to read from
+   * @param buf Buffer to read into
+   * @param off Offset in buffer to read into
+   * @param len Number of bytes to read
+   * @param tp  policy to use if we hit EOF
+   *
+   * @return number of bytes read
+   */
+  private int tailRead(InputStream is, byte[] buf, 
+                       int off, int len, tailPolicy tp) throws TTransportException {
+    int orig_len = len;
+    try {
+      int retries = 0;
+      while(len > 0) {
+        int cnt = is.read(buf, off, len);
+        if(cnt > 0) {
+          off += cnt;
+          len -= cnt;
+          retries = 0;
+          cs.skip(cnt); // remember that we read so many bytes
+        } else if (cnt == -1) {
+          // EOF
+          retries++;
+
+          if((tp.retries_ != -1) && tp.retries_ < retries)
+            return (orig_len - len);
+
+          if(tp.timeout_ > 0) {
+            try {Thread.sleep(tp.timeout_);} catch(InterruptedException e) {}
+          }
+        } else {
+          // either non-zero or -1 is what the contract says!
+          throw new
+            TTransportException("Unexpected return from InputStream.read = "
+                                + cnt);
+        }
+      }
+    } catch (IOException iox) {
+      throw new TTransportException(iox.getMessage(), iox);
+    }
+
+    return(orig_len - len);
+  }
+
+  /**
+   * Event is corrupted. Do recovery
+   *
+   * @return true if recovery could be performed and we can read more data
+   *         false is returned only when nothing more can be read
+   */
+  private boolean performRecovery() throws TTransportException {
+    int numChunks = getNumChunks();
+    int curChunk = cs.getChunkNum();
+
+    if(curChunk >= (numChunks-1)) {
+      return false;
+    }
+    seekToChunk(curChunk+1);
+    return true;
+  }
+
+  /**
+   * Read event from underlying file
+   *
+   * @return true if event could be read, false otherwise (on EOF)
+   */
+  private boolean readEvent() throws TTransportException {
+    byte[] ebytes = new byte[4];
+    int esize;
+    int nread;
+    int nrequested;
+
+    retry:
+    do {
+      // corner case. read to end of chunk
+      nrequested = cs.getRemaining();
+      if(nrequested < 4) {
+        nread = tailRead(inputStream_, ebytes, 0, nrequested, currentPolicy_);
+        if(nread != nrequested) {
+          return(false);
+        }
+      }
+
+      // assuming serialized on little endian machine
+      nread = tailRead(inputStream_, ebytes, 0, 4, currentPolicy_);
+      if(nread != 4) {
+        return(false);
+      }
+
+      esize=0;
+      for(int i=3; i>=0; i--) {
+        int val = (0x000000ff & (int)ebytes[i]);
+        esize |= (val << (i*8));
+      }
+
+      // check if event is corrupted and do recovery as required
+      if(esize > cs.getRemaining()) {
+        throw new TTransportException("FileTransport error: bad event size");
+        /*        
+                  if(performRecovery()) {
+                  esize=0;
+                  } else {
+                  return false;
+                  }
+        */
+      }
+    } while (esize == 0);
+
+    // reset existing event or get a larger one
+    if(currentEvent_.getSize() < esize)
+      currentEvent_ = new Event(new byte [esize]);
+
+    // populate the event
+    byte[] buf = currentEvent_.getBuf();
+    nread = tailRead(inputStream_, buf, 0, esize, currentPolicy_);
+    if(nread != esize) {
+      return(false);
+    }
+    currentEvent_.setAvailable(esize);
+    return(true);
+  }
+
+  /**
+   * open if both input/output open unless readonly
+   *
+   * @return true
+   */
+  public boolean isOpen() {
+    return ((inputStream_ != null) && (readOnly_ || (outputStream_ != null)));
+  }
+
+
+  /**
+   * Diverging from the cpp model and sticking to the TSocket model
+   * Files are not opened in ctor - but in explicit open call
+   */
+  public void open() throws TTransportException {
+    if (isOpen()) 
+      throw new TTransportException(TTransportException.ALREADY_OPEN);
+
+    try {
+      inputStream_ = createInputStream();
+      cs = new chunkState();
+      currentEvent_ = new Event(new byte [256]);
+
+      if(!readOnly_)
+        outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream(), 8192);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.NOT_OPEN, iox);
+    }
+  }
+
+  /**
+   * Closes the transport.
+   */
+  public void close() {
+    if (inputFile_ != null) {
+      try {
+        inputFile_.close();
+      } catch (IOException iox) {
+        System.err.println("WARNING: Error closing input file: " +
+                           iox.getMessage());
+      }
+      inputFile_ = null;
+    }
+    if (outputStream_ != null) {
+      try {
+        outputStream_.close();
+      } catch (IOException iox) {
+        System.err.println("WARNING: Error closing output stream: " +
+                           iox.getMessage());
+      }
+      outputStream_ = null;
+    }
+  }
+
+
+  /**
+   * File Transport ctor
+   *
+   * @param path File path to read and write from
+   * @param readOnly Whether this is a read-only transport
+   */ 
+  public TFileTransport(final String path, boolean readOnly) throws IOException {
+    inputFile_ = new TStandardFile(path);
+    readOnly_ = readOnly;
+  }
+
+  /**
+   * File Transport ctor
+   *
+   * @param inputFile open TSeekableFile to read/write from
+   * @param readOnly Whether this is a read-only transport
+   */
+  public TFileTransport(TSeekableFile inputFile, boolean readOnly) {
+    inputFile_ = inputFile;
+    readOnly_ = readOnly;
+  }
+
+
+  /**
+   * Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception
+   * where one is detected.
+   */
+  public int readAll(byte[] buf, int off, int len)
+    throws TTransportException {
+    int got = 0;
+    int ret = 0;
+    while (got < len) {
+      ret = read(buf, off+got, len-got);
+      if (ret < 0) {
+        throw new TTransportException("Error in reading from file");
+      }
+      if(ret == 0) {
+        throw new TTransportException(TTransportException.END_OF_FILE,
+                                      "End of File reached");
+      }
+      got += ret;
+    }
+    return got;
+  }
+
+
+  /**
+   * Reads up to len bytes into buffer buf, starting at offset off.
+   *
+   * @param buf Array to read into
+   * @param off Index to start reading at
+   * @param len Maximum number of bytes to read
+   * @return The number of bytes actually read
+   * @throws TTransportException if there was an error reading data
+   */
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if(!isOpen()) 
+      throw new TTransportException(TTransportException.NOT_OPEN, 
+                                    "Must open before reading");
+
+    if(currentEvent_.getRemaining() == 0) {
+      if(!readEvent())
+        return(0);
+    }
+
+    int nread = currentEvent_.emit(buf, off, len);
+    return nread;
+  }
+
+  public int getNumChunks() throws TTransportException {
+    if(!isOpen()) 
+      throw new TTransportException(TTransportException.NOT_OPEN, 
+                                    "Must open before getNumChunks");
+    try {
+      long len = inputFile_.length();
+      if(len == 0)
+        return 0;
+      else 
+        return (((int)(len/cs.getChunkSize())) + 1);
+
+    } catch (IOException iox) {
+      throw new TTransportException(iox.getMessage(), iox);
+    }
+  }
+
+  public int getCurChunk() throws TTransportException {
+    if(!isOpen()) 
+      throw new TTransportException(TTransportException.NOT_OPEN, 
+                                    "Must open before getCurChunk");
+    return (cs.getChunkNum());
+
+  }
+
+
+  public void seekToChunk(int chunk) throws TTransportException {
+    if(!isOpen()) 
+      throw new TTransportException(TTransportException.NOT_OPEN, 
+                                    "Must open before seeking");
+
+    int numChunks = getNumChunks();
+
+    // file is empty, seeking to chunk is pointless
+    if (numChunks == 0) {
+      return;
+    }
+
+    // negative indicates reverse seek (from the end)
+    if (chunk < 0) {
+      chunk += numChunks;
+    }
+
+    // too large a value for reverse seek, just seek to beginning
+    if (chunk < 0) {
+      chunk = 0;
+    }
+
+    long eofOffset=0;
+    boolean seekToEnd = (chunk >= numChunks);
+    if(seekToEnd) {
+      chunk = chunk - 1;
+      try { eofOffset = inputFile_.length(); }
+      catch (IOException iox) {throw new TTransportException(iox.getMessage(),
+                                                             iox);}
+    }
+
+    if(chunk*cs.getChunkSize() != cs.getOffset()) {
+      try { inputFile_.seek((long)chunk*cs.getChunkSize()); } 
+      catch (IOException iox) {
+        System.err.println("createInputStream: "+iox.getMessage());
+        throw new TTransportException("Seek to chunk " +
+                                      chunk + " " +iox.getMessage(), iox);
+      }
+
+      cs.seek((long)chunk*cs.getChunkSize());
+      currentEvent_.setAvailable(0);
+      inputStream_ = createInputStream();
+    }
+
+    if(seekToEnd) {
+      // waiting forever here - otherwise we can hit EOF and end up
+      // having consumed partial data from the data stream.
+      tailPolicy old = setTailPolicy(tailPolicy.WAIT_FOREVER);
+      while(cs.getOffset() < eofOffset) { readEvent(); }
+      currentEvent_.setAvailable(0);
+      setTailPolicy(old);
+    }
+  }
+
+  public void seekToEnd() throws TTransportException {
+    if(!isOpen()) 
+      throw new TTransportException(TTransportException.NOT_OPEN, 
+                                    "Must open before seeking");
+    seekToChunk(getNumChunks());
+  }
+
+
+  /**
+   * Writes up to len bytes from the buffer.
+   *
+   * @param buf The output data buffer
+   * @param off The offset to start writing from
+   * @param len The number of bytes to write
+   * @throws TTransportException if there was an error writing data
+   */
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    throw new TTransportException("Not Supported");
+  }
+
+  /**
+   * Flush any pending data out of a transport buffer.
+   *
+   * @throws TTransportException if there was an error writing out data.
+   */
+  public void flush() throws TTransportException {
+    throw new TTransportException("Not Supported");
+  }
+
+  /**
+   * test program
+   * 
+   */
+  public static void main(String[] args) throws Exception {
+
+    int num_chunks = 10;
+
+    if((args.length < 1) || args[0].equals("--help")
+       || args[0].equals("-h") || args[0].equals("-?")) {
+      printUsage();
+    }
+
+    if(args.length > 1) {
+      try {
+        num_chunks = Integer.parseInt(args[1]);
+      } catch (Exception e) {
+        System.err.println("Cannot parse " + args[1]); 
+        printUsage();
+      }
+    }
+
+    TFileTransport t = new TFileTransport(args[0], true);
+    t.open();
+    System.out.println("NumChunks="+t.getNumChunks());
+
+    Random r = new Random();
+    for(int j=0; j<num_chunks; j++) {
+      byte[] buf = new byte[4096];
+      int cnum = r.nextInt(t.getNumChunks()-1);
+      System.out.println("Reading chunk "+cnum);
+      t.seekToChunk(cnum);
+      for(int i=0; i<4096; i++) {
+        t.read(buf, 0, 4096);
+      }
+    }
+  }
+
+  private static void printUsage() {
+    System.err.println("Usage: TFileTransport <filename> [num_chunks]");
+    System.err.println("       (Opens and reads num_chunks chunks from file randomly)");
+    System.exit(1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFramedTransport.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFramedTransport.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFramedTransport.java
new file mode 100644
index 0000000..9968b34
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TFramedTransport.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TByteArrayOutputStream;
+
+/**
+ * TFramedTransport is a buffered TTransport that ensures a fully read message
+ * every time by preceding messages with a 4-byte frame size.
+ */
+public class TFramedTransport extends TTransport {
+
+  protected static final int DEFAULT_MAX_LENGTH = 16384000;
+
+  private int maxLength_;
+
+  /**
+   * Underlying transport
+   */
+  private TTransport transport_ = null;
+
+  /**
+   * Buffer for output
+   */
+  private final TByteArrayOutputStream writeBuffer_ =
+    new TByteArrayOutputStream(1024);
+
+  /**
+   * Buffer for input
+   */
+  private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);
+
+  public static class Factory extends TTransportFactory {
+    private int maxLength_;
+
+    public Factory() {
+      maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+    }
+
+    public Factory(int maxLength) {
+      maxLength_ = maxLength;
+    }
+
+    @Override
+    public TTransport getTransport(TTransport base) {
+      return new TFramedTransport(base, maxLength_);
+    }
+  }
+
+  /**
+   * Constructor wraps around another transport
+   */
+  public TFramedTransport(TTransport transport, int maxLength) {
+    transport_ = transport;
+    maxLength_ = maxLength;
+  }
+
+  public TFramedTransport(TTransport transport) {
+    transport_ = transport;
+    maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+  }
+
+  public void open() throws TTransportException {
+    transport_.open();
+  }
+
+  public boolean isOpen() {
+    return transport_.isOpen();
+  }
+
+  public void close() {
+    transport_.close();
+  }
+
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if (readBuffer_ != null) {
+      int got = readBuffer_.read(buf, off, len);
+      if (got > 0) {
+        return got;
+      }
+    }
+
+    // Read another frame of data
+    readFrame();
+
+    return readBuffer_.read(buf, off, len);
+  }
+
+  @Override
+  public byte[] getBuffer() {
+    return readBuffer_.getBuffer();
+  }
+
+  @Override
+  public int getBufferPosition() {
+    return readBuffer_.getBufferPosition();
+  }
+
+  @Override
+  public int getBytesRemainingInBuffer() {
+    return readBuffer_.getBytesRemainingInBuffer();
+  }
+
+  @Override
+  public void consumeBuffer(int len) {
+    readBuffer_.consumeBuffer(len);
+  }
+
+  private final byte[] i32buf = new byte[4];
+
+  private void readFrame() throws TTransportException {
+    transport_.readAll(i32buf, 0, 4);
+    int size = decodeFrameSize(i32buf);
+
+    if (size < 0) {
+      throw new TTransportException("Read a negative frame size (" + size + ")!");
+    }
+
+    if (size > maxLength_) {
+      throw new TTransportException("Frame size (" + size + ") larger than max length (" + maxLength_ + ")!");
+    }
+
+    byte[] buff = new byte[size];
+    transport_.readAll(buff, 0, size);
+    readBuffer_.reset(buff);
+  }
+
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    writeBuffer_.write(buf, off, len);
+  }
+
+  @Override
+  public void flush() throws TTransportException {
+    byte[] buf = writeBuffer_.get();
+    int len = writeBuffer_.len();
+    writeBuffer_.reset();
+
+    encodeFrameSize(len, i32buf);
+    transport_.write(i32buf, 0, 4);
+    transport_.write(buf, 0, len);
+    transport_.flush();
+  }
+
+  public static final void encodeFrameSize(final int frameSize, final byte[] buf) {
+    buf[0] = (byte)(0xff & (frameSize >> 24));
+    buf[1] = (byte)(0xff & (frameSize >> 16));
+    buf[2] = (byte)(0xff & (frameSize >> 8));
+    buf[3] = (byte)(0xff & (frameSize));
+  }
+
+  public static final int decodeFrameSize(final byte[] buf) {
+    return 
+      ((buf[0] & 0xff) << 24) |
+      ((buf[1] & 0xff) << 16) |
+      ((buf[2] & 0xff) <<  8) |
+      ((buf[3] & 0xff));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/THttpClient.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/THttpClient.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/THttpClient.java
new file mode 100644
index 0000000..91a5b72
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/THttpClient.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+import java.net.URL;
+import java.net.HttpURLConnection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * HTTP implementation of the TTransport interface. Used for working with a
+ * Thrift web services implementation (using for example TServlet).
+ *
+ * This class offers two implementations of the HTTP transport.
+ * One uses HttpURLConnection instances, the other HttpClient from Apache
+ * Http Components.
+ * The chosen implementation depends on the constructor used to
+ * create the THttpClient instance.
+ * Using the THttpClient(String url) constructor or passing null as the
+ * HttpClient to THttpClient(String url, HttpClient client) will create an
+ * instance which will use HttpURLConnection.
+ *
+ * When using HttpClient, the following configuration leads to 5-15% 
+ * better performance than the HttpURLConnection implementation:
+ *
+ * http.protocol.version=HttpVersion.HTTP_1_1
+ * http.protocol.content-charset=UTF-8
+ * http.protocol.expect-continue=false
+ * http.connection.stalecheck=false
+ *
+ * Also note that under high load, the HttpURLConnection implementation
+ * may exhaust the open file descriptor limit.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/THRIFT-970">THRIFT-970</a>
+ */
+
+public class THttpClient extends TTransport {
+
+  private URL url_ = null;
+
+  private final ByteArrayOutputStream requestBuffer_ = new ByteArrayOutputStream();
+
+  private InputStream inputStream_ = null;
+
+  private int connectTimeout_ = 0;
+
+  private int readTimeout_ = 0;
+
+  private Map<String,String> customHeaders_ = null;
+
+  private final HttpHost host;
+  
+  private final HttpClient client;
+  
+  public static class Factory extends TTransportFactory {
+    
+    private final String url;
+    private final HttpClient client;
+    
+    public Factory(String url) {
+      this.url = url;
+      this.client = null;
+    }
+
+    public Factory(String url, HttpClient client) {
+      this.url = url;
+      this.client = client;
+    }
+    
+    @Override
+    public TTransport getTransport(TTransport trans) {
+      try {
+        if (null != client) {
+          return new THttpClient(url, client);
+        } else {
+          return new THttpClient(url);
+        }
+      } catch (TTransportException tte) {
+        return null;
+      }
+    }
+  }
+
+  public THttpClient(String url) throws TTransportException {
+    try {
+      url_ = new URL(url);
+      this.client = null;
+      this.host = null;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public THttpClient(String url, HttpClient client) throws TTransportException {
+    try {
+      url_ = new URL(url);
+      this.client = client;
+      this.host = new HttpHost(url_.getHost(), -1 == url_.getPort() ? url_.getDefaultPort() : url_.getPort(), url_.getProtocol());
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void setConnectTimeout(int timeout) {
+    connectTimeout_ = timeout;
+    if (null != this.client) {
+      // WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the
+      // same HttpClient is used for something else.
+      client.getParams().setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, connectTimeout_);
+    }
+  }
+
+  public void setReadTimeout(int timeout) {
+    readTimeout_ = timeout;
+    if (null != this.client) {
+      // WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the
+      // same HttpClient is used for something else.
+      client.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, readTimeout_);
+    }
+  }
+
+  public void setCustomHeaders(Map<String,String> headers) {
+    customHeaders_ = headers;
+  }
+
+  public void setCustomHeader(String key, String value) {
+    if (customHeaders_ == null) {
+      customHeaders_ = new HashMap<String, String>();
+    }
+    customHeaders_.put(key, value);
+  }
+
+  public void open() {}
+
+  public void close() {
+    if (null != inputStream_) {
+      try {
+        inputStream_.close();
+      } catch (IOException ioe) {
+        ;
+      }
+      inputStream_ = null;
+    }
+  }
+
+  public boolean isOpen() {
+    return true;
+  }
+
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if (inputStream_ == null) {
+      throw new TTransportException("Response buffer is empty, no request.");
+    }
+    try {
+      int ret = inputStream_.read(buf, off, len);
+      if (ret == -1) {
+        throw new TTransportException("No more data available.");
+      }
+      return ret;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void write(byte[] buf, int off, int len) {
+    requestBuffer_.write(buf, off, len);
+  }
+
+  private void flushUsingHttpClient() throws TTransportException {
+    
+    if (null == this.client) {
+      throw new TTransportException("Null HttpClient, aborting.");
+    }
+
+    // Extract request and reset buffer
+    byte[] data = requestBuffer_.toByteArray();
+    requestBuffer_.reset();
+
+    HttpPost post = null;
+    
+    InputStream is = null;
+    
+    try {      
+      // Set request to path + query string
+      post = new HttpPost(this.url_.getFile());
+      
+      //
+      // Headers are added to the HttpPost instance, not
+      // to HttpClient.
+      //
+      
+      post.setHeader("Content-Type", "application/x-thrift");
+      post.setHeader("Accept", "application/x-thrift");
+      post.setHeader("User-Agent", "Java/THttpClient/HC");
+      
+      if (null != customHeaders_) {
+        for (Map.Entry<String, String> header : customHeaders_.entrySet()) {
+          post.setHeader(header.getKey(), header.getValue());
+        }
+      }
+
+      post.setEntity(new ByteArrayEntity(data));
+      
+      HttpResponse response = this.client.execute(this.host, post);
+      int responseCode = response.getStatusLine().getStatusCode();
+
+      //      
+      // Retrieve the inputstream BEFORE checking the status code so
+      // resources get freed in the finally clause.
+      //
+
+      is = response.getEntity().getContent();
+      
+      if (responseCode != HttpStatus.SC_OK) {
+        throw new TTransportException("HTTP Response code: " + responseCode);
+      }
+
+      // Read the responses into a byte array so we can release the connection
+      // early. This implies that the whole content will have to be read in
+      // memory, and that momentarily we might use up twice the memory (while the
+      // thrift struct is being read up the chain).
+      // Proceeding differently might lead to exhaustion of connections and thus
+      // to app failure.
+      
+      byte[] buf = new byte[1024];
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      
+      int len = 0;
+      do {
+        len = is.read(buf);
+        if (len > 0) {
+          baos.write(buf, 0, len);
+        }
+      } while (-1 != len);
+      
+      try {
+        // Indicate we're done with the content.
+        EntityUtils.consume(response.getEntity());
+      } catch (IOException ioe) {
+        // We ignore this exception, it might only mean the server has no
+        // keep-alive capability.
+      }
+            
+      inputStream_ = new ByteArrayInputStream(baos.toByteArray());
+    } catch (IOException ioe) {
+      // Abort method so the connection gets released back to the connection manager
+      if (null != post) {
+        post.abort();
+      }
+      throw new TTransportException(ioe);
+    } finally {
+      if (null != is) {
+        // Close the entity's input stream, this will release the underlying connection
+        try {
+          is.close();
+        } catch (IOException ioe) {
+          throw new TTransportException(ioe);
+        }
+      }
+    }
+  }
+
+  public void flush() throws TTransportException {
+
+    if (null != this.client) {
+      flushUsingHttpClient();
+      return;
+    }
+
+    // Extract request and reset buffer
+    byte[] data = requestBuffer_.toByteArray();
+    requestBuffer_.reset();
+
+    try {
+      // Create connection object
+      HttpURLConnection connection = (HttpURLConnection)url_.openConnection();
+
+      // Timeouts, only if explicitly set
+      if (connectTimeout_ > 0) {
+        connection.setConnectTimeout(connectTimeout_);
+      }
+      if (readTimeout_ > 0) {
+        connection.setReadTimeout(readTimeout_);
+      }
+
+      // Make the request
+      connection.setRequestMethod("POST");
+      connection.setRequestProperty("Content-Type", "application/x-thrift");
+      connection.setRequestProperty("Accept", "application/x-thrift");
+      connection.setRequestProperty("User-Agent", "Java/THttpClient");
+      if (customHeaders_ != null) {
+        for (Map.Entry<String, String> header : customHeaders_.entrySet()) {
+          connection.setRequestProperty(header.getKey(), header.getValue());
+        }
+      }
+      connection.setDoOutput(true);
+      connection.connect();
+      connection.getOutputStream().write(data);
+
+      int responseCode = connection.getResponseCode();
+      if (responseCode != HttpURLConnection.HTTP_OK) {
+        throw new TTransportException("HTTP Response code: " + responseCode);
+      }
+
+      // Read the responses
+      inputStream_ = connection.getInputStream();
+
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TIOStreamTransport.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TIOStreamTransport.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TIOStreamTransport.java
new file mode 100644
index 0000000..b48bc91
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TIOStreamTransport.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This is the most commonly used base transport. It takes an InputStream
+ * and an OutputStream and uses those to perform all transport operations.
+ * This allows for compatibility with all the nice constructs Java already
+ * has to provide a variety of types of streams.
+ *
+ */
+public class TIOStreamTransport extends TTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName());
+
+  /** Underlying inputStream */
+  protected InputStream inputStream_ = null;
+
+  /** Underlying outputStream */
+  protected OutputStream outputStream_ = null;
+
+  /**
+   * Subclasses can invoke the default constructor and then assign the input
+   * streams in the open method.
+   */
+  protected TIOStreamTransport() {}
+
+  /**
+   * Input stream constructor.
+   *
+   * @param is Input stream to read from
+   */
+  public TIOStreamTransport(InputStream is) {
+    inputStream_ = is;
+  }
+
+  /**
+   * Output stream constructor.
+   *
+   * @param os Output stream to read from
+   */
+  public TIOStreamTransport(OutputStream os) {
+    outputStream_ = os;
+  }
+
+  /**
+   * Two-way stream constructor.
+   *
+   * @param is Input stream to read from
+   * @param os Output stream to read from
+   */
+  public TIOStreamTransport(InputStream is, OutputStream os) {
+    inputStream_ = is;
+    outputStream_ = os;
+  }
+
+  /**
+   * The streams must already be open at construction time, so this should
+   * always return true.
+   *
+   * @return true
+   */
+  public boolean isOpen() {
+    return true;
+  }
+
+  /**
+   * The streams must already be open. This method does nothing.
+   */
+  public void open() throws TTransportException {}
+
+  /**
+   * Closes both the input and output streams.
+   */
+  public void close() {
+    if (inputStream_ != null) {
+      try {
+        inputStream_.close();
+      } catch (IOException iox) {
+        LOGGER.warn("Error closing input stream.", iox);
+      }
+      inputStream_ = null;
+    }
+    if (outputStream_ != null) {
+      try {
+        outputStream_.close();
+      } catch (IOException iox) {
+        LOGGER.warn("Error closing output stream.", iox);
+      }
+      outputStream_ = null;
+    }
+  }
+
+  /**
+   * Reads from the underlying input stream if not null.
+   */
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if (inputStream_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream");
+    }
+    int bytesRead;
+    try {
+      bytesRead = inputStream_.read(buf, off, len);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+    if (bytesRead < 0) {
+      throw new TTransportException(TTransportException.END_OF_FILE);
+    }
+    return bytesRead;
+  }
+
+  /**
+   * Writes to the underlying output stream if not null.
+   */
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    if (outputStream_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
+    }
+    try {
+      outputStream_.write(buf, off, len);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+
+  /**
+   * Flushes the underlying output stream if not null.
+   */
+  public void flush() throws TTransportException {
+    if (outputStream_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");
+    }
+    try {
+      outputStream_.flush();
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryBuffer.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryBuffer.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryBuffer.java
new file mode 100644
index 0000000..ced983f
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryBuffer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TByteArrayOutputStream;
+
+import java.io.UnsupportedEncodingException;
+
+/**
+ * Memory buffer-based implementation of the TTransport interface.
+ */
+public class TMemoryBuffer extends TTransport {
+  /**
+   * Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The
+   * internal buffer will grow as necessary to accommodate the size of the data
+   * being written to it.
+   */
+  public TMemoryBuffer(int size) {
+    arr_ = new TByteArrayOutputStream(size);
+  }
+
+  @Override
+  public boolean isOpen() {
+    return true;
+  }
+
+  @Override
+  public void open() {
+    /* Do nothing */
+  }
+
+  @Override
+  public void close() {
+    /* Do nothing */
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) {
+    byte[] src = arr_.get();
+    int amtToRead = (len > arr_.len() - pos_ ? arr_.len() - pos_ : len);
+    if (amtToRead > 0) {
+      System.arraycopy(src, pos_, buf, off, amtToRead);
+      pos_ += amtToRead;
+    }
+    return amtToRead;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) {
+    arr_.write(buf, off, len);
+  }
+
+  /**
+   * Output the contents of the memory buffer as a String, using the supplied
+   * encoding
+   * @param enc  the encoding to use
+   * @return the contents of the memory buffer as a String
+   */
+  public String toString(String enc) throws UnsupportedEncodingException {
+    return arr_.toString(enc);
+  }
+
+  public String inspect() {
+    String buf = "";
+    byte[] bytes = arr_.toByteArray();
+    for (int i = 0; i < bytes.length; i++) {
+      buf += (pos_ == i ? "==>" : "" ) + Integer.toHexString(bytes[i] & 0xff) + " ";
+    }
+    return buf;
+  }
+
+  // The contents of the buffer
+  private TByteArrayOutputStream arr_;
+
+  // Position to read next byte from
+  private int pos_;
+
+  public int length() {
+    return arr_.size();
+  }
+
+  public byte[] getArray() {
+    return arr_.get();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryInputTransport.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryInputTransport.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryInputTransport.java
new file mode 100644
index 0000000..1f26505
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryInputTransport.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+public final class TMemoryInputTransport extends TTransport {
+
+  private byte[] buf_;
+  private int pos_;
+  private int endPos_;
+
+  public TMemoryInputTransport() {
+  }
+
+  public TMemoryInputTransport(byte[] buf) {
+    reset(buf);
+  }
+
+  public TMemoryInputTransport(byte[] buf, int offset, int length) {
+    reset(buf, offset, length);
+  }
+
+  public void reset(byte[] buf) {
+    reset(buf, 0, buf.length);
+  }
+
+  public void reset(byte[] buf, int offset, int length) {
+    buf_ = buf;
+    pos_ = offset;
+    endPos_ = offset + length;
+  }
+
+  public void clear() {
+    buf_ = null;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public boolean isOpen() {
+    return true;
+  }
+
+  @Override
+  public void open() throws TTransportException {}
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    int bytesRemaining = getBytesRemainingInBuffer();
+    int amtToRead = (len > bytesRemaining ? bytesRemaining : len);
+    if (amtToRead > 0) {
+      System.arraycopy(buf_, pos_, buf, off, amtToRead);
+      consumeBuffer(amtToRead);
+    }
+    return amtToRead;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    throw new UnsupportedOperationException("No writing allowed!");
+  }
+
+  @Override
+  public byte[] getBuffer() {
+    return buf_;
+  }
+
+  public int getBufferPosition() {
+    return pos_;
+  }
+
+  public int getBytesRemainingInBuffer() {
+    return endPos_ - pos_;
+  }
+
+  public void consumeBuffer(int len) {
+    pos_ += len;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java
new file mode 100644
index 0000000..560e5a3
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper around ServerSocketChannel
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingServerTransport.class.getName());
+
+  /**
+   * This channel is where all the nonblocking magic happens.
+   */
+  private ServerSocketChannel serverSocketChannel = null;
+
+  /**
+   * Underlying ServerSocket object
+   */
+  private ServerSocket serverSocket_ = null;
+
+  /**
+   * Timeout for client sockets from accept
+   */
+  private int clientTimeout_ = 0;
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port) throws TTransportException {
+    this(port, 0);
+  }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
+    this(new InetSocketAddress(port), clientTimeout);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+    this(bindAddr, 0);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+    clientTimeout_ = clientTimeout;
+    try {
+      serverSocketChannel = ServerSocketChannel.open();
+      serverSocketChannel.configureBlocking(false);
+
+      // Make server socket
+      serverSocket_ = serverSocketChannel.socket();
+      // Prevent 2MSL delay problem on server restarts
+      serverSocket_.setReuseAddress(true);
+      // Bind to listening port
+      serverSocket_.bind(bindAddr);
+    } catch (IOException ioe) {
+      serverSocket_ = null;
+      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+    }
+  }
+
+  public void listen() throws TTransportException {
+    // Make sure not to block on accept
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.setSoTimeout(0);
+      } catch (SocketException sx) {
+        sx.printStackTrace();
+      }
+    }
+  }
+
+  protected TNonblockingSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+    }
+    try {
+      SocketChannel socketChannel = serverSocketChannel.accept();
+      if (socketChannel == null) {
+        return null;
+      }
+
+      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+      tsocket.setTimeout(clientTimeout_);
+      return tsocket;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void registerSelector(Selector selector) {
+    try {
+      // Register the server socket channel, indicating an interest in
+      // accepting new connections
+      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+    } catch (ClosedChannelException e) {
+      // this shouldn't happen, ideally...
+      // TODO: decide what to do with this.
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        LOGGER.warn("WARNING: Could not close server socket: " + iox.getMessage());
+      }
+      serverSocket_ = null;
+    }
+  }
+
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerTransport.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerTransport.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerTransport.java
new file mode 100644
index 0000000..96cc7fe
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerTransport.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.nio.channels.Selector;
+
+/**
+ * Server transport that can be operated in a nonblocking fashion.
+ */
+public abstract class TNonblockingServerTransport extends TServerTransport {
+
+  public abstract void registerSelector(Selector selector);
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingSocket.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingSocket.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingSocket.java
new file mode 100644
index 0000000..fb51798
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingSocket.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transport for use with async client.
+ */
+public class TNonblockingSocket extends TNonblockingTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingSocket.class.getName());
+
+  /**
+   * Host and port if passed in, used for lazy non-blocking connect.
+   */
+  private final SocketAddress socketAddress_;
+
+  private final SocketChannel socketChannel_;
+
+  public TNonblockingSocket(String host, int port) throws IOException {
+    this(host, port, 0);
+  }
+
+  /**
+   * Create a new nonblocking socket transport that will be connected to host:port.
+   * @param host
+   * @param port
+   * @throws TTransportException
+   * @throws IOException
+   */
+  public TNonblockingSocket(String host, int port, int timeout) throws IOException {
+    this(SocketChannel.open(), timeout, new InetSocketAddress(host, port));
+  }
+
+  /**
+   * Constructor that takes an already created socket.
+   *
+   * @param socketChannel Already created SocketChannel object
+   * @throws IOException if there is an error setting up the streams
+   */
+  public TNonblockingSocket(SocketChannel socketChannel) throws IOException {
+    this(socketChannel, 0, null);
+    if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected");
+  }
+
+  private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress)
+      throws IOException {
+    socketChannel_ = socketChannel;
+    socketAddress_ = socketAddress;
+
+    // make it a nonblocking channel
+    socketChannel.configureBlocking(false);
+
+    // set options
+    Socket socket = socketChannel.socket();
+    socket.setSoLinger(false, 0);
+    socket.setTcpNoDelay(true);
+    setTimeout(timeout);
+  }
+
+  /**
+   * Register the new SocketChannel with our Selector, indicating
+   * we'd like to be notified when it's ready for I/O.
+   *
+   * @param selector
+   * @return the selection key for this socket.
+   */
+  public SelectionKey registerSelector(Selector selector, int interests) throws IOException {
+    return socketChannel_.register(selector, interests);
+  }
+
+  /**
+   * Sets the socket timeout, although this implementation never uses blocking operations so it is unused.
+   *
+   * @param timeout Milliseconds timeout
+   */
+  public void setTimeout(int timeout) {
+    try {
+      socketChannel_.socket().setSoTimeout(timeout);
+    } catch (SocketException sx) {
+      LOGGER.warn("Could not set socket timeout.", sx);
+    }
+  }
+
+  /**
+   * Returns a reference to the underlying SocketChannel.
+   */
+  public SocketChannel getSocketChannel() {
+    return socketChannel_;
+  }
+
+  /**
+   * Checks whether the socket is connected.
+   */
+  public boolean isOpen() {
+    // isConnected() does not return false after close(), but isOpen() does
+    return socketChannel_.isOpen() && socketChannel_.isConnected();
+  }
+
+  /**
+   * Do not call, the implementation provides its own lazy non-blocking connect.
+   */
+  public void open() throws TTransportException {
+    throw new RuntimeException("open() is not implemented for TNonblockingSocket");
+  }
+
+  /**
+   * Perform a nonblocking read into buffer.
+   */
+  public int read(ByteBuffer buffer) throws IOException {
+    return socketChannel_.read(buffer);
+  }
+
+
+  /**
+   * Reads from the underlying input stream if not null.
+   */
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if ((socketChannel_.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
+      throw new TTransportException(TTransportException.NOT_OPEN,
+        "Cannot read from write-only socket channel");
+    }
+    try {
+      return socketChannel_.read(ByteBuffer.wrap(buf, off, len));
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+
+  /**
+   * Perform a nonblocking write of the data in buffer;
+   */
+  public int write(ByteBuffer buffer) throws IOException {
+    return socketChannel_.write(buffer);
+  }
+
+  /**
+   * Writes to the underlying output stream if not null.
+   */
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    if ((socketChannel_.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
+      throw new TTransportException(TTransportException.NOT_OPEN,
+        "Cannot write to write-only socket channel");
+    }
+    try {
+      socketChannel_.write(ByteBuffer.wrap(buf, off, len));
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+
+  /**
+   * Noop.
+   */
+  public void flush() throws TTransportException {
+    // Not supported by SocketChannel.
+  }
+
+  /**
+   * Closes the socket.
+   */
+  public void close() {
+    try {
+      socketChannel_.close();
+    } catch (IOException iox) {
+      LOGGER.warn("Could not close socket.", iox);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public boolean startConnect() throws IOException {
+    return socketChannel_.connect(socketAddress_);
+  }
+
+  /** {@inheritDoc} */
+  public boolean finishConnect() throws IOException {
+    return socketChannel_.finishConnect();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingTransport.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingTransport.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingTransport.java
new file mode 100644
index 0000000..21df369
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingTransport.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+
+public abstract class TNonblockingTransport extends TTransport {
+
+  /**
+   * Non-blocking connection initialization.
+   * @see java.nio.channels.SocketChannel#connect(SocketAddress remote)
+   */
+  public abstract boolean startConnect() throws IOException;
+
+  /**
+   * Non-blocking connection completion.
+   * @see java.nio.channels.SocketChannel#finishConnect()
+   */
+  public abstract boolean finishConnect() throws IOException;
+
+  public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
+
+  public abstract int read(ByteBuffer buffer) throws IOException;
+
+  public abstract int write(ByteBuffer buffer) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSSLTransportFactory.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSSLTransportFactory.java
new file mode 100644
index 0000000..f2e9ba5
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSSLTransportFactory.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.FileInputStream;
+import java.net.InetAddress;
+import java.security.KeyStore;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+/**
+ *  A Factory for providing and setting up Client and Server SSL wrapped 
+ *  TSocket and TServerSocket
+ */
+public class TSSLTransportFactory {
+
+  /**
+   * Get a SSL wrapped TServerSocket bound to the specified port. In this
+   * configuration the default settings are used. Default settings are retrieved
+   * from System properties that are set.
+   * 
+   * Example system properties:
+   * -Djavax.net.ssl.trustStore=<truststore location>
+   * -Djavax.net.ssl.trustStorePassword=password
+   * -Djavax.net.ssl.keyStore=<keystore location>
+   * -Djavax.net.ssl.keyStorePassword=password
+   * 
+   * @param port
+   * @return A SSL wrapped TServerSocket
+   * @throws TTransportException
+   */
+  public static TServerSocket getServerSocket(int port) throws TTransportException {
+    return getServerSocket(port, 0); 
+  }
+
+  /**
+   * Get a default SSL wrapped TServerSocket bound to the specified port
+   * 
+   * @param port
+   * @param clientTimeout
+   * @return A SSL wrapped TServerSocket
+   * @throws TTransportException
+   */
+  public static TServerSocket getServerSocket(int port, int clientTimeout) throws TTransportException {
+    return getServerSocket(port, clientTimeout, false, null);
+  }
+
+  /**
+   * Get a default SSL wrapped TServerSocket bound to the specified port and interface
+   * 
+   * @param port
+   * @param clientTimeout
+   * @param ifAddress
+   * @return A SSL wrapped TServerSocket
+   * @throws TTransportException
+   */
+  public static TServerSocket getServerSocket(int port, int clientTimeout, boolean clientAuth, InetAddress ifAddress) throws TTransportException {
+    SSLServerSocketFactory factory = (SSLServerSocketFactory) SSLServerSocketFactory.getDefault();
+    return createServer(factory, port, clientTimeout, clientAuth, ifAddress, null); 
+  }
+
+  /**
+   * Get a configured SSL wrapped TServerSocket bound to the specified port and interface. 
+   * Here the TSSLTransportParameters are used to set the values for the algorithms, keystore, 
+   * truststore and other settings
+   * 
+   * @param port
+   * @param clientTimeout
+   * @param ifAddress
+   * @param params
+   * @return A SSL wrapped TServerSocket
+   * @throws TTransportException
+   */
+  public static TServerSocket getServerSocket(int port, int clientTimeout, InetAddress ifAddress, TSSLTransportParameters params) throws TTransportException {
+    if (params == null || !(params.isKeyStoreSet || params.isTrustStoreSet)) {
+      throw new TTransportException("Either one of the KeyStore or TrustStore must be set for SSLTransportParameters");
+    }
+
+    SSLContext ctx = createSSLContext(params);
+    return createServer(ctx.getServerSocketFactory(), port, clientTimeout, params.clientAuth, ifAddress, params);
+  }
+
+  private static TServerSocket createServer(SSLServerSocketFactory factory, int port, int timeout, boolean clientAuth,
+                                    InetAddress ifAddress, TSSLTransportParameters params) throws TTransportException {
+    try {
+      SSLServerSocket serverSocket = (SSLServerSocket) factory.createServerSocket(port, 100, ifAddress);
+      serverSocket.setSoTimeout(timeout);
+      serverSocket.setNeedClientAuth(clientAuth);
+      if (params != null && params.cipherSuites != null) {
+        serverSocket.setEnabledCipherSuites(params.cipherSuites);
+      }
+      return new TServerSocket(serverSocket, timeout);
+    } catch (Exception e) {
+      throw new TTransportException("Could not bind to port " + port, e);
+    }
+  }
+
+  /**
+   * Get a default SSL wrapped TSocket connected to the specified host and port. All
+   * the client methods return a bound connection. So there is no need to call open() on the 
+   * TTransport.
+   * 
+   * @param host
+   * @param port
+   * @param timeout
+   * @return A SSL wrapped TSocket
+   * @throws TTransportException
+   */
+  public static TSocket getClientSocket(String host, int port, int timeout) throws TTransportException {
+    SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory.getDefault();
+    return createClient(factory, host, port, timeout);
+  }
+
+  /**
+   * Get a default SSL wrapped TSocket connected to the specified host and port.
+   * 
+   * @param host
+   * @param port
+   * @return A SSL wrapped TSocket
+   * @throws TTransportException
+   */
+  public static TSocket getClientSocket(String host, int port) throws TTransportException {
+    return getClientSocket(host, port, 0);
+  }
+
+  /**
+   * Get a custom configured SSL wrapped TSocket. The SSL settings are obtained from the 
+   * passed in TSSLTransportParameters.
+   * 
+   * @param host
+   * @param port
+   * @param timeout
+   * @param params
+   * @return A SSL wrapped TSocket
+   * @throws TTransportException
+   */
+  public static TSocket getClientSocket(String host, int port, int timeout, TSSLTransportParameters params) throws TTransportException {
+    if (params == null || !(params.isKeyStoreSet || params.isTrustStoreSet)) {
+      throw new TTransportException("Either one of the KeyStore or TrustStore must be set for SSLTransportParameters");
+    }
+
+    SSLContext ctx = createSSLContext(params);
+    return createClient(ctx.getSocketFactory(), host, port, timeout);
+  }
+
+  private static SSLContext createSSLContext(TSSLTransportParameters params) throws TTransportException {
+    SSLContext ctx;
+    try {
+      ctx = SSLContext.getInstance(params.protocol);
+      TrustManagerFactory tmf = null;
+      KeyManagerFactory kmf = null;
+
+      if (params.isTrustStoreSet) {
+        tmf = TrustManagerFactory.getInstance(params.trustManagerType);
+        KeyStore ts = KeyStore.getInstance(params.trustStoreType);
+        ts.load(new FileInputStream(params.trustStore), params.trustPass.toCharArray());
+        tmf.init(ts);
+      }
+
+      if (params.isKeyStoreSet) {
+        kmf = KeyManagerFactory.getInstance(params.keyManagerType);
+        KeyStore ks = KeyStore.getInstance(params.keyStoreType);
+        ks.load(new FileInputStream(params.keyStore), params.keyPass.toCharArray());
+        kmf.init(ks, params.keyPass.toCharArray());
+      }
+
+      if (params.isKeyStoreSet && params.isTrustStoreSet) {
+        ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+      }
+      else if (params.isKeyStoreSet) {
+        ctx.init(kmf.getKeyManagers(), null, null);
+      }
+      else {
+        ctx.init(null, tmf.getTrustManagers(), null);
+      }
+
+    } catch (Exception e) {
+      throw new TTransportException("Error creating the transport", e);
+    }
+    return ctx;
+  }
+
+  private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
+    try {
+      SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
+      socket.setSoTimeout(timeout);
+      return new TSocket(socket);
+    } catch (Exception e) {
+      throw new TTransportException("Could not connect to " + host + " on port " + port, e);
+    }
+  }
+
+
+  /**
+   * A Class to hold all the SSL parameters
+   */
+  public static class TSSLTransportParameters {
+    protected String protocol = "TLS";
+    protected String keyStore;
+    protected String keyPass;
+    protected String keyManagerType = KeyManagerFactory.getDefaultAlgorithm();
+    protected String keyStoreType = "JKS";
+    protected String trustStore;
+    protected String trustPass;
+    protected String trustManagerType = TrustManagerFactory.getDefaultAlgorithm();
+    protected String trustStoreType = "JKS";
+    protected String[] cipherSuites;
+    protected boolean clientAuth = false;
+    protected boolean isKeyStoreSet = false;
+    protected boolean isTrustStoreSet = false;
+
+    public TSSLTransportParameters() {}
+
+    /**
+     * Create parameters specifying the protocol and cipher suites
+     * 
+     * @param protocol The specific protocol (TLS/SSL) can be specified with versions
+     * @param cipherSuites
+     */
+    public TSSLTransportParameters(String protocol, String[] cipherSuites) {
+      this(protocol, cipherSuites, false);
+    }
+
+    /**
+     * Create parameters specifying the protocol, cipher suites and if client authentication
+     * is required
+     * 
+     * @param protocol The specific protocol (TLS/SSL) can be specified with versions
+     * @param cipherSuites
+     * @param clientAuth
+     */
+    public TSSLTransportParameters(String protocol, String[] cipherSuites, boolean clientAuth) {
+      if (protocol != null) {
+        this.protocol = protocol;
+      }
+      this.cipherSuites = cipherSuites;
+      this.clientAuth = clientAuth;
+    }
+
+    /**
+     * Set the keystore, password, certificate type and the store type
+     * 
+     * @param keyStore Location of the Keystore on disk
+     * @param keyPass Keystore password
+     * @param keyManagerType The default is X509
+     * @param keyStoreType The default is JKS
+     */
+    public void setKeyStore(String keyStore, String keyPass, String keyManagerType, String keyStoreType) {
+      this.keyStore = keyStore;
+      this.keyPass = keyPass;
+      if (keyManagerType != null) {
+        this.keyManagerType = keyManagerType;
+      }
+      if (keyStoreType != null) {
+        this.keyStoreType = keyStoreType;
+      }
+      isKeyStoreSet = true;
+    }
+
+    /**
+     * Set the keystore and password
+     * 
+     * @param keyStore Location of the Keystore on disk
+     * @param keyPass Keystore password
+     */
+    public void setKeyStore(String keyStore, String keyPass) {
+      setKeyStore(keyStore, keyPass, null, null);
+    }
+
+    /**
+     * Set the truststore, password, certificate type and the store type
+     * 
+     * @param trustStore Location of the Truststore on disk
+     * @param trustPass Truststore password
+     * @param trustManagerType The default is X509
+     * @param trustStoreType The default is JKS
+     */
+    public void setTrustStore(String trustStore, String trustPass, String trustManagerType, String trustStoreType) {
+      this.trustStore = trustStore;
+      this.trustPass = trustPass;
+      if (trustManagerType != null) {
+        this.trustManagerType = trustManagerType;
+      }
+      if (trustStoreType != null) {
+        this.trustStoreType = trustStoreType;
+      }
+      isTrustStoreSet = true;
+    }
+
+    /**
+     * Set the truststore and password
+     * 
+     * @param trustStore Location of the Truststore on disk
+     * @param trustPass Truststore password
+     */
+    public void setTrustStore(String trustStore, String trustPass) {
+      setTrustStore(trustStore, trustPass, null, null);
+    }
+
+    /**
+     * Set if client authentication is required
+     * 
+     * @param clientAuth
+     */
+    public void requireClientAuth(boolean clientAuth) {
+      this.clientAuth = clientAuth;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9f63daff/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslClientTransport.java
----------------------------------------------------------------------
diff --git a/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslClientTransport.java b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslClientTransport.java
new file mode 100644
index 0000000..48d485e
--- /dev/null
+++ b/src/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslClientTransport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.util.Map;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wraps another Thrift <code>TTransport</code>, but performs SASL client
+ * negotiation on the call to <code>open()</code>. This class will wrap ensuing
+ * communication over it, if a SASL QOP is negotiated with the other party.
+ */
+public class TSaslClientTransport extends TSaslTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TSaslClientTransport.class);
+
+  /**
+   * The name of the mechanism this client supports.
+   */
+  private final String mechanism;
+
+  /**
+   * Uses the given <code>SaslClient</code>.
+   * 
+   * @param saslClient
+   *          The <code>SaslClient</code> to use for the subsequent SASL
+   *          negotiation.
+   * @param transport
+   *          Transport underlying this one.
+   */
+  public TSaslClientTransport(SaslClient saslClient, TTransport transport) {
+    super(saslClient, transport);
+    mechanism = saslClient.getMechanismName();
+  }
+
+  /**
+   * Creates a <code>SaslClient</code> using the given SASL-specific parameters.
+   * See the Java documentation for <code>Sasl.createSaslClient</code> for the
+   * details of the parameters.
+   * 
+   * @param transport
+   *          The underlying Thrift transport.
+   * @throws SaslException
+   */
+  public TSaslClientTransport(String mechanism, String authorizationId, String protocol,
+      String serverName, Map<String, String> props, CallbackHandler cbh, TTransport transport)
+      throws SaslException {
+    super(Sasl.createSaslClient(new String[] { mechanism }, authorizationId, protocol, serverName,
+        props, cbh), transport);
+    this.mechanism = mechanism;
+  }
+
+
+  @Override
+  protected SaslRole getRole() {
+    return SaslRole.CLIENT;
+  }
+
+  /**
+   * Performs the client side of the initial portion of the Thrift SASL
+   * protocol. Generates and sends the initial response to the server, including
+   * which mechanism this client wants to use.
+   */
+  @Override
+  protected void handleSaslStartMessage() throws TTransportException, SaslException {
+    SaslClient saslClient = getSaslClient();
+
+    byte[] initialResponse = new byte[0];
+    if (saslClient.hasInitialResponse())
+      initialResponse = saslClient.evaluateChallenge(initialResponse);
+
+    LOGGER.debug("Sending mechanism name {} and initial response of length {}", mechanism,
+        initialResponse.length);
+
+    byte[] mechanismBytes = mechanism.getBytes();
+    sendSaslMessage(NegotiationStatus.START,
+                    mechanismBytes);
+    // Send initial response
+    sendSaslMessage(saslClient.isComplete() ? NegotiationStatus.COMPLETE : NegotiationStatus.OK,
+                    initialResponse);
+    underlyingTransport.flush();
+  }
+}


Mime
View raw message