incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/3] Moving classes around from one package to another and extracted the commands from blur-core and made a new project called blur-command. There is a META-INF folder that contains a file to tell the blur server to load the commands.
Date Tue, 09 Sep 2014 22:07:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/ObjectArrayPacking.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ObjectArrayPacking.java b/blur-core/src/main/java/org/apache/blur/command/ObjectArrayPacking.java
new file mode 100644
index 0000000..7fa263d
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ObjectArrayPacking.java
@@ -0,0 +1,165 @@
+package org.apache.blur.command;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurObjectType;
+import org.apache.blur.thrift.generated.BlurPackedObject;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ObjectArrayPacking {
+
+  public static void main(String[] args) throws BlurException {
+    BlurObject object = newBlurObject();
+    System.out.println(object.toString(1));
+
+    List<BlurPackedObject> packedVersion = pack(object);
+
+    int index = 0;
+    for (BlurPackedObject packedObject : packedVersion) {
+      System.out.println(index + " " + packedObject);
+      index++;
+    }
+
+    BlurObject object2 = (BlurObject) unpack(packedVersion);
+    System.out.println(object2.toString(1));
+    System.out.println(object2.toString());
+  }
+
+  public static List<BlurPackedObject> pack(Object object) throws BlurException {
+    List<BlurPackedObject> packed = new ArrayList<BlurPackedObject>();
+    pack(-1, object, packed);
+    return packed;
+  }
+
+  public static Object unpack(List<BlurPackedObject> packedVersion) {
+    int size = packedVersion.size();
+    Object[] objects = new Object[size];
+    for (int i = 0; i < size; i++) {
+      BlurPackedObject packedObject = packedVersion.get(i);
+      switch (packedObject.type) {
+      case MAP:
+        objects[i] = new BlurObject();
+        break;
+      case LIST:
+        objects[i] = new BlurArray();
+        break;
+      case VALUE:
+        objects[i] = CommandUtil.toObject(packedObject.value);
+        break;
+      case NAME:
+        objects[i] = CommandUtil.toObject(packedObject.value);
+        break;
+      default:
+        throw new RuntimeException();
+      }
+    }
+
+    for (int i = 0; i < size; i++) {
+      BlurPackedObject packedObject = packedVersion.get(i);
+      switch (packedObject.type) {
+      case NAME:
+        break;
+      case MAP:
+      case LIST:
+      case VALUE:
+        addValue(i, objects, packedVersion);
+        break;
+      default:
+        throw new RuntimeException();
+      }
+    }
+    return objects[0];
+  }
+
+  private static void addValue(int index, Object[] objects, List<BlurPackedObject> packedVersion) {
+    BlurPackedObject packedObject = packedVersion.get(index);
+    int parentId = packedObject.parentId;
+    if (parentId == -1) {
+      // root
+      return;
+    }
+    Object value = objects[index];
+    BlurPackedObject po = packedVersion.get(parentId);
+    if (po.type == BlurObjectType.NAME) {
+      BlurObject map = (BlurObject) objects[po.parentId];
+      String key = (String) CommandUtil.toObject(po.value);
+      map.put(key, value);
+    } else if (po.type == BlurObjectType.LIST) {
+      BlurArray array = (BlurArray) objects[parentId];
+      array.put(value);
+    } else {
+      throw new RuntimeException();
+    }
+  }
+
+  private static void pack(int parentId, Object object, List<BlurPackedObject> packed) throws BlurException {
+    if (object instanceof BlurObject) {
+      int id = packed.size();
+      BlurPackedObject packedObject = new BlurPackedObject(parentId, BlurObjectType.MAP, null);
+      packed.add(packedObject);
+      BlurObject blurObject = (BlurObject) object;
+      Iterator<String> keys = blurObject.keys();
+      while (keys.hasNext()) {
+        String key = keys.next();
+        Object o = blurObject.getObject(key);
+        BlurPackedObject po = new BlurPackedObject(id, BlurObjectType.NAME, CommandUtil.toValue(key));
+        int newId = packed.size();
+        packed.add(po);
+        pack(newId, o, packed);
+      }
+    } else if (object instanceof BlurArray) {
+      int id = packed.size();
+      BlurPackedObject packedObject = new BlurPackedObject(parentId, BlurObjectType.LIST, null);
+      packed.add(packedObject);
+      BlurArray array = (BlurArray) object;
+      int length = array.length();
+      for (int i = 0; i < length; i++) {
+        Object o = array.getObject(i);
+        pack(id, o, packed);
+      }
+    } else {
+      packed.add(new BlurPackedObject(parentId, BlurObjectType.VALUE, CommandUtil.toValue(object)));
+    }
+  }
+
+  private static BlurObject newBlurObject() {
+    BlurObject jsonObject = new BlurObject();
+    BlurObject node1 = new BlurObject();
+    node1.accumulate("f1", "v1");
+    node1.accumulate("f2", "v2a");
+    node1.accumulate("f2", "v2b");
+    jsonObject.accumulate("node1", node1);
+    BlurArray node2 = new BlurArray();
+    node2.put("val1");
+    node2.put("val2");
+    node2.put("val3");
+    jsonObject.put("node2", node2);
+
+    BlurObject node3 = new BlurObject();
+    BlurObject node3n1 = new BlurObject();
+    node3n1.put("a", "b");
+    node3.put("n1", node3n1);
+    jsonObject.put("node3", node3);
+    return jsonObject;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/Response.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/Response.java b/blur-core/src/main/java/org/apache/blur/command/Response.java
new file mode 100644
index 0000000..d88dfe4
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/Response.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.util.Map;
+
+public class Response {
+
+  private final Map<Shard, Object> _shardResults;
+  private final Map<Server, Object> _serverResults;
+  private final Object _serverResult;
+  private final boolean _aggregatedResults;
+
+  private Response(Map<Shard, Object> shardResults, Object serverResult, Map<Server, Object> serverResults,
+      boolean aggregatedResults) {
+    _shardResults = shardResults;
+    _serverResult = serverResult;
+    _aggregatedResults = aggregatedResults;
+    _serverResults = serverResults;
+  }
+
+  public boolean isAggregatedResults() {
+    return _aggregatedResults;
+  }
+
+  public Map<Shard, Object> getShardResults() {
+    return _shardResults;
+  }
+
+  public Object getServerResult() {
+    return _serverResult;
+  }
+
+  public Map<Server, Object> getServerResults() {
+    return _serverResults;
+  }
+
+  public static Response createNewAggregateResponse(Object object) {
+    return new Response(null, object, null, true);
+  }
+
+  public static Response createNewShardResponse(Map<Shard, Object> map) {
+    return new Response(map, null, null, false);
+  }
+
+  public static Response createNewServerResponse(Map<Server, Object> result) {
+    return new Response(null, null, result, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/Server.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/Server.java b/blur-core/src/main/java/org/apache/blur/command/Server.java
new file mode 100644
index 0000000..d559373
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/Server.java
@@ -0,0 +1,70 @@
+package org.apache.blur.command;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class Server implements Comparable<Server> {
+
+  private final String _server;
+
+  public Server(String server) {
+    _server = server;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_server == null) ? 0 : _server.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Server other = (Server) obj;
+    if (_server == null) {
+      if (other._server != null)
+        return false;
+    } else if (!_server.equals(other._server))
+      return false;
+    return true;
+  }
+
+  public String getServer() {
+    return _server;
+  }
+
+  @Override
+  public int compareTo(Server o) {
+    if (o == null) {
+      return -1;
+    }
+    return _server.compareTo(o._server);
+  }
+
+  @Override
+  public String toString() {
+    return "Server [server=" + _server + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/Shard.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/Shard.java b/blur-core/src/main/java/org/apache/blur/command/Shard.java
new file mode 100644
index 0000000..421fde4
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/Shard.java
@@ -0,0 +1,70 @@
+package org.apache.blur.command;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class Shard implements Comparable<Shard> {
+
+  private final String _shard;
+
+  public Shard(String shard) {
+    _shard = shard;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_shard == null) ? 0 : _shard.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Shard other = (Shard) obj;
+    if (_shard == null) {
+      if (other._shard != null)
+        return false;
+    } else if (!_shard.equals(other._shard))
+      return false;
+    return true;
+  }
+
+  public String getShard() {
+    return _shard;
+  }
+
+  @Override
+  public int compareTo(Shard o) {
+    if (o == null) {
+      return -1;
+    }
+    return _shard.compareTo(o._shard);
+  }
+
+  @Override
+  public String toString() {
+    return "Shard [shard=" + _shard + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
new file mode 100644
index 0000000..3a16ff6
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.manager.IndexServer;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.ShardServerContext;
+import org.apache.blur.server.TableContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+
+public class ShardCommandManager extends BaseCommandManager {
+
+  private final IndexServer _indexServer;
+
+  public ShardCommandManager(IndexServer indexServer, int threadCount, long connectionTimeout) throws IOException {
+    super(threadCount, connectionTimeout);
+    _indexServer = indexServer;
+  }
+
+  public Response execute(final TableContext tableContext, final String commandName, final Args args)
+      throws IOException, TimeoutException {
+    final ShardServerContext shardServerContext = ShardServerContext.getShardServerContext();
+    Callable<Response> callable = new Callable<Response>() {
+      @Override
+      public Response call() throws Exception {
+        Command command = getCommandObject(commandName);
+        if (command == null) {
+          throw new IOException("Command with name [" + commandName + "] not found.");
+        }
+        if (command instanceof IndexReadCommand || command instanceof IndexReadCombiningCommand) {
+          return toResponse(executeReadCommand(shardServerContext, command, tableContext, args), command);
+        } else if (command instanceof IndexWriteCommand) {
+          return toResponse(executeReadWriteCommand(shardServerContext, command, tableContext, args), command);
+        }
+        throw new IOException("Command type of [" + command.getClass() + "] not supported.");
+      }
+    };
+    return submitCallable(callable);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Response toResponse(Map<Shard, Object> results, Command command) throws IOException {
+    if (command instanceof IndexReadCombiningCommand) {
+      IndexReadCombiningCommand<Object, Object> primitiveCommandAggregator = (IndexReadCombiningCommand<Object, Object>) command;
+      Object object = primitiveCommandAggregator.combine(results);
+      return Response.createNewAggregateResponse(object);
+    }
+    return Response.createNewShardResponse(results);
+  }
+
+  private Map<Shard, Object> executeReadWriteCommand(ShardServerContext shardServerContext, Command command,
+      TableContext tableContext, Args args) {
+    return null;
+  }
+
+  private Map<Shard, Object> executeReadCommand(ShardServerContext shardServerContext, Command command,
+      final TableContext tableContext, final Args args) throws IOException {
+    Map<String, BlurIndex> indexes = _indexServer.getIndexes(tableContext.getTable());
+    Map<String, Future<?>> futureMap = new HashMap<String, Future<?>>();
+    for (Entry<String, BlurIndex> e : indexes.entrySet()) {
+      String shardId = e.getKey();
+      final Shard shard = new Shard(shardId);
+      final BlurIndex blurIndex = e.getValue();
+      Callable<Object> callable;
+      if (command instanceof IndexReadCommand) {
+        final IndexReadCommand<?> readCommand = (IndexReadCommand<?>) command.clone();
+        callable = getCallable(shardServerContext, tableContext, args, shard, blurIndex, readCommand);
+      } else if (command instanceof IndexReadCombiningCommand) {
+        final IndexReadCombiningCommand<?, ?> readCombiningCommand = (IndexReadCombiningCommand<?, ?>) command.clone();
+        callable = getCallable(shardServerContext, tableContext, args, shard, blurIndex, readCombiningCommand);
+      } else {
+        throw new IOException("Command type of [" + command.getClass() + "] not supported.");
+      }
+      Future<Object> future = _executorService.submit(callable);
+      futureMap.put(shardId, future);
+    }
+    Map<Shard, Object> resultMap = new HashMap<Shard, Object>();
+    for (Entry<String, Future<?>> e : futureMap.entrySet()) {
+      Future<?> future = e.getValue();
+      Object object;
+      try {
+        object = future.get();
+      } catch (InterruptedException ex) {
+        throw new IOException(ex);
+      } catch (ExecutionException ex) {
+        throw new IOException(ex.getCause());
+      }
+      resultMap.put(new Shard(e.getKey()), object);
+    }
+    return resultMap;
+  }
+
+  private Callable<Object> getCallable(final ShardServerContext shardServerContext, final TableContext tableContext,
+      final Args args, final Shard shard, final BlurIndex blurIndex,
+      final IndexReadCombiningCommand<?, ?> readCombiningCommand) {
+    return new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        String table = tableContext.getTable();
+        String shardId = shard.getShard();
+        IndexSearcherClosable searcher = shardServerContext.getIndexSearcherClosable(table, shardId);
+        if (searcher == null) {
+          searcher = blurIndex.getIndexSearcher();
+          shardServerContext.setIndexSearcherClosable(table, shardId, searcher);
+        }
+        return readCombiningCommand.execute(new ShardIndexContext(tableContext, shard, searcher, args));
+      }
+    };
+  }
+
+  private Callable<Object> getCallable(final ShardServerContext shardServerContext, final TableContext tableContext,
+      final Args args, final Shard shard, final BlurIndex blurIndex, final IndexReadCommand<?> readCommand) {
+    return new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        String table = tableContext.getTable();
+        String shardId = shard.getShard();
+        IndexSearcherClosable searcher = shardServerContext.getIndexSearcherClosable(table, shardId);
+        if (searcher == null) {
+          searcher = blurIndex.getIndexSearcher();
+          shardServerContext.setIndexSearcherClosable(table, shardId, searcher);
+        }
+        return readCommand.execute(new ShardIndexContext(tableContext, shard, searcher, args));
+      }
+    };
+  }
+
+  static class ShardIndexContext extends IndexContext {
+
+    private final TableContext _tableContext;
+    private final Shard _shard;
+    private final IndexSearcher _searcher;
+    private final Args _args;
+
+    public ShardIndexContext(TableContext tableContext, Shard shard, IndexSearcher searcher, Args args) {
+      _tableContext = tableContext;
+      _shard = shard;
+      _searcher = searcher;
+      _args = args;
+    }
+
+    @Override
+    public Args getArgs() {
+      return _args;
+    }
+
+    @Override
+    public IndexReader getIndexReader() {
+      return getIndexSearcher().getIndexReader();
+    }
+
+    @Override
+    public IndexSearcher getIndexSearcher() {
+      return _searcher;
+    }
+
+    @Override
+    public TableContext getTableContext() {
+      return _tableContext;
+    }
+
+    @Override
+    public Shard getShard() {
+      return _shard;
+    }
+
+    @Override
+    public BlurConfiguration getBlurConfiguration() {
+      return _tableContext.getBlurConfiguration();
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return _tableContext.getConfiguration();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/ShardResultFuture.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ShardResultFuture.java b/blur-core/src/main/java/org/apache/blur/command/ShardResultFuture.java
new file mode 100644
index 0000000..56e5c43
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ShardResultFuture.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class ShardResultFuture<T> implements Future<T> {
+
+  private final Shard _shard;
+  private final Future<Map<Shard, T>> _future;
+
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    return _future.cancel(mayInterruptIfRunning);
+  }
+
+  public boolean isCancelled() {
+    return _future.isCancelled();
+  }
+
+  public boolean isDone() {
+    return _future.isDone();
+  }
+
+  public T get() throws InterruptedException, ExecutionException {
+    Map<Shard, T> map = _future.get();
+    if (map == null) {
+      return null;
+    }
+    return map.get(_shard);
+  }
+
+  public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+      TimeoutException {
+    Map<Shard, T> map = _future.get(timeout, unit);
+    if (map == null) {
+      return null;
+    }
+    return map.get(_shard);
+  }
+
+  public ShardResultFuture(Shard shard, Future<Map<Shard, T>> future) {
+    _shard = shard;
+    _future = future;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/TimeoutException.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/TimeoutException.java b/blur-core/src/main/java/org/apache/blur/command/TimeoutException.java
new file mode 100644
index 0000000..b7b4530
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/TimeoutException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+@SuppressWarnings("serial")
+public class TimeoutException extends Exception {
+
+  private final String _executionId;
+
+  public TimeoutException(String executionId) {
+    _executionId = executionId;
+  }
+
+  public String getExecutionId() {
+    return _executionId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/annotation/Argument.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/annotation/Argument.java b/blur-core/src/main/java/org/apache/blur/command/annotation/Argument.java
new file mode 100644
index 0000000..0f68e1c
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/annotation/Argument.java
@@ -0,0 +1,28 @@
+package org.apache.blur.command.annotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Argument {
+  String name();
+
+  String value();
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/annotation/Arguments.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/annotation/Arguments.java b/blur-core/src/main/java/org/apache/blur/command/annotation/Arguments.java
new file mode 100644
index 0000000..35fa7de
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/annotation/Arguments.java
@@ -0,0 +1,26 @@
+package org.apache.blur.command.annotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Arguments {
+  Argument[] value();
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/Args.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/Args.java b/blur-core/src/main/java/org/apache/blur/manager/command/Args.java
deleted file mode 100644
index d851827..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/Args.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.command;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class Args {
-
-  private final Map<String, Object> _values = new HashMap<String, Object>();
-
-  @SuppressWarnings("unchecked")
-  public <T> T get(String name) {
-    return (T) _values.get(name);
-  }
-
-  public <T> T get(String name, T defaultValue) {
-    T t = get(name);
-    if (t == null) {
-      return defaultValue;
-    }
-    return t;
-  }
-
-  public <T> void set(String name, T value) {
-    _values.put(name, value);
-  }
-
-  public Map<String, Object> getValues() {
-    return _values;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
deleted file mode 100644
index eb1cbcc..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
+++ /dev/null
@@ -1,126 +0,0 @@
-package org.apache.blur.manager.command;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.blur.concurrent.Executors;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.manager.command.cmds.BaseCommand;
-import org.apache.blur.manager.command.cmds.DocumentCount;
-import org.apache.blur.manager.command.cmds.DocumentCountCombiner;
-import org.apache.blur.manager.command.cmds.DocumentCountNoCombine;
-import org.apache.blur.manager.command.cmds.TestBlurObjectCommand;
-import org.apache.blur.manager.command.cmds.WaitForSeconds;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-public class BaseCommandManager implements Closeable {
-
-  private final static Log LOG = LogFactory.getLog(BaseCommandManager.class);
-
-  protected final ExecutorService _executorService;
-  protected final Map<String, BaseCommand> _command = new ConcurrentHashMap<String, BaseCommand>();
-  protected final Map<Class<? extends BaseCommand>, String> _commandNameLookup = new ConcurrentHashMap<Class<? extends BaseCommand>, String>();
-  protected final ExecutorService _executorServiceDriver;
-  protected final ConcurrentHashMap<String, Future<Response>> _runningMap = new ConcurrentHashMap<String, Future<Response>>();
-  protected final long _connectionTimeout;
-
-  public BaseCommandManager(int threadCount, long connectionTimeout) throws IOException {
-    register(DocumentCount.class);
-    register(DocumentCountNoCombine.class);
-    register(DocumentCountCombiner.class);
-    register(TestBlurObjectCommand.class);
-    register(WaitForSeconds.class);
-    _executorService = Executors.newThreadPool("command-", threadCount);
-    _executorServiceDriver = Executors.newThreadPool("command-driver-", threadCount);
-    _connectionTimeout = connectionTimeout / 2;
-  }
-
-  public Response reconnect(String executionId) throws IOException, TimeoutException {
-    Future<Response> future = _runningMap.get(executionId);
-    if (future == null) {
-      throw new IOException("Command id [" + executionId + "] did not find any executing commands.");
-    }
-    try {
-      return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
-    } catch (CancellationException e) {
-      throw new IOException(e);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } catch (ExecutionException e) {
-      throw new IOException(e.getCause());
-    } catch (java.util.concurrent.TimeoutException e) {
-      LOG.info("Timeout of command [{0}]", executionId);
-      throw new TimeoutException(executionId);
-    }
-  }
-
-  protected Response submitCallable(Callable<Response> callable) throws IOException, TimeoutException {
-    String executionId = UUID.randomUUID().toString();
-    Future<Response> future = _executorServiceDriver.submit(callable);
-    _runningMap.put(executionId, future);
-    try {
-      return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
-    } catch (CancellationException e) {
-      throw new IOException(e);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } catch (ExecutionException e) {
-      throw new IOException(e.getCause());
-    } catch (java.util.concurrent.TimeoutException e) {
-      LOG.info("Timeout of command [{0}]", executionId);
-      throw new TimeoutException(executionId);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    _executorService.shutdownNow();
-    _executorServiceDriver.shutdownNow();
-  }
-
-  public void register(Class<? extends BaseCommand> commandClass) throws IOException {
-    try {
-      BaseCommand command = commandClass.newInstance();
-      _command.put(command.getName(), command);
-      _commandNameLookup.put(commandClass, command.getName());
-    } catch (InstantiationException e) {
-      throw new IOException(e);
-    } catch (IllegalAccessException e) {
-      throw new IOException(e);
-    }
-  }
-
-  protected BaseCommand getCommandObject(String commandName) {
-    return _command.get(commandName);
-  }
-
-  protected String getCommandName(Class<? extends BaseCommand> clazz) {
-    return _commandNameLookup.get(clazz);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/BlurArray.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/BlurArray.java b/blur-core/src/main/java/org/apache/blur/manager/command/BlurArray.java
deleted file mode 100644
index 93f30d3..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/BlurArray.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.command;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class BlurArray {
-
-  private List<Object> _values = new ArrayList<Object>();
-
-  public BlurArray() {
-
-  }
-
-  public BlurArray(BlurArray array) {
-    _values.addAll(array._values);
-  }
-
-  public void clear() {
-    _values.clear();
-  }
-
-  public String getString(int index) {
-    return (String) getObject(index);
-  }
-
-  public void put(String value) {
-    put((Object) value);
-  }
-
-  public void put(int index, String value) {
-    put(index, (Object) value);
-  }
-
-  public Integer getInteger(int index) {
-    return (Integer) getObject(index);
-  }
-
-  public void put(Integer value) {
-    put((Object) value);
-  }
-
-  public void put(int index, Integer value) {
-    put(index, (Object) value);
-  }
-
-  public Short getShort(int index) {
-    return (Short) getObject(index);
-  }
-
-  public void put(Short value) {
-    put((Object) value);
-  }
-
-  public void put(int index, Short value) {
-    put(index, (Object) value);
-  }
-
-  public Long getLong(int index) {
-    return (Long) getObject(index);
-  }
-
-  public void put(Long value) {
-    put((Object) value);
-  }
-
-  public void put(int index, Long value) {
-    put(index, (Object) value);
-  }
-
-  public Double getDouble(int index) {
-    return (Double) getObject(index);
-  }
-
-  public void put(Double value) {
-    put((Object) value);
-  }
-
-  public void put(int index, Double value) {
-    put(index, (Object) value);
-  }
-
-  public Float getFloat(int index) {
-    return (Float) getObject(index);
-  }
-
-  public void put(Float value) {
-    put((Object) value);
-  }
-
-  public void put(int index, Float value) {
-    put(index, (Object) value);
-  }
-
-  public byte[] getBinary(int index) {
-    return (byte[]) getObject(index);
-  }
-
-  public void put(byte[] value) {
-    put((Object) value);
-  }
-
-  public void put(int index, byte[] value) {
-    put(index, (Object) value);
-  }
-
-  public Boolean getBoolean(int index) {
-    return (Boolean) getObject(index);
-  }
-
-  public void put(Boolean value) {
-    put((Object) value);
-  }
-
-  public void put(int index, Boolean value) {
-    put(index, (Object) value);
-  }
-
-  public BlurObject getBlurObject(int index) {
-    return (BlurObject) getObject(index);
-  }
-
-  public void put(BlurObject value) {
-    put((Object) value);
-  }
-
-  public void put(int index, BlurObject value) {
-    put(index, (Object) value);
-  }
-
-  public BlurArray getBlurArray(int index) {
-    return (BlurArray) getObject(index);
-  }
-
-  public void put(BlurArray value) {
-    put((Object) value);
-  }
-
-  public void put(int index, BlurArray value) {
-    put(index, (Object) value);
-  }
-
-  public void put(Object value) {
-    BlurObject.checkType(value);
-    _values.add(value);
-  }
-
-  public void put(int index, Object value) {
-    BlurObject.checkType(value);
-    int sizeNeeded = index + 1;
-    while (_values.size() < sizeNeeded) {
-      _values.add(null);
-    }
-    _values.set(index, value);
-  }
-
-  @Override
-  public String toString() {
-    return toString(0);
-  }
-
-  public String toString(int i) {
-    StringBuilder builder = new StringBuilder();
-    builder.append('[');
-    boolean comma = false;
-    for (Object value : _values) {
-      if (comma) {
-        builder.append(',');
-      }
-      comma = true;
-      if (i > 0) {
-        builder.append('\n');
-        for (int j = 0; j < i; j++) {
-          builder.append(' ');
-        }
-      }
-      if (value instanceof BlurObject) {
-        builder.append(((BlurObject) value).toString(i > 0 ? i + 1 : 0));
-      } else if (value instanceof BlurArray) {
-        builder.append(((BlurArray) value).toString(i > 0 ? i + 1 : 0));
-      } else {
-        builder.append(BlurObject.stringify(value));
-      }
-    }
-    if (i > 0) {
-      builder.append('\n');
-      for (int j = 0; j < i - 1; j++) {
-        builder.append(' ');
-      }
-    }
-    builder.append(']');
-    return builder.toString();
-  }
-
-  public int length() {
-    return _values.size();
-  }
-
-  public Object getObject(int i) {
-    return _values.get(i);
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> T get(int i) {
-    return (T) _values.get(i);
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((_values == null) ? 0 : _values.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    BlurArray other = (BlurArray) obj;
-    if (_values == null) {
-      if (other._values != null)
-        return false;
-    } else if (!_values.equals(other._values))
-      return false;
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/BlurObject.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/BlurObject.java b/blur-core/src/main/java/org/apache/blur/manager/command/BlurObject.java
deleted file mode 100644
index 72ed5b8..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/BlurObject.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.command;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-public class BlurObject {
-
-  private final static char[] hexArray = "0123456789ABCDEF".toCharArray();
-
-  private Map<String, Object> _valueMap = new TreeMap<String, Object>();
-
-  public BlurObject() {
-
-  }
-
-  public BlurObject(BlurObject object) {
-    _valueMap.putAll(object._valueMap);
-  }
-
-  public void accumulate(String name, String value) {
-    accumulate(name, (Object) value);
-  }
-
-  public void put(String name, String value) {
-    put(name, (Object) value);
-  }
-
-  public String getString(String name) {
-    return (String) _valueMap.get(name);
-  }
-
-  public void accumulate(String name, Integer value) {
-    accumulate(name, (Object) value);
-  }
-
-  public void put(String name, Integer value) {
-    put(name, (Object) value);
-  }
-
-  public Integer getInteger(String name) {
-    return (Integer) _valueMap.get(name);
-  }
-
-  public void accumulate(String name, Short value) {
-    accumulate(name, (Object) value);
-  }
-
-  public void put(String name, Short value) {
-    put(name, (Object) value);
-  }
-
-  public Short getShort(String name) {
-    return (Short) _valueMap.get(name);
-  }
-
-  public void accumulate(String name, Long value) {
-    accumulate(name, (Object) value);
-  }
-
-  public void put(String name, Long value) {
-    put(name, (Object) value);
-  }
-
-  public Long getLong(String name) {
-    return (Long) _valueMap.get(name);
-  }
-
-  public void accumulate(String name, Double value) {
-    accumulate(name, (Object) value);
-  }
-
-  public void put(String name, Double value) {
-    put(name, (Object) value);
-  }
-
-  public Double getDouble(String name) {
-    return (Double) _valueMap.get(name);
-  }
-
-  public void accumulate(String name, Float value) {
-    accumulate(name, (Object) value);
-  }
-
-  public void put(String name, Float value) {
-    put(name, (Object) value);
-  }
-
-  public Float getFloat(String name) {
-    return (Float) _valueMap.get(name);
-  }
-
-  public void accumulate(String name, byte[] value) {
-    accumulate(name, (Object) value);
-  }
-
-  public void put(String name, byte[] value) {
-    put(name, (Object) value);
-  }
-
-  public byte[] getBinary(String name) {
-    return (byte[]) _valueMap.get(name);
-  }
-
-  public void accumulate(String name, Boolean value) {
-    accumulate(name, (Object) value);
-  }
-
-  public void put(String name, Boolean value) {
-    put(name, (Object) value);
-  }
-
-  public Boolean getBoolean(String name) {
-    return (Boolean) _valueMap.get(name);
-  }
-
-  public void accumulate(String name, BlurObject value) {
-    accumulate(name, (Object) value);
-  }
-
-  public void put(String name, BlurObject value) {
-    put(name, (Object) value);
-  }
-
-  public BlurObject getBlurObject(String name) {
-    return (BlurObject) _valueMap.get(name);
-  }
-
-  public void accumulate(String name, BlurArray value) {
-    accumulate(name, (Object) value);
-  }
-
-  public void put(String name, BlurArray value) {
-    put(name, (Object) value);
-  }
-
-  public BlurArray getBlurArray(String name) {
-    return (BlurArray) _valueMap.get(name);
-  }
-
-  public void accumulate(String name, Object value) {
-    checkType(value);
-    Object object = _valueMap.get(name);
-    if (object == null) {
-      _valueMap.put(name, value);
-    } else {
-      if (object instanceof BlurArray) {
-        BlurArray array = (BlurArray) object;
-        array.put(value);
-      } else {
-        BlurArray array = new BlurArray();
-        array.put(object);
-        array.put(value);
-        _valueMap.put(name, array);
-      }
-    }
-  }
-
-  public static void checkType(Object value) {
-
-  }
-
-  public void put(String name, Object value) {
-    checkType(value);
-    _valueMap.put(name, value);
-  }
-
-  @Override
-  public String toString() {
-    return toString(0);
-  }
-
-  public String toString(int i) {
-    StringBuilder builder = new StringBuilder();
-    builder.append('{');
-    boolean comma = false;
-    for (Entry<String, Object> e : _valueMap.entrySet()) {
-      if (comma) {
-        builder.append(',');
-      }
-      comma = true;
-      if (i > 0) {
-        builder.append('\n');
-        for (int j = 0; j < i; j++) {
-          builder.append(' ');
-        }
-      }
-      builder.append(stringify(e.getKey()));
-      builder.append(':');
-      Object value = e.getValue();
-      if (value instanceof BlurObject) {
-        builder.append(((BlurObject) value).toString(i > 0 ? i + 1 : 0));
-      } else if (value instanceof BlurArray) {
-        builder.append(((BlurArray) value).toString(i > 0 ? i + 1 : 0));
-      } else {
-        builder.append(stringify(value));
-      }
-    }
-    if (i > 0) {
-      builder.append('\n');
-      for (int j = 0; j < i - 1; j++) {
-        builder.append(' ');
-      }
-    }
-    builder.append('}');
-    return builder.toString();
-  }
-
-  public static String toHexString(byte[] bs) {
-    char[] hexChars = new char[bs.length * 2];
-    for (int j = 0; j < bs.length; j++) {
-      int v = bs[j] & 0xFF;
-      hexChars[j * 2] = hexArray[v >>> 4];
-      hexChars[j * 2 + 1] = hexArray[v & 0x0F];
-    }
-    return new String(hexChars);
-  }
-
-  public static Object stringify(Object o) {
-    if (o instanceof Number) {
-      return o.toString();
-    } else if (o instanceof byte[]) {
-      return toHexString((byte[]) o);
-    } else if (o instanceof Boolean) {
-      return o.toString();
-    } else if (o instanceof String) {
-      return "\"" + o.toString() + "\"";
-    } else {
-      throw new RuntimeException("Cannot stringify object [" + o + "]");
-    }
-  }
-
-  public Iterator<String> keys() {
-    return _valueMap.keySet().iterator();
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> T get(String name) {
-    return (T) _valueMap.get(name);
-  }
-
-  public Object getObject(String name) {
-    return _valueMap.get(name);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
deleted file mode 100644
index 002fefe..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.blur.manager.command;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-public interface ClusterCommand<T> extends Serializable, Cloneable {
-
-  T clusterExecute(ClusterContext context) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java
deleted file mode 100644
index 1c541e5..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.apache.blur.manager.command;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.server.TableContext;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-public abstract class ClusterContext {
-
-  public abstract Args getArgs();
-
-  public abstract TableContext getTableContext();
-
-  public abstract BlurConfiguration getBlurConfiguration();
-
-  public abstract Configuration getConfiguration();
-
-  public abstract <T> Map<Shard, T> readIndexes(Args args, Class<? extends IndexReadCommand<T>> clazz)
-      throws IOException;
-
-  public abstract <T> Map<Shard, Future<T>> readIndexesAsync(Args args, Class<? extends IndexReadCommand<T>> clazz)
-      throws IOException;
-
-  public abstract <T> T readIndex(Shard shard, Args args, Class<? extends IndexReadCommand<T>> clazz)
-      throws IOException;
-
-  public abstract <T> Future<T> readIndexAsync(Shard shard, Args args, Class<? extends IndexReadCommand<T>> clazz)
-      throws IOException;
-
-  public abstract <T> Map<Server, T> readServers(Args args, Class<? extends IndexReadCombiningCommand<?, T>> clazz)
-      throws IOException;
-
-  public abstract <T> Map<Server, Future<T>> readServersAsync(Args args,
-      Class<? extends IndexReadCombiningCommand<?, T>> clazz) throws IOException;
-
-  public abstract <T> T writeIndex(Args args, Class<? extends IndexWriteCommand<T>> clazz) throws IOException;
-
-  public abstract <T> Future<T> writeIndexAsync(Args args, Class<? extends IndexWriteCommand<T>> clazz)
-      throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java b/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
deleted file mode 100644
index 3bd3309..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
+++ /dev/null
@@ -1,168 +0,0 @@
-package org.apache.blur.manager.command;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.blur.thrift.BException;
-import org.apache.blur.thrift.generated.Arguments;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.Value;
-import org.apache.blur.thrift.generated.ValueObject;
-import org.apache.blur.thrift.generated.ValueObject._Fields;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-public class CommandUtil {
-
-  public static org.apache.blur.thrift.generated.Response fromObjectToThrift(Response response) throws BlurException {
-    org.apache.blur.thrift.generated.Response converted = new org.apache.blur.thrift.generated.Response();
-    if (response.isAggregatedResults()) {
-      converted.setValue(toValueObject(response.getServerResult()));
-    } else {
-      Map<Server, Object> serverResults = response.getServerResults();
-      if (serverResults == null) {
-        Map<org.apache.blur.thrift.generated.Shard, ValueObject> fromObjectToThrift = fromObjectToThrift(response
-            .getShardResults());
-        converted.setShardToValue(fromObjectToThrift);
-      } else {
-        Map<org.apache.blur.thrift.generated.Server, ValueObject> fromObjectToThrift = fromObjectToThrift(serverResults);
-        converted.setServerToValue(fromObjectToThrift);
-      }
-    }
-    return converted;
-  }
-
-  @SuppressWarnings("unchecked")
-  public static <T, R> Map<R, ValueObject> fromObjectToThrift(Map<T, Object> map) throws BlurException {
-    Map<R, ValueObject> result = new HashMap<R, ValueObject>();
-    for (Entry<T, Object> e : map.entrySet()) {
-      T key = e.getKey();
-      if (key instanceof Shard) {
-        Shard shard = (Shard) key;
-        result.put((R) new org.apache.blur.thrift.generated.Shard(shard.getShard()), toValueObject(e.getValue()));
-      } else if (key instanceof Server) {
-        Server server = (Server) key;
-        result.put((R) new org.apache.blur.thrift.generated.Server(server.getServer()), toValueObject(e.getValue()));
-      }
-    }
-    return result;
-  }
-
-  public static Value toValue(Object o) throws BlurException {
-    Value value = new Value();
-    if (o == null) {
-      value.setNullValue(true);
-      return value;
-    }
-    if (o instanceof Long) {
-      value.setLongValue((Long) o);
-      return value;
-    } else if (o instanceof String) {
-      value.setStringValue((String) o);
-      return value;
-    } else if (o instanceof Integer) {
-      value.setIntValue((Integer) o);
-      return value;
-    } else if (o instanceof Boolean) {
-      value.setBooleanValue((Boolean) o);
-      return value;
-    } else if (o instanceof Short) {
-      value.setShortValue((Short) o);
-      return value;
-    } else if (o instanceof byte[]) {
-      value.setBinaryValue((byte[]) o);
-      return value;
-    } else if (o instanceof Double) {
-      value.setDoubleValue((Double) o);
-      return value;
-    } else if (o instanceof Float) {
-      value.setFloatValue((Float) o);
-      return value;
-    }
-    throw new BException("Object [{0}] not supported.", o);
-  }
-
-  public static ValueObject toValueObject(Object o) throws BlurException {
-    ValueObject valueObject = new ValueObject();
-    if (o == null) {
-      valueObject.setValue(toValue(o));
-    } else if (o instanceof BlurObject || o instanceof BlurArray) {
-      valueObject.setBlurObject(ObjectArrayPacking.pack(o));
-    } else {
-      valueObject.setValue(toValue(o));
-    }
-    return valueObject;
-  }
-
-  public static Args toArgs(Arguments arguments) {
-    if (arguments == null) {
-      return null;
-    }
-    Args args = new Args();
-    Map<String, ValueObject> values = arguments.getValues();
-    Set<Entry<String, ValueObject>> entrySet = values.entrySet();
-    for (Entry<String, ValueObject> e : entrySet) {
-      args.set(e.getKey(), toObject(e.getValue()));
-    }
-    return args;
-  }
-
-  public static Object toObject(Value value) {
-    if (value.isSetNullValue()) {
-      return null;
-    }
-    return value.getFieldValue();
-  }
-
-  public static Arguments toArguments(Args args) throws BlurException {
-    if (args == null) {
-      return null;
-    }
-    Arguments arguments = new Arguments();
-    Set<Entry<String, Object>> entrySet = args.getValues().entrySet();
-    for (Entry<String, Object> e : entrySet) {
-      arguments.putToValues(e.getKey(), toValueObject(e.getValue()));
-    }
-    return arguments;
-  }
-
-  @SuppressWarnings("unchecked")
-  public static <T> Map<Shard, T> fromThriftToObject(
-      Map<org.apache.blur.thrift.generated.Shard, ValueObject> shardToValue) {
-    Map<Shard, T> result = new HashMap<Shard, T>();
-    for (Entry<org.apache.blur.thrift.generated.Shard, ValueObject> e : shardToValue.entrySet()) {
-      result.put(new Shard(e.getKey().getShard()), (T) CommandUtil.toObject(e.getValue()));
-    }
-    return result;
-  }
-
-  @SuppressWarnings("unchecked")
-  public static <T> T toObject(ValueObject valueObject) {
-    _Fields field = valueObject.getSetField();
-    switch (field) {
-    case VALUE:
-      return (T) toObject(valueObject.getValue());
-    case BLUR_OBJECT:
-      return (T) ObjectArrayPacking.unpack(valueObject.getBlurObject());
-    default:
-      throw new RuntimeException("Type unknown.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
deleted file mode 100644
index 9de500d..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
+++ /dev/null
@@ -1,267 +0,0 @@
-package org.apache.blur.manager.command;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.manager.command.cmds.BaseCommand;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClientManager;
-import org.apache.blur.thrift.ClientPool;
-import org.apache.blur.thrift.Connection;
-import org.apache.blur.thrift.generated.Arguments;
-import org.apache.blur.thrift.generated.Blur.Client;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.Response;
-import org.apache.blur.thrift.generated.TimeoutException;
-import org.apache.blur.thrift.generated.ValueObject;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-public class ControllerClusterContext extends ClusterContext implements Closeable {
-
-  private final static Log LOG = LogFactory.getLog(ControllerClusterContext.class);
-
-  private final Args _args;
-  private final TableContext _tableContext;
-  private final Map<Server, Client> _clientMap;
-  private final ExecutorService _executorService;
-  private final ControllerCommandManager _manager;
-  private final Map<String, String> _tableLayout;
-
-  public ControllerClusterContext(TableContext tableContext, Args args, Map<String, String> tableLayout,
-      ExecutorService executorService, ControllerCommandManager manager) throws IOException {
-    _tableContext = tableContext;
-    _args = args;
-    _clientMap = getBlurClientsForTable(_tableContext.getTable(), tableLayout);
-    _executorService = executorService;
-    _manager = manager;
-    _tableLayout = tableLayout;
-  }
-
-  public Map<Server, Client> getBlurClientsForTable(String table, Map<String, String> tableLayout) throws IOException {
-    Map<Server, Client> clients = new HashMap<Server, Client>();
-    for (String serverStr : tableLayout.values()) {
-      try {
-        Client client = BlurClientManager.getClientPool().getClient(new Connection(serverStr));
-        client.refresh();
-        clients.put(new Server(serverStr), client);
-      } catch (TException e) {
-        throw new IOException(e);
-      }
-    }
-    return clients;
-  }
-
-  @Override
-  public Args getArgs() {
-    return _args;
-  }
-
-  @Override
-  public TableContext getTableContext() {
-    return _tableContext;
-  }
-
-  @Override
-  public <T> Map<Shard, T> readIndexes(Args args, Class<? extends IndexReadCommand<T>> clazz) throws IOException {
-    Map<Shard, Future<T>> futures = readIndexesAsync(args, clazz);
-    Map<Shard, T> result = new HashMap<Shard, T>();
-    return processFutures(clazz, futures, result);
-  }
-
-  @Override
-  public <T> Map<Server, T> readServers(Args args, Class<? extends IndexReadCombiningCommand<?, T>> clazz)
-      throws IOException {
-    Map<Server, Future<T>> futures = readServersAsync(args, clazz);
-    Map<Server, T> result = new HashMap<Server, T>();
-    return processFutures(clazz, futures, result);
-  }
-
-  @Override
-  public <T> T writeIndex(Args args, Class<? extends IndexWriteCommand<T>> clazz) {
-    throw new RuntimeException("Not Implemented");
-  }
-
-  @Override
-  public void close() throws IOException {
-    ClientPool clientPool = BlurClientManager.getClientPool();
-    for (Entry<Server, Client> e : _clientMap.entrySet()) {
-      clientPool.returnClient(new Connection(e.getKey().getServer()), e.getValue());
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public <T> Map<Shard, Future<T>> readIndexesAsync(final Args args, Class<? extends IndexReadCommand<T>> clazz) {
-    final String commandName = _manager.getCommandName((Class<? extends BaseCommand>) clazz);
-    Map<Shard, Future<T>> futureMap = new HashMap<Shard, Future<T>>();
-    for (Entry<Server, Client> e : _clientMap.entrySet()) {
-      Server server = e.getKey();
-      final Client client = e.getValue();
-      Future<Map<Shard, T>> future = _executorService.submit(new Callable<Map<Shard, T>>() {
-        @Override
-        public Map<Shard, T> call() throws Exception {
-          Arguments arguments = CommandUtil.toArguments(args);
-          Response response = waitForResponse(client, getTable(), commandName, arguments);
-          Map<Shard, Object> shardToValue = CommandUtil.fromThriftToObject(response.getShardToValue());
-          return (Map<Shard, T>) shardToValue;
-        }
-      });
-      Set<Shard> shards = getShardsOnServer(server);
-      for (Shard shard : shards) {
-        futureMap.put(shard, new ShardResultFuture<T>(shard, future));
-      }
-    }
-    return futureMap;
-  }
-
-  protected static Response waitForResponse(Client client, String table, String commandName, Arguments arguments)
-      throws TException {
-    // TODO This should likely be changed to run of a AtomicBoolean used for
-    // the status of commands.
-    String executionId = null;
-    while (true) {
-      try {
-        if (executionId == null) {
-          return client.execute(table, commandName, arguments);
-        } else {
-          return client.reconnect(executionId);
-        }
-      } catch (BlurException e) {
-        throw e;
-      } catch (TimeoutException e) {
-        executionId = e.getExecutionId();
-        LOG.info("Execution fetch timed out, reconnecting using [{0}].", executionId);
-      } catch (TException e) {
-        throw e;
-      }
-    }
-  }
-
-  private Set<Shard> getShardsOnServer(Server server) {
-    Set<Shard> shards = new HashSet<Shard>();
-    for (Entry<String, String> e : _tableLayout.entrySet()) {
-      String value = e.getValue();
-      if (value.equals(server.getServer())) {
-        shards.add(new Shard(e.getKey()));
-      }
-    }
-    return shards;
-  }
-
-  private String getTable() {
-    return getTableContext().getTable();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public <T> Map<Server, Future<T>> readServersAsync(final Args args,
-      Class<? extends IndexReadCombiningCommand<?, T>> clazz) {
-    final String commandName = _manager.getCommandName((Class<? extends BaseCommand>) clazz);
-    Map<Server, Future<T>> futureMap = new HashMap<Server, Future<T>>();
-    for (Entry<Server, Client> e : _clientMap.entrySet()) {
-      Server server = e.getKey();
-      final Client client = e.getValue();
-      Future<T> future = _executorService.submit(new Callable<T>() {
-        @Override
-        public T call() throws Exception {
-          Arguments arguments = CommandUtil.toArguments(args);
-          Response response = waitForResponse(client, getTable(), commandName, arguments);
-          ValueObject valueObject = response.getValue();
-          return (T) CommandUtil.toObject(valueObject);
-        }
-      });
-      futureMap.put(server, future);
-    }
-    return futureMap;
-  }
-
-  @Override
-  public <T> Future<T> writeIndexAsync(Args args, Class<? extends IndexWriteCommand<T>> clazz) {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  private <K, T> Map<K, T> processFutures(Class<?> clazz, Map<K, Future<T>> futures, Map<K, T> result)
-      throws IOException {
-    Throwable firstError = null;
-    for (Entry<K, Future<T>> e : futures.entrySet()) {
-      K key = e.getKey();
-      Future<T> future = e.getValue();
-      T value;
-      try {
-        value = future.get();
-        result.put(key, value);
-      } catch (InterruptedException ex) {
-        throw new IOException(ex);
-      } catch (ExecutionException ex) {
-        Throwable cause = ex.getCause();
-        if (firstError == null) {
-          firstError = cause;
-        }
-        LOG.error("Unknown call while executing command [{0}] on server or shard [{1}]", clazz, key);
-      }
-    }
-    if (firstError != null) {
-      throw new IOException(firstError);
-    }
-    return result;
-  }
-
-  @Override
-  public BlurConfiguration getBlurConfiguration() {
-    return _tableContext.getBlurConfiguration();
-  }
-
-  @Override
-  public Configuration getConfiguration() {
-    return _tableContext.getConfiguration();
-  }
-
-  @Override
-  public <T> T readIndex(Shard shard, Args args, Class<? extends IndexReadCommand<T>> clazz) throws IOException {
-    Future<T> future = readIndexAsync(shard, args, clazz);
-    try {
-      return future.get();
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } catch (ExecutionException e) {
-      throw new IOException(e.getCause());
-    }
-  }
-
-  @Override
-  public <T> Future<T> readIndexAsync(Shard shard, Args args, Class<? extends IndexReadCommand<T>> clazz)
-      throws IOException {
-    throw new RuntimeException("Not Implemented.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
deleted file mode 100644
index 6a1ebb8..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.blur.manager.command;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.blur.manager.command.cmds.BaseCommand;
-import org.apache.blur.server.TableContext;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-@SuppressWarnings("unchecked")
-public class ControllerCommandManager extends BaseCommandManager {
-
-  public ControllerCommandManager(int threadCount, long connectionTimeout) throws IOException {
-    super(threadCount, connectionTimeout);
-  }
-
-  public Response execute(TableContext tableContext, String commandName, final Args args, Map<String, String> tableLayout)
-      throws IOException, TimeoutException {
-    final ClusterContext context = createCommandContext(tableContext, args, tableLayout);
-    final BaseCommand command = getCommandObject(commandName);
-    if (command == null) {
-      throw new IOException("Command with name [" + commandName + "] not found.");
-    }
-    return submitCallable(new Callable<Response>() {
-      @Override
-      public Response call() throws Exception {
-        // For those commands that do not implement cluster command, run them in a
-        // base impl.
-        if (command instanceof ClusterCommand) {
-          return executeClusterCommand(context, command);
-        } else if (command instanceof IndexReadCombiningCommand) {
-          return executeIndexReadCombiningCommand(args, context, command);
-        } else if (command instanceof IndexReadCommand) {
-          return executeIndexReadCommand(args, context, command);
-        } else if (command instanceof IndexWriteCommand) {
-          return executeIndexWriteCommand(args, context, command);
-        } else {
-          throw new IOException("Command type of [" + command.getClass() + "] not supported.");
-        }
-      }
-    });
-  }
-
-  private Response executeClusterCommand(ClusterContext context, BaseCommand command) throws IOException {
-    ClusterCommand<Object> clusterCommand = (ClusterCommand<Object>) command;
-    Object object = clusterCommand.clusterExecute(context);
-    return Response.createNewAggregateResponse(object);
-  }
-
-  private Response executeIndexWriteCommand(Args args, ClusterContext context, BaseCommand command) throws IOException {
-    Class<? extends IndexWriteCommand<Object>> clazz = (Class<? extends IndexWriteCommand<Object>>) command.getClass();
-    Object object = context.writeIndex(args, clazz);
-    return Response.createNewAggregateResponse(object);
-  }
-
-  private Response executeIndexReadCommand(Args args, ClusterContext context, BaseCommand command) throws IOException {
-    Class<? extends IndexReadCommand<Object>> clazz = (Class<? extends IndexReadCommand<Object>>) command.getClass();
-    Map<Shard, Object> result = context.readIndexes(args, clazz);
-    return Response.createNewShardResponse(result);
-  }
-
-  private Response executeIndexReadCombiningCommand(Args args, ClusterContext context, BaseCommand command)
-      throws IOException {
-    Class<? extends IndexReadCombiningCommand<Object, Object>> clazz = (Class<? extends IndexReadCombiningCommand<Object, Object>>) command
-        .getClass();
-    Map<Server, Object> result = context.readServers(args, clazz);
-    return Response.createNewServerResponse(result);
-  }
-
-  private ClusterContext createCommandContext(TableContext tableContext, Args args, Map<String, String> tableLayout)
-      throws IOException {
-    return new ControllerClusterContext(tableContext, args, tableLayout, _executorService, this);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/IndexContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/IndexContext.java b/blur-core/src/main/java/org/apache/blur/manager/command/IndexContext.java
deleted file mode 100644
index 051a98e..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/IndexContext.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.blur.manager.command;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.server.TableContext;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.IndexSearcher;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-public abstract class IndexContext {
-
-  public abstract Args getArgs();
-
-  public abstract TableContext getTableContext();
-
-  public abstract Shard getShard();
-
-  public abstract IndexReader getIndexReader();
-
-  public abstract IndexSearcher getIndexSearcher();
-
-  public abstract BlurConfiguration getBlurConfiguration();
-
-  public abstract Configuration getConfiguration();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCombiningCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCombiningCommand.java b/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCombiningCommand.java
deleted file mode 100644
index 1b433ab..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCombiningCommand.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.command;
-
-import java.io.IOException;
-import java.util.Map;
-
-public interface IndexReadCombiningCommand<T1, T2> {
-
-  T1 execute(IndexContext context) throws IOException;
-
-  T2 combine(Map<Shard, T1> results) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCommand.java b/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCommand.java
deleted file mode 100644
index d4c156c..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCommand.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.command;
-
-import java.io.IOException;
-
-public interface IndexReadCommand<T> {
-
-  T execute(IndexContext context) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/manager/command/IndexWriteCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/IndexWriteCommand.java b/blur-core/src/main/java/org/apache/blur/manager/command/IndexWriteCommand.java
deleted file mode 100644
index 893638b..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/IndexWriteCommand.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.command;
-
-import java.io.IOException;
-
-import org.apache.lucene.index.IndexWriter;
-
-public interface IndexWriteCommand<T> {
-
-  public abstract T execute(IndexContext context, IndexWriter writer) throws IOException;
-
-}


Mime
View raw message