incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [19/51] [partial] Fixed BLUR-126.
Date Thu, 06 Jun 2013 18:57:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TStandardFile.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TStandardFile.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TStandardFile.java
new file mode 100644
index 0000000..4d8c3b3
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TStandardFile.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+
+public class TStandardFile implements TSeekableFile {
+
+  protected String path_ = null;
+  protected RandomAccessFile inputFile_ = null;
+
+  public TStandardFile(String path) throws IOException {
+    path_ = path;
+    inputFile_ = new RandomAccessFile(path_, "r");
+  }
+
+  public InputStream getInputStream() throws IOException {
+    return new FileInputStream(inputFile_.getFD());
+  }
+
+  public OutputStream getOutputStream() throws IOException {
+    return new FileOutputStream(path_);
+  }
+
+  public void close() throws IOException {
+    if(inputFile_ != null) {
+      inputFile_.close();
+    }
+  }
+
+  public long length() throws IOException {
+    return inputFile_.length();
+  }
+
+  public void seek(long pos) throws IOException {
+    inputFile_.seek(pos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransport.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransport.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransport.java
new file mode 100644
index 0000000..effd008
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransport.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+/**
+ * Generic class that encapsulates the I/O layer. This is basically a thin
+ * wrapper around the combined functionality of Java input/output streams.
+ *
+ */
+public abstract class TTransport {
+
+  /**
+   * Queries whether the transport is open.
+   *
+   * @return True if the transport is open.
+   */
+  public abstract boolean isOpen();
+
+  /**
+   * Is there more data to be read?
+   *
+   * @return True if the remote side is still alive and feeding us
+   */
+  public boolean peek() {
+    return isOpen();
+  }
+
+  /**
+   * Opens the transport for reading/writing.
+   *
+   * @throws TTransportException if the transport could not be opened
+   */
+  public abstract void open()
+    throws TTransportException;
+
+  /**
+   * Closes the transport.
+   */
+  public abstract void close();
+
+  /**
+   * Reads up to len bytes into buffer buf, starting at offset off.
+   *
+   * @param buf Array to read into
+   * @param off Index to start reading at
+   * @param len Maximum number of bytes to read
+   * @return The number of bytes actually read
+   * @throws TTransportException if there was an error reading data
+   */
+  public abstract int read(byte[] buf, int off, int len)
+    throws TTransportException;
+
+  /**
+   * Guarantees that all of len bytes are actually read off the transport.
+   *
+   * @param buf Array to read into
+   * @param off Index to start reading at
+   * @param len Maximum number of bytes to read
+   * @return The number of bytes actually read, which must be equal to len
+   * @throws TTransportException if there was an error reading data
+   */
+  public int readAll(byte[] buf, int off, int len)
+    throws TTransportException {
+    int got = 0;
+    int ret = 0;
+    while (got < len) {
+      ret = read(buf, off+got, len-got);
+      if (ret <= 0) {
+        throw new TTransportException(
+            "Cannot read. Remote side has closed. Tried to read "
+                + len
+                + " bytes, but only got "
+                + got
+                + " bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)");
+      }
+      got += ret;
+    }
+    return got;
+  }
+
+  /**
+   * Writes the buffer to the output
+   *
+   * @param buf The output data buffer
+   * @throws TTransportException if an error occurs writing data
+   */
+  public void write(byte[] buf) throws TTransportException {
+    write(buf, 0, buf.length);
+  }
+
+  /**
+   * Writes up to len bytes from the buffer.
+   *
+   * @param buf The output data buffer
+   * @param off The offset to start writing from
+   * @param len The number of bytes to write
+   * @throws TTransportException if there was an error writing data
+   */
+  public abstract void write(byte[] buf, int off, int len)
+    throws TTransportException;
+
+  /**
+   * Flush any pending data out of a transport buffer.
+   *
+   * @throws TTransportException if there was an error writing out data.
+   */
+  public void flush()
+    throws TTransportException {}
+
+  /**
+   * Access the protocol's underlying buffer directly. If this is not a
+   * buffered transport, return null.
+   * @return protocol's Underlying buffer
+   */
+  public byte[] getBuffer() {
+    return null;
+  }
+
+  /**
+   * Return the index within the underlying buffer that specifies the next spot
+   * that should be read from.
+   * @return index within the underlying buffer that specifies the next spot
+   * that should be read from
+   */
+  public int getBufferPosition() {
+    return 0;
+  }
+
+  /**
+   * Get the number of bytes remaining in the underlying buffer. Returns -1 if
+   * this is a non-buffered transport.
+   * @return the number of bytes remaining in the underlying buffer. <br> Returns -1 if
+   * this is a non-buffered transport.
+   */
+  public int getBytesRemainingInBuffer() {
+    return -1;
+  }
+
+  /**
+   * Consume len bytes from the underlying buffer.
+   * @param len
+   */
+  public void consumeBuffer(int len) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransportException.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransportException.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransportException.java
new file mode 100644
index 0000000..87f5830
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransportException.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+
+/**
+ * Transport exceptions.
+ *
+ */
+public class TTransportException extends TException {
+
+  private static final long serialVersionUID = 1L;
+
+  public static final int UNKNOWN = 0;
+  public static final int NOT_OPEN = 1;
+  public static final int ALREADY_OPEN = 2;
+  public static final int TIMED_OUT = 3;
+  public static final int END_OF_FILE = 4;
+
+  protected int type_ = UNKNOWN;
+
+  public TTransportException() {
+    super();
+  }
+
+  public TTransportException(int type) {
+    super();
+    type_ = type;
+  }
+
+  public TTransportException(int type, String message) {
+    super(message);
+    type_ = type;
+  }
+
+  public TTransportException(String message) {
+    super(message);
+  }
+
+  public TTransportException(int type, Throwable cause) {
+    super(cause);
+    type_ = type;
+  }
+
+  public TTransportException(Throwable cause) {
+    super(cause);
+  }
+
+  public TTransportException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public TTransportException(int type, String message, Throwable cause) {
+    super(message, cause);
+    type_ = type;
+  }
+
+  public int getType() {
+    return type_;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransportFactory.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransportFactory.java b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransportFactory.java
new file mode 100644
index 0000000..dd44b17
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thirdparty/thrift_0_9_0/transport/TTransportFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.thirdparty.thrift_0_9_0.transport;
+
+/**
+ * Factory class used to create wrapped instance of Transports.
+ * This is used primarily in servers, which get Transports from
+ * a ServerTransport and then may want to mutate them (i.e. create
+ * a BufferedTransport from the underlying base transport)
+ *
+ */
+public class TTransportFactory {
+
+  /**
+   * Return a wrapped instance of the base Transport.
+   *
+   * @param trans The base transport
+   * @return Wrapped Transport
+   */
+  public TTransport getTransport(TTransport trans) {
+    return trans;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/AbstractCommand.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/AbstractCommand.java b/blur-thrift/src/main/java/org/apache/blur/thrift/AbstractCommand.java
new file mode 100644
index 0000000..1a3cc8e
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/AbstractCommand.java
@@ -0,0 +1,86 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.BlurException;
+
+public abstract class AbstractCommand<CLIENT, T> implements Cloneable {
+
+  private boolean detachClient = false;
+
+  /**
+   * Reads if this command is to detach the client from the pool or not. If
+   * detach is set to true, then the user of the call needs to return the client
+   * to the pool by calling returnClient on the {@link BlurClientManager}.
+   * 
+   * @return the boolean.
+   */
+  public boolean isDetachClient() {
+    return detachClient;
+  }
+
+  /**
+   * Sets the attribute of detach client.
+   * 
+   * @param detachClient
+   *          the boolean value.
+   */
+  public void setDetachClient(boolean detachClient) {
+    this.detachClient = detachClient;
+  }
+
+  /**
+   * If this method is implemented then the call(CLIENT client) method is not
+   * called. This allows the command to gain access to the {@link Connection}
+   * object that is not normally needed. Usually used in conjunction with the
+   * detachClient attribute.
+   * 
+   * @param client
+   *          the client.
+   * @param connection
+   *          the connection object.
+   * @return object.
+   * @throws BlurException
+   * @throws TException
+   */
+  public T call(CLIENT client, Connection connection) throws BlurException, TException {
+    return call(client);
+  }
+
+  /**
+   * Abstract method that will be executed with a CLIENT object. And it will be
+   * retried if a {@link TException} is throw (that type of exception is assumed
+   * to be a problem with the connection to the remote system).
+   * 
+   * @param client the client.
+   * @return object.
+   * @throws BlurException
+   * @throws TException
+   */
+  public abstract T call(CLIENT client) throws BlurException, TException;
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public AbstractCommand<CLIENT, T> clone() {
+    try {
+      return (AbstractCommand<CLIENT, T>) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/AsyncClientPool.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/AsyncClientPool.java b/blur-thrift/src/main/java/org/apache/blur/thrift/AsyncClientPool.java
new file mode 100644
index 0000000..951002c
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/AsyncClientPool.java
@@ -0,0 +1,275 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.async.AsyncMethodCallback;
+import org.apache.blur.thirdparty.thrift_0_9_0.async.TAsyncClient;
+import org.apache.blur.thirdparty.thrift_0_9_0.async.TAsyncClientManager;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TBinaryProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolFactory;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingSocket;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TNonblockingTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AsyncClientPool {
+
+  public static final Log LOG = LogFactory.getLog(AsyncClientPool.class);
+
+  public static final int DEFAULT_MAX_CONNECTIONS_PER_HOST = 5;
+  public static final int DEFAULT_CONNECTION_TIMEOUT = 60000;
+
+  private int _maxConnectionsPerHost;
+  private long _timeout;
+  private long _pollTime = 5;
+  private Map<String, AtomicInteger> _numberOfConnections = new ConcurrentHashMap<String, AtomicInteger>();
+
+  private Map<Connection, BlockingQueue<TAsyncClient>> _clientMap = new ConcurrentHashMap<Connection, BlockingQueue<TAsyncClient>>();
+  private Map<String, Constructor<?>> _constructorCache = new ConcurrentHashMap<String, Constructor<?>>();
+  private TProtocolFactory _protocolFactory;
+  private TAsyncClientManager _clientManager;
+  private Collection<TNonblockingTransport> _transports = new LinkedBlockingQueue<TNonblockingTransport>();
+  private Field _transportField;
+
+  private Random random = new Random();
+
+  public AsyncClientPool() throws IOException {
+    this(DEFAULT_MAX_CONNECTIONS_PER_HOST, DEFAULT_CONNECTION_TIMEOUT);
+  }
+
+  public AsyncClientPool(int maxConnectionsPerHost, int connectionTimeout) throws IOException {
+    _clientManager = new TAsyncClientManager();
+    _protocolFactory = new TBinaryProtocol.Factory();
+    _maxConnectionsPerHost = maxConnectionsPerHost;
+    _timeout = connectionTimeout;
+    try {
+      _transportField = TAsyncClient.class.getDeclaredField("___transport");
+      _transportField.setAccessible(true);
+    } catch (SecurityException e) {
+      throw new RuntimeException(e);
+    } catch (NoSuchFieldException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void close() {
+    _clientManager.stop();
+    for (TNonblockingTransport transport : _transports) {
+      transport.close();
+    }
+  }
+
+  /**
+   * Gets a client instance that implements the AsyncIface interface that
+   * connects to the given connection string.
+   * 
+   * @param <T>
+   * @param asyncIfaceClass
+   *          the AsyncIface interface to pool.
+   * @param connectionStr
+   *          the connection string.
+   * @return the client instance.
+   */
+  @SuppressWarnings("unchecked")
+  public <T> T getClient(final Class<T> asyncIfaceClass, final String connectionStr) {
+    List<Connection> connections = BlurClientManager.getConnections(connectionStr);
+    Collections.shuffle(connections, random);
+    // randomness ftw
+    final Connection connection = connections.get(0);
+    return (T) Proxy.newProxyInstance(asyncIfaceClass.getClassLoader(), new Class[] { asyncIfaceClass }, new InvocationHandler() {
+      @Override
+      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+        return execute(new AsyncCall(asyncIfaceClass, method, args, connection));
+      }
+    });
+  }
+
+  private Object execute(AsyncCall call) throws Exception {
+    AsyncMethodCallback<?> realCallback = getRealAsyncMethodCallback(call._args);
+    TAsyncClient client = newClient(call._clazz, call._connection);
+    AsyncMethodCallback<?> retryingCallback = wrapCallback(realCallback, client, call._connection);
+    resetArgs(call._args, retryingCallback);
+    return call._method.invoke(client, call._args);
+  }
+
+  private synchronized BlockingQueue<TAsyncClient> getQueue(Connection connection) {
+    BlockingQueue<TAsyncClient> blockingQueue = _clientMap.get(connection);
+    if (blockingQueue == null) {
+      blockingQueue = new LinkedBlockingQueue<TAsyncClient>();
+      _clientMap.put(connection, blockingQueue);
+    }
+    return blockingQueue;
+  }
+
+  private void returnClient(Connection connection, TAsyncClient client) throws InterruptedException {
+    if (!client.hasError()) {
+      getQueue(connection).put(client);
+    } else {
+      AtomicInteger counter = _numberOfConnections.get(connection.getHost());
+      if (counter != null) {
+        counter.decrementAndGet();
+      }
+    }
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private AsyncMethodCallback<?> wrapCallback(AsyncMethodCallback<?> realCallback, TAsyncClient client, Connection connectionStr) {
+    return new ClientPoolAsyncMethodCallback(realCallback, client, this, connectionStr);
+  }
+
+  private void resetArgs(Object[] args, AsyncMethodCallback<?> callback) {
+    args[args.length - 1] = callback;
+  }
+
+  private AsyncMethodCallback<?> getRealAsyncMethodCallback(Object[] args) {
+    return (AsyncMethodCallback<?>) args[args.length - 1];
+  }
+
+  private TAsyncClient newClient(Class<?> c, Connection connection) throws InterruptedException {
+    BlockingQueue<TAsyncClient> blockingQueue = getQueue(connection);
+    TAsyncClient client = blockingQueue.poll();
+    if (client != null) {
+      return client;
+    }
+
+    AtomicInteger counter;
+    synchronized (_numberOfConnections) {
+      counter = _numberOfConnections.get(connection.getHost());
+      if (counter == null) {
+        counter = new AtomicInteger();
+        _numberOfConnections.put(connection.getHost(), counter);
+      }
+    }
+
+    synchronized (counter) {
+      int numOfConnections = counter.get();
+      while (numOfConnections >= _maxConnectionsPerHost) {
+        client = blockingQueue.poll(_pollTime, TimeUnit.MILLISECONDS);
+        if (client != null) {
+          return client;
+        }
+        LOG.debug("Waiting for client number of connection [" + numOfConnections + "], max connection per host [" + _maxConnectionsPerHost + "]");
+        numOfConnections = counter.get();
+      }
+      LOG.info("Creating a new client for [" + connection + "]");
+      String name = c.getName();
+      Constructor<?> constructor = _constructorCache.get(name);
+      if (constructor == null) {
+        String clientClassName = name.replace("$AsyncIface", "$AsyncClient");
+        try {
+          Class<?> clazz = Class.forName(clientClassName);
+          constructor = clazz.getConstructor(new Class[] { TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class });
+          _constructorCache.put(name, constructor);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+      try {
+        TNonblockingSocket transport = newTransport(connection);
+        client = (TAsyncClient) constructor.newInstance(new Object[] { _protocolFactory, _clientManager, transport });
+        client.setTimeout(_timeout);
+        counter.incrementAndGet();
+        return client;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private TNonblockingSocket newTransport(Connection connection) throws IOException {
+    return new TNonblockingSocket(connection.getHost(), connection.getPort());
+  }
+
+  private static class ClientPoolAsyncMethodCallback<T> implements AsyncMethodCallback<T> {
+
+    private AsyncMethodCallback<T> _realCallback;
+    private TAsyncClient _client;
+    private AsyncClientPool _pool;
+    private Connection _connection;
+
+    public ClientPoolAsyncMethodCallback(AsyncMethodCallback<T> realCallback, TAsyncClient client, AsyncClientPool pool, Connection connection) {
+      _realCallback = realCallback;
+      _client = client;
+      _pool = pool;
+      _connection = connection;
+    }
+
+    @Override
+    public void onComplete(T response) {
+      _realCallback.onComplete(response);
+      try {
+        _pool.returnClient(_connection, _client);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void onError(Exception exception) {
+      AtomicInteger counter = _pool._numberOfConnections.get(_connection.getHost());
+      if (counter != null) {
+        counter.decrementAndGet();
+      }
+      _realCallback.onError(exception);
+      _pool.closeAndRemoveTransport(_client);
+    }
+  }
+
+  private static class AsyncCall {
+
+    Class<?> _clazz;
+    Method _method;
+    Object[] _args;
+    Connection _connection;
+
+    public AsyncCall(Class<?> clazz, Method method, Object[] args, Connection connection) {
+      _clazz = clazz;
+      _method = method;
+      _args = args;
+      _connection = connection;
+    }
+  }
+
+  private void closeAndRemoveTransport(TAsyncClient client) {
+    try {
+      TNonblockingTransport transport = (TNonblockingTransport) _transportField.get(client);
+      _transports.remove(transport);
+      transport.close();
+    } catch (IllegalArgumentException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/BException.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BException.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BException.java
new file mode 100644
index 0000000..f68a86f
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BException.java
@@ -0,0 +1,52 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
+import java.text.MessageFormat;
+
+import org.apache.blur.thrift.generated.BlurException;
+
+
+public class BException extends BlurException {
+
+  private static final long serialVersionUID = 5846541677293727358L;
+
+  public BException(String message, Throwable t) {
+    super(message, toString(t));
+  }
+
+  public BException(String message, Object... parameters) {
+    this(MessageFormat.format(message.toString(), parameters), (Throwable) null);
+  }
+
+  public BException(String message, Throwable t, Object... parameters) {
+    this(MessageFormat.format(message.toString(), parameters), t);
+  }
+
+  public static String toString(Throwable t) {
+    if (t == null) {
+      return null;
+    }
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    PrintWriter writer = new PrintWriter(outputStream);
+    t.printStackTrace(writer);
+    writer.close();
+    return new String(outputStream.toByteArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
new file mode 100644
index 0000000..92a0172
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
@@ -0,0 +1,99 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.commands.BlurCommand;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.Blur.Iface;
+
+public class BlurClient {
+
+  static class BlurClientInvocationHandler implements InvocationHandler {
+
+    private List<Connection> connections;
+
+    public BlurClientInvocationHandler(List<Connection> connections) {
+      this.connections = connections;
+    }
+
+    @Override
+    public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable {
+      return BlurClientManager.execute(connections, new BlurCommand<Object>() {
+        @Override
+        public Object call(Client client) throws BlurException, TException {
+          try {
+            return method.invoke(client, args);
+          } catch (IllegalArgumentException e) {
+            throw new RuntimeException(e);
+          } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+          } catch (InvocationTargetException e) {
+            Throwable targetException = e.getTargetException();
+            if (targetException instanceof BlurException) {
+              throw (BlurException) targetException;
+            }
+            if (targetException instanceof TException) {
+              throw (TException) targetException;
+            }
+            throw new RuntimeException(targetException);
+          }
+        }
+      });
+    }
+
+  }
+
+  /**
+   * Returns a client interface to Blur based on the connectionStr.
+   * 
+   * <pre>
+   * Blur.Iface client = Blur.getClient(&quot;controller1:40010,controller2:40010&quot;);
+   * </pre>
+   * 
+   * The connectionStr also supports passing a proxy host/port (e.g. a SOCKS proxy configuration):
+   * 
+   * <pre>
+   * Blur.Iface client = Blur.getClient("host1:port/proxyhost1:proxyport");
+   * </pre>
+   * 
+   * @param connectionStr
+   *          - a comma-delimited list of host:port of Shard Controllers.
+   * @return
+   */
+  public static Iface getClient(String connectionStr) {
+    List<Connection> connections = BlurClientManager.getConnections(connectionStr);
+    return getClient(connections);
+  }
+
+  public static Iface getClient(Connection connection) {
+    return getClient(Arrays.asList(connection));
+  }
+
+  public static Iface getClient(List<Connection> connections) {
+    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class }, new BlurClientInvocationHandler(connections));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
new file mode 100644
index 0000000..4e4c40c
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
@@ -0,0 +1,348 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.Proxy.Type;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TBinaryProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TSocket;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.apache.blur.thrift.generated.Blur;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Blur.Client;
+
+public class BlurClientManager {
+
+  private static final Object NULL = new Object();
+
+  private static final Log LOG = LogFactory.getLog(BlurClientManager.class);
+  private static final int MAX_RETRIES = 5;
+  private static final long BACK_OFF_TIME = TimeUnit.MILLISECONDS.toMillis(250);
+  private static final long MAX_BACK_OFF_TIME = TimeUnit.SECONDS.toMillis(10);
+  private static final long ONE_SECOND = TimeUnit.SECONDS.toMillis(1);
+
+  private static Map<Connection, BlockingQueue<Client>> clientPool = new ConcurrentHashMap<Connection, BlockingQueue<Client>>();
+  private static Thread daemon;
+  private static AtomicBoolean running = new AtomicBoolean(true);
+  private static Map<Connection, Object> badConnections = new ConcurrentHashMap<Connection, Object>();
+
+  static {
+    startDaemon();
+  }
+
+  private static void startDaemon() {
+    daemon = new Thread(new Runnable() {
+      private Set<Connection> good = new HashSet<Connection>();
+
+      @Override
+      public void run() {
+        while (running.get()) {
+          good.clear();
+          Set<Connection> badConns = badConnections.keySet();
+          for (Connection connection : badConns) {
+            if (isConnectionGood(connection)) {
+              good.add(connection);
+            }
+          }
+          for (Connection connection : good) {
+            badConnections.remove(connection);
+          }
+          try {
+            Thread.sleep(ONE_SECOND);
+          } catch (InterruptedException e) {
+            return;
+          }
+        }
+      }
+    });
+    daemon.setDaemon(true);
+    daemon.setName("Blur-Client-Manager-Connection-Checker");
+    daemon.start();
+  }
+
+  protected static boolean isConnectionGood(Connection connection) {
+    try {
+      returnClient(connection, getClient(connection));
+      return true;
+    } catch (TTransportException e) {
+      LOG.debug("Connection [{0}] is still bad.", connection);
+    } catch (IOException e) {
+      LOG.debug("Connection [{0}] is still bad.", connection);
+    }
+    return false;
+  }
+
+  public static <CLIENT, T> T execute(Connection connection, AbstractCommand<CLIENT, T> command) throws BlurException,
+      TException, IOException {
+    return execute(connection, command, MAX_RETRIES, BACK_OFF_TIME, MAX_BACK_OFF_TIME);
+  }
+
+  public static <CLIENT, T> T execute(Connection connection, AbstractCommand<CLIENT, T> command, int maxRetries,
+      long backOffTime, long maxBackOffTime) throws BlurException, TException, IOException {
+    return execute(Arrays.asList(connection), command, maxRetries, backOffTime, maxBackOffTime);
+  }
+
+  public static <CLIENT, T> T execute(List<Connection> connections, AbstractCommand<CLIENT, T> command)
+      throws BlurException, TException, IOException {
+    return execute(connections, command, MAX_RETRIES, BACK_OFF_TIME, MAX_BACK_OFF_TIME);
+  }
+
+  private static class LocalResources {
+    AtomicInteger retries = new AtomicInteger();
+    AtomicReference<Blur.Client> client = new AtomicReference<Client>();
+    List<Connection> shuffledConnections = new ArrayList<Connection>();
+    Random random = new Random();
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <CLIENT, T> T execute(List<Connection> connections, AbstractCommand<CLIENT, T> command, int maxRetries,
+      long backOffTime, long maxBackOffTime) throws BlurException, TException, IOException {
+    LocalResources localResources = new LocalResources();
+    AtomicReference<Client> client = localResources.client;
+    Random random = localResources.random;
+    AtomicInteger retries = localResources.retries;
+    List<Connection> shuffledConnections = localResources.shuffledConnections;
+
+    retries.set(0);
+    shuffledConnections.addAll(connections);
+
+    Collections.shuffle(shuffledConnections, random);
+    boolean allBad = true;
+    int connectionErrorCount = 0;
+    while (true) {
+      for (Connection connection : shuffledConnections) {
+        if (isBadConnection(connection)) {
+          continue;
+        }
+        client.set(null);
+        try {
+          client.set(getClient(connection));
+        } catch (IOException e) {
+          if (handleError(connection, client, retries, command, e, maxRetries, backOffTime, maxBackOffTime)) {
+            throw e;
+          } else {
+            markBadConnection(connection);
+            continue;
+          }
+        }
+        try {
+          T result = command.call((CLIENT) client.get(), connection);
+          allBad = false;
+          if (command.isDetachClient()) {
+            // if the is detach client is set then the command will return the
+            // client to the pool.
+            client.set(null);
+          }
+          return result;
+        } catch (RuntimeException e) {
+          Throwable cause = e.getCause();
+          if (cause instanceof TTransportException) {
+            TTransportException t = (TTransportException) cause;
+            if (handleError(connection, client, retries, command, t, maxRetries, backOffTime, maxBackOffTime)) {
+              throw t;
+            }
+          } else {
+            throw e;
+          }
+        } catch (TTransportException e) {
+          if (handleError(connection, client, retries, command, e, maxRetries, backOffTime, maxBackOffTime)) {
+            throw e;
+          }
+        } finally {
+          if (client.get() != null) {
+            returnClient(connection, client);
+          }
+        }
+      }
+      if (allBad) {
+        connectionErrorCount++;
+        LOG.error("All connections are bad [" + connectionErrorCount + "].");
+        if (connectionErrorCount >= 5) {
+          throw new IOException("All connections are bad.");
+        }
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          throw new BException("Unknown error.", e);
+        }
+      }
+    }
+  }
+
+  private static void markBadConnection(Connection connection) {
+    LOG.info("Marking bad connection [{0}]", connection);
+    badConnections.put(connection, NULL);
+  }
+
+  private static boolean isBadConnection(Connection connection) {
+    return badConnections.containsKey(connection);
+  }
+
+  private static <CLIENT, T> boolean handleError(Connection connection, AtomicReference<Blur.Client> client,
+      AtomicInteger retries, AbstractCommand<CLIENT, T> command, Exception e, int maxRetries, long backOffTime,
+      long maxBackOffTime) {
+    if (client.get() != null) {
+      trashConnections(connection, client);
+      markBadConnection(connection);
+      client.set(null);
+    }
+    if (retries.get() > maxRetries) {
+      LOG.error("No more retries [{0}] out of [{1}]", retries, maxRetries);
+      return true;
+    }
+    LOG.error("Retrying call [{0}] retry [{1}] out of [{2}] message [{3}]", command, retries.get(), maxRetries,
+        e.getMessage());
+    sleep(backOffTime, maxBackOffTime, retries.get(), maxRetries);
+    retries.incrementAndGet();
+    return false;
+  }
+
+  public static void sleep(long backOffTime, long maxBackOffTime, int retry, int maxRetries) {
+    long extra = (maxBackOffTime - backOffTime) / maxRetries;
+    long sleep = backOffTime + (extra * retry);
+    LOG.info("Backing off call for [{0} ms]", sleep);
+    try {
+      Thread.sleep(sleep);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static <CLIENT, T> T execute(String connectionStr, AbstractCommand<CLIENT, T> command, int maxRetries,
+      long backOffTime, long maxBackOffTime) throws BlurException, TException, IOException {
+    return execute(getConnections(connectionStr), command, maxRetries, backOffTime, maxBackOffTime);
+  }
+
+  public static List<Connection> getConnections(String connectionStr) {
+    int start = 0;
+    int index = connectionStr.indexOf(',');
+    if (index >= 0) {
+      List<Connection> connections = new ArrayList<Connection>();
+      while (index >= 0) {
+        connections.add(new Connection(connectionStr.substring(start, index)));
+        start = index + 1;
+        index = connectionStr.indexOf(',', start);
+      }
+      connections.add(new Connection(connectionStr.substring(start)));
+      return connections;
+    }
+    return Arrays.asList(new Connection(connectionStr));
+  }
+
+  public static <CLIENT, T> T execute(String connectionStr, AbstractCommand<CLIENT, T> command) throws BlurException,
+      TException, IOException {
+    return execute(getConnections(connectionStr), command);
+  }
+
+  public static void returnClient(Connection connection, AtomicReference<Blur.Client> client) {
+    returnClient(connection, client.get());
+  }
+
+  public static void returnClient(Connection connection, Blur.Client client) {
+    try {
+      clientPool.get(connection).put(client);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void trashConnections(Connection connection, AtomicReference<Client> c) {
+    BlockingQueue<Client> blockingQueue;
+    synchronized (clientPool) {
+      blockingQueue = clientPool.put(connection, new LinkedBlockingQueue<Client>());
+      try {
+        blockingQueue.put(c.get());
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    LOG.info("Trashing client for connections [{0}]", connection);
+    for (Client client : blockingQueue) {
+      close(client);
+    }
+  }
+
+  public static void close(Client client) {
+    client.getInputProtocol().getTransport().close();
+    client.getOutputProtocol().getTransport().close();
+  }
+
+  private static Client getClient(Connection connection) throws TTransportException, IOException {
+    BlockingQueue<Client> blockingQueue;
+    synchronized (clientPool) {
+      blockingQueue = clientPool.get(connection);
+      if (blockingQueue == null) {
+        blockingQueue = new LinkedBlockingQueue<Client>();
+        clientPool.put(connection, blockingQueue);
+      }
+    }
+    if (blockingQueue.isEmpty()) {
+      return newClient(connection);
+    }
+    try {
+      return blockingQueue.take();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Client newClient(Connection connection) throws TTransportException, IOException {
+    String host = connection.getHost();
+    int port = connection.getPort();
+    TSocket trans;
+    Socket socket;
+    if (connection.isProxy()) {
+      Proxy proxy = new Proxy(Type.SOCKS, new InetSocketAddress(connection.getProxyHost(), connection.getProxyPort()));
+      socket = new Socket(proxy);
+    } else {
+      socket = new Socket();
+    }
+    socket.setTcpNoDelay(true);
+    socket.connect(new InetSocketAddress(host, port));
+    trans = new TSocket(socket);
+
+    TProtocol proto = new TBinaryProtocol(new TFramedTransport(trans));
+    Client client = new Client(proto);
+    return client;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java b/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
new file mode 100644
index 0000000..e9ce24b
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
@@ -0,0 +1,130 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class Connection {
+
+  private String _host = null;
+  private int _port = -1;
+  private String _proxyHost = null;
+  private int _proxyPort = -1;
+  private boolean _proxy = false;
+
+  public Connection(String connectionStr) {
+    int index = connectionStr.indexOf(':');
+    if (index >= 0) {
+      int slashIndex = connectionStr.indexOf('/');
+      if (slashIndex > 0) {
+        _host = connectionStr.substring(0, index);
+        _port = Integer.parseInt(connectionStr.substring(index + 1, slashIndex));
+        int indexOfProxyPort = connectionStr.indexOf(':', slashIndex);
+        _proxyHost = connectionStr.substring(slashIndex + 1, indexOfProxyPort);
+        _proxyPort = Integer.parseInt(connectionStr.substring(indexOfProxyPort + 1));
+      } else {
+        _host = connectionStr.substring(0, index);
+        _port = Integer.parseInt(connectionStr.substring(index + 1));
+      }
+    } else {
+      throw new RuntimeException("Connection string of [" + connectionStr + "] does not match 'host1:port' or 'host1:port/proxyhost1:proxyport'");
+    }
+  }
+
+  public Connection(String host, int port, String proxyHost, int proxyPort) {
+    _port = port;
+    _host = host;
+    _proxyHost = proxyHost;
+    _proxyPort = proxyPort;
+    _proxy = true;
+  }
+
+  public Connection(String host, int port) {
+    _port = port;
+    _host = host;
+  }
+
+  public String getHost() {
+    return _host;
+  }
+
+  public int getPort() {
+    return _port;
+  }
+
+  public boolean isProxy() {
+    return _proxy;
+  }
+
+  public int getProxyPort() {
+    return _proxyPort;
+  }
+
+  public String getProxyHost() {
+    return _proxyHost;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_host == null) ? 0 : _host.hashCode());
+    result = prime * result + _port;
+    result = prime * result + (_proxy ? 1231 : 1237);
+    result = prime * result + ((_proxyHost == null) ? 0 : _proxyHost.hashCode());
+    result = prime * result + _proxyPort;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Connection other = (Connection) obj;
+    if (_host == null) {
+      if (other._host != null)
+        return false;
+    } else if (!_host.equals(other._host))
+      return false;
+    if (_port != other._port)
+      return false;
+    if (_proxy != other._proxy)
+      return false;
+    if (_proxyHost == null) {
+      if (other._proxyHost != null)
+        return false;
+    } else if (!_proxyHost.equals(other._proxyHost))
+      return false;
+    if (_proxyPort != other._proxyPort)
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "Connection [_host=" + _host + ", _port=" + _port + ", _proxy=" + _proxy + ", _proxyHost=" + _proxyHost + ", _proxyPort=" + _proxyPort + "]";
+  }
+
+  public Object getConnectionStr() {
+    if (_proxyHost != null) {
+      return _host + ":" + _port + "/" + _proxyHost + ":" + _proxyPort;
+    }
+    return _host + ":" + _port;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/commands/BlurCommand.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/commands/BlurCommand.java b/blur-thrift/src/main/java/org/apache/blur/thrift/commands/BlurCommand.java
new file mode 100644
index 0000000..72f1f5f
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/commands/BlurCommand.java
@@ -0,0 +1,29 @@
+package org.apache.blur.thrift.commands;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.blur.thrift.AbstractCommand;
+import org.apache.blur.thrift.generated.Blur;
+
+public abstract class BlurCommand<T> extends AbstractCommand<Blur.Client, T> {
+
+  @Override
+  public BlurCommand<T> clone() {
+    return (BlurCommand<T>) super.clone();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-thrift/src/main/java/org/apache/blur/thrift/generated/AlternateColumnDefinition.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/generated/AlternateColumnDefinition.java b/blur-thrift/src/main/java/org/apache/blur/thrift/generated/AlternateColumnDefinition.java
new file mode 100644
index 0000000..d470dd1
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/generated/AlternateColumnDefinition.java
@@ -0,0 +1,420 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.blur.thrift.generated;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+import org.apache.blur.thirdparty.thrift_0_9_0.scheme.IScheme;
+import org.apache.blur.thirdparty.thrift_0_9_0.scheme.SchemeFactory;
+import org.apache.blur.thirdparty.thrift_0_9_0.scheme.StandardScheme;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.scheme.TupleScheme;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TTupleProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolException;
+import org.apache.blur.thirdparty.thrift_0_9_0.EncodingUtils;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class AlternateColumnDefinition implements org.apache.blur.thirdparty.thrift_0_9_0.TBase<AlternateColumnDefinition, AlternateColumnDefinition._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.blur.thirdparty.thrift_0_9_0.protocol.TStruct STRUCT_DESC = new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TStruct("AlternateColumnDefinition");
+
+  private static final org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField ANALYZER_CLASS_NAME_FIELD_DESC = new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField("analyzerClassName", org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.STRING, (short)1);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new AlternateColumnDefinitionStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new AlternateColumnDefinitionTupleSchemeFactory());
+  }
+
+  /**
+   * 
+   */
+  public String analyzerClassName; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.blur.thirdparty.thrift_0_9_0.TFieldIdEnum {
+    /**
+     * 
+     */
+    ANALYZER_CLASS_NAME((short)1, "analyzerClassName");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // ANALYZER_CLASS_NAME
+          return ANALYZER_CLASS_NAME;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final Map<_Fields, org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.ANALYZER_CLASS_NAME, new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData("analyzerClassName", org.apache.blur.thirdparty.thrift_0_9_0.TFieldRequirementType.DEFAULT, 
+        new org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldValueMetaData(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.blur.thirdparty.thrift_0_9_0.meta_data.FieldMetaData.addStructMetaDataMap(AlternateColumnDefinition.class, metaDataMap);
+  }
+
+  public AlternateColumnDefinition() {
+  }
+
+  public AlternateColumnDefinition(
+    String analyzerClassName)
+  {
+    this();
+    this.analyzerClassName = analyzerClassName;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public AlternateColumnDefinition(AlternateColumnDefinition other) {
+    if (other.isSetAnalyzerClassName()) {
+      this.analyzerClassName = other.analyzerClassName;
+    }
+  }
+
+  public AlternateColumnDefinition deepCopy() {
+    return new AlternateColumnDefinition(this);
+  }
+
+  @Override
+  public void clear() {
+    this.analyzerClassName = null;
+  }
+
+  /**
+   * 
+   */
+  public String getAnalyzerClassName() {
+    return this.analyzerClassName;
+  }
+
+  /**
+   * 
+   */
+  public AlternateColumnDefinition setAnalyzerClassName(String analyzerClassName) {
+    this.analyzerClassName = analyzerClassName;
+    return this;
+  }
+
+  public void unsetAnalyzerClassName() {
+    this.analyzerClassName = null;
+  }
+
+  /** Returns true if field analyzerClassName is set (has been assigned a value) and false otherwise */
+  public boolean isSetAnalyzerClassName() {
+    return this.analyzerClassName != null;
+  }
+
+  public void setAnalyzerClassNameIsSet(boolean value) {
+    if (!value) {
+      this.analyzerClassName = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case ANALYZER_CLASS_NAME:
+      if (value == null) {
+        unsetAnalyzerClassName();
+      } else {
+        setAnalyzerClassName((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case ANALYZER_CLASS_NAME:
+      return getAnalyzerClassName();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case ANALYZER_CLASS_NAME:
+      return isSetAnalyzerClassName();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof AlternateColumnDefinition)
+      return this.equals((AlternateColumnDefinition)that);
+    return false;
+  }
+
+  public boolean equals(AlternateColumnDefinition that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_analyzerClassName = true && this.isSetAnalyzerClassName();
+    boolean that_present_analyzerClassName = true && that.isSetAnalyzerClassName();
+    if (this_present_analyzerClassName || that_present_analyzerClassName) {
+      if (!(this_present_analyzerClassName && that_present_analyzerClassName))
+        return false;
+      if (!this.analyzerClassName.equals(that.analyzerClassName))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public int compareTo(AlternateColumnDefinition other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    AlternateColumnDefinition typedOther = (AlternateColumnDefinition)other;
+
+    lastComparison = Boolean.valueOf(isSetAnalyzerClassName()).compareTo(typedOther.isSetAnalyzerClassName());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetAnalyzerClassName()) {
+      lastComparison = org.apache.blur.thirdparty.thrift_0_9_0.TBaseHelper.compareTo(this.analyzerClassName, typedOther.analyzerClassName);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol iprot) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol oprot) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("AlternateColumnDefinition(");
+    boolean first = true;
+
+    sb.append("analyzerClassName:");
+    if (this.analyzerClassName == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.analyzerClassName);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TCompactProtocol(new org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport(out)));
+    } catch (org.apache.blur.thirdparty.thrift_0_9_0.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.blur.thirdparty.thrift_0_9_0.protocol.TCompactProtocol(new org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport(in)));
+    } catch (org.apache.blur.thirdparty.thrift_0_9_0.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class AlternateColumnDefinitionStandardSchemeFactory implements SchemeFactory {
+    public AlternateColumnDefinitionStandardScheme getScheme() {
+      return new AlternateColumnDefinitionStandardScheme();
+    }
+  }
+
+  private static class AlternateColumnDefinitionStandardScheme extends StandardScheme<AlternateColumnDefinition> {
+
+    public void read(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol iprot, AlternateColumnDefinition struct) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+      org.apache.blur.thirdparty.thrift_0_9_0.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // ANALYZER_CLASS_NAME
+            if (schemeField.type == org.apache.blur.thirdparty.thrift_0_9_0.protocol.TType.STRING) {
+              struct.analyzerClassName = iprot.readString();
+              struct.setAnalyzerClassNameIsSet(true);
+            } else { 
+              org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol oprot, AlternateColumnDefinition struct) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.analyzerClassName != null) {
+        oprot.writeFieldBegin(ANALYZER_CLASS_NAME_FIELD_DESC);
+        oprot.writeString(struct.analyzerClassName);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class AlternateColumnDefinitionTupleSchemeFactory implements SchemeFactory {
+    public AlternateColumnDefinitionTupleScheme getScheme() {
+      return new AlternateColumnDefinitionTupleScheme();
+    }
+  }
+
+  private static class AlternateColumnDefinitionTupleScheme extends TupleScheme<AlternateColumnDefinition> {
+
+    @Override
+    public void write(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol prot, AlternateColumnDefinition struct) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      BitSet optionals = new BitSet();
+      if (struct.isSetAnalyzerClassName()) {
+        optionals.set(0);
+      }
+      oprot.writeBitSet(optionals, 1);
+      if (struct.isSetAnalyzerClassName()) {
+        oprot.writeString(struct.analyzerClassName);
+      }
+    }
+
+    @Override
+    public void read(org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol prot, AlternateColumnDefinition struct) throws org.apache.blur.thirdparty.thrift_0_9_0.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      BitSet incoming = iprot.readBitSet(1);
+      if (incoming.get(0)) {
+        struct.analyzerClassName = iprot.readString();
+        struct.setAnalyzerClassNameIsSet(true);
+      }
+    }
+  }
+
+}
+


Mime
View raw message