incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [5/6] Massive change to arguments in the new command platform. Now arguments are passed by annotating fields on the commands themselves. So therefore the Args object was removed.
Date Wed, 01 Oct 2014 01:49:48 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java b/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
index 28f4f80..3f7bf6a 100644
--- a/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
@@ -1,10 +1,13 @@
 package org.apache.blur.command;
 
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.blur.command.annotation.OptionalArgument;
+import org.apache.blur.command.annotation.RequiredArgument;
 import org.apache.blur.thrift.BException;
 import org.apache.blur.thrift.generated.Arguments;
 import org.apache.blur.thrift.generated.BlurException;
@@ -106,23 +109,25 @@ public class CommandUtil {
       valueObject.setValue(toValue(o));
     } else if (o instanceof BlurObject || o instanceof BlurArray) {
       valueObject.setBlurObject(ObjectArrayPacking.pack(o));
+    } else if (o instanceof Set) {
+      throw new RuntimeException("Not implemented.");
     } else {
       valueObject.setValue(toValue(o));
     }
     return valueObject;
   }
 
-  public static Args toArgs(Arguments arguments) {
+  public static BlurObject toBlurObject(Arguments arguments) {
     if (arguments == null) {
       return null;
     }
-    Args args = new Args();
+    BlurObject blurObject = new BlurObject();
     Map<String, ValueObject> values = arguments.getValues();
     Set<Entry<String, ValueObject>> entrySet = values.entrySet();
     for (Entry<String, ValueObject> e : entrySet) {
-      args.set(e.getKey(), toObject(e.getValue()));
+      blurObject.put(e.getKey(), toObject(e.getValue()));
     }
-    return args;
+    return blurObject;
   }
 
   public static Object toObject(Value value) {
@@ -132,18 +137,6 @@ public class CommandUtil {
     return value.getFieldValue();
   }
 
-  public static Arguments toArguments(Args args) throws BlurException {
-    if (args == null) {
-      return null;
-    }
-    Arguments arguments = new Arguments();
-    Set<Entry<String, Object>> entrySet = args.getValues().entrySet();
-    for (Entry<String, Object> e : entrySet) {
-      arguments.putToValues(e.getKey(), toValueObject(e.getValue()));
-    }
-    return arguments;
-  }
-
   @SuppressWarnings("unchecked")
   public static <T> Map<Shard, T> fromThriftToObjectShard(
       Map<org.apache.blur.thrift.generated.Shard, ValueObject> shardToValue) {
@@ -194,4 +187,57 @@ public class CommandUtil {
       throw new RuntimeException("Not supported.");
     }
   }
+
+  public static Arguments toArguments(Command<?> command) {
+    Class<?> clazz = command.getClass();
+    Arguments arguments = new Arguments();
+    addArguments(clazz, arguments, command);
+    return arguments;
+  }
+
+  private static void addArguments(Class<?> clazz, Arguments arguments, Command<?> command) {
+    if (!(clazz.equals(Command.class))) {
+      addArguments(clazz.getSuperclass(), arguments, command);
+    }
+    Field[] fields = clazz.getDeclaredFields();
+    for (Field field : fields) {
+      RequiredArgument requiredArgument = field.getAnnotation(RequiredArgument.class);
+      if (requiredArgument != null) {
+        field.setAccessible(true);
+        String name = field.getName();
+        try {
+          Object o = field.get(command);
+          if (o != null) {
+            arguments.putToValues(name, CommandUtil.toValueObject(o));
+          } else {
+            throw new IllegalArgumentException("Field [" + name + "] is required.");
+          }
+        } catch (IllegalArgumentException e) {
+          throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        } catch (BlurException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      OptionalArgument optionalArgument = field.getAnnotation(OptionalArgument.class);
+      if (optionalArgument != null) {
+        field.setAccessible(true);
+        String name = field.getName();
+        try {
+          Object o = field.get(command);
+          if (o != null) {
+            arguments.putToValues(name, CommandUtil.toValueObject(o));
+          }
+        } catch (IllegalArgumentException e) {
+          throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        } catch (BlurException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
index 7ac3566..3e53067 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
@@ -49,16 +49,14 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
 
   private final static Log LOG = LogFactory.getLog(ControllerClusterContext.class);
 
-  private final Args _args;
   private final TableContextFactory _tableContextFactory;
   private final Map<Server, Client> _clientMap;
   private final ControllerCommandManager _manager;
   private final LayoutFactory _layoutFactory;
 
-  public ControllerClusterContext(TableContextFactory tableContextFactory, LayoutFactory layoutFactory, Args args,
+  public ControllerClusterContext(TableContextFactory tableContextFactory, LayoutFactory layoutFactory,
       ControllerCommandManager manager) throws IOException {
     _tableContextFactory = tableContextFactory;
-    _args = args;
     _clientMap = getBlurClientsForCluster(layoutFactory.getServerConnections());
     _manager = manager;
     _layoutFactory = layoutFactory;
@@ -79,27 +77,22 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
   }
 
   @Override
-  public Args getArgs() {
-    return _args;
-  }
-
-  @Override
   public TableContext getTableContext(String table) throws IOException {
     return _tableContextFactory.getTableContext(table);
   }
 
   @Override
-  public <T> Map<Shard, T> readIndexes(Args args, Class<? extends IndexRead<T>> clazz) throws IOException {
-    Map<Shard, Future<T>> futures = readIndexesAsync(args, clazz);
+  public <T> Map<Shard, T> readIndexes(IndexRead<T> command) throws IOException {
+    Map<Shard, Future<T>> futures = readIndexesAsync(command);
     Map<Shard, T> result = new HashMap<Shard, T>();
-    return processFutures(clazz, futures, result);
+    return processFutures((Command<?>) command, futures, result);
   }
 
   @Override
-  public <T> Map<Server, T> readServers(Args args, Class<? extends IndexReadCombining<?, T>> clazz) throws IOException {
-    Map<Server, Future<T>> futures = readServersAsync(args, clazz);
+  public <T> Map<Server, T> readServers(ServerRead<?, T> command) throws IOException {
+    Map<Server, Future<T>> futures = readServersAsync(command);
     Map<Server, T> result = new HashMap<Server, T>();
-    return processFutures(clazz, futures, result);
+    return processFutures((Command<?>) command, futures, result);
   }
 
   @Override
@@ -112,23 +105,22 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
 
   @SuppressWarnings("unchecked")
   @Override
-  public <T> Map<Shard, Future<T>> readIndexesAsync(final Args args, Class<? extends IndexRead<T>> clazz)
-      throws IOException {
-    final String commandName = _manager.getCommandName((Class<? extends Command<?>>) clazz);
-    Command<?> command = _manager.getCommandObject(commandName);
+  public <T> Map<Shard, Future<T>> readIndexesAsync(IndexRead<T> cmd) throws IOException {
+    final Command<?> command = (Command<?>) cmd;
+    _manager.validate(command);
     Map<Shard, Future<T>> futureMap = new HashMap<Shard, Future<T>>();
-    Set<String> tables = _manager.getTables(command, args);
-    Map<String, Set<Shard>> shards = _manager.getShards(_tableContextFactory, command, args, tables);
-    Map<Server, Client> clientMap = getClientMap(command, args, tables, shards);
+    Set<String> tables = command.routeTables(this);
+    Set<Shard> shards = command.routeShards(this, tables);
+    Map<Server, Client> clientMap = getClientMap(command, tables, shards);
 
+    final Arguments arguments = _manager.toArguments(command);
     for (Entry<Server, Client> e : clientMap.entrySet()) {
       Server server = e.getKey();
       final Client client = e.getValue();
       Future<Map<Shard, T>> future = _manager.submitToExecutorService(new Callable<Map<Shard, T>>() {
         @Override
         public Map<Shard, T> call() throws Exception {
-          Arguments arguments = CommandUtil.toArguments(args);
-          Response response = waitForResponse(client, commandName, arguments);
+          Response response = waitForResponse(client, command, arguments);
           Map<Shard, Object> shardToValue = CommandUtil.fromThriftToObjectShard(response.getShardToValue());
           return (Map<Shard, T>) shardToValue;
         }
@@ -140,10 +132,9 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
     return futureMap;
   }
 
-  private Map<Server, Client> getClientMap(Command<?> command, Args args, Set<String> tables,
-      Map<String, Set<Shard>> shards) throws IOException {
+  private Map<Server, Client> getClientMap(Command<?> command, Set<String> tables, Set<Shard> shards)
+      throws IOException {
     Map<Server, Client> result = new HashMap<Server, Client>();
-
     for (Entry<Server, Client> e : _clientMap.entrySet()) {
       Server server = e.getKey();
       if (_layoutFactory.isValidServer(server, tables, shards)) {
@@ -153,14 +144,14 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
     return result;
   }
 
-  protected static Response waitForResponse(Client client, String commandName, Arguments arguments) throws TException {
+  protected static Response waitForResponse(Client client, Command<?> command, Arguments arguments) throws TException {
     // TODO This should likely be changed to run of a AtomicBoolean used for
     // the status of commands.
     String executionId = null;
     while (true) {
       try {
         if (executionId == null) {
-          return client.execute(commandName, arguments);
+          return client.execute(command.getName(), arguments);
         } else {
           return client.reconnect(executionId);
         }
@@ -175,8 +166,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
     }
   }
 
-  private Set<Shard> getShardsOnServer(Server server, Set<String> tables, Map<String, Set<Shard>> shards)
-      throws IOException {
+  private Set<Shard> getShardsOnServer(Server server, Set<String> tables, Set<Shard> shards) throws IOException {
     Set<Shard> serverLayout = _layoutFactory.getServerLayout(server);
     Set<Shard> result = new HashSet<Shard>();
     for (Shard shard : serverLayout) {
@@ -187,37 +177,35 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
     return result;
   }
 
-  private boolean isValid(Shard shard, Set<String> tables, Map<String, Set<Shard>> shards) {
+  private boolean isValid(Shard shard, Set<String> tables, Set<Shard> shards) {
     String table = shard.getTable();
     if (!tables.contains(table)) {
       return false;
     }
-    Set<Shard> shardSet = shards.get(table);
-    if (shardSet.isEmpty()) {
+    if (shards.isEmpty()) {
       return true;
     } else {
-      return shardSet.contains(shard);
+      return shards.contains(shard);
     }
   }
 
   @SuppressWarnings("unchecked")
   @Override
-  public <T> Map<Server, Future<T>> readServersAsync(final Args args, Class<? extends IndexReadCombining<?, T>> clazz)
-      throws IOException {
-    final String commandName = _manager.getCommandName((Class<? extends Command<?>>) clazz);
-    Command<?> command = _manager.getCommandObject(commandName);
+  public <T> Map<Server, Future<T>> readServersAsync(ServerRead<?, T> cmd) throws IOException {
+    final Command<?> command = (Command<?>) cmd;
+    _manager.validate(command);
     Map<Server, Future<T>> futureMap = new HashMap<Server, Future<T>>();
-    Set<String> tables = _manager.getTables(command, args);
-    Map<String, Set<Shard>> shards = _manager.getShards(_tableContextFactory, command, args, tables);
-    Map<Server, Client> clientMap = getClientMap(command, args, tables, shards);
+    Set<String> tables = command.routeTables(this);
+    Set<Shard> shards = command.routeShards(this, tables);
+    Map<Server, Client> clientMap = getClientMap(command, tables, shards);
+    final Arguments arguments = _manager.toArguments(command);
     for (Entry<Server, Client> e : clientMap.entrySet()) {
       Server server = e.getKey();
       final Client client = e.getValue();
       Future<T> future = _manager.submitToExecutorService(new Callable<T>() {
         @Override
         public T call() throws Exception {
-          Arguments arguments = CommandUtil.toArguments(args);
-          Response response = waitForResponse(client, commandName, arguments);
+          Response response = waitForResponse(client, command, arguments);
           ValueObject valueObject = response.getValue();
           return (T) CommandUtil.toObject(valueObject);
         }
@@ -227,7 +215,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
     return futureMap;
   }
 
-  private <K, T> Map<K, T> processFutures(Class<?> clazz, Map<K, Future<T>> futures, Map<K, T> result)
+  private <K, T> Map<K, T> processFutures(Command<?> command, Map<K, Future<T>> futures, Map<K, T> result)
       throws IOException {
     Throwable firstError = null;
     for (Entry<K, Future<T>> e : futures.entrySet()) {
@@ -244,7 +232,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
         if (firstError == null) {
           firstError = cause;
         }
-        LOG.error("Unknown call while executing command [{0}] on server or shard [{1}]", clazz, key);
+        LOG.error("Unknown call while executing command [{0}] on server or shard [{1}]", command, key);
       }
     }
     if (firstError != null) {
@@ -259,8 +247,8 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
   }
 
   @Override
-  public <T> T readIndex(Args args, Class<? extends IndexRead<T>> clazz) throws IOException {
-    Future<T> future = readIndexAsync(args, clazz);
+  public <T> T readIndex(IndexRead<T> command) throws IOException {
+    Future<T> future = readIndexAsync(command);
     try {
       return future.get();
     } catch (InterruptedException e) {
@@ -271,7 +259,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
   }
 
   @Override
-  public <T> Future<T> readIndexAsync(Args args, Class<? extends IndexRead<T>> clazz) throws IOException {
+  public <T> Future<T> readIndexAsync(IndexRead<T> command) throws IOException {
     throw new RuntimeException("Not Implemented.");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
index 1d166d4..815e0ef 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
@@ -5,6 +5,12 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 
 import org.apache.blur.BlurConfiguration;
+import org.apache.blur.command.commandtype.ClusterExecuteCommand;
+import org.apache.blur.command.commandtype.ClusterExecuteServerReadCommand;
+import org.apache.blur.command.commandtype.ClusterServerReadCommand;
+import org.apache.blur.command.commandtype.ClusterIndexReadCommand;
+import org.apache.blur.command.commandtype.ServerReadCommand;
+import org.apache.blur.command.commandtype.IndexReadCommand;
 import org.apache.blur.server.LayoutFactory;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.server.TableContextFactory;
@@ -35,9 +41,9 @@ public class ControllerCommandManager extends BaseCommandManager {
   }
 
   public Response execute(final TableContextFactory tableContextFactory, LayoutFactory layoutFactory,
-      String commandName, final Args args) throws IOException, TimeoutException, ExceptionCollector {
-    final ClusterContext context = createCommandContext(tableContextFactory, layoutFactory, args);
-    final Command<?> command = getCommandObject(commandName);
+      String commandName, ArgumentOverlay argumentOverlay) throws IOException, TimeoutException, ExceptionCollector {
+    final ClusterContext context = createCommandContext(tableContextFactory, layoutFactory);
+    final Command<?> command = getCommandObject(commandName, argumentOverlay);
     if (command == null) {
       throw new IOException("Command with name [" + commandName + "] not found.");
     }
@@ -48,19 +54,19 @@ public class ControllerCommandManager extends BaseCommandManager {
         // a base impl.
 
         if (command instanceof IndexReadCommand) {
-          return executeIndexReadCommand(args, context, command);
+          return executeIndexReadCommand(context, command);
         }
-        if (command instanceof IndexReadCombiningCommand) {
-          return executeIndexReadCombiningCommand(args, context, command);
+        if (command instanceof ServerReadCommand) {
+          return executeIndexReadCombiningCommand(context, command);
         }
-        if (command instanceof ClusterReadCommand) {
+        if (command instanceof ClusterIndexReadCommand) {
           throw new RuntimeException("Not implemented");
         }
-        if (command instanceof ClusterReadCombiningCommand) {
-          CombiningContext combiningContext = getCombiningContext(tableContextFactory, args);
-          return executeClusterReadCombiningCommand(args, context, command, combiningContext);
+        if (command instanceof ClusterServerReadCommand) {
+          CombiningContext combiningContext = getCombiningContext(tableContextFactory);
+          return executeClusterReadCombiningCommand(context, command, combiningContext);
         }
-        if (command instanceof ClusterExecuteReadCombiningCommand) {
+        if (command instanceof ClusterExecuteServerReadCommand) {
           return executeClusterCommand(context, command);
         }
         if (command instanceof ClusterExecuteCommand) {
@@ -73,7 +79,7 @@ public class ControllerCommandManager extends BaseCommandManager {
     });
   }
 
-  private CombiningContext getCombiningContext(final TableContextFactory tableContextFactory, final Args args) {
+  private CombiningContext getCombiningContext(final TableContextFactory tableContextFactory) {
     return new CombiningContext() {
 
       @Override
@@ -85,48 +91,37 @@ public class ControllerCommandManager extends BaseCommandManager {
       public BlurConfiguration getBlurConfiguration(String table) throws IOException {
         return getTableContext(table).getBlurConfiguration();
       }
-
-      @Override
-      public Args getArgs() {
-        return args;
-      }
     };
   }
 
   private Response executeClusterCommand(ClusterContext context, Command<?> command) throws IOException,
       InterruptedException {
-    ClusterExecuteReadCombiningCommand<Object> clusterCommand = (ClusterExecuteReadCombiningCommand<Object>) command;
+    ClusterExecuteServerReadCommand<Object> clusterCommand = (ClusterExecuteServerReadCommand<Object>) command;
     Object object = clusterCommand.clusterExecute(context);
     return Response.createNewAggregateResponse(object);
   }
 
-  private Response executeIndexReadCommand(Args args, ClusterContext context, Command<?> command) throws IOException {
-    Class<? extends IndexReadCommand<Object>> clazz = (Class<? extends IndexReadCommand<Object>>) command.getClass();
-    Map<Shard, Object> result = context.readIndexes(args, clazz);
+  private Response executeIndexReadCommand(ClusterContext context, Command<?> command) throws IOException {
+    Map<Shard, Object> result = context.readIndexes((IndexReadCommand<Object>) command);
     return Response.createNewShardResponse(result);
   }
 
-  private Response executeClusterReadCombiningCommand(Args args, ClusterContext context, Command<?> command,
+  private Response executeClusterReadCombiningCommand(ClusterContext context, Command<?> command,
       CombiningContext combiningContext) throws IOException, InterruptedException {
-    Class<? extends ClusterReadCombiningCommand<Object>> clazz = (Class<? extends ClusterReadCombiningCommand<Object>>) command
-        .getClass();
-    Map<Server, Object> results = context.readServers(args, clazz);
-    ClusterReadCombiningCommand<Object> clusterReadCombiningCommand = (ClusterReadCombiningCommand<Object>) command;
+    Map<Server, Object> results = context.readServers((ClusterServerReadCommand<Object>) command);
+    ClusterServerReadCommand<Object> clusterReadCombiningCommand = (ClusterServerReadCommand<Object>) command;
     Object result = clusterReadCombiningCommand.combine(combiningContext, (Map<? extends Location<?>, Object>) results);
     return Response.createNewAggregateResponse(result);
   }
 
-  private Response executeIndexReadCombiningCommand(Args args, ClusterContext context, Command<?> command)
-      throws IOException {
-    Class<? extends IndexReadCombiningCommand<Object, Object>> clazz = (Class<? extends IndexReadCombiningCommand<Object, Object>>) command
-        .getClass();
-    Map<Server, Object> result = context.readServers(args, clazz);
+  private Response executeIndexReadCombiningCommand(ClusterContext context, Command<?> command) throws IOException {
+    Map<Server, Object> result = context.readServers((ServerReadCommand<Object, Object>) command);
     return Response.createNewServerResponse(result);
   }
 
-  private ClusterContext createCommandContext(TableContextFactory tableContextFactory, LayoutFactory layoutFactory,
-      Args args) throws IOException {
-    return new ControllerClusterContext(tableContextFactory, layoutFactory, args, this);
+  private ClusterContext createCommandContext(TableContextFactory tableContextFactory, LayoutFactory layoutFactory)
+      throws IOException {
+    return new ControllerClusterContext(tableContextFactory, layoutFactory, this);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/IndexReadCombining.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/IndexReadCombining.java b/blur-core/src/main/java/org/apache/blur/command/IndexReadCombining.java
deleted file mode 100644
index ff01dbc..0000000
--- a/blur-core/src/main/java/org/apache/blur/command/IndexReadCombining.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.command;
-
-import java.io.IOException;
-import java.util.Map;
-
-public interface IndexReadCombining<T1, T2> {
-  
-  T1 execute(IndexContext context) throws IOException, InterruptedException;
-
-  T2 combine(CombiningContext context, Map<? extends Location<?>, T1> results) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/IndexReadCombiningCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/IndexReadCombiningCommand.java b/blur-core/src/main/java/org/apache/blur/command/IndexReadCombiningCommand.java
deleted file mode 100644
index c37f8b8..0000000
--- a/blur-core/src/main/java/org/apache/blur/command/IndexReadCombiningCommand.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.command;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.generated.BlurException;
-
-public abstract class IndexReadCombiningCommand<T1, T2> extends Command<Map<Server, T2>> implements
-    IndexReadCombining<T1, T2> {
-
-  public abstract T1 execute(IndexContext context) throws IOException, InterruptedException;
-
-  public abstract T2 combine(CombiningContext context, Map<? extends Location<?>, T1> results) throws IOException,
-      InterruptedException;
-
-  @Override
-  public Map<Server, T2> run(Args arguments) throws IOException {
-    try {
-      return CommandRunner.run(this, arguments);
-    } catch (BlurException e) {
-      throw new IOException(e);
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public Map<Server, T2> run(Args arguments, String connectionStr) throws IOException {
-    try {
-      return CommandRunner.run(this, arguments, connectionStr);
-    } catch (BlurException e) {
-      throw new IOException(e);
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/IndexReadCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/IndexReadCommand.java b/blur-core/src/main/java/org/apache/blur/command/IndexReadCommand.java
deleted file mode 100644
index fbeb116..0000000
--- a/blur-core/src/main/java/org/apache/blur/command/IndexReadCommand.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.command;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-
-public abstract class IndexReadCommand<T> extends Command<Map<Shard, T>> implements IndexRead<T> {
-
-  public abstract T execute(IndexContext context) throws IOException, InterruptedException;
-
-  @Override
-  public Map<Shard, T> run(Args arguments) throws IOException {
-    try {
-      return CommandRunner.run(this, arguments);
-    } catch (BlurException e) {
-      throw new IOException(e);
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public Map<Shard, T> run(Args arguments, String connectionStr) throws IOException {
-    try {
-      return CommandRunner.run(this, arguments, connectionStr);
-    } catch (BlurException e) {
-      throw new IOException(e);
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public Map<Shard, T> run(Args arguments, Iface client) throws IOException {
-    try {
-      return CommandRunner.run(this, arguments, client);
-    } catch (BlurException e) {
-      throw new IOException(e);
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/ServerRead.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ServerRead.java b/blur-core/src/main/java/org/apache/blur/command/ServerRead.java
new file mode 100644
index 0000000..f26c2c0
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ServerRead.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface ServerRead<T1, T2> {
+  
+  T1 execute(IndexContext context) throws IOException, InterruptedException;
+
+  T2 combine(CombiningContext context, Map<? extends Location<?>, T1> results) throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
index 4c3b006..d22df33 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -46,24 +47,24 @@ public class ShardCommandManager extends BaseCommandManager {
     _indexServer = indexServer;
   }
 
-  public Response execute(final TableContextFactory tableContextFactory, final String commandName, final Args args)
-      throws IOException, TimeoutException, ExceptionCollector {
+  public Response execute(final TableContextFactory tableContextFactory, final String commandName,
+      final ArgumentOverlay argumentOverlay) throws IOException, TimeoutException, ExceptionCollector {
     final ShardServerContext shardServerContext = getShardServerContext();
     Callable<Response> callable = new Callable<Response>() {
       @Override
       public Response call() throws Exception {
-        Command<?> command = getCommandObject(commandName);
+        Command<?> command = getCommandObject(commandName, argumentOverlay);
         if (command == null) {
           throw new IOException("Command with name [" + commandName + "] not found.");
         }
-        if (command instanceof IndexRead || command instanceof IndexReadCombining) {
-          return toResponse(executeReadCommand(shardServerContext, command, tableContextFactory, args), command,
-              getServerContext(args, tableContextFactory));
+        if (command instanceof IndexRead || command instanceof ServerRead) {
+          return toResponse(executeReadCommand(shardServerContext, command, tableContextFactory), command,
+              getServerContext(tableContextFactory));
         }
         throw new IOException("Command type of [" + command.getClass() + "] not supported.");
       }
 
-      private CombiningContext getServerContext(final Args args, final TableContextFactory tableContextFactory) {
+      private CombiningContext getServerContext(final TableContextFactory tableContextFactory) {
         return new CombiningContext() {
 
           @Override
@@ -72,11 +73,6 @@ public class ShardCommandManager extends BaseCommandManager {
           }
 
           @Override
-          public Args getArgs() {
-            return args;
-          }
-
-          @Override
           public BlurConfiguration getBlurConfiguration(String table) throws IOException {
             return getTableContext(table).getBlurConfiguration();
           }
@@ -97,8 +93,8 @@ public class ShardCommandManager extends BaseCommandManager {
   @SuppressWarnings("unchecked")
   private Response toResponse(Map<Shard, Object> results, Command<?> command, CombiningContext serverContext)
       throws IOException, InterruptedException {
-    if (command instanceof IndexReadCombining) {
-      IndexReadCombining<Object, Object> primitiveCommandAggregator = (IndexReadCombining<Object, Object>) command;
+    if (command instanceof ServerRead) {
+      ServerRead<Object, Object> primitiveCommandAggregator = (ServerRead<Object, Object>) command;
       Object object = primitiveCommandAggregator.combine(serverContext, results);
       return Response.createNewAggregateResponse(object);
     }
@@ -106,17 +102,28 @@ public class ShardCommandManager extends BaseCommandManager {
   }
 
   private Map<Shard, Object> executeReadCommand(ShardServerContext shardServerContext, Command<?> command,
-      final TableContextFactory tableContextFactory, final Args args) throws IOException, ExceptionCollector {
-    Set<String> tables = getTables(command, args);
+      final TableContextFactory tableContextFactory) throws IOException, ExceptionCollector {
+    BaseContext context = new BaseContext() {
+      @Override
+      public TableContext getTableContext(String table) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public BlurConfiguration getBlurConfiguration(String table) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+    };
+    Set<String> tables = command.routeTables(context);
     if (tables.isEmpty()) {
       throw new IOException("At least one table needs to specified.");
     }
-    Map<String, Set<Shard>> shardMap = getShards(tableContextFactory, command, args, tables);
+    Map<String, Set<Shard>> shardMap = toMap(command.routeShards(context, tables));
 
     Map<Shard, Future<?>> futureMap = new HashMap<Shard, Future<?>>();
     for (String table : tables) {
       Set<Shard> shardSet = shardMap.get(table);
-      boolean checkShards = !shardSet.isEmpty();
+      boolean checkShards = shardSet == null ? false : !shardSet.isEmpty();
 
       Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
       for (Entry<String, BlurIndex> e : indexes.entrySet()) {
@@ -131,11 +138,10 @@ public class ShardCommandManager extends BaseCommandManager {
         Callable<Object> callable;
         if (command instanceof IndexRead) {
           final IndexRead<?> readCommand = (IndexRead<?>) command.clone();
-          callable = getCallable(shardServerContext, tableContextFactory, table, args, shard, blurIndex, readCommand);
-        } else if (command instanceof IndexReadCombining) {
-          final IndexReadCombining<?, ?> readCombiningCommand = (IndexReadCombining<?, ?>) command.clone();
-          callable = getCallable(shardServerContext, tableContextFactory, table, args, shard, blurIndex,
-              readCombiningCommand);
+          callable = getCallable(shardServerContext, tableContextFactory, table, shard, blurIndex, readCommand);
+        } else if (command instanceof ServerRead) {
+          final ServerRead<?, ?> readCombiningCommand = (ServerRead<?, ?>) command.clone();
+          callable = getCallable(shardServerContext, tableContextFactory, table, shard, blurIndex, readCombiningCommand);
         } else {
           throw new IOException("Command type of [" + command.getClass() + "] not supported.");
         }
@@ -168,9 +174,21 @@ public class ShardCommandManager extends BaseCommandManager {
     return resultMap;
   }
 
+  private Map<String, Set<Shard>> toMap(Set<Shard> shards) {
+    Map<String, Set<Shard>> result = new HashMap<String, Set<Shard>>();
+    for (Shard shard : shards) {
+      Set<Shard> shardSet = result.get(shard.getTable());
+      if (shardSet == null) {
+        result.put(shard.getTable(), shardSet = new TreeSet<Shard>());
+      }
+      shardSet.add(shard);
+    }
+    return result;
+  }
+
   private Callable<Object> getCallable(final ShardServerContext shardServerContext,
-      final TableContextFactory tableContextFactory, final String table, final Args args, final Shard shard,
-      final BlurIndex blurIndex, final IndexReadCombining<?, ?> readCombiningCommand) {
+      final TableContextFactory tableContextFactory, final String table, final Shard shard, final BlurIndex blurIndex,
+      final ServerRead<?, ?> readCombiningCommand) {
     return new Callable<Object>() {
       @Override
       public Object call() throws Exception {
@@ -180,14 +198,14 @@ public class ShardCommandManager extends BaseCommandManager {
           searcher = blurIndex.getIndexSearcher();
           shardServerContext.setIndexSearcherClosable(table, shardId, searcher);
         }
-        return readCombiningCommand.execute(new ShardIndexContext(tableContextFactory, table, shard, searcher, args));
+        return readCombiningCommand.execute(new ShardIndexContext(tableContextFactory, table, shard, searcher));
       }
     };
   }
 
   private Callable<Object> getCallable(final ShardServerContext shardServerContext,
-      final TableContextFactory tableContextFactory, final String table, final Args args, final Shard shard,
-      final BlurIndex blurIndex, final IndexRead<?> readCommand) {
+      final TableContextFactory tableContextFactory, final String table, final Shard shard, final BlurIndex blurIndex,
+      final IndexRead<?> readCommand) {
     return new Callable<Object>() {
       @Override
       public Object call() throws Exception {
@@ -198,7 +216,7 @@ public class ShardCommandManager extends BaseCommandManager {
           searcher = blurIndex.getIndexSearcher();
           shardServerContext.setIndexSearcherClosable(table, shardId, searcher);
         }
-        return readCommand.execute(new ShardIndexContext(tableContextFactory, table, shard, searcher, args));
+        return readCommand.execute(new ShardIndexContext(tableContextFactory, table, shard, searcher));
       }
     };
   }
@@ -207,22 +225,14 @@ public class ShardCommandManager extends BaseCommandManager {
 
     private final Shard _shard;
     private final IndexSearcher _searcher;
-    private final Args _args;
     private final TableContextFactory _tableContextFactory;
     private final String _table;
 
-    public ShardIndexContext(TableContextFactory tableContextFactory, String table, Shard shard,
-        IndexSearcher searcher, Args args) {
+    public ShardIndexContext(TableContextFactory tableContextFactory, String table, Shard shard, IndexSearcher searcher) {
       _tableContextFactory = tableContextFactory;
       _table = table;
       _shard = shard;
       _searcher = searcher;
-      _args = args;
-    }
-
-    @Override
-    public Args getArgs() {
-      return _args;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/ShardRoute.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ShardRoute.java b/blur-core/src/main/java/org/apache/blur/command/ShardRoute.java
deleted file mode 100644
index 7f59857..0000000
--- a/blur-core/src/main/java/org/apache/blur/command/ShardRoute.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.blur.command;
-
-import java.util.Set;
-
-import org.apache.blur.server.TableContext;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-public interface ShardRoute {
-
-  Set<Shard> resolveShards(TableContext tableContext, Args args);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/TableRoute.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/TableRoute.java b/blur-core/src/main/java/org/apache/blur/command/TableRoute.java
deleted file mode 100644
index bc3e7eb..0000000
--- a/blur-core/src/main/java/org/apache/blur/command/TableRoute.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.blur.command;
-
-import java.util.Set;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-public interface TableRoute {
-
-  Set<String> resolveTables(Args args);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/annotation/Argument.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/annotation/Argument.java b/blur-core/src/main/java/org/apache/blur/command/annotation/Argument.java
deleted file mode 100644
index e083326..0000000
--- a/blur-core/src/main/java/org/apache/blur/command/annotation/Argument.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.blur.command.annotation;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-@Retention(RetentionPolicy.RUNTIME)
-public @interface Argument {
-  String name();
-
-  String value();
-  
-  Class<?> type();
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/annotation/OptionalArgument.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/annotation/OptionalArgument.java b/blur-core/src/main/java/org/apache/blur/command/annotation/OptionalArgument.java
new file mode 100644
index 0000000..66e57a3
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/annotation/OptionalArgument.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.annotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface OptionalArgument {
+  String value() default "";
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/annotation/OptionalArguments.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/annotation/OptionalArguments.java b/blur-core/src/main/java/org/apache/blur/command/annotation/OptionalArguments.java
deleted file mode 100644
index c978bf2..0000000
--- a/blur-core/src/main/java/org/apache/blur/command/annotation/OptionalArguments.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.blur.command.annotation;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-@Retention(RetentionPolicy.RUNTIME)
-public @interface OptionalArguments {
-  Argument[] value();
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/annotation/RequiredArgument.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/annotation/RequiredArgument.java b/blur-core/src/main/java/org/apache/blur/command/annotation/RequiredArgument.java
new file mode 100644
index 0000000..110fd72
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/annotation/RequiredArgument.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.annotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RequiredArgument {
+  String value() default "";
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/annotation/RequiredArguments.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/annotation/RequiredArguments.java b/blur-core/src/main/java/org/apache/blur/command/annotation/RequiredArguments.java
deleted file mode 100644
index fdba1cd..0000000
--- a/blur-core/src/main/java/org/apache/blur/command/annotation/RequiredArguments.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.blur.command.annotation;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-@Retention(RetentionPolicy.RUNTIME)
-public @interface RequiredArguments {
-  Argument[] value();
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteCommand.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteCommand.java
new file mode 100644
index 0000000..5ba0863
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteCommand.java
@@ -0,0 +1,65 @@
+package org.apache.blur.command.commandtype;
+
+import java.io.IOException;
+
+import org.apache.blur.command.ClusterContext;
+import org.apache.blur.command.Command;
+import org.apache.blur.command.CommandRunner;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public abstract class ClusterExecuteCommand<T> extends Command<T> {
+
+  public abstract T clusterExecute(ClusterContext context) throws IOException, InterruptedException;
+
+  @Override
+  public T run() throws IOException {
+    try {
+      return CommandRunner.run(this);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public T run(String connectionStr) throws IOException {
+    try {
+      return CommandRunner.run(this, connectionStr);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public T run(Iface client) throws IOException {
+    try {
+      return CommandRunner.run(this, client);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteServerReadCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteServerReadCommand.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteServerReadCommand.java
new file mode 100644
index 0000000..58d206b
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteServerReadCommand.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.blur.command.commandtype;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.blur.command.ClusterContext;
+import org.apache.blur.command.CombiningContext;
+import org.apache.blur.command.Command;
+import org.apache.blur.command.CommandRunner;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.ServerRead;
+import org.apache.blur.command.Location;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+
+public abstract class ClusterExecuteServerReadCommand<T> extends Command<T> implements ServerRead<T, T> {
+
+  public abstract T execute(IndexContext context) throws IOException, InterruptedException;
+
+  public abstract T combine(CombiningContext context, Map<? extends Location<?>, T> results) throws IOException,
+      InterruptedException;
+
+  public abstract T clusterExecute(ClusterContext context) throws IOException, InterruptedException;
+
+  @Override
+  public T run() throws IOException {
+    try {
+      return CommandRunner.run(this);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public T run(String connectionStr) throws IOException {
+    try {
+      return CommandRunner.run(this, connectionStr);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public T run(Iface client) throws IOException {
+    try {
+      return CommandRunner.run(this, client);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteServerReadCommandSingleTable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteServerReadCommandSingleTable.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteServerReadCommandSingleTable.java
new file mode 100644
index 0000000..a0e87bb
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterExecuteServerReadCommandSingleTable.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.commandtype;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.blur.command.BaseContext;
+import org.apache.blur.command.Shard;
+import org.apache.blur.command.annotation.OptionalArgument;
+import org.apache.blur.command.annotation.RequiredArgument;
+
+public abstract class ClusterExecuteServerReadCommandSingleTable<T> extends ClusterExecuteServerReadCommand<T> {
+
+  @RequiredArgument("The name of the table.")
+  private String table;
+
+  @OptionalArgument("The shard ids (e.g. shard-0000000).")
+  private Set<String> shards;
+
+  @Override
+  public Set<String> routeTables(BaseContext context) {
+    return asSet(table);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Set<Shard> routeShards(BaseContext context, Set<String> tables) {
+    if (shards == null) {
+      return Collections.EMPTY_SET;
+    }
+    Set<Shard> result = new TreeSet<Shard>();
+    for (String shard : shards) {
+      result.add(new Shard(table, shard));
+    }
+    return result;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public Set<String> getShards() {
+    return shards;
+  }
+
+  public void setShards(Set<String> shards) {
+    this.shards = shards;
+  }
+
+  public void addShard(String shard) {
+    if (shards == null) {
+      shards = new TreeSet<String>();
+    }
+    shards.add(shard);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterIndexReadCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterIndexReadCommand.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterIndexReadCommand.java
new file mode 100644
index 0000000..ad526d2
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterIndexReadCommand.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.blur.command.commandtype;
+
+import java.io.IOException;
+
+import org.apache.blur.command.ClusterContext;
+import org.apache.blur.command.Command;
+import org.apache.blur.command.CommandRunner;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.IndexRead;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+
+public abstract class ClusterIndexReadCommand<T1, T2> extends Command<T2> implements IndexRead<T1> {
+
+  public abstract T1 execute(IndexContext context) throws IOException, InterruptedException;
+
+  public abstract T2 clusterExecute(ClusterContext context) throws IOException, InterruptedException;
+
+  @Override
+  public T2 run() throws IOException {
+    try {
+      return CommandRunner.run(this);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public T2 run(String connectionStr) throws IOException {
+    try {
+      return CommandRunner.run(this, connectionStr);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public T2 run(Iface client) throws IOException {
+    try {
+      return CommandRunner.run(this, client);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterIndexReadCommandSingleTable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterIndexReadCommandSingleTable.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterIndexReadCommandSingleTable.java
new file mode 100644
index 0000000..dc38c77
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterIndexReadCommandSingleTable.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.commandtype;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.blur.command.BaseContext;
+import org.apache.blur.command.Shard;
+import org.apache.blur.command.annotation.OptionalArgument;
+import org.apache.blur.command.annotation.RequiredArgument;
+
+public abstract class ClusterIndexReadCommandSingleTable<T1, T2> extends ClusterIndexReadCommand<T1, T2> {
+
+  @RequiredArgument("The name of the table.")
+  private String table;
+
+  @OptionalArgument("The shard ids (e.g. shard-0000000).")
+  private Set<String> shards;
+
+  @Override
+  public Set<String> routeTables(BaseContext context) {
+    return asSet(table);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Set<Shard> routeShards(BaseContext context, Set<String> tables) {
+    if (shards == null) {
+      return Collections.EMPTY_SET;
+    }
+    Set<Shard> result = new TreeSet<Shard>();
+    for (String shard : shards) {
+      result.add(new Shard(table, shard));
+    }
+    return result;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public Set<String> getShards() {
+    return shards;
+  }
+
+  public void setShards(Set<String> shards) {
+    this.shards = shards;
+  }
+
+  public void addShard(String shard) {
+    if (shards == null) {
+      shards = new TreeSet<String>();
+    }
+    shards.add(shard);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterServerReadCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterServerReadCommand.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterServerReadCommand.java
new file mode 100644
index 0000000..ec582f7
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterServerReadCommand.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.commandtype;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.blur.command.CombiningContext;
+import org.apache.blur.command.Command;
+import org.apache.blur.command.CommandRunner;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.ServerRead;
+import org.apache.blur.command.Location;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+
+public abstract class ClusterServerReadCommand<T> extends Command<T> implements ServerRead<T, T> {
+
+  public abstract T execute(IndexContext context) throws IOException, InterruptedException;
+
+  public abstract T combine(CombiningContext context, Map<? extends Location<?>, T> results) throws IOException,
+      InterruptedException;
+
+  @Override
+  public T run() throws IOException {
+    try {
+      return CommandRunner.run(this);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public T run(String connectionStr) throws IOException {
+    try {
+      return CommandRunner.run(this, connectionStr);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public T run(Iface client) throws IOException {
+    try {
+      return CommandRunner.run(this, client);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterServerReadCommandSingleTable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterServerReadCommandSingleTable.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterServerReadCommandSingleTable.java
new file mode 100644
index 0000000..1762127
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/ClusterServerReadCommandSingleTable.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.commandtype;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.blur.command.BaseContext;
+import org.apache.blur.command.Shard;
+import org.apache.blur.command.annotation.OptionalArgument;
+import org.apache.blur.command.annotation.RequiredArgument;
+
+public abstract class ClusterServerReadCommandSingleTable<T> extends ClusterServerReadCommand<T> {
+
+  @RequiredArgument("The name of the table.")
+  private String table;
+
+  @OptionalArgument("The shard ids (e.g. shard-0000000).")
+  private Set<String> shards;
+
+  @Override
+  public Set<String> routeTables(BaseContext context) {
+    return asSet(table);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Set<Shard> routeShards(BaseContext context, Set<String> tables) {
+    if (shards == null) {
+      return Collections.EMPTY_SET;
+    }
+    Set<Shard> result = new TreeSet<Shard>();
+    for (String shard : shards) {
+      result.add(new Shard(table, shard));
+    }
+    return result;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public Set<String> getShards() {
+    return shards;
+  }
+
+  public void setShards(Set<String> shards) {
+    this.shards = shards;
+  }
+
+  public void addShard(String shard) {
+    if (shards == null) {
+      shards = new TreeSet<String>();
+    }
+    shards.add(shard);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/IndexReadCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/IndexReadCommand.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/IndexReadCommand.java
new file mode 100644
index 0000000..5d2b75d
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/IndexReadCommand.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.commandtype;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.blur.command.Command;
+import org.apache.blur.command.CommandRunner;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.IndexRead;
+import org.apache.blur.command.Shard;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+
+public abstract class IndexReadCommand<T> extends Command<Map<Shard, T>> implements IndexRead<T> {
+
+  public abstract T execute(IndexContext context) throws IOException, InterruptedException;
+
+  @Override
+  public Map<Shard, T> run() throws IOException {
+    try {
+      return CommandRunner.run(this);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Map<Shard, T> run(String connectionStr) throws IOException {
+    try {
+      return CommandRunner.run(this, connectionStr);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Map<Shard, T> run(Iface client) throws IOException {
+    try {
+      return CommandRunner.run(this, client);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/IndexReadCommandSingleTable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/IndexReadCommandSingleTable.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/IndexReadCommandSingleTable.java
new file mode 100644
index 0000000..5129311
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/IndexReadCommandSingleTable.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.commandtype;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.blur.command.BaseContext;
+import org.apache.blur.command.Shard;
+import org.apache.blur.command.annotation.OptionalArgument;
+import org.apache.blur.command.annotation.RequiredArgument;
+
+public abstract class IndexReadCommandSingleTable<T> extends IndexReadCommand<T> {
+
+  @RequiredArgument("The name of the table.")
+  private String table;
+
+  @OptionalArgument("The shard ids (e.g. shard-0000000).")
+  private Set<String> shards;
+
+  @Override
+  public Set<String> routeTables(BaseContext context) {
+    return asSet(table);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Set<Shard> routeShards(BaseContext context, Set<String> tables) {
+    if (shards == null) {
+      return Collections.EMPTY_SET;
+    }
+    Set<Shard> result = new TreeSet<Shard>();
+    for (String shard : shards) {
+      result.add(new Shard(table, shard));
+    }
+    return result;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public Set<String> getShards() {
+    return shards;
+  }
+
+  public void setShards(Set<String> shards) {
+    this.shards = shards;
+  }
+
+  public void addShard(String shard) {
+    if (shards == null) {
+      shards = new TreeSet<String>();
+    }
+    shards.add(shard);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/ServerReadCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/ServerReadCommand.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/ServerReadCommand.java
new file mode 100644
index 0000000..7a553e9
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/ServerReadCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.commandtype;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.blur.command.CombiningContext;
+import org.apache.blur.command.Command;
+import org.apache.blur.command.CommandRunner;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.ServerRead;
+import org.apache.blur.command.Location;
+import org.apache.blur.command.Server;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+
+public abstract class ServerReadCommand<T1, T2> extends Command<Map<Server, T2>> implements
+    ServerRead<T1, T2> {
+
+  public abstract T1 execute(IndexContext context) throws IOException, InterruptedException;
+
+  public abstract T2 combine(CombiningContext context, Map<? extends Location<?>, T1> results) throws IOException,
+      InterruptedException;
+
+  @Override
+  public Map<Server, T2> run() throws IOException {
+    try {
+      return CommandRunner.run(this);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Map<Server, T2> run(String connectionStr) throws IOException {
+    try {
+      return CommandRunner.run(this, connectionStr);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Map<Server, T2> run(Iface client) throws IOException {
+    try {
+      return CommandRunner.run(this, client);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/command/commandtype/ServerReadCommandSingleTable.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/commandtype/ServerReadCommandSingleTable.java b/blur-core/src/main/java/org/apache/blur/command/commandtype/ServerReadCommandSingleTable.java
new file mode 100644
index 0000000..6a7f07a
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/commandtype/ServerReadCommandSingleTable.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.commandtype;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.blur.command.BaseContext;
+import org.apache.blur.command.Shard;
+import org.apache.blur.command.annotation.OptionalArgument;
+import org.apache.blur.command.annotation.RequiredArgument;
+
+public abstract class ServerReadCommandSingleTable<T1, T2> extends ServerReadCommand<T1, T2> {
+
+  @RequiredArgument("The name of the table.")
+  private String table;
+
+  @OptionalArgument("The shard ids (e.g. shard-0000000).")
+  private Set<String> shards;
+
+  @Override
+  public Set<String> routeTables(BaseContext context) {
+    return asSet(table);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Set<Shard> routeShards(BaseContext context, Set<String> tables) {
+    if (shards == null) {
+      return Collections.EMPTY_SET;
+    }
+    Set<Shard> result = new TreeSet<Shard>();
+    for (String shard : shards) {
+      result.add(new Shard(table, shard));
+    }
+    return result;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public Set<String> getShards() {
+    return shards;
+  }
+
+  public void setShards(Set<String> shards) {
+    this.shards = shards;
+  }
+
+  public void addShard(String shard) {
+    if (shards == null) {
+      shards = new TreeSet<String>();
+    }
+    shards.add(shard);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/server/LayoutFactory.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/LayoutFactory.java b/blur-core/src/main/java/org/apache/blur/server/LayoutFactory.java
index 6202811..6211a9a 100644
--- a/blur-core/src/main/java/org/apache/blur/server/LayoutFactory.java
+++ b/blur-core/src/main/java/org/apache/blur/server/LayoutFactory.java
@@ -17,7 +17,6 @@
 package org.apache.blur.server;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.blur.command.Server;
@@ -30,6 +29,6 @@ public interface LayoutFactory {
 
   Set<Connection> getServerConnections() throws IOException;
 
-  boolean isValidServer(Server server, Set<String> tables, Map<String, Set<Shard>> shards);
+  boolean isValidServer(Server server, Set<String> tables, Set<Shard> shards);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27107abf/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 4cbae31..32967a1 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 
+import org.apache.blur.command.ArgumentOverlay;
+import org.apache.blur.command.BlurObject;
 import org.apache.blur.command.CommandUtil;
 import org.apache.blur.command.ControllerCommandManager;
 import org.apache.blur.command.ExecutionId;
@@ -1515,13 +1517,8 @@ public class BlurControllerServer extends TableAdmin implements Iface {
   public org.apache.blur.thrift.generated.Response execute(String commandName, Arguments arguments)
       throws BlurException, TException {
     try {
-      // TableContext tableContext = getTableContext(table);
-      // Map<String, String> tableLayout = getTableLayout(table);
-      Response response = _commandManager.execute(getTableContextFactory(), getLayoutFactory(), commandName,
-          CommandUtil.toArgs(arguments));
-      // Response response = _commandManager
-      // .execute(tableContext, commandName, CommandUtil.toArgs(arguments),
-      // tableLayout);
+      BlurObject args = CommandUtil.toBlurObject(arguments);
+      Response response = _commandManager.execute(getTableContextFactory(), getLayoutFactory(), commandName, new ArgumentOverlay(args));
       return CommandUtil.fromObjectToThrift(response);
     } catch (Exception e) {
       if (e instanceof org.apache.blur.command.TimeoutException) {
@@ -1594,20 +1591,19 @@ public class BlurControllerServer extends TableAdmin implements Iface {
       }
 
       @Override
-      public boolean isValidServer(Server server, Set<String> tables, Map<String, Set<Shard>> shards) {
+      public boolean isValidServer(Server server, Set<String> tables, Set<Shard> shards) {
         for (String table : tables) {
           String cluster = _clusterStatus.getCluster(true, table);
           List<String> onlineShardServers = _clusterStatus.getOnlineShardServers(true, cluster);
           if (!onlineShardServers.contains(server.getServer())) {
             return false;
           }
-          Set<Shard> shardSet = shards.get(table);
-          if (shardSet.isEmpty()) {
+          if (shards == null || shards.isEmpty()) {
             return true;
           }
           Map<String, Map<String, String>> layout = _shardServerLayout.get();
           Map<String, String> shardIdToServerMap = layout.get(table);
-          for (Shard shard : shardSet) {
+          for (Shard shard : shards) {
             String serverId = shardIdToServerMap.get(shard.getShard());
             if (serverId.equals(server.getServer())) {
               return true;


Mime
View raw message