incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [20/51] [partial] Fixed BLUR-126.
Date Thu, 06 Jun 2013 18:57:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TIOStreamTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TIOStreamTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TIOStreamTransport.java
new file mode 100644
index 0000000..b48bc91
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TIOStreamTransport.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This is the most commonly used base transport. It takes an InputStream
+ * and an OutputStream and uses those to perform all transport operations.
+ * This allows for compatibility with all the nice constructs Java already
+ * has to provide a variety of types of streams.
+ *
+ */
+public class TIOStreamTransport extends TTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName());
+
+  /** Underlying inputStream */
+  protected InputStream inputStream_ = null;
+
+  /** Underlying outputStream */
+  protected OutputStream outputStream_ = null;
+
+  /**
+   * Subclasses can invoke the default constructor and then assign the input
+   * streams in the open method.
+   */
+  protected TIOStreamTransport() {}
+
+  /**
+   * Input stream constructor.
+   *
+   * @param is Input stream to read from
+   */
+  public TIOStreamTransport(InputStream is) {
+    inputStream_ = is;
+  }
+
+  /**
+   * Output stream constructor.
+   *
+   * @param os Output stream to read from
+   */
+  public TIOStreamTransport(OutputStream os) {
+    outputStream_ = os;
+  }
+
+  /**
+   * Two-way stream constructor.
+   *
+   * @param is Input stream to read from
+   * @param os Output stream to read from
+   */
+  public TIOStreamTransport(InputStream is, OutputStream os) {
+    inputStream_ = is;
+    outputStream_ = os;
+  }
+
+  /**
+   * The streams must already be open at construction time, so this should
+   * always return true.
+   *
+   * @return true
+   */
+  public boolean isOpen() {
+    return true;
+  }
+
+  /**
+   * The streams must already be open. This method does nothing.
+   */
+  public void open() throws TTransportException {}
+
+  /**
+   * Closes both the input and output streams.
+   */
+  public void close() {
+    if (inputStream_ != null) {
+      try {
+        inputStream_.close();
+      } catch (IOException iox) {
+        LOGGER.warn("Error closing input stream.", iox);
+      }
+      inputStream_ = null;
+    }
+    if (outputStream_ != null) {
+      try {
+        outputStream_.close();
+      } catch (IOException iox) {
+        LOGGER.warn("Error closing output stream.", iox);
+      }
+      outputStream_ = null;
+    }
+  }
+
+  /**
+   * Reads from the underlying input stream if not null.
+   */
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if (inputStream_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream");
+    }
+    int bytesRead;
+    try {
+      bytesRead = inputStream_.read(buf, off, len);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+    if (bytesRead < 0) {
+      throw new TTransportException(TTransportException.END_OF_FILE);
+    }
+    return bytesRead;
+  }
+
+  /**
+   * Writes to the underlying output stream if not null.
+   */
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    if (outputStream_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
+    }
+    try {
+      outputStream_.write(buf, off, len);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+
+  /**
+   * Flushes the underlying output stream if not null.
+   */
+  public void flush() throws TTransportException {
+    if (outputStream_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");
+    }
+    try {
+      outputStream_.flush();
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryBuffer.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryBuffer.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryBuffer.java
new file mode 100644
index 0000000..ced983f
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryBuffer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TByteArrayOutputStream;
+
+import java.io.UnsupportedEncodingException;
+
+/**
+ * Memory buffer-based implementation of the TTransport interface.
+ */
+public class TMemoryBuffer extends TTransport {
+  /**
+   * Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The
+   * internal buffer will grow as necessary to accommodate the size of the data
+   * being written to it.
+   */
+  public TMemoryBuffer(int size) {
+    arr_ = new TByteArrayOutputStream(size);
+  }
+
+  @Override
+  public boolean isOpen() {
+    return true;
+  }
+
+  @Override
+  public void open() {
+    /* Do nothing */
+  }
+
+  @Override
+  public void close() {
+    /* Do nothing */
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) {
+    byte[] src = arr_.get();
+    int amtToRead = (len > arr_.len() - pos_ ? arr_.len() - pos_ : len);
+    if (amtToRead > 0) {
+      System.arraycopy(src, pos_, buf, off, amtToRead);
+      pos_ += amtToRead;
+    }
+    return amtToRead;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) {
+    arr_.write(buf, off, len);
+  }
+
+  /**
+   * Output the contents of the memory buffer as a String, using the supplied
+   * encoding
+   * @param enc  the encoding to use
+   * @return the contents of the memory buffer as a String
+   */
+  public String toString(String enc) throws UnsupportedEncodingException {
+    return arr_.toString(enc);
+  }
+
+  public String inspect() {
+    String buf = "";
+    byte[] bytes = arr_.toByteArray();
+    for (int i = 0; i < bytes.length; i++) {
+      buf += (pos_ == i ? "==>" : "" ) + Integer.toHexString(bytes[i] & 0xff) + " ";
+    }
+    return buf;
+  }
+
+  // The contents of the buffer
+  private TByteArrayOutputStream arr_;
+
+  // Position to read next byte from
+  private int pos_;
+
+  public int length() {
+    return arr_.size();
+  }
+
+  public byte[] getArray() {
+    return arr_.get();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryInputTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryInputTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryInputTransport.java
new file mode 100644
index 0000000..1f26505
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TMemoryInputTransport.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+public final class TMemoryInputTransport extends TTransport {
+
+  private byte[] buf_;
+  private int pos_;
+  private int endPos_;
+
+  public TMemoryInputTransport() {
+  }
+
+  public TMemoryInputTransport(byte[] buf) {
+    reset(buf);
+  }
+
+  public TMemoryInputTransport(byte[] buf, int offset, int length) {
+    reset(buf, offset, length);
+  }
+
+  public void reset(byte[] buf) {
+    reset(buf, 0, buf.length);
+  }
+
+  public void reset(byte[] buf, int offset, int length) {
+    buf_ = buf;
+    pos_ = offset;
+    endPos_ = offset + length;
+  }
+
+  public void clear() {
+    buf_ = null;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public boolean isOpen() {
+    return true;
+  }
+
+  @Override
+  public void open() throws TTransportException {}
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    int bytesRemaining = getBytesRemainingInBuffer();
+    int amtToRead = (len > bytesRemaining ? bytesRemaining : len);
+    if (amtToRead > 0) {
+      System.arraycopy(buf_, pos_, buf, off, amtToRead);
+      consumeBuffer(amtToRead);
+    }
+    return amtToRead;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    throw new UnsupportedOperationException("No writing allowed!");
+  }
+
+  @Override
+  public byte[] getBuffer() {
+    return buf_;
+  }
+
+  public int getBufferPosition() {
+    return pos_;
+  }
+
+  public int getBytesRemainingInBuffer() {
+    return endPos_ - pos_;
+  }
+
+  public void consumeBuffer(int len) {
+    pos_ += len;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java
new file mode 100644
index 0000000..560e5a3
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerSocket.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper around ServerSocketChannel
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingServerTransport.class.getName());
+
+  /**
+   * This channel is where all the nonblocking magic happens.
+   */
+  private ServerSocketChannel serverSocketChannel = null;
+
+  /**
+   * Underlying ServerSocket object
+   */
+  private ServerSocket serverSocket_ = null;
+
+  /**
+   * Timeout for client sockets from accept
+   */
+  private int clientTimeout_ = 0;
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port) throws TTransportException {
+    this(port, 0);
+  }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
+    this(new InetSocketAddress(port), clientTimeout);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+    this(bindAddr, 0);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+    clientTimeout_ = clientTimeout;
+    try {
+      serverSocketChannel = ServerSocketChannel.open();
+      serverSocketChannel.configureBlocking(false);
+
+      // Make server socket
+      serverSocket_ = serverSocketChannel.socket();
+      // Prevent 2MSL delay problem on server restarts
+      serverSocket_.setReuseAddress(true);
+      // Bind to listening port
+      serverSocket_.bind(bindAddr);
+    } catch (IOException ioe) {
+      serverSocket_ = null;
+      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+    }
+  }
+
+  public void listen() throws TTransportException {
+    // Make sure not to block on accept
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.setSoTimeout(0);
+      } catch (SocketException sx) {
+        sx.printStackTrace();
+      }
+    }
+  }
+
+  protected TNonblockingSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+    }
+    try {
+      SocketChannel socketChannel = serverSocketChannel.accept();
+      if (socketChannel == null) {
+        return null;
+      }
+
+      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+      tsocket.setTimeout(clientTimeout_);
+      return tsocket;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void registerSelector(Selector selector) {
+    try {
+      // Register the server socket channel, indicating an interest in
+      // accepting new connections
+      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+    } catch (ClosedChannelException e) {
+      // this shouldn't happen, ideally...
+      // TODO: decide what to do with this.
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        LOGGER.warn("WARNING: Could not close server socket: " + iox.getMessage());
+      }
+      serverSocket_ = null;
+    }
+  }
+
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerTransport.java
new file mode 100644
index 0000000..96cc7fe
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingServerTransport.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.nio.channels.Selector;
+
+/**
+ * Server transport that can be operated in a nonblocking fashion.
+ */
+public abstract class TNonblockingServerTransport extends TServerTransport {
+
+  public abstract void registerSelector(Selector selector);
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingSocket.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingSocket.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingSocket.java
new file mode 100644
index 0000000..fb51798
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingSocket.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transport for use with async client.
+ */
+public class TNonblockingSocket extends TNonblockingTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingSocket.class.getName());
+
+  /**
+   * Host and port if passed in, used for lazy non-blocking connect.
+   */
+  private final SocketAddress socketAddress_;
+
+  private final SocketChannel socketChannel_;
+
+  public TNonblockingSocket(String host, int port) throws IOException {
+    this(host, port, 0);
+  }
+
+  /**
+   * Create a new nonblocking socket transport that will be connected to host:port.
+   * @param host
+   * @param port
+   * @throws TTransportException
+   * @throws IOException
+   */
+  public TNonblockingSocket(String host, int port, int timeout) throws IOException {
+    this(SocketChannel.open(), timeout, new InetSocketAddress(host, port));
+  }
+
+  /**
+   * Constructor that takes an already created socket.
+   *
+   * @param socketChannel Already created SocketChannel object
+   * @throws IOException if there is an error setting up the streams
+   */
+  public TNonblockingSocket(SocketChannel socketChannel) throws IOException {
+    this(socketChannel, 0, null);
+    if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected");
+  }
+
+  private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress)
+      throws IOException {
+    socketChannel_ = socketChannel;
+    socketAddress_ = socketAddress;
+
+    // make it a nonblocking channel
+    socketChannel.configureBlocking(false);
+
+    // set options
+    Socket socket = socketChannel.socket();
+    socket.setSoLinger(false, 0);
+    socket.setTcpNoDelay(true);
+    setTimeout(timeout);
+  }
+
+  /**
+   * Register the new SocketChannel with our Selector, indicating
+   * we'd like to be notified when it's ready for I/O.
+   *
+   * @param selector
+   * @return the selection key for this socket.
+   */
+  public SelectionKey registerSelector(Selector selector, int interests) throws IOException {
+    return socketChannel_.register(selector, interests);
+  }
+
+  /**
+   * Sets the socket timeout, although this implementation never uses blocking operations so it is unused.
+   *
+   * @param timeout Milliseconds timeout
+   */
+  public void setTimeout(int timeout) {
+    try {
+      socketChannel_.socket().setSoTimeout(timeout);
+    } catch (SocketException sx) {
+      LOGGER.warn("Could not set socket timeout.", sx);
+    }
+  }
+
+  /**
+   * Returns a reference to the underlying SocketChannel.
+   */
+  public SocketChannel getSocketChannel() {
+    return socketChannel_;
+  }
+
+  /**
+   * Checks whether the socket is connected.
+   */
+  public boolean isOpen() {
+    // isConnected() does not return false after close(), but isOpen() does
+    return socketChannel_.isOpen() && socketChannel_.isConnected();
+  }
+
+  /**
+   * Do not call, the implementation provides its own lazy non-blocking connect.
+   */
+  public void open() throws TTransportException {
+    throw new RuntimeException("open() is not implemented for TNonblockingSocket");
+  }
+
+  /**
+   * Perform a nonblocking read into buffer.
+   */
+  public int read(ByteBuffer buffer) throws IOException {
+    return socketChannel_.read(buffer);
+  }
+
+
+  /**
+   * Reads from the underlying input stream if not null.
+   */
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if ((socketChannel_.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
+      throw new TTransportException(TTransportException.NOT_OPEN,
+        "Cannot read from write-only socket channel");
+    }
+    try {
+      return socketChannel_.read(ByteBuffer.wrap(buf, off, len));
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+
+  /**
+   * Perform a nonblocking write of the data in buffer;
+   */
+  public int write(ByteBuffer buffer) throws IOException {
+    return socketChannel_.write(buffer);
+  }
+
+  /**
+   * Writes to the underlying output stream if not null.
+   */
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    if ((socketChannel_.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
+      throw new TTransportException(TTransportException.NOT_OPEN,
+        "Cannot write to write-only socket channel");
+    }
+    try {
+      socketChannel_.write(ByteBuffer.wrap(buf, off, len));
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+
+  /**
+   * Noop.
+   */
+  public void flush() throws TTransportException {
+    // Not supported by SocketChannel.
+  }
+
+  /**
+   * Closes the socket.
+   */
+  public void close() {
+    try {
+      socketChannel_.close();
+    } catch (IOException iox) {
+      LOGGER.warn("Could not close socket.", iox);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public boolean startConnect() throws IOException {
+    return socketChannel_.connect(socketAddress_);
+  }
+
+  /** {@inheritDoc} */
+  public boolean finishConnect() throws IOException {
+    return socketChannel_.finishConnect();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingTransport.java
new file mode 100644
index 0000000..21df369
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TNonblockingTransport.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+
+public abstract class TNonblockingTransport extends TTransport {
+
+  /**
+   * Non-blocking connection initialization.
+   * @see java.nio.channels.SocketChannel#connect(SocketAddress remote)
+   */
+  public abstract boolean startConnect() throws IOException;
+
+  /**
+   * Non-blocking connection completion.
+   * @see java.nio.channels.SocketChannel#finishConnect()
+   */
+  public abstract boolean finishConnect() throws IOException;
+
+  public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
+
+  public abstract int read(ByteBuffer buffer) throws IOException;
+
+  public abstract int write(ByteBuffer buffer) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSSLTransportFactory.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSSLTransportFactory.java
new file mode 100644
index 0000000..f2e9ba5
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSSLTransportFactory.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.FileInputStream;
+import java.net.InetAddress;
+import java.security.KeyStore;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+/**
+ *  A Factory for providing and setting up Client and Server SSL wrapped 
+ *  TSocket and TServerSocket
+ */
+public class TSSLTransportFactory {
+
+  /**
+   * Get a SSL wrapped TServerSocket bound to the specified port. In this
+   * configuration the default settings are used. Default settings are retrieved
+   * from System properties that are set.
+   * 
+   * Example system properties:
+   * -Djavax.net.ssl.trustStore=<truststore location>
+   * -Djavax.net.ssl.trustStorePassword=password
+   * -Djavax.net.ssl.keyStore=<keystore location>
+   * -Djavax.net.ssl.keyStorePassword=password
+   * 
+   * @param port
+   * @return A SSL wrapped TServerSocket
+   * @throws TTransportException
+   */
+  public static TServerSocket getServerSocket(int port) throws TTransportException {
+    return getServerSocket(port, 0); 
+  }
+
+  /**
+   * Get a default SSL wrapped TServerSocket bound to the specified port
+   * 
+   * @param port
+   * @param clientTimeout
+   * @return A SSL wrapped TServerSocket
+   * @throws TTransportException
+   */
+  public static TServerSocket getServerSocket(int port, int clientTimeout) throws TTransportException {
+    return getServerSocket(port, clientTimeout, false, null);
+  }
+
+  /**
+   * Get a default SSL wrapped TServerSocket bound to the specified port and interface
+   * 
+   * @param port
+   * @param clientTimeout
+   * @param ifAddress
+   * @return A SSL wrapped TServerSocket
+   * @throws TTransportException
+   */
+  public static TServerSocket getServerSocket(int port, int clientTimeout, boolean clientAuth, InetAddress ifAddress) throws TTransportException {
+    SSLServerSocketFactory factory = (SSLServerSocketFactory) SSLServerSocketFactory.getDefault();
+    return createServer(factory, port, clientTimeout, clientAuth, ifAddress, null); 
+  }
+
+  /**
+   * Get a configured SSL wrapped TServerSocket bound to the specified port and interface. 
+   * Here the TSSLTransportParameters are used to set the values for the algorithms, keystore, 
+   * truststore and other settings
+   * 
+   * @param port
+   * @param clientTimeout
+   * @param ifAddress
+   * @param params
+   * @return A SSL wrapped TServerSocket
+   * @throws TTransportException
+   */
+  public static TServerSocket getServerSocket(int port, int clientTimeout, InetAddress ifAddress, TSSLTransportParameters params) throws TTransportException {
+    if (params == null || !(params.isKeyStoreSet || params.isTrustStoreSet)) {
+      throw new TTransportException("Either one of the KeyStore or TrustStore must be set for SSLTransportParameters");
+    }
+
+    SSLContext ctx = createSSLContext(params);
+    return createServer(ctx.getServerSocketFactory(), port, clientTimeout, params.clientAuth, ifAddress, params);
+  }
+
+  private static TServerSocket createServer(SSLServerSocketFactory factory, int port, int timeout, boolean clientAuth,
+                                    InetAddress ifAddress, TSSLTransportParameters params) throws TTransportException {
+    try {
+      SSLServerSocket serverSocket = (SSLServerSocket) factory.createServerSocket(port, 100, ifAddress);
+      serverSocket.setSoTimeout(timeout);
+      serverSocket.setNeedClientAuth(clientAuth);
+      if (params != null && params.cipherSuites != null) {
+        serverSocket.setEnabledCipherSuites(params.cipherSuites);
+      }
+      return new TServerSocket(serverSocket, timeout);
+    } catch (Exception e) {
+      throw new TTransportException("Could not bind to port " + port, e);
+    }
+  }
+
+  /**
+   * Get a default SSL wrapped TSocket connected to the specified host and port. All
+   * the client methods return a bound connection. So there is no need to call open() on the 
+   * TTransport.
+   * 
+   * @param host
+   * @param port
+   * @param timeout
+   * @return A SSL wrapped TSocket
+   * @throws TTransportException
+   */
+  public static TSocket getClientSocket(String host, int port, int timeout) throws TTransportException {
+    SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory.getDefault();
+    return createClient(factory, host, port, timeout);
+  }
+
+  /**
+   * Get a default SSL wrapped TSocket connected to the specified host and port.
+   * 
+   * @param host
+   * @param port
+   * @return A SSL wrapped TSocket
+   * @throws TTransportException
+   */
+  public static TSocket getClientSocket(String host, int port) throws TTransportException {
+    return getClientSocket(host, port, 0);
+  }
+
+  /**
+   * Get a custom configured SSL wrapped TSocket. The SSL settings are obtained from the 
+   * passed in TSSLTransportParameters.
+   * 
+   * @param host
+   * @param port
+   * @param timeout
+   * @param params
+   * @return A SSL wrapped TSocket
+   * @throws TTransportException
+   */
+  public static TSocket getClientSocket(String host, int port, int timeout, TSSLTransportParameters params) throws TTransportException {
+    if (params == null || !(params.isKeyStoreSet || params.isTrustStoreSet)) {
+      throw new TTransportException("Either one of the KeyStore or TrustStore must be set for SSLTransportParameters");
+    }
+
+    SSLContext ctx = createSSLContext(params);
+    return createClient(ctx.getSocketFactory(), host, port, timeout);
+  }
+
+  private static SSLContext createSSLContext(TSSLTransportParameters params) throws TTransportException {
+    SSLContext ctx;
+    try {
+      ctx = SSLContext.getInstance(params.protocol);
+      TrustManagerFactory tmf = null;
+      KeyManagerFactory kmf = null;
+
+      if (params.isTrustStoreSet) {
+        tmf = TrustManagerFactory.getInstance(params.trustManagerType);
+        KeyStore ts = KeyStore.getInstance(params.trustStoreType);
+        ts.load(new FileInputStream(params.trustStore), params.trustPass.toCharArray());
+        tmf.init(ts);
+      }
+
+      if (params.isKeyStoreSet) {
+        kmf = KeyManagerFactory.getInstance(params.keyManagerType);
+        KeyStore ks = KeyStore.getInstance(params.keyStoreType);
+        ks.load(new FileInputStream(params.keyStore), params.keyPass.toCharArray());
+        kmf.init(ks, params.keyPass.toCharArray());
+      }
+
+      if (params.isKeyStoreSet && params.isTrustStoreSet) {
+        ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+      }
+      else if (params.isKeyStoreSet) {
+        ctx.init(kmf.getKeyManagers(), null, null);
+      }
+      else {
+        ctx.init(null, tmf.getTrustManagers(), null);
+      }
+
+    } catch (Exception e) {
+      throw new TTransportException("Error creating the transport", e);
+    }
+    return ctx;
+  }
+
+  private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
+    try {
+      SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
+      socket.setSoTimeout(timeout);
+      return new TSocket(socket);
+    } catch (Exception e) {
+      throw new TTransportException("Could not connect to " + host + " on port " + port, e);
+    }
+  }
+
+
+  /**
+   * A Class to hold all the SSL parameters
+   */
+  public static class TSSLTransportParameters {
+    protected String protocol = "TLS";
+    protected String keyStore;
+    protected String keyPass;
+    protected String keyManagerType = KeyManagerFactory.getDefaultAlgorithm();
+    protected String keyStoreType = "JKS";
+    protected String trustStore;
+    protected String trustPass;
+    protected String trustManagerType = TrustManagerFactory.getDefaultAlgorithm();
+    protected String trustStoreType = "JKS";
+    protected String[] cipherSuites;
+    protected boolean clientAuth = false;
+    protected boolean isKeyStoreSet = false;
+    protected boolean isTrustStoreSet = false;
+
+    public TSSLTransportParameters() {}
+
+    /**
+     * Create parameters specifying the protocol and cipher suites
+     * 
+     * @param protocol The specific protocol (TLS/SSL) can be specified with versions
+     * @param cipherSuites
+     */
+    public TSSLTransportParameters(String protocol, String[] cipherSuites) {
+      this(protocol, cipherSuites, false);
+    }
+
+    /**
+     * Create parameters specifying the protocol, cipher suites and if client authentication
+     * is required
+     * 
+     * @param protocol The specific protocol (TLS/SSL) can be specified with versions
+     * @param cipherSuites
+     * @param clientAuth
+     */
+    public TSSLTransportParameters(String protocol, String[] cipherSuites, boolean clientAuth) {
+      if (protocol != null) {
+        this.protocol = protocol;
+      }
+      this.cipherSuites = cipherSuites;
+      this.clientAuth = clientAuth;
+    }
+
+    /**
+     * Set the keystore, password, certificate type and the store type
+     * 
+     * @param keyStore Location of the Keystore on disk
+     * @param keyPass Keystore password
+     * @param keyManagerType The default is X509
+     * @param keyStoreType The default is JKS
+     */
+    public void setKeyStore(String keyStore, String keyPass, String keyManagerType, String keyStoreType) {
+      this.keyStore = keyStore;
+      this.keyPass = keyPass;
+      if (keyManagerType != null) {
+        this.keyManagerType = keyManagerType;
+      }
+      if (keyStoreType != null) {
+        this.keyStoreType = keyStoreType;
+      }
+      isKeyStoreSet = true;
+    }
+
+    /**
+     * Set the keystore and password
+     * 
+     * @param keyStore Location of the Keystore on disk
+     * @param keyPass Keystore password
+     */
+    public void setKeyStore(String keyStore, String keyPass) {
+      setKeyStore(keyStore, keyPass, null, null);
+    }
+
+    /**
+     * Set the truststore, password, certificate type and the store type
+     * 
+     * @param trustStore Location of the Truststore on disk
+     * @param trustPass Truststore password
+     * @param trustManagerType The default is X509
+     * @param trustStoreType The default is JKS
+     */
+    public void setTrustStore(String trustStore, String trustPass, String trustManagerType, String trustStoreType) {
+      this.trustStore = trustStore;
+      this.trustPass = trustPass;
+      if (trustManagerType != null) {
+        this.trustManagerType = trustManagerType;
+      }
+      if (trustStoreType != null) {
+        this.trustStoreType = trustStoreType;
+      }
+      isTrustStoreSet = true;
+    }
+
+    /**
+     * Set the truststore and password
+     * 
+     * @param trustStore Location of the Truststore on disk
+     * @param trustPass Truststore password
+     */
+    public void setTrustStore(String trustStore, String trustPass) {
+      setTrustStore(trustStore, trustPass, null, null);
+    }
+
+    /**
+     * Set if client authentication is required
+     * 
+     * @param clientAuth
+     */
+    public void requireClientAuth(boolean clientAuth) {
+      this.clientAuth = clientAuth;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslClientTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslClientTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslClientTransport.java
new file mode 100644
index 0000000..48d485e
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslClientTransport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.util.Map;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wraps another Thrift <code>TTransport</code>, but performs SASL client
+ * negotiation on the call to <code>open()</code>. This class will wrap ensuing
+ * communication over it, if a SASL QOP is negotiated with the other party.
+ */
+public class TSaslClientTransport extends TSaslTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TSaslClientTransport.class);
+
+  /**
+   * The name of the mechanism this client supports.
+   */
+  private final String mechanism;
+
+  /**
+   * Uses the given <code>SaslClient</code>.
+   * 
+   * @param saslClient
+   *          The <code>SaslClient</code> to use for the subsequent SASL
+   *          negotiation.
+   * @param transport
+   *          Transport underlying this one.
+   */
+  public TSaslClientTransport(SaslClient saslClient, TTransport transport) {
+    super(saslClient, transport);
+    mechanism = saslClient.getMechanismName();
+  }
+
+  /**
+   * Creates a <code>SaslClient</code> using the given SASL-specific parameters.
+   * See the Java documentation for <code>Sasl.createSaslClient</code> for the
+   * details of the parameters.
+   * 
+   * @param transport
+   *          The underlying Thrift transport.
+   * @throws SaslException
+   */
+  public TSaslClientTransport(String mechanism, String authorizationId, String protocol,
+      String serverName, Map<String, String> props, CallbackHandler cbh, TTransport transport)
+      throws SaslException {
+    super(Sasl.createSaslClient(new String[] { mechanism }, authorizationId, protocol, serverName,
+        props, cbh), transport);
+    this.mechanism = mechanism;
+  }
+
+
+  @Override
+  protected SaslRole getRole() {
+    return SaslRole.CLIENT;
+  }
+
+  /**
+   * Performs the client side of the initial portion of the Thrift SASL
+   * protocol. Generates and sends the initial response to the server, including
+   * which mechanism this client wants to use.
+   */
+  @Override
+  protected void handleSaslStartMessage() throws TTransportException, SaslException {
+    SaslClient saslClient = getSaslClient();
+
+    byte[] initialResponse = new byte[0];
+    if (saslClient.hasInitialResponse())
+      initialResponse = saslClient.evaluateChallenge(initialResponse);
+
+    LOGGER.debug("Sending mechanism name {} and initial response of length {}", mechanism,
+        initialResponse.length);
+
+    byte[] mechanismBytes = mechanism.getBytes();
+    sendSaslMessage(NegotiationStatus.START,
+                    mechanismBytes);
+    // Send initial response
+    sendSaslMessage(saslClient.isComplete() ? NegotiationStatus.COMPLETE : NegotiationStatus.OK,
+                    initialResponse);
+    underlyingTransport.flush();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslServerTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslServerTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslServerTransport.java
new file mode 100644
index 0000000..9d5c955
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslServerTransport.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.lang.ref.WeakReference;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wraps another Thrift <code>TTransport</code>, but performs SASL server
+ * negotiation on the call to <code>open()</code>. This class will wrap ensuing
+ * communication over it, if a SASL QOP is negotiated with the other party.
+ */
+public class TSaslServerTransport extends TSaslTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TSaslServerTransport.class);
+
+  /**
+   * Mapping from SASL mechanism name -> all the parameters required to
+   * instantiate a SASL server.
+   */
+  private Map<String, TSaslServerDefinition> serverDefinitionMap = new HashMap<String, TSaslServerDefinition>();
+
+  /**
+   * Contains all the parameters used to define a SASL server implementation.
+   */
+  private static class TSaslServerDefinition {
+    public String mechanism;
+    public String protocol;
+    public String serverName;
+    public Map<String, String> props;
+    public CallbackHandler cbh;
+
+    public TSaslServerDefinition(String mechanism, String protocol, String serverName,
+        Map<String, String> props, CallbackHandler cbh) {
+      this.mechanism = mechanism;
+      this.protocol = protocol;
+      this.serverName = serverName;
+      this.props = props;
+      this.cbh = cbh;
+    }
+  }
+
+  /**
+   * Uses the given underlying transport. Assumes that addServerDefinition is
+   * called later.
+   * 
+   * @param transport
+   *          Transport underlying this one.
+   */
+  public TSaslServerTransport(TTransport transport) {
+    super(transport);
+  }
+
+  /**
+   * Creates a <code>SaslServer</code> using the given SASL-specific parameters.
+   * See the Java documentation for <code>Sasl.createSaslServer</code> for the
+   * details of the parameters.
+   * 
+   * @param transport
+   *          The underlying Thrift transport.
+   */
+  public TSaslServerTransport(String mechanism, String protocol, String serverName,
+      Map<String, String> props, CallbackHandler cbh, TTransport transport) {
+    super(transport);
+    addServerDefinition(mechanism, protocol, serverName, props, cbh);
+  }
+
+  private TSaslServerTransport(Map<String, TSaslServerDefinition> serverDefinitionMap, TTransport transport) {
+    super(transport);
+    this.serverDefinitionMap.putAll(serverDefinitionMap);
+  }
+
+  /**
+   * Add a supported server definition to this transport. See the Java
+   * documentation for <code>Sasl.createSaslServer</code> for the details of the
+   * parameters.
+   */
+  public void addServerDefinition(String mechanism, String protocol, String serverName,
+      Map<String, String> props, CallbackHandler cbh) {
+    serverDefinitionMap.put(mechanism, new TSaslServerDefinition(mechanism, protocol, serverName,
+        props, cbh));
+  }
+
+  @Override
+  protected SaslRole getRole() {
+    return SaslRole.SERVER;
+  }
+
+  /**
+   * Performs the server side of the initial portion of the Thrift SASL protocol.
+   * Receives the initial response from the client, creates a SASL server using
+   * the mechanism requested by the client (if this server supports it), and
+   * sends the first challenge back to the client.
+   */
+  @Override
+  protected void handleSaslStartMessage() throws TTransportException, SaslException {
+    SaslResponse message = receiveSaslMessage();
+
+    LOGGER.debug("Received start message with status {}", message.status);
+    if (message.status != NegotiationStatus.START) {
+      sendAndThrowMessage(NegotiationStatus.ERROR, "Expecting START status, received " + message.status);
+    }
+
+    // Get the mechanism name.
+    String mechanismName = new String(message.payload);
+    TSaslServerDefinition serverDefinition = serverDefinitionMap.get(mechanismName);
+    LOGGER.debug("Received mechanism name '{}'", mechanismName);
+
+    if (serverDefinition == null) {
+      sendAndThrowMessage(NegotiationStatus.BAD, "Unsupported mechanism type " + mechanismName);
+    }
+    SaslServer saslServer = Sasl.createSaslServer(serverDefinition.mechanism,
+        serverDefinition.protocol, serverDefinition.serverName, serverDefinition.props,
+        serverDefinition.cbh);
+    setSaslServer(saslServer);
+  }
+
+  /**
+   * <code>TTransportFactory</code> to create
+   * <code>TSaslServerTransports<c/ode>. Ensures that a given
+   * underlying <code>TTransport</code> instance receives the same
+   * <code>TSaslServerTransport</code>. This is kind of an awful hack to work
+   * around the fact that Thrift is designed assuming that
+   * <code>TTransport</code> instances are stateless, and thus the existing
+   * <code>TServers</code> use different <code>TTransport</code> instances for
+   * input and output.
+   */
+  public static class Factory extends TTransportFactory {
+
+    /**
+     * This is the implementation of the awful hack described above.
+     * <code>WeakHashMap</code> is used to ensure that we don't leak memory.
+     */
+    private static Map<TTransport, WeakReference<TSaslServerTransport>> transportMap =
+      Collections.synchronizedMap(new WeakHashMap<TTransport, WeakReference<TSaslServerTransport>>());
+
+    /**
+     * Mapping from SASL mechanism name -> all the parameters required to
+     * instantiate a SASL server.
+     */
+    private Map<String, TSaslServerDefinition> serverDefinitionMap = new HashMap<String, TSaslServerDefinition>();
+
+    /**
+     * Create a new Factory. Assumes that <code>addServerDefinition</code> will
+     * be called later.
+     */
+    public Factory() {
+      super();
+    }
+
+    /**
+     * Create a new <code>Factory</code>, initially with the single server
+     * definition given. You may still call <code>addServerDefinition</code>
+     * later. See the Java documentation for <code>Sasl.createSaslServer</code>
+     * for the details of the parameters.
+     */
+    public Factory(String mechanism, String protocol, String serverName,
+        Map<String, String> props, CallbackHandler cbh) {
+      super();
+      addServerDefinition(mechanism, protocol, serverName, props, cbh);
+    }
+
+    /**
+     * Add a supported server definition to the transports created by this
+     * factory. See the Java documentation for
+     * <code>Sasl.createSaslServer</code> for the details of the parameters.
+     */
+    public void addServerDefinition(String mechanism, String protocol, String serverName,
+        Map<String, String> props, CallbackHandler cbh) {
+      serverDefinitionMap.put(mechanism, new TSaslServerDefinition(mechanism, protocol, serverName,
+          props, cbh));
+    }
+
+    /**
+     * Get a new <code>TSaslServerTransport</code> instance, or reuse the
+     * existing one if a <code>TSaslServerTransport</code> has already been
+     * created before using the given <code>TTransport</code> as an underlying
+     * transport. This ensures that a given underlying transport instance
+     * receives the same <code>TSaslServerTransport</code>.
+     */
+    @Override
+    public TTransport getTransport(TTransport base) {
+      WeakReference<TSaslServerTransport> ret = transportMap.get(base);
+      if (ret == null || ret.get() == null) {
+        LOGGER.debug("transport map does not contain key", base);
+        ret = new WeakReference<TSaslServerTransport>(new TSaslServerTransport(serverDefinitionMap, base));
+        try {
+          ret.get().open();
+        } catch (TTransportException e) {
+          LOGGER.debug("failed to open server transport", e);
+          throw new RuntimeException(e);
+        }
+        transportMap.put(base, ret); // No need for putIfAbsent().
+                                     // Concurrent calls to getTransport() will pass in different TTransports.
+      } else {
+        LOGGER.debug("transport map does contain key {}", base);
+      }
+      return ret.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslTransport.java
new file mode 100644
index 0000000..453562e
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSaslTransport.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.EncodingUtils;
+import org.apache.blur.thirdparty.thrift_0_9_0.TByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A superclass for SASL client/server thrift transports. A subclass need only
+ * implement the <code>open</open> method.
+ */
+abstract class TSaslTransport extends TTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TSaslTransport.class);
+
+  protected static final int DEFAULT_MAX_LENGTH = 0x7FFFFFFF;
+
+  protected static final int MECHANISM_NAME_BYTES = 1;
+  protected static final int STATUS_BYTES = 1;
+  protected static final int PAYLOAD_LENGTH_BYTES = 4;
+
+  protected static enum SaslRole {
+    SERVER, CLIENT;
+  }
+
+  /**
+   * Status bytes used during the initial Thrift SASL handshake.
+   */
+  protected static enum NegotiationStatus {
+    START((byte)0x01),
+    OK((byte)0x02),
+    BAD((byte)0x03),
+    ERROR((byte)0x04),
+    COMPLETE((byte)0x05);
+
+    private final byte value;
+
+    private static final Map<Byte, NegotiationStatus> reverseMap =
+      new HashMap<Byte, NegotiationStatus>();
+    static {
+      for (NegotiationStatus s : NegotiationStatus.class.getEnumConstants()) {
+        reverseMap.put(s.getValue(), s);
+      }
+    }
+
+    private NegotiationStatus(byte val) {
+      this.value = val;
+    }
+
+    public byte getValue() {
+      return value;
+    }
+
+    public static NegotiationStatus byValue(byte val) {
+      return reverseMap.get(val);
+    }
+  }
+
+  /**
+   * Transport underlying this one.
+   */
+  protected TTransport underlyingTransport;
+
+  /**
+   * Either a SASL client or a SASL server.
+   */
+  private SaslParticipant sasl;
+
+  /**
+   * Whether or not we should wrap/unwrap reads/writes. Determined by whether or
+   * not a QOP is negotiated during the SASL handshake.
+   */
+  private boolean shouldWrap = false;
+
+  /**
+   * Buffer for input.
+   */
+  private TMemoryInputTransport readBuffer = new TMemoryInputTransport();
+
+  /**
+   * Buffer for output.
+   */
+  private final TByteArrayOutputStream writeBuffer = new TByteArrayOutputStream(1024);
+
+  /**
+   * Create a TSaslTransport. It's assumed that setSaslServer will be called
+   * later to initialize the SASL endpoint underlying this transport.
+   * 
+   * @param underlyingTransport
+   *          The thrift transport which this transport is wrapping.
+   */
+  protected TSaslTransport(TTransport underlyingTransport) {
+    this.underlyingTransport = underlyingTransport;
+  }
+
+  /**
+   * Create a TSaslTransport which acts as a client.
+   * 
+   * @param saslClient
+   *          The <code>SaslClient</code> which this transport will use for SASL
+   *          negotiation.
+   * @param underlyingTransport
+   *          The thrift transport which this transport is wrapping.
+   */
+  protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) {
+    sasl = new SaslParticipant(saslClient);
+    this.underlyingTransport = underlyingTransport;
+  }
+
+  protected void setSaslServer(SaslServer saslServer) {
+    sasl = new SaslParticipant(saslServer);
+  }
+
+  // Used to read the status byte and payload length.
+  private final byte[] messageHeader = new byte[STATUS_BYTES + PAYLOAD_LENGTH_BYTES];
+
+  /**
+   * Send a complete Thrift SASL message.
+   * 
+   * @param status
+   *          The status to send.
+   * @param payload
+   *          The data to send as the payload of this message.
+   * @throws TTransportException
+   */
+  protected void sendSaslMessage(NegotiationStatus status, byte[] payload) throws TTransportException {
+    if (payload == null)
+      payload = new byte[0];
+
+    messageHeader[0] = status.getValue();
+    EncodingUtils.encodeBigEndian(payload.length, messageHeader, STATUS_BYTES);
+
+    if (LOGGER.isDebugEnabled())
+      LOGGER.debug(getRole() + ": Writing message with status {} and payload length {}",
+                   status, payload.length);
+    underlyingTransport.write(messageHeader);
+    underlyingTransport.write(payload);
+    underlyingTransport.flush();
+  }
+
+  /**
+   * Read a complete Thrift SASL message.
+   * 
+   * @return The SASL status and payload from this message.
+   * @throws TTransportException
+   *           Thrown if there is a failure reading from the underlying
+   *           transport, or if a status code of BAD or ERROR is encountered.
+   */
+  protected SaslResponse receiveSaslMessage() throws TTransportException {
+    underlyingTransport.readAll(messageHeader, 0, messageHeader.length);
+
+    byte statusByte = messageHeader[0];
+    byte[] payload = new byte[EncodingUtils.decodeBigEndian(messageHeader, STATUS_BYTES)];
+    underlyingTransport.readAll(payload, 0, payload.length);
+
+    NegotiationStatus status = NegotiationStatus.byValue(statusByte);
+    if (status == null) {
+      sendAndThrowMessage(NegotiationStatus.ERROR, "Invalid status " + statusByte);
+    } else if (status == NegotiationStatus.BAD || status == NegotiationStatus.ERROR) {
+      try {
+        String remoteMessage = new String(payload, "UTF-8");
+        throw new TTransportException("Peer indicated failure: " + remoteMessage);
+      } catch (UnsupportedEncodingException e) {
+        throw new TTransportException(e);
+      }
+    }
+
+    if (LOGGER.isDebugEnabled())
+      LOGGER.debug(getRole() + ": Received message with status {} and payload length {}",
+                   status, payload.length);
+    return new SaslResponse(status, payload);
+  }
+
+  /**
+   * Send a Thrift SASL message with the given status (usually BAD or ERROR) and
+   * string message, and then throw a TTransportException with the given
+   * message.
+   * 
+   * @param status
+   *          The Thrift SASL status code to send. Usually BAD or ERROR.
+   * @param message
+   *          The optional message to send to the other side.
+   * @throws TTransportException
+   *           Always thrown with the message provided.
+   */
+  protected void sendAndThrowMessage(NegotiationStatus status, String message) throws TTransportException {
+    try {
+      sendSaslMessage(status, message.getBytes());
+    } catch (Exception e) {
+      LOGGER.warn("Could not send failure response", e);
+      message += "\nAlso, could not send response: " + e.toString();
+    }
+    throw new TTransportException(message);
+  }
+
+  /**
+   * Implemented by subclasses to start the Thrift SASL handshake process. When
+   * this method completes, the <code>SaslParticipant</code> in this class is
+   * assumed to be initialized.
+   * 
+   * @throws TTransportException
+   * @throws SaslException
+   */
+  abstract protected void handleSaslStartMessage() throws TTransportException, SaslException;
+
+  protected abstract SaslRole getRole();
+
+  /**
+   * Opens the underlying transport if it's not already open and then performs
+   * SASL negotiation. If a QOP is negotiated during this SASL handshake, it used
+   * for all communication on this transport after this call is complete.
+   */
+  @Override
+  public void open() throws TTransportException {
+    LOGGER.debug("opening transport {}", this);
+    if (sasl != null && sasl.isComplete())
+      throw new TTransportException("SASL transport already open");
+
+    if (!underlyingTransport.isOpen())
+      underlyingTransport.open();
+
+    try {
+      // Negotiate a SASL mechanism. The client also sends its
+      // initial response, or an empty one.
+      handleSaslStartMessage();
+      LOGGER.debug("{}: Start message handled", getRole());
+
+      SaslResponse message = null;
+      while (!sasl.isComplete()) {
+        message = receiveSaslMessage();
+        if (message.status != NegotiationStatus.COMPLETE &&
+            message.status != NegotiationStatus.OK) {
+          throw new TTransportException("Expected COMPLETE or OK, got " + message.status);
+        }
+
+        byte[] challenge = sasl.evaluateChallengeOrResponse(message.payload);
+
+        // If we are the client, and the server indicates COMPLETE, we don't need to
+        // send back any further response.
+        if (message.status == NegotiationStatus.COMPLETE &&
+            getRole() == SaslRole.CLIENT) {
+          LOGGER.debug("{}: All done!", getRole());
+          break;
+        }
+
+        sendSaslMessage(sasl.isComplete() ? NegotiationStatus.COMPLETE : NegotiationStatus.OK,
+                        challenge);
+      }
+      LOGGER.debug("{}: Main negotiation loop complete", getRole());
+
+      assert sasl.isComplete();
+
+      // If we're the client, and we're complete, but the server isn't
+      // complete yet, we need to wait for its response. This will occur
+      // with ANONYMOUS auth, for example, where we send an initial response
+      // and are immediately complete.
+      if (getRole() == SaslRole.CLIENT &&
+          (message == null || message.status == NegotiationStatus.OK)) {
+        LOGGER.debug("{}: SASL Client receiving last message", getRole());
+        message = receiveSaslMessage();
+        if (message.status != NegotiationStatus.COMPLETE) {
+          throw new TTransportException(
+            "Expected SASL COMPLETE, but got " + message.status);
+        }
+      }
+    } catch (SaslException e) {
+      try {
+        LOGGER.error("SASL negotiation failure", e);
+        sendAndThrowMessage(NegotiationStatus.BAD, e.getMessage());
+      } finally {
+        underlyingTransport.close();
+      }
+    }
+
+    String qop = (String) sasl.getNegotiatedProperty(Sasl.QOP);
+    if (qop != null && !qop.equalsIgnoreCase("auth"))
+      shouldWrap = true;
+  }
+
+  /**
+   * Get the underlying <code>SaslClient</code>.
+   * 
+   * @return The <code>SaslClient</code>, or <code>null</code> if this transport
+   *         is backed by a <code>SaslServer</code>.
+   */
+  public SaslClient getSaslClient() {
+    return sasl.saslClient;
+  }
+
+  /**
+   * Get the underlying transport that Sasl is using.
+   * @return The <code>TTransport</code> transport
+   */
+   public TTransport getUnderlyingTransport() {
+     return underlyingTransport;
+   }
+
+  /**
+   * Get the underlying <code>SaslServer</code>.
+   * 
+   * @return The <code>SaslServer</code>, or <code>null</code> if this transport
+   *         is backed by a <code>SaslClient</code>.
+   */
+  public SaslServer getSaslServer() {
+    return sasl.saslServer;
+  }
+
+  /**
+   * Read a 4-byte word from the underlying transport and interpret it as an
+   * integer.
+   * 
+   * @return The length prefix of the next SASL message to read.
+   * @throws TTransportException
+   *           Thrown if reading from the underlying transport fails.
+   */
+  protected int readLength() throws TTransportException {
+    byte[] lenBuf = new byte[4];
+    underlyingTransport.readAll(lenBuf, 0, lenBuf.length);
+    return EncodingUtils.decodeBigEndian(lenBuf);
+  }
+
+  /**
+   * Write the given integer as 4 bytes to the underlying transport.
+   * 
+   * @param length
+   *          The length prefix of the next SASL message to write.
+   * @throws TTransportException
+   *           Thrown if writing to the underlying transport fails.
+   */
+  protected void writeLength(int length) throws TTransportException {
+    byte[] lenBuf = new byte[4];
+    TFramedTransport.encodeFrameSize(length, lenBuf);
+    underlyingTransport.write(lenBuf);
+  }
+
+  // Below is the SASL implementation of the TTransport interface.
+
+  /**
+   * Closes the underlying transport and disposes of the SASL implementation
+   * underlying this transport.
+   */
+  @Override
+  public void close() {
+    underlyingTransport.close();
+    try {
+      sasl.dispose();
+    } catch (SaslException e) {
+      // Not much we can do here.
+    }
+  }
+
+  /**
+   * True if the underlying transport is open and the SASL handshake is
+   * complete.
+   */
+  @Override
+  public boolean isOpen() {
+    return underlyingTransport.isOpen() && sasl != null && sasl.isComplete();
+  }
+
+  /**
+   * Read from the underlying transport. Unwraps the contents if a QOP was
+   * negotiated during the SASL handshake.
+   */
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if (!isOpen())
+      throw new TTransportException("SASL authentication not complete");
+
+    int got = readBuffer.read(buf, off, len);
+    if (got > 0) {
+      return got;
+    }
+
+    // Read another frame of data
+    try {
+      readFrame();
+    } catch (SaslException e) {
+      throw new TTransportException(e);
+    }
+
+    return readBuffer.read(buf, off, len);
+  }
+
+  /**
+   * Read a single frame of data from the underlying transport, unwrapping if
+   * necessary.
+   * 
+   * @throws TTransportException
+   *           Thrown if there's an error reading from the underlying transport.
+   * @throws SaslException
+   *           Thrown if there's an error unwrapping the data.
+   */
+  private void readFrame() throws TTransportException, SaslException {
+    int dataLength = readLength();
+
+    if (dataLength < 0)
+      throw new TTransportException("Read a negative frame size (" + dataLength + ")!");
+
+    byte[] buff = new byte[dataLength];
+    LOGGER.debug("{}: reading data length: {}", getRole(), dataLength);
+    underlyingTransport.readAll(buff, 0, dataLength);
+    if (shouldWrap) {
+      buff = sasl.unwrap(buff, 0, buff.length);
+      LOGGER.debug("data length after unwrap: {}", buff.length);
+    }
+    readBuffer.reset(buff);
+  }
+
+  /**
+   * Write to the underlying transport.
+   */
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    if (!isOpen())
+      throw new TTransportException("SASL authentication not complete");
+
+    writeBuffer.write(buf, off, len);
+  }
+
+  /**
+   * Flushes to the underlying transport. Wraps the contents if a QOP was
+   * negotiated during the SASL handshake.
+   */
+  @Override
+  public void flush() throws TTransportException {
+    byte[] buf = writeBuffer.get();
+    int dataLength = writeBuffer.len();
+    writeBuffer.reset();
+
+    if (shouldWrap) {
+      LOGGER.debug("data length before wrap: {}", dataLength);
+      try {
+        buf = sasl.wrap(buf, 0, dataLength);
+      } catch (SaslException e) {
+        throw new TTransportException(e);
+      }
+      dataLength = buf.length;
+    }
+    LOGGER.debug("writing data length: {}", dataLength);
+    writeLength(dataLength);
+    underlyingTransport.write(buf, 0, dataLength);
+    underlyingTransport.flush();
+  }
+
+  /**
+   * Used exclusively by readSaslMessage to return both a status and data.
+   */
+  protected static class SaslResponse {
+    public NegotiationStatus status;
+    public byte[] payload;
+
+    public SaslResponse(NegotiationStatus status, byte[] payload) {
+      this.status = status;
+      this.payload = payload;
+    }
+  }
+
+  /**
+   * Used to abstract over the <code>SaslServer</code> and
+   * <code>SaslClient</code> classes, which share a lot of their interface, but
+   * unfortunately don't share a common superclass.
+   */
+  private static class SaslParticipant {
+    // One of these will always be null.
+    public SaslServer saslServer;
+    public SaslClient saslClient;
+
+    public SaslParticipant(SaslServer saslServer) {
+      this.saslServer = saslServer;
+    }
+
+    public SaslParticipant(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException {
+      if (saslClient != null) {
+        return saslClient.evaluateChallenge(challengeOrResponse);
+      } else {
+        return saslServer.evaluateResponse(challengeOrResponse);
+      }
+    }
+
+    public boolean isComplete() {
+      if (saslClient != null)
+        return saslClient.isComplete();
+      else
+        return saslServer.isComplete();
+    }
+
+    public void dispose() throws SaslException {
+      if (saslClient != null)
+        saslClient.dispose();
+      else
+        saslServer.dispose();
+    }
+
+    public byte[] unwrap(byte[] buf, int off, int len) throws SaslException {
+      if (saslClient != null)
+        return saslClient.unwrap(buf, off, len);
+      else
+        return saslServer.unwrap(buf, off, len);
+    }
+
+    public byte[] wrap(byte[] buf, int off, int len) throws SaslException {
+      if (saslClient != null)
+        return saslClient.wrap(buf, off, len);
+      else
+        return saslServer.wrap(buf, off, len);
+    }
+
+    public Object getNegotiatedProperty(String propName) {
+      if (saslClient != null)
+        return saslClient.getNegotiatedProperty(propName);
+      else
+        return saslServer.getNegotiatedProperty(propName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSeekableFile.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSeekableFile.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSeekableFile.java
new file mode 100644
index 0000000..f797a2d
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSeekableFile.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+
+public interface TSeekableFile {
+
+  public InputStream getInputStream() throws IOException;
+  public OutputStream getOutputStream() throws IOException;
+  public void close() throws IOException;
+  public long length() throws IOException;
+  public void seek(long pos) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TServerSocket.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TServerSocket.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TServerSocket.java
new file mode 100644
index 0000000..1d6ea74
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TServerSocket.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * Wrapper around ServerSocket for Thrift.
+ *
+ */
+public class TServerSocket extends TServerTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TServerSocket.class.getName());
+
+  /**
+   * Underlying ServerSocket object
+   */
+  private ServerSocket serverSocket_ = null;
+
+  /**
+   * Timeout for client sockets from accept
+   */
+  private int clientTimeout_ = 0;
+
+  /**
+   * Creates a server socket from underlying socket object
+   */
+  public TServerSocket(ServerSocket serverSocket) {
+    this(serverSocket, 0);
+  }
+
+  /**
+   * Creates a server socket from underlying socket object
+   */
+  public TServerSocket(ServerSocket serverSocket, int clientTimeout) {
+    serverSocket_ = serverSocket;
+    clientTimeout_ = clientTimeout;
+  }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TServerSocket(int port) throws TTransportException {
+    this(port, 0);
+  }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TServerSocket(int port, int clientTimeout) throws TTransportException {
+    this(new InetSocketAddress(port), clientTimeout);
+  }
+
+  public TServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+    this(bindAddr, 0);
+  }
+
+  public TServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+    clientTimeout_ = clientTimeout;
+    try {
+      // Make server socket
+      serverSocket_ = new ServerSocket();
+      // Prevent 2MSL delay problem on server restarts
+      serverSocket_.setReuseAddress(true);
+      // Bind to listening port
+      serverSocket_.bind(bindAddr);
+    } catch (IOException ioe) {
+      serverSocket_ = null;
+      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+    }
+  }
+
+  public void listen() throws TTransportException {
+    // Make sure not to block on accept
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.setSoTimeout(0);
+      } catch (SocketException sx) {
+        LOGGER.error("Could not set socket timeout.", sx);
+      }
+    }
+  }
+
+  protected TSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+    }
+    try {
+      Socket result = serverSocket_.accept();
+      TSocket result2 = new TSocket(result);
+      result2.setTimeout(clientTimeout_);
+      return result2;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        LOGGER.warn("Could not close server socket.", iox);
+      }
+      serverSocket_ = null;
+    }
+  }
+
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
+  public ServerSocket getServerSocket() {
+    return serverSocket_;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TServerTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TServerTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TServerTransport.java
new file mode 100644
index 0000000..d4a0a69
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TServerTransport.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+/**
+ * Server transport. Object which provides client transports.
+ *
+ */
+public abstract class TServerTransport {
+
+  public abstract void listen() throws TTransportException;
+
+  public final TTransport accept() throws TTransportException {
+    TTransport transport = acceptImpl();
+    if (transport == null) {
+      throw new TTransportException("accept() may not return NULL");
+    }
+    return transport;
+  }
+
+  public abstract void close();
+
+  protected abstract TTransport acceptImpl() throws TTransportException;
+
+  /**
+   * Optional method implementation. This signals to the server transport
+   * that it should break out of any accept() or listen() that it is currently
+   * blocked on. This method, if implemented, MUST be thread safe, as it may
+   * be called from a different thread context than the other TServerTransport
+   * methods.
+   */
+  public void interrupt() {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSocket.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSocket.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSocket.java
new file mode 100644
index 0000000..8f3a66a
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TSocket.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * Socket implementation of the TTransport interface. To be commented soon!
+ *
+ */
+public class TSocket extends TIOStreamTransport {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TSocket.class.getName());
+
+  /**
+   * Wrapped Socket object
+   */
+  private Socket socket_ = null;
+
+  /**
+   * Remote host
+   */
+  private String host_  = null;
+
+  /**
+   * Remote port
+   */
+  private int port_ = 0;
+
+  /**
+   * Socket timeout
+   */
+  private int timeout_ = 0;
+
+  /**
+   * Constructor that takes an already created socket.
+   *
+   * @param socket Already created socket object
+   * @throws TTransportException if there is an error setting up the streams
+   */
+  public TSocket(Socket socket) throws TTransportException {
+    socket_ = socket;
+    try {
+      socket_.setSoLinger(false, 0);
+      socket_.setTcpNoDelay(true);
+    } catch (SocketException sx) {
+      LOGGER.warn("Could not configure socket.", sx);
+    }
+
+    if (isOpen()) {
+      try {
+        inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
+        outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
+      } catch (IOException iox) {
+        close();
+        throw new TTransportException(TTransportException.NOT_OPEN, iox);
+      }
+    }
+  }
+
+  /**
+   * Creates a new unconnected socket that will connect to the given host
+   * on the given port.
+   *
+   * @param host Remote host
+   * @param port Remote port
+   */
+  public TSocket(String host, int port) {
+    this(host, port, 0);
+  }
+
+  /**
+   * Creates a new unconnected socket that will connect to the given host
+   * on the given port.
+   *
+   * @param host    Remote host
+   * @param port    Remote port
+   * @param timeout Socket timeout
+   */
+  public TSocket(String host, int port, int timeout) {
+    host_ = host;
+    port_ = port;
+    timeout_ = timeout;
+    initSocket();
+  }
+
+  /**
+   * Initializes the socket object
+   */
+  private void initSocket() {
+    socket_ = new Socket();
+    try {
+      socket_.setSoLinger(false, 0);
+      socket_.setTcpNoDelay(true);
+      socket_.setSoTimeout(timeout_);
+    } catch (SocketException sx) {
+      LOGGER.error("Could not configure socket.", sx);
+    }
+  }
+
+  /**
+   * Sets the socket timeout
+   *
+   * @param timeout Milliseconds timeout
+   */
+  public void setTimeout(int timeout) {
+    timeout_ = timeout;
+    try {
+      socket_.setSoTimeout(timeout);
+    } catch (SocketException sx) {
+      LOGGER.warn("Could not set socket timeout.", sx);
+    }
+  }
+
+  /**
+   * Returns a reference to the underlying socket.
+   */
+  public Socket getSocket() {
+    if (socket_ == null) {
+      initSocket();
+    }
+    return socket_;
+  }
+
+  /**
+   * Checks whether the socket is connected.
+   */
+  public boolean isOpen() {
+    if (socket_ == null) {
+      return false;
+    }
+    return socket_.isConnected();
+  }
+
+  /**
+   * Connects the socket, creating a new socket object if necessary.
+   */
+  public void open() throws TTransportException {
+    if (isOpen()) {
+      throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
+    }
+
+    if (host_.length() == 0) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
+    }
+    if (port_ <= 0) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port.");
+    }
+
+    if (socket_ == null) {
+      initSocket();
+    }
+
+    try {
+      socket_.connect(new InetSocketAddress(host_, port_), timeout_);
+      inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
+      outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
+    } catch (IOException iox) {
+      close();
+      throw new TTransportException(TTransportException.NOT_OPEN, iox);
+    }
+  }
+
+  /**
+   * Closes the socket.
+   */
+  public void close() {
+    // Close the underlying streams
+    super.close();
+
+    // Close the socket
+    if (socket_ != null) {
+      try {
+        socket_.close();
+      } catch (IOException iox) {
+        LOGGER.warn("Could not close socket.", iox);
+      }
+      socket_ = null;
+    }
+  }
+
+}


Mime
View raw message