incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/3] git commit: Moving classes around from one package to another and extracted the commands from blur-core and made a new project called blur-command. There is a META-INF folder that contains a file to tell the blur server to load the commands.
Date Tue, 09 Sep 2014 22:08:00 GMT
Moving classes around from one package to another and extracted the commands from blur-core and made a new project called blur-command.  There is a META-INF folder that contains a file to tell the blur server to load the commands.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/e97a120d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/e97a120d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/e97a120d

Branch: refs/heads/master
Commit: e97a120dfbc5d9b43fce47ca9b703889f9564c69
Parents: c7a934a
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Sep 9 18:07:46 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Sep 9 18:07:46 2014 -0400

----------------------------------------------------------------------
 blur-command/pom.xml                            | 249 +++++++++++++++++
 .../org/apache/blur/command/AddDocument.java    |  55 ++++
 .../org/apache/blur/command/DocumentCount.java  |  40 +++
 .../blur/command/DocumentCountCombiner.java     |  66 +++++
 .../blur/command/DocumentCountNoCombine.java    |  55 ++++
 .../blur/command/TestBlurObjectCommand.java     |  73 +++++
 .../command/TestBlurObjectCommandUsing.java     |  41 +++
 .../org/apache/blur/command/WaitForSeconds.java |  43 +++
 .../services/org.apache.blur.command.Commands   |  16 ++
 .../main/java/org/apache/blur/command/Args.java |  47 ++++
 .../apache/blur/command/BaseCommandManager.java | 146 ++++++++++
 .../java/org/apache/blur/command/BlurArray.java | 248 +++++++++++++++++
 .../org/apache/blur/command/BlurObject.java     | 263 ++++++++++++++++++
 .../org/apache/blur/command/ClusterCommand.java |  27 ++
 .../org/apache/blur/command/ClusterContext.java |  61 +++++
 .../java/org/apache/blur/command/Command.java   |  35 +++
 .../org/apache/blur/command/CommandUtil.java    | 168 ++++++++++++
 .../blur/command/ControllerClusterContext.java  | 266 ++++++++++++++++++
 .../blur/command/ControllerCommandManager.java  |  90 +++++++
 .../org/apache/blur/command/IndexContext.java   |  42 +++
 .../blur/command/IndexReadCombiningCommand.java |  28 ++
 .../apache/blur/command/IndexReadCommand.java   |  25 ++
 .../apache/blur/command/IndexWriteCommand.java  |  27 ++
 .../apache/blur/command/ObjectArrayPacking.java | 165 ++++++++++++
 .../java/org/apache/blur/command/Response.java  |  63 +++++
 .../java/org/apache/blur/command/Server.java    |  70 +++++
 .../java/org/apache/blur/command/Shard.java     |  70 +++++
 .../blur/command/ShardCommandManager.java       | 205 ++++++++++++++
 .../apache/blur/command/ShardResultFuture.java  |  64 +++++
 .../apache/blur/command/TimeoutException.java   |  32 +++
 .../blur/command/annotation/Argument.java       |  28 ++
 .../blur/command/annotation/Arguments.java      |  26 ++
 .../org/apache/blur/manager/command/Args.java   |  47 ----
 .../manager/command/BaseCommandManager.java     | 126 ---------
 .../apache/blur/manager/command/BlurArray.java  | 248 -----------------
 .../apache/blur/manager/command/BlurObject.java | 263 ------------------
 .../blur/manager/command/ClusterCommand.java    |  27 --
 .../blur/manager/command/ClusterContext.java    |  61 -----
 .../blur/manager/command/CommandUtil.java       | 168 ------------
 .../command/ControllerClusterContext.java       | 267 -------------------
 .../command/ControllerCommandManager.java       |  91 -------
 .../blur/manager/command/IndexContext.java      |  42 ---
 .../command/IndexReadCombiningCommand.java      |  28 --
 .../blur/manager/command/IndexReadCommand.java  |  25 --
 .../blur/manager/command/IndexWriteCommand.java |  27 --
 .../manager/command/ObjectArrayPacking.java     | 165 ------------
 .../apache/blur/manager/command/Response.java   |  63 -----
 .../org/apache/blur/manager/command/Server.java |  70 -----
 .../org/apache/blur/manager/command/Shard.java  |  70 -----
 .../manager/command/ShardCommandManager.java    | 206 --------------
 .../blur/manager/command/ShardResultFuture.java |  64 -----
 .../blur/manager/command/TimeoutException.java  |  32 ---
 .../manager/command/annotation/Argument.java    |  28 --
 .../manager/command/annotation/Arguments.java   |  26 --
 .../blur/manager/command/cmds/AddDocument.java  |  54 ----
 .../blur/manager/command/cmds/BaseCommand.java  |  35 ---
 .../manager/command/cmds/DocumentCount.java     |  39 ---
 .../command/cmds/DocumentCountCombiner.java     |  65 -----
 .../command/cmds/DocumentCountNoCombine.java    |  54 ----
 .../command/cmds/TestBlurObjectCommand.java     |  72 -----
 .../cmds/TestBlurObjectCommandUsing.java        |  41 ---
 .../manager/command/cmds/WaitForSeconds.java    |  42 ---
 .../blur/thrift/BlurControllerServer.java       |  14 +-
 .../org/apache/blur/thrift/BlurShardServer.java |  14 +-
 .../blur/thrift/ThriftBlurControllerServer.java |   2 +-
 .../blur/thrift/ThriftBlurShardServer.java      |   2 +-
 .../blur/manager/command/BlurArrayTest.java     |   1 +
 pom.xml                                         |   2 +
 68 files changed, 2853 insertions(+), 2562 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-command/pom.xml
----------------------------------------------------------------------
diff --git a/blur-command/pom.xml b/blur-command/pom.xml
new file mode 100644
index 0000000..7c8417a
--- /dev/null
+++ b/blur-command/pom.xml
@@ -0,0 +1,249 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.blur</groupId>
+		<artifactId>blur</artifactId>
+		<version>0.2.4-incubating-SNAPSHOT</version>
+		<relativePath>../pom.xml</relativePath>
+	</parent>
+	<groupId>org.apache.blur</groupId>
+	<artifactId>blur-command</artifactId>
+	<version>${projectVersion}</version>
+	<packaging>jar</packaging>
+	<name>Blur Command</name>
+	<description>The Blur command module contains Blur Command implementations.</description>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+        <dependency>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+            <version>${commons-logging.version}</version>
+        </dependency>
+		<dependency>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+			<version>${zookeeper.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.mail</groupId>
+					<artifactId>mail</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>${junit.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>${slf4j.version}</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>${slf4j.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.json</groupId>
+			<artifactId>json</artifactId>
+			<version>${json.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>${log4j.version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.mail</groupId>
+					<artifactId>mail</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.yammer.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+		<dependency>
+  			<groupId>com.yammer.metrics</groupId>
+  			<artifactId>metrics-ganglia</artifactId>
+  			<version>${metrics-ganglia.version}</version>
+		</dependency>
+		<dependency>
+		  <groupId>com.yammer.metrics</groupId>
+		  <artifactId>metrics-graphite</artifactId>
+		  <version>${metrics-graphite.version}</version>
+		</dependency>
+		<dependency>
+ 			<groupId>com.google.guava</groupId>
+ 			<artifactId>guava</artifactId>
+ 			<version>${guava.version}</version>
+ 		</dependency>
+		<dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+		<dependency>
+			<groupId>com.yammer.metrics</groupId>
+			<artifactId>metrics-servlet</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+	</dependencies>
+
+
+	<repositories>
+		<repository>
+			<id>libdir</id>
+			<url>file://${basedir}/../lib</url>
+		</repository>
+	</repositories>
+
+	<build>
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-surefire-plugin</artifactId>
+					<configuration>
+						<argLine>-XX:+UseConcMarkSweepGC -Xmx1g -Xms1g</argLine>
+						<forkCount>2</forkCount>
+						<forkMode>always</forkMode>
+						<reuseForks>false</reuseForks>
+						<systemPropertyVariables>
+							<blur.tmp.dir>${project.build.directory}/target/tmp</blur.tmp.dir>
+						</systemPropertyVariables>
+					</configuration>
+				</plugin>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-compiler-plugin</artifactId>
+					<configuration>
+						<source>1.6</source>
+						<target>1.6</target>
+					</configuration>
+				</plugin>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-jar-plugin</artifactId>
+					<executions>
+						<execution>
+							<goals>
+								<goal>test-jar</goal>
+							</goals>
+						</execution>
+					</executions>
+				</plugin>
+				<plugin>
+					<artifactId>maven-assembly-plugin</artifactId>
+					<configuration>
+						<descriptorRefs>
+							<descriptorRef>jar-with-dependencies</descriptorRef>
+						</descriptorRefs>
+					</configuration>
+
+					<executions>
+						<execution>
+							<id>make-assembly</id>
+							<phase>package</phase>
+							<goals>
+								<goal>attached</goal>
+							</goals>
+						</execution>
+					</executions>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+	<profiles>
+		<profile>
+			<id>hadoop1</id>
+			<activation>
+				<property>
+					<name>hadoop1</name>
+				</property>
+			</activation>
+        <properties>
+            <projectVersion>${project.parent.version}-hadoop1</projectVersion>
+        </properties>
+		</profile>
+		<profile>
+			<id>hadoop2-mr1</id>
+			<activation>
+				<property>
+					<name>hadoop2-mr1</name>
+				</property>
+			</activation>
+        <properties>
+            <projectVersion>${project.parent.version}-hadoop2-mr1</projectVersion>
+        </properties>
+		</profile>
+		<profile>
+			<id>hadoop2</id>
+			<activation>
+				<property>
+					<name>hadoop2</name>
+				</property>
+			</activation>
+        <properties>
+            <projectVersion>${project.parent.version}-hadoop2</projectVersion>
+        </properties>
+		</profile>
+	</profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-command/src/main/java/org/apache/blur/command/AddDocument.java
----------------------------------------------------------------------
diff --git a/blur-command/src/main/java/org/apache/blur/command/AddDocument.java b/blur-command/src/main/java/org/apache/blur/command/AddDocument.java
new file mode 100644
index 0000000..6b1c05a
--- /dev/null
+++ b/blur-command/src/main/java/org/apache/blur/command/AddDocument.java
@@ -0,0 +1,55 @@
+package org.apache.blur.command;
+
+import java.io.IOException;
+
+import org.apache.blur.command.Args;
+import org.apache.blur.command.Command;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.IndexWriteCommand;
+import org.apache.blur.command.annotation.Argument;
+import org.apache.blur.command.annotation.Arguments;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+@SuppressWarnings("serial")
+@Arguments({
+    @Argument(name = "shard", value = "The shard id that the addDocument command is to be applied."),
+    @Argument(name = "doc", value = "Is a map of string key to string values that will be converted"
+        + "into a Lucene Document via the FieldManager and added to the index.") })
+public class AddDocument extends Command implements IndexWriteCommand<Void> {
+
+  @Override
+  public String getName() {
+    return "addDoc";
+  }
+
+  @Override
+  public Void execute(IndexContext context, IndexWriter writer) throws IOException {
+    Args args = context.getArgs();
+    Document doc = getDoc(args);
+    writer.addDocument(doc);
+    return null;
+  }
+
+  private Document getDoc(Args args) {
+    throw new RuntimeException("Not Implemented");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-command/src/main/java/org/apache/blur/command/DocumentCount.java
----------------------------------------------------------------------
diff --git a/blur-command/src/main/java/org/apache/blur/command/DocumentCount.java b/blur-command/src/main/java/org/apache/blur/command/DocumentCount.java
new file mode 100644
index 0000000..62b4e04
--- /dev/null
+++ b/blur-command/src/main/java/org/apache/blur/command/DocumentCount.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+
+import org.apache.blur.command.Command;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.IndexReadCommand;
+
+@SuppressWarnings("serial")
+public class DocumentCount extends Command implements IndexReadCommand<Integer> {
+
+  private static final String DOC_COUNT = "docCount";
+
+  @Override
+  public String getName() {
+    return DOC_COUNT;
+  }
+
+  @Override
+  public Integer execute(IndexContext context) throws IOException {
+    return context.getIndexReader().numDocs();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-command/src/main/java/org/apache/blur/command/DocumentCountCombiner.java
----------------------------------------------------------------------
diff --git a/blur-command/src/main/java/org/apache/blur/command/DocumentCountCombiner.java b/blur-command/src/main/java/org/apache/blur/command/DocumentCountCombiner.java
new file mode 100644
index 0000000..dd397b7
--- /dev/null
+++ b/blur-command/src/main/java/org/apache/blur/command/DocumentCountCombiner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.blur.command.Command;
+import org.apache.blur.command.ClusterCommand;
+import org.apache.blur.command.ClusterContext;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.IndexReadCombiningCommand;
+import org.apache.blur.command.Server;
+import org.apache.blur.command.Shard;
+
+@SuppressWarnings("serial")
+public class DocumentCountCombiner extends Command implements ClusterCommand<Long>,
+    IndexReadCombiningCommand<Integer, Long> {
+
+  private static final String DOC_COUNT_AGGREGATE = "docCountAggregate";
+
+  @Override
+  public String getName() {
+    return DOC_COUNT_AGGREGATE;
+  }
+
+  @Override
+  public Integer execute(IndexContext context) throws IOException {
+    return context.getIndexReader().numDocs();
+  }
+
+  @Override
+  public Long combine(Map<Shard, Integer> results) throws IOException {
+    long total = 0;
+    for (Integer i : results.values()) {
+      total += i;
+    }
+    return total;
+  }
+
+  @Override
+  public Long clusterExecute(ClusterContext context) throws IOException {
+    Map<Server, Long> results = context.readServers(null, DocumentCountCombiner.class);
+    long total = 0;
+    for (Entry<Server, Long> e : results.entrySet()) {
+      total += e.getValue();
+    }
+    return total;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-command/src/main/java/org/apache/blur/command/DocumentCountNoCombine.java
----------------------------------------------------------------------
diff --git a/blur-command/src/main/java/org/apache/blur/command/DocumentCountNoCombine.java b/blur-command/src/main/java/org/apache/blur/command/DocumentCountNoCombine.java
new file mode 100644
index 0000000..d3de1f6
--- /dev/null
+++ b/blur-command/src/main/java/org/apache/blur/command/DocumentCountNoCombine.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.blur.command.Command;
+import org.apache.blur.command.ClusterCommand;
+import org.apache.blur.command.ClusterContext;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.IndexReadCommand;
+import org.apache.blur.command.Shard;
+
+@SuppressWarnings("serial")
+public class DocumentCountNoCombine extends Command implements IndexReadCommand<Integer>, ClusterCommand<Long> {
+
+  private static final String DOC_COUNT_NO_COMBINE = "docCountNoCombine";
+
+  @Override
+  public String getName() {
+    return DOC_COUNT_NO_COMBINE;
+  }
+
+  @Override
+  public Integer execute(IndexContext context) throws IOException {
+    return context.getIndexReader().numDocs();
+  }
+
+  @Override
+  public Long clusterExecute(ClusterContext context) throws IOException {
+    Map<Shard, Integer> indexes = context.readIndexes(null, DocumentCountNoCombine.class);
+    long total = 0;
+    for (Entry<Shard, Integer> e : indexes.entrySet()) {
+      total += e.getValue();
+    }
+    return total;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-command/src/main/java/org/apache/blur/command/TestBlurObjectCommand.java
----------------------------------------------------------------------
diff --git a/blur-command/src/main/java/org/apache/blur/command/TestBlurObjectCommand.java b/blur-command/src/main/java/org/apache/blur/command/TestBlurObjectCommand.java
new file mode 100644
index 0000000..e7de338
--- /dev/null
+++ b/blur-command/src/main/java/org/apache/blur/command/TestBlurObjectCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.blur.command.Command;
+import org.apache.blur.command.BlurObject;
+import org.apache.blur.command.ClusterCommand;
+import org.apache.blur.command.ClusterContext;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.IndexReadCombiningCommand;
+import org.apache.blur.command.Server;
+import org.apache.blur.command.Shard;
+
+@SuppressWarnings("serial")
+public class TestBlurObjectCommand extends Command implements IndexReadCombiningCommand<BlurObject, BlurObject>,
+    ClusterCommand<BlurObject> {
+
+  @Override
+  public BlurObject execute(IndexContext context) throws IOException {
+    BlurObject blurObject = new BlurObject();
+    blurObject.accumulate("docCount", context.getIndexReader().numDocs());
+    return blurObject;
+  }
+
+  @Override
+  public BlurObject combine(Map<Shard, BlurObject> results) throws IOException {
+    BlurObject blurObject = new BlurObject();
+    long total = 0;
+    for (Entry<Shard, BlurObject> e : results.entrySet()) {
+      total += e.getValue().getInteger("docCount");
+    }
+    blurObject.put("docCount", total);
+    return blurObject;
+  }
+
+  @Override
+  public BlurObject clusterExecute(ClusterContext context) throws IOException {
+    BlurObject blurObject = new BlurObject();
+    Map<Server, BlurObject> results = context.readServers(null, TestBlurObjectCommand.class);
+    long total = 0;
+    for (Entry<Server, BlurObject> e : results.entrySet()) {
+      BlurObject value = e.getValue();
+      Long count = value.getLong("docCount");
+      total += count;
+    }
+    blurObject.put("docCount", total);
+    return blurObject;
+  }
+
+  @Override
+  public String getName() {
+    return "testBlurObject";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-command/src/main/java/org/apache/blur/command/TestBlurObjectCommandUsing.java
----------------------------------------------------------------------
diff --git a/blur-command/src/main/java/org/apache/blur/command/TestBlurObjectCommandUsing.java b/blur-command/src/main/java/org/apache/blur/command/TestBlurObjectCommandUsing.java
new file mode 100644
index 0000000..c2a7a80
--- /dev/null
+++ b/blur-command/src/main/java/org/apache/blur/command/TestBlurObjectCommandUsing.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+
+import org.apache.blur.command.BlurObject;
+import org.apache.blur.command.ObjectArrayPacking;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClientManager;
+import org.apache.blur.thrift.Connection;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Response;
+
+public class TestBlurObjectCommandUsing {
+
+  public static void main(String[] args) throws BlurException, TException, IOException {
+    Client client = BlurClientManager.getClientPool().getClient(new Connection("localhost:40010"));
+    Response response = client.execute("test", "testBlurObject", null);
+    BlurObject object = (BlurObject) ObjectArrayPacking.unpack(response.getValue().getBlurObject());
+    System.out.println(object);
+    // prints => {"docCount":10005}
+    System.out.println(object.getLong("docCount"));
+    // prints => 10005
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-command/src/main/java/org/apache/blur/command/WaitForSeconds.java
----------------------------------------------------------------------
diff --git a/blur-command/src/main/java/org/apache/blur/command/WaitForSeconds.java b/blur-command/src/main/java/org/apache/blur/command/WaitForSeconds.java
new file mode 100644
index 0000000..833384b
--- /dev/null
+++ b/blur-command/src/main/java/org/apache/blur/command/WaitForSeconds.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+
+import org.apache.blur.command.Command;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.IndexReadCommand;
+
+@SuppressWarnings("serial")
+public class WaitForSeconds extends Command implements IndexReadCommand<Boolean> {
+
+  @Override
+  public Boolean execute(IndexContext context) throws IOException {
+    try {
+      Thread.sleep(30000);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+    return true;
+  }
+
+  @Override
+  public String getName() {
+    return "wait";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-command/src/main/resources/META-INF/services/org.apache.blur.command.Commands
----------------------------------------------------------------------
diff --git a/blur-command/src/main/resources/META-INF/services/org.apache.blur.command.Commands b/blur-command/src/main/resources/META-INF/services/org.apache.blur.command.Commands
new file mode 100644
index 0000000..e10270e
--- /dev/null
+++ b/blur-command/src/main/resources/META-INF/services/org.apache.blur.command.Commands
@@ -0,0 +1,16 @@
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+org.apache.blur.command.DocumentCount
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/Args.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/Args.java b/blur-core/src/main/java/org/apache/blur/command/Args.java
new file mode 100644
index 0000000..ec14700
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/Args.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Args {
+
+  private final Map<String, Object> _values = new HashMap<String, Object>();
+
+  @SuppressWarnings("unchecked")
+  public <T> T get(String name) {
+    return (T) _values.get(name);
+  }
+
+  public <T> T get(String name, T defaultValue) {
+    T t = get(name);
+    if (t == null) {
+      return defaultValue;
+    }
+    return t;
+  }
+
+  public <T> void set(String name, T value) {
+    _values.put(name, value);
+  }
+
+  public Map<String, Object> getValues() {
+    return _values;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
new file mode 100644
index 0000000..cea496b
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
@@ -0,0 +1,146 @@
+package org.apache.blur.command;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class BaseCommandManager implements Closeable {
+
+  private static final String META_INF_SERVICES_ORG_APACHE_BLUR_COMMAND_COMMANDS = "META-INF/services/org.apache.blur.command.Commands";
+
+  private final static Log LOG = LogFactory.getLog(BaseCommandManager.class);
+
+  protected final ExecutorService _executorService;
+  protected final Map<String, Command> _command = new ConcurrentHashMap<String, Command>();
+  protected final Map<Class<? extends Command>, String> _commandNameLookup = new ConcurrentHashMap<Class<? extends Command>, String>();
+  protected final ExecutorService _executorServiceDriver;
+  protected final ConcurrentHashMap<String, Future<Response>> _runningMap = new ConcurrentHashMap<String, Future<Response>>();
+  protected final long _connectionTimeout;
+
+  public BaseCommandManager(int threadCount, long connectionTimeout) throws IOException {
+    lookForCommandsToRegister();
+    _executorService = Executors.newThreadPool("command-", threadCount);
+    _executorServiceDriver = Executors.newThreadPool("command-driver-", threadCount);
+    _connectionTimeout = connectionTimeout / 2;
+  }
+
+  @SuppressWarnings("unchecked")
+  private void lookForCommandsToRegister() throws IOException {
+    Enumeration<URL> systemResources = ClassLoader
+        .getSystemResources(META_INF_SERVICES_ORG_APACHE_BLUR_COMMAND_COMMANDS);
+    Properties properties = new Properties();
+    while (systemResources.hasMoreElements()) {
+      URL url = systemResources.nextElement();
+      InputStream inputStream = url.openStream();
+      properties.load(inputStream);
+      inputStream.close();
+    }
+    Set<Object> keySet = properties.keySet();
+    for (Object o : keySet) {
+      String classNameToRegister = o.toString();
+      try {
+        register((Class<? extends Command>) Class.forName(classNameToRegister));
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  public Response reconnect(String executionId) throws IOException, TimeoutException {
+    Future<Response> future = _runningMap.get(executionId);
+    if (future == null) {
+      throw new IOException("Command id [" + executionId + "] did not find any executing commands.");
+    }
+    try {
+      return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } catch (java.util.concurrent.TimeoutException e) {
+      LOG.info("Timeout of command [{0}]", executionId);
+      throw new TimeoutException(executionId);
+    }
+  }
+
+  protected Response submitCallable(Callable<Response> callable) throws IOException, TimeoutException {
+    String executionId = UUID.randomUUID().toString();
+    Future<Response> future = _executorServiceDriver.submit(callable);
+    _runningMap.put(executionId, future);
+    try {
+      return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } catch (java.util.concurrent.TimeoutException e) {
+      LOG.info("Timeout of command [{0}]", executionId);
+      throw new TimeoutException(executionId);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    _executorService.shutdownNow();
+    _executorServiceDriver.shutdownNow();
+  }
+
+  public void register(Class<? extends Command> commandClass) throws IOException {
+    try {
+      Command command = commandClass.newInstance();
+      _command.put(command.getName(), command);
+      _commandNameLookup.put(commandClass, command.getName());
+      LOG.info("Command [{0}] from class [{1}] registered.", command.getName(), commandClass.getName());
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+
+  protected Command getCommandObject(String commandName) {
+    return _command.get(commandName);
+  }
+
+  protected String getCommandName(Class<? extends Command> clazz) {
+    return _commandNameLookup.get(clazz);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/BlurArray.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/BlurArray.java b/blur-core/src/main/java/org/apache/blur/command/BlurArray.java
new file mode 100644
index 0000000..6bb5d30
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/BlurArray.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BlurArray {
+
+  private List<Object> _values = new ArrayList<Object>();
+
+  public BlurArray() {
+
+  }
+
+  public BlurArray(BlurArray array) {
+    _values.addAll(array._values);
+  }
+
+  public void clear() {
+    _values.clear();
+  }
+
+  public String getString(int index) {
+    return (String) getObject(index);
+  }
+
+  public void put(String value) {
+    put((Object) value);
+  }
+
+  public void put(int index, String value) {
+    put(index, (Object) value);
+  }
+
+  public Integer getInteger(int index) {
+    return (Integer) getObject(index);
+  }
+
+  public void put(Integer value) {
+    put((Object) value);
+  }
+
+  public void put(int index, Integer value) {
+    put(index, (Object) value);
+  }
+
+  public Short getShort(int index) {
+    return (Short) getObject(index);
+  }
+
+  public void put(Short value) {
+    put((Object) value);
+  }
+
+  public void put(int index, Short value) {
+    put(index, (Object) value);
+  }
+
+  public Long getLong(int index) {
+    return (Long) getObject(index);
+  }
+
+  public void put(Long value) {
+    put((Object) value);
+  }
+
+  public void put(int index, Long value) {
+    put(index, (Object) value);
+  }
+
+  public Double getDouble(int index) {
+    return (Double) getObject(index);
+  }
+
+  public void put(Double value) {
+    put((Object) value);
+  }
+
+  public void put(int index, Double value) {
+    put(index, (Object) value);
+  }
+
+  public Float getFloat(int index) {
+    return (Float) getObject(index);
+  }
+
+  public void put(Float value) {
+    put((Object) value);
+  }
+
+  public void put(int index, Float value) {
+    put(index, (Object) value);
+  }
+
+  public byte[] getBinary(int index) {
+    return (byte[]) getObject(index);
+  }
+
+  public void put(byte[] value) {
+    put((Object) value);
+  }
+
+  public void put(int index, byte[] value) {
+    put(index, (Object) value);
+  }
+
+  public Boolean getBoolean(int index) {
+    return (Boolean) getObject(index);
+  }
+
+  public void put(Boolean value) {
+    put((Object) value);
+  }
+
+  public void put(int index, Boolean value) {
+    put(index, (Object) value);
+  }
+
+  public BlurObject getBlurObject(int index) {
+    return (BlurObject) getObject(index);
+  }
+
+  public void put(BlurObject value) {
+    put((Object) value);
+  }
+
+  public void put(int index, BlurObject value) {
+    put(index, (Object) value);
+  }
+
+  public BlurArray getBlurArray(int index) {
+    return (BlurArray) getObject(index);
+  }
+
+  public void put(BlurArray value) {
+    put((Object) value);
+  }
+
+  public void put(int index, BlurArray value) {
+    put(index, (Object) value);
+  }
+
+  public void put(Object value) {
+    BlurObject.checkType(value);
+    _values.add(value);
+  }
+
+  public void put(int index, Object value) {
+    BlurObject.checkType(value);
+    int sizeNeeded = index + 1;
+    while (_values.size() < sizeNeeded) {
+      _values.add(null);
+    }
+    _values.set(index, value);
+  }
+
+  @Override
+  public String toString() {
+    return toString(0);
+  }
+
+  public String toString(int i) {
+    StringBuilder builder = new StringBuilder();
+    builder.append('[');
+    boolean comma = false;
+    for (Object value : _values) {
+      if (comma) {
+        builder.append(',');
+      }
+      comma = true;
+      if (i > 0) {
+        builder.append('\n');
+        for (int j = 0; j < i; j++) {
+          builder.append(' ');
+        }
+      }
+      if (value instanceof BlurObject) {
+        builder.append(((BlurObject) value).toString(i > 0 ? i + 1 : 0));
+      } else if (value instanceof BlurArray) {
+        builder.append(((BlurArray) value).toString(i > 0 ? i + 1 : 0));
+      } else {
+        builder.append(BlurObject.stringify(value));
+      }
+    }
+    if (i > 0) {
+      builder.append('\n');
+      for (int j = 0; j < i - 1; j++) {
+        builder.append(' ');
+      }
+    }
+    builder.append(']');
+    return builder.toString();
+  }
+
+  public int length() {
+    return _values.size();
+  }
+
+  public Object getObject(int i) {
+    return _values.get(i);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> T get(int i) {
+    return (T) _values.get(i);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_values == null) ? 0 : _values.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurArray other = (BlurArray) obj;
+    if (_values == null) {
+      if (other._values != null)
+        return false;
+    } else if (!_values.equals(other._values))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/BlurObject.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/BlurObject.java b/blur-core/src/main/java/org/apache/blur/command/BlurObject.java
new file mode 100644
index 0000000..139c7de
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/BlurObject.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+public class BlurObject {
+
+  private final static char[] hexArray = "0123456789ABCDEF".toCharArray();
+
+  private Map<String, Object> _valueMap = new TreeMap<String, Object>();
+
+  public BlurObject() {
+
+  }
+
+  public BlurObject(BlurObject object) {
+    _valueMap.putAll(object._valueMap);
+  }
+
+  public void accumulate(String name, String value) {
+    accumulate(name, (Object) value);
+  }
+
+  public void put(String name, String value) {
+    put(name, (Object) value);
+  }
+
+  public String getString(String name) {
+    return (String) _valueMap.get(name);
+  }
+
+  public void accumulate(String name, Integer value) {
+    accumulate(name, (Object) value);
+  }
+
+  public void put(String name, Integer value) {
+    put(name, (Object) value);
+  }
+
+  public Integer getInteger(String name) {
+    return (Integer) _valueMap.get(name);
+  }
+
+  public void accumulate(String name, Short value) {
+    accumulate(name, (Object) value);
+  }
+
+  public void put(String name, Short value) {
+    put(name, (Object) value);
+  }
+
+  public Short getShort(String name) {
+    return (Short) _valueMap.get(name);
+  }
+
+  public void accumulate(String name, Long value) {
+    accumulate(name, (Object) value);
+  }
+
+  public void put(String name, Long value) {
+    put(name, (Object) value);
+  }
+
+  public Long getLong(String name) {
+    return (Long) _valueMap.get(name);
+  }
+
+  public void accumulate(String name, Double value) {
+    accumulate(name, (Object) value);
+  }
+
+  public void put(String name, Double value) {
+    put(name, (Object) value);
+  }
+
+  public Double getDouble(String name) {
+    return (Double) _valueMap.get(name);
+  }
+
+  public void accumulate(String name, Float value) {
+    accumulate(name, (Object) value);
+  }
+
+  public void put(String name, Float value) {
+    put(name, (Object) value);
+  }
+
+  public Float getFloat(String name) {
+    return (Float) _valueMap.get(name);
+  }
+
+  public void accumulate(String name, byte[] value) {
+    accumulate(name, (Object) value);
+  }
+
+  public void put(String name, byte[] value) {
+    put(name, (Object) value);
+  }
+
+  public byte[] getBinary(String name) {
+    return (byte[]) _valueMap.get(name);
+  }
+
+  public void accumulate(String name, Boolean value) {
+    accumulate(name, (Object) value);
+  }
+
+  public void put(String name, Boolean value) {
+    put(name, (Object) value);
+  }
+
+  public Boolean getBoolean(String name) {
+    return (Boolean) _valueMap.get(name);
+  }
+
+  public void accumulate(String name, BlurObject value) {
+    accumulate(name, (Object) value);
+  }
+
+  public void put(String name, BlurObject value) {
+    put(name, (Object) value);
+  }
+
+  public BlurObject getBlurObject(String name) {
+    return (BlurObject) _valueMap.get(name);
+  }
+
+  public void accumulate(String name, BlurArray value) {
+    accumulate(name, (Object) value);
+  }
+
+  public void put(String name, BlurArray value) {
+    put(name, (Object) value);
+  }
+
+  public BlurArray getBlurArray(String name) {
+    return (BlurArray) _valueMap.get(name);
+  }
+
+  public void accumulate(String name, Object value) {
+    checkType(value);
+    Object object = _valueMap.get(name);
+    if (object == null) {
+      _valueMap.put(name, value);
+    } else {
+      if (object instanceof BlurArray) {
+        BlurArray array = (BlurArray) object;
+        array.put(value);
+      } else {
+        BlurArray array = new BlurArray();
+        array.put(object);
+        array.put(value);
+        _valueMap.put(name, array);
+      }
+    }
+  }
+
+  public static void checkType(Object value) {
+
+  }
+
+  public void put(String name, Object value) {
+    checkType(value);
+    _valueMap.put(name, value);
+  }
+
+  @Override
+  public String toString() {
+    return toString(0);
+  }
+
+  public String toString(int i) {
+    StringBuilder builder = new StringBuilder();
+    builder.append('{');
+    boolean comma = false;
+    for (Entry<String, Object> e : _valueMap.entrySet()) {
+      if (comma) {
+        builder.append(',');
+      }
+      comma = true;
+      if (i > 0) {
+        builder.append('\n');
+        for (int j = 0; j < i; j++) {
+          builder.append(' ');
+        }
+      }
+      builder.append(stringify(e.getKey()));
+      builder.append(':');
+      Object value = e.getValue();
+      if (value instanceof BlurObject) {
+        builder.append(((BlurObject) value).toString(i > 0 ? i + 1 : 0));
+      } else if (value instanceof BlurArray) {
+        builder.append(((BlurArray) value).toString(i > 0 ? i + 1 : 0));
+      } else {
+        builder.append(stringify(value));
+      }
+    }
+    if (i > 0) {
+      builder.append('\n');
+      for (int j = 0; j < i - 1; j++) {
+        builder.append(' ');
+      }
+    }
+    builder.append('}');
+    return builder.toString();
+  }
+
+  public static String toHexString(byte[] bs) {
+    char[] hexChars = new char[bs.length * 2];
+    for (int j = 0; j < bs.length; j++) {
+      int v = bs[j] & 0xFF;
+      hexChars[j * 2] = hexArray[v >>> 4];
+      hexChars[j * 2 + 1] = hexArray[v & 0x0F];
+    }
+    return new String(hexChars);
+  }
+
+  public static Object stringify(Object o) {
+    if (o instanceof Number) {
+      return o.toString();
+    } else if (o instanceof byte[]) {
+      return toHexString((byte[]) o);
+    } else if (o instanceof Boolean) {
+      return o.toString();
+    } else if (o instanceof String) {
+      return "\"" + o.toString() + "\"";
+    } else {
+      throw new RuntimeException("Cannot stringify object [" + o + "]");
+    }
+  }
+
+  public Iterator<String> keys() {
+    return _valueMap.keySet().iterator();
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> T get(String name) {
+    return (T) _valueMap.get(name);
+  }
+
+  public Object getObject(String name) {
+    return _valueMap.get(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/ClusterCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ClusterCommand.java b/blur-core/src/main/java/org/apache/blur/command/ClusterCommand.java
new file mode 100644
index 0000000..7d65c85
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ClusterCommand.java
@@ -0,0 +1,27 @@
+package org.apache.blur.command;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public interface ClusterCommand<T> extends Serializable, Cloneable {
+
+  T clusterExecute(ClusterContext context) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/ClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ClusterContext.java b/blur-core/src/main/java/org/apache/blur/command/ClusterContext.java
new file mode 100644
index 0000000..8a1f6f7
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ClusterContext.java
@@ -0,0 +1,61 @@
+package org.apache.blur.command;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.server.TableContext;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public abstract class ClusterContext {
+
+  public abstract Args getArgs();
+
+  public abstract TableContext getTableContext();
+
+  public abstract BlurConfiguration getBlurConfiguration();
+
+  public abstract Configuration getConfiguration();
+
+  public abstract <T> Map<Shard, T> readIndexes(Args args, Class<? extends IndexReadCommand<T>> clazz)
+      throws IOException;
+
+  public abstract <T> Map<Shard, Future<T>> readIndexesAsync(Args args, Class<? extends IndexReadCommand<T>> clazz)
+      throws IOException;
+
+  public abstract <T> T readIndex(Shard shard, Args args, Class<? extends IndexReadCommand<T>> clazz)
+      throws IOException;
+
+  public abstract <T> Future<T> readIndexAsync(Shard shard, Args args, Class<? extends IndexReadCommand<T>> clazz)
+      throws IOException;
+
+  public abstract <T> Map<Server, T> readServers(Args args, Class<? extends IndexReadCombiningCommand<?, T>> clazz)
+      throws IOException;
+
+  public abstract <T> Map<Server, Future<T>> readServersAsync(Args args,
+      Class<? extends IndexReadCombiningCommand<?, T>> clazz) throws IOException;
+
+  public abstract <T> T writeIndex(Args args, Class<? extends IndexWriteCommand<T>> clazz) throws IOException;
+
+  public abstract <T> Future<T> writeIndexAsync(Args args, Class<? extends IndexWriteCommand<T>> clazz)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/Command.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/Command.java b/blur-core/src/main/java/org/apache/blur/command/Command.java
new file mode 100644
index 0000000..a063629
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/Command.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public abstract class Command implements Serializable, Cloneable {
+
+  public abstract String getName();
+
+  @Override
+  public Command clone() {
+    try {
+      return (Command) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java b/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
new file mode 100644
index 0000000..1dd6a91
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
@@ -0,0 +1,168 @@
+package org.apache.blur.command;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.Arguments;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Value;
+import org.apache.blur.thrift.generated.ValueObject;
+import org.apache.blur.thrift.generated.ValueObject._Fields;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class CommandUtil {
+
+  public static org.apache.blur.thrift.generated.Response fromObjectToThrift(Response response) throws BlurException {
+    org.apache.blur.thrift.generated.Response converted = new org.apache.blur.thrift.generated.Response();
+    if (response.isAggregatedResults()) {
+      converted.setValue(toValueObject(response.getServerResult()));
+    } else {
+      Map<Server, Object> serverResults = response.getServerResults();
+      if (serverResults == null) {
+        Map<org.apache.blur.thrift.generated.Shard, ValueObject> fromObjectToThrift = fromObjectToThrift(response
+            .getShardResults());
+        converted.setShardToValue(fromObjectToThrift);
+      } else {
+        Map<org.apache.blur.thrift.generated.Server, ValueObject> fromObjectToThrift = fromObjectToThrift(serverResults);
+        converted.setServerToValue(fromObjectToThrift);
+      }
+    }
+    return converted;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T, R> Map<R, ValueObject> fromObjectToThrift(Map<T, Object> map) throws BlurException {
+    Map<R, ValueObject> result = new HashMap<R, ValueObject>();
+    for (Entry<T, Object> e : map.entrySet()) {
+      T key = e.getKey();
+      if (key instanceof Shard) {
+        Shard shard = (Shard) key;
+        result.put((R) new org.apache.blur.thrift.generated.Shard(shard.getShard()), toValueObject(e.getValue()));
+      } else if (key instanceof Server) {
+        Server server = (Server) key;
+        result.put((R) new org.apache.blur.thrift.generated.Server(server.getServer()), toValueObject(e.getValue()));
+      }
+    }
+    return result;
+  }
+
+  public static Value toValue(Object o) throws BlurException {
+    Value value = new Value();
+    if (o == null) {
+      value.setNullValue(true);
+      return value;
+    }
+    if (o instanceof Long) {
+      value.setLongValue((Long) o);
+      return value;
+    } else if (o instanceof String) {
+      value.setStringValue((String) o);
+      return value;
+    } else if (o instanceof Integer) {
+      value.setIntValue((Integer) o);
+      return value;
+    } else if (o instanceof Boolean) {
+      value.setBooleanValue((Boolean) o);
+      return value;
+    } else if (o instanceof Short) {
+      value.setShortValue((Short) o);
+      return value;
+    } else if (o instanceof byte[]) {
+      value.setBinaryValue((byte[]) o);
+      return value;
+    } else if (o instanceof Double) {
+      value.setDoubleValue((Double) o);
+      return value;
+    } else if (o instanceof Float) {
+      value.setFloatValue((Float) o);
+      return value;
+    }
+    throw new BException("Object [{0}] not supported.", o);
+  }
+
+  public static ValueObject toValueObject(Object o) throws BlurException {
+    ValueObject valueObject = new ValueObject();
+    if (o == null) {
+      valueObject.setValue(toValue(o));
+    } else if (o instanceof BlurObject || o instanceof BlurArray) {
+      valueObject.setBlurObject(ObjectArrayPacking.pack(o));
+    } else {
+      valueObject.setValue(toValue(o));
+    }
+    return valueObject;
+  }
+
+  public static Args toArgs(Arguments arguments) {
+    if (arguments == null) {
+      return null;
+    }
+    Args args = new Args();
+    Map<String, ValueObject> values = arguments.getValues();
+    Set<Entry<String, ValueObject>> entrySet = values.entrySet();
+    for (Entry<String, ValueObject> e : entrySet) {
+      args.set(e.getKey(), toObject(e.getValue()));
+    }
+    return args;
+  }
+
+  public static Object toObject(Value value) {
+    if (value.isSetNullValue()) {
+      return null;
+    }
+    return value.getFieldValue();
+  }
+
+  public static Arguments toArguments(Args args) throws BlurException {
+    if (args == null) {
+      return null;
+    }
+    Arguments arguments = new Arguments();
+    Set<Entry<String, Object>> entrySet = args.getValues().entrySet();
+    for (Entry<String, Object> e : entrySet) {
+      arguments.putToValues(e.getKey(), toValueObject(e.getValue()));
+    }
+    return arguments;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> Map<Shard, T> fromThriftToObject(
+      Map<org.apache.blur.thrift.generated.Shard, ValueObject> shardToValue) {
+    Map<Shard, T> result = new HashMap<Shard, T>();
+    for (Entry<org.apache.blur.thrift.generated.Shard, ValueObject> e : shardToValue.entrySet()) {
+      result.put(new Shard(e.getKey().getShard()), (T) CommandUtil.toObject(e.getValue()));
+    }
+    return result;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T toObject(ValueObject valueObject) {
+    _Fields field = valueObject.getSetField();
+    switch (field) {
+    case VALUE:
+      return (T) toObject(valueObject.getValue());
+    case BLUR_OBJECT:
+      return (T) ObjectArrayPacking.unpack(valueObject.getBlurObject());
+    default:
+      throw new RuntimeException("Type unknown.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
new file mode 100644
index 0000000..5a03806
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
@@ -0,0 +1,266 @@
+package org.apache.blur.command;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClientManager;
+import org.apache.blur.thrift.ClientPool;
+import org.apache.blur.thrift.Connection;
+import org.apache.blur.thrift.generated.Arguments;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Response;
+import org.apache.blur.thrift.generated.TimeoutException;
+import org.apache.blur.thrift.generated.ValueObject;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ControllerClusterContext extends ClusterContext implements Closeable {
+
+  private final static Log LOG = LogFactory.getLog(ControllerClusterContext.class);
+
+  private final Args _args;
+  private final TableContext _tableContext;
+  private final Map<Server, Client> _clientMap;
+  private final ExecutorService _executorService;
+  private final ControllerCommandManager _manager;
+  private final Map<String, String> _tableLayout;
+
+  public ControllerClusterContext(TableContext tableContext, Args args, Map<String, String> tableLayout,
+      ExecutorService executorService, ControllerCommandManager manager) throws IOException {
+    _tableContext = tableContext;
+    _args = args;
+    _clientMap = getBlurClientsForTable(_tableContext.getTable(), tableLayout);
+    _executorService = executorService;
+    _manager = manager;
+    _tableLayout = tableLayout;
+  }
+
+  public Map<Server, Client> getBlurClientsForTable(String table, Map<String, String> tableLayout) throws IOException {
+    Map<Server, Client> clients = new HashMap<Server, Client>();
+    for (String serverStr : tableLayout.values()) {
+      try {
+        Client client = BlurClientManager.getClientPool().getClient(new Connection(serverStr));
+        client.refresh();
+        clients.put(new Server(serverStr), client);
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    }
+    return clients;
+  }
+
+  @Override
+  public Args getArgs() {
+    return _args;
+  }
+
+  @Override
+  public TableContext getTableContext() {
+    return _tableContext;
+  }
+
+  @Override
+  public <T> Map<Shard, T> readIndexes(Args args, Class<? extends IndexReadCommand<T>> clazz) throws IOException {
+    Map<Shard, Future<T>> futures = readIndexesAsync(args, clazz);
+    Map<Shard, T> result = new HashMap<Shard, T>();
+    return processFutures(clazz, futures, result);
+  }
+
+  @Override
+  public <T> Map<Server, T> readServers(Args args, Class<? extends IndexReadCombiningCommand<?, T>> clazz)
+      throws IOException {
+    Map<Server, Future<T>> futures = readServersAsync(args, clazz);
+    Map<Server, T> result = new HashMap<Server, T>();
+    return processFutures(clazz, futures, result);
+  }
+
+  @Override
+  public <T> T writeIndex(Args args, Class<? extends IndexWriteCommand<T>> clazz) {
+    throw new RuntimeException("Not Implemented");
+  }
+
+  @Override
+  public void close() throws IOException {
+    ClientPool clientPool = BlurClientManager.getClientPool();
+    for (Entry<Server, Client> e : _clientMap.entrySet()) {
+      clientPool.returnClient(new Connection(e.getKey().getServer()), e.getValue());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T> Map<Shard, Future<T>> readIndexesAsync(final Args args, Class<? extends IndexReadCommand<T>> clazz) {
+    final String commandName = _manager.getCommandName((Class<? extends Command>) clazz);
+    Map<Shard, Future<T>> futureMap = new HashMap<Shard, Future<T>>();
+    for (Entry<Server, Client> e : _clientMap.entrySet()) {
+      Server server = e.getKey();
+      final Client client = e.getValue();
+      Future<Map<Shard, T>> future = _executorService.submit(new Callable<Map<Shard, T>>() {
+        @Override
+        public Map<Shard, T> call() throws Exception {
+          Arguments arguments = CommandUtil.toArguments(args);
+          Response response = waitForResponse(client, getTable(), commandName, arguments);
+          Map<Shard, Object> shardToValue = CommandUtil.fromThriftToObject(response.getShardToValue());
+          return (Map<Shard, T>) shardToValue;
+        }
+      });
+      Set<Shard> shards = getShardsOnServer(server);
+      for (Shard shard : shards) {
+        futureMap.put(shard, new ShardResultFuture<T>(shard, future));
+      }
+    }
+    return futureMap;
+  }
+
+  protected static Response waitForResponse(Client client, String table, String commandName, Arguments arguments)
+      throws TException {
+    // TODO This should likely be changed to run of a AtomicBoolean used for
+    // the status of commands.
+    String executionId = null;
+    while (true) {
+      try {
+        if (executionId == null) {
+          return client.execute(table, commandName, arguments);
+        } else {
+          return client.reconnect(executionId);
+        }
+      } catch (BlurException e) {
+        throw e;
+      } catch (TimeoutException e) {
+        executionId = e.getExecutionId();
+        LOG.info("Execution fetch timed out, reconnecting using [{0}].", executionId);
+      } catch (TException e) {
+        throw e;
+      }
+    }
+  }
+
+  private Set<Shard> getShardsOnServer(Server server) {
+    Set<Shard> shards = new HashSet<Shard>();
+    for (Entry<String, String> e : _tableLayout.entrySet()) {
+      String value = e.getValue();
+      if (value.equals(server.getServer())) {
+        shards.add(new Shard(e.getKey()));
+      }
+    }
+    return shards;
+  }
+
+  private String getTable() {
+    return getTableContext().getTable();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T> Map<Server, Future<T>> readServersAsync(final Args args,
+      Class<? extends IndexReadCombiningCommand<?, T>> clazz) {
+    final String commandName = _manager.getCommandName((Class<? extends Command>) clazz);
+    Map<Server, Future<T>> futureMap = new HashMap<Server, Future<T>>();
+    for (Entry<Server, Client> e : _clientMap.entrySet()) {
+      Server server = e.getKey();
+      final Client client = e.getValue();
+      Future<T> future = _executorService.submit(new Callable<T>() {
+        @Override
+        public T call() throws Exception {
+          Arguments arguments = CommandUtil.toArguments(args);
+          Response response = waitForResponse(client, getTable(), commandName, arguments);
+          ValueObject valueObject = response.getValue();
+          return (T) CommandUtil.toObject(valueObject);
+        }
+      });
+      futureMap.put(server, future);
+    }
+    return futureMap;
+  }
+
+  @Override
+  public <T> Future<T> writeIndexAsync(Args args, Class<? extends IndexWriteCommand<T>> clazz) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private <K, T> Map<K, T> processFutures(Class<?> clazz, Map<K, Future<T>> futures, Map<K, T> result)
+      throws IOException {
+    Throwable firstError = null;
+    for (Entry<K, Future<T>> e : futures.entrySet()) {
+      K key = e.getKey();
+      Future<T> future = e.getValue();
+      T value;
+      try {
+        value = future.get();
+        result.put(key, value);
+      } catch (InterruptedException ex) {
+        throw new IOException(ex);
+      } catch (ExecutionException ex) {
+        Throwable cause = ex.getCause();
+        if (firstError == null) {
+          firstError = cause;
+        }
+        LOG.error("Unknown call while executing command [{0}] on server or shard [{1}]", clazz, key);
+      }
+    }
+    if (firstError != null) {
+      throw new IOException(firstError);
+    }
+    return result;
+  }
+
+  @Override
+  public BlurConfiguration getBlurConfiguration() {
+    return _tableContext.getBlurConfiguration();
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return _tableContext.getConfiguration();
+  }
+
+  @Override
+  public <T> T readIndex(Shard shard, Args args, Class<? extends IndexReadCommand<T>> clazz) throws IOException {
+    Future<T> future = readIndexAsync(shard, args, clazz);
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    }
+  }
+
+  @Override
+  public <T> Future<T> readIndexAsync(Shard shard, Args args, Class<? extends IndexReadCommand<T>> clazz)
+      throws IOException {
+    throw new RuntimeException("Not Implemented.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
new file mode 100644
index 0000000..cc445e9
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
@@ -0,0 +1,90 @@
+package org.apache.blur.command;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.blur.server.TableContext;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+@SuppressWarnings("unchecked")
+public class ControllerCommandManager extends BaseCommandManager {
+
+  public ControllerCommandManager(int threadCount, long connectionTimeout) throws IOException {
+    super(threadCount, connectionTimeout);
+  }
+
+  public Response execute(TableContext tableContext, String commandName, final Args args, Map<String, String> tableLayout)
+      throws IOException, TimeoutException {
+    final ClusterContext context = createCommandContext(tableContext, args, tableLayout);
+    final Command command = getCommandObject(commandName);
+    if (command == null) {
+      throw new IOException("Command with name [" + commandName + "] not found.");
+    }
+    return submitCallable(new Callable<Response>() {
+      @Override
+      public Response call() throws Exception {
+        // For those commands that do not implement cluster command, run them in a
+        // base impl.
+        if (command instanceof ClusterCommand) {
+          return executeClusterCommand(context, command);
+        } else if (command instanceof IndexReadCombiningCommand) {
+          return executeIndexReadCombiningCommand(args, context, command);
+        } else if (command instanceof IndexReadCommand) {
+          return executeIndexReadCommand(args, context, command);
+        } else if (command instanceof IndexWriteCommand) {
+          return executeIndexWriteCommand(args, context, command);
+        } else {
+          throw new IOException("Command type of [" + command.getClass() + "] not supported.");
+        }
+      }
+    });
+  }
+
+  private Response executeClusterCommand(ClusterContext context, Command command) throws IOException {
+    ClusterCommand<Object> clusterCommand = (ClusterCommand<Object>) command;
+    Object object = clusterCommand.clusterExecute(context);
+    return Response.createNewAggregateResponse(object);
+  }
+
+  private Response executeIndexWriteCommand(Args args, ClusterContext context, Command command) throws IOException {
+    Class<? extends IndexWriteCommand<Object>> clazz = (Class<? extends IndexWriteCommand<Object>>) command.getClass();
+    Object object = context.writeIndex(args, clazz);
+    return Response.createNewAggregateResponse(object);
+  }
+
+  private Response executeIndexReadCommand(Args args, ClusterContext context, Command command) throws IOException {
+    Class<? extends IndexReadCommand<Object>> clazz = (Class<? extends IndexReadCommand<Object>>) command.getClass();
+    Map<Shard, Object> result = context.readIndexes(args, clazz);
+    return Response.createNewShardResponse(result);
+  }
+
+  private Response executeIndexReadCombiningCommand(Args args, ClusterContext context, Command command)
+      throws IOException {
+    Class<? extends IndexReadCombiningCommand<Object, Object>> clazz = (Class<? extends IndexReadCombiningCommand<Object, Object>>) command
+        .getClass();
+    Map<Server, Object> result = context.readServers(args, clazz);
+    return Response.createNewServerResponse(result);
+  }
+
+  private ClusterContext createCommandContext(TableContext tableContext, Args args, Map<String, String> tableLayout)
+      throws IOException {
+    return new ControllerClusterContext(tableContext, args, tableLayout, _executorService, this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/IndexContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/IndexContext.java b/blur-core/src/main/java/org/apache/blur/command/IndexContext.java
new file mode 100644
index 0000000..64f7643
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/IndexContext.java
@@ -0,0 +1,42 @@
+package org.apache.blur.command;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.server.TableContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public abstract class IndexContext {
+
+  public abstract Args getArgs();
+
+  public abstract TableContext getTableContext();
+
+  public abstract Shard getShard();
+
+  public abstract IndexReader getIndexReader();
+
+  public abstract IndexSearcher getIndexSearcher();
+
+  public abstract BlurConfiguration getBlurConfiguration();
+
+  public abstract Configuration getConfiguration();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/IndexReadCombiningCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/IndexReadCombiningCommand.java b/blur-core/src/main/java/org/apache/blur/command/IndexReadCombiningCommand.java
new file mode 100644
index 0000000..59486a1
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/IndexReadCombiningCommand.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface IndexReadCombiningCommand<T1, T2> {
+
+  T1 execute(IndexContext context) throws IOException;
+
+  T2 combine(Map<Shard, T1> results) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/IndexReadCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/IndexReadCommand.java b/blur-core/src/main/java/org/apache/blur/command/IndexReadCommand.java
new file mode 100644
index 0000000..c9e5901
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/IndexReadCommand.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+
+public interface IndexReadCommand<T> {
+
+  T execute(IndexContext context) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e97a120d/blur-core/src/main/java/org/apache/blur/command/IndexWriteCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/IndexWriteCommand.java b/blur-core/src/main/java/org/apache/blur/command/IndexWriteCommand.java
new file mode 100644
index 0000000..67ce100
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/IndexWriteCommand.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.io.IOException;
+
+import org.apache.lucene.index.IndexWriter;
+
+public interface IndexWriteCommand<T> {
+
+  public abstract T execute(IndexContext context, IndexWriter writer) throws IOException;
+
+}


Mime
View raw message