incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: More updates to add the new command platform to the controller. Still a work in progress.
Date Thu, 28 Aug 2014 12:14:20 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 00821b81c -> 2fed63d6a


More updates to add the new command platform to the controller.  Still a work in progress.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/2fed63d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/2fed63d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/2fed63d6

Branch: refs/heads/master
Commit: 2fed63d6a21fabd5e33bbdad2ba3a44e2589fe49
Parents: 00821b8
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Aug 28 08:13:53 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Aug 28 08:13:53 2014 -0400

----------------------------------------------------------------------
 .../blur/manager/command/ClusterCommand.java    |  38 +++++
 .../blur/manager/command/CommandContext.java    |  26 ++++
 .../blur/manager/command/CommandManager.java    | 140 -------------------
 .../blur/manager/command/CommandUtil.java       |  88 ++++++++++++
 .../command/ControllerCommandManager.java       |  36 +++++
 .../manager/command/ShardCommandManager.java    | 140 +++++++++++++++++++
 .../manager/command/cluster/DocumentCount.java  |  42 ++++++
 .../cluster/DocumentCountAggregator.java        |  47 +++++++
 .../blur/thrift/BlurControllerServer.java       |  21 ++-
 .../org/apache/blur/thrift/BlurShardServer.java |  67 +--------
 .../blur/thrift/ThriftBlurShardServer.java      |   4 +-
 11 files changed, 443 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
new file mode 100644
index 0000000..f2bf9d3
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
@@ -0,0 +1,38 @@
+package org.apache.blur.manager.command;
+
+import java.io.Serializable;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+@SuppressWarnings("serial")
+public abstract class ClusterCommand implements Serializable, Cloneable {
+
+  public abstract String getName();
+
+  @Override
+  public ClusterCommand clone() {
+    try {
+      return (ClusterCommand) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public abstract Response execute(Args args, CommandContext context);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/manager/command/CommandContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/CommandContext.java b/blur-core/src/main/java/org/apache/blur/manager/command/CommandContext.java
new file mode 100644
index 0000000..145a94b
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/CommandContext.java
@@ -0,0 +1,26 @@
+package org.apache.blur.manager.command;
+
+import java.util.Map;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public abstract class CommandContext {
+
+  public abstract <T> Map<String, T> execute(Args args, String string);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/manager/command/CommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/CommandManager.java b/blur-core/src/main/java/org/apache/blur/manager/command/CommandManager.java
deleted file mode 100644
index 2247599..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/CommandManager.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.manager.command;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.apache.blur.concurrent.Executors;
-import org.apache.blur.manager.IndexServer;
-import org.apache.blur.manager.command.primitive.DocumentCount;
-import org.apache.blur.manager.command.primitive.DocumentCountAggregator;
-import org.apache.blur.manager.command.primitive.PrimitiveCommand;
-import org.apache.blur.manager.command.primitive.PrimitiveCommandAggregator;
-import org.apache.blur.manager.command.primitive.ReadCommand;
-import org.apache.blur.manager.command.primitive.ReadWriteCommand;
-import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.server.IndexSearcherClosable;
-
-public class CommandManager implements Closeable {
-
-  private final IndexServer _indexServer;
-  private final ExecutorService _executorService;
-  private final Map<String, PrimitiveCommand> _command = new ConcurrentHashMap<String,
PrimitiveCommand>();
-
-  public CommandManager(IndexServer indexServer, int threadCount) throws IOException {
-    register(DocumentCount.class);
-    register(DocumentCountAggregator.class);
-    _indexServer = indexServer;
-    _executorService = Executors.newThreadPool("command-", threadCount);
-  }
-
-  private void register(Class<? extends PrimitiveCommand> commandClass) throws IOException
{
-    try {
-      PrimitiveCommand command = commandClass.newInstance();
-      _command.put(command.getName(), command);
-    } catch (InstantiationException e) {
-      throw new IOException(e);
-    } catch (IllegalAccessException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public Response execute(String table, String commandName, Args args) throws IOException
{
-    PrimitiveCommand command = getCommandObject(commandName);
-    if (command == null) {
-      throw new IOException("Command with name [" + commandName + "] not found.");
-    }
-    if (command instanceof ReadCommand) {
-      return toResponse(executeReadCommand((ReadCommand<?>) command, table, args),
command);
-    } else if (command instanceof ReadWriteCommand) {
-      return toResponse(executeReadWriteCommand((ReadWriteCommand<?>) command, table,
args), command);
-    }
-    throw new IOException("Command type of [" + command.getClass() + "] not supported.");
-  }
-
-  @SuppressWarnings("unchecked")
-  private Response toResponse(Map<String, Object> results, PrimitiveCommand command)
throws IOException {
-    if (command instanceof PrimitiveCommandAggregator) {
-      PrimitiveCommandAggregator<Object, Object> primitiveCommandAggregator = (PrimitiveCommandAggregator<Object,
Object>) command;
-      Iterator<Entry<String, Object>> iterator = results.entrySet().iterator();
-      Object object = primitiveCommandAggregator.aggregate(iterator);
-      return Response.createNewAggregateResponse(object);
-    }
-    return Response.createNewResponse(results);
-  }
-
-  private Map<String, Object> executeReadWriteCommand(ReadWriteCommand<?> command,
String table, Args args) {
-    return null;
-  }
-
-  private Map<String, Object> executeReadCommand(ReadCommand<?> command, String
table, final Args args)
-      throws IOException {
-    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
-    Map<String, Future<?>> futureMap = new HashMap<String, Future<?>>();
-    for (Entry<String, BlurIndex> e : indexes.entrySet()) {
-      String shardId = e.getKey();
-      final BlurIndex blurIndex = e.getValue();
-      final ReadCommand<?> readCommand = command.clone();
-      Future<Object> future = _executorService.submit(new Callable<Object>()
{
-        @Override
-        public Object call() throws Exception {
-          IndexSearcherClosable searcher = blurIndex.getIndexSearcher();
-          try {
-            return readCommand.execute(args, searcher);
-          } finally {
-            searcher.close();
-          }
-        }
-      });
-      futureMap.put(shardId, future);
-    }
-    Map<String, Object> resultMap = new HashMap<String, Object>();
-    for (Entry<String, Future<?>> e : futureMap.entrySet()) {
-      Future<?> future = e.getValue();
-      Object object;
-      try {
-        object = future.get();
-      } catch (InterruptedException ex) {
-        throw new IOException(ex);
-      } catch (ExecutionException ex) {
-        throw new IOException(ex.getCause());
-      }
-      resultMap.put(e.getKey(), object);
-    }
-    return resultMap;
-  }
-
-  private PrimitiveCommand getCommandObject(String commandName) {
-    return _command.get(commandName);
-  }
-
-  @Override
-  public void close() throws IOException {
-    _executorService.shutdownNow();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java b/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
new file mode 100644
index 0000000..ee28a61
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
@@ -0,0 +1,88 @@
+package org.apache.blur.manager.command;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.Arguments;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Value;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class CommandUtil {
+
+  public static org.apache.blur.thrift.generated.Response convert(Response response) throws
BlurException {
+    org.apache.blur.thrift.generated.Response converted = new org.apache.blur.thrift.generated.Response();
+    if (response.isAggregatedResults()) {
+      converted.setValue(toValue(response.getServerResult()));
+    } else {
+      converted.setShardToValue(convert(response.getShardResults()));
+    }
+    return converted;
+  }
+
+  public static Map<String, Value> convert(Map<String, Object> map) throws BlurException
{
+    Map<String, Value> result = new HashMap<String, Value>();
+    for (Entry<String, Object> e : map.entrySet()) {
+      result.put(e.getKey(), toValue(e.getValue()));
+    }
+    return result;
+  }
+
+  public static Value toValue(Object o) throws BlurException {
+    Value value = new Value();
+    if (o == null) {
+      value.setNullValue(true);
+      return value;
+    }
+    if (o instanceof Long) {
+      value.setLongValue((Long) o);
+      return value;
+    } else if (o instanceof String) {
+      value.setStringValue((String) o);
+      return value;
+    } else if (o instanceof Integer) {
+      value.setIntValue((Integer) o);
+      return value;
+    }
+    throw new BException("Object [{0}] not supported.", o);
+  }
+
+  public static Args convert(Arguments arguments) {
+    if (arguments == null) {
+      return null;
+    }
+    Args args = new Args();
+    Map<String, Value> values = arguments.getValues();
+    Set<Entry<String, Value>> entrySet = values.entrySet();
+    for (Entry<String, Value> e : entrySet) {
+      args.set(e.getKey(), toObject(e.getValue()));
+    }
+    return args;
+  }
+
+  public static Object toObject(Value value) {
+    if (value.getNullValue()) {
+      return null;
+    }
+    return value.getFieldValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
new file mode 100644
index 0000000..5286b41
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
@@ -0,0 +1,36 @@
+package org.apache.blur.manager.command;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ControllerCommandManager {
+
+  public Response execute(String table, String commandName, Args args) {
+    CommandContext context = createCommandContext(table);
+    ClusterCommand clusterCommand = getCommand(commandName);
+    return clusterCommand.execute(args, context);
+  }
+
+  private ClusterCommand getCommand(String commandName) {
+    throw new RuntimeException("Not Implemented");
+  }
+
+  private CommandContext createCommandContext(String table) {
+    throw new RuntimeException("Not Implemented");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
new file mode 100644
index 0000000..3ba1b32
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.command;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.manager.IndexServer;
+import org.apache.blur.manager.command.primitive.DocumentCount;
+import org.apache.blur.manager.command.primitive.DocumentCountAggregator;
+import org.apache.blur.manager.command.primitive.PrimitiveCommand;
+import org.apache.blur.manager.command.primitive.PrimitiveCommandAggregator;
+import org.apache.blur.manager.command.primitive.ReadCommand;
+import org.apache.blur.manager.command.primitive.ReadWriteCommand;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.server.IndexSearcherClosable;
+
+public class ShardCommandManager implements Closeable {
+
+  private final IndexServer _indexServer;
+  private final ExecutorService _executorService;
+  private final Map<String, PrimitiveCommand> _command = new ConcurrentHashMap<String,
PrimitiveCommand>();
+
+  public ShardCommandManager(IndexServer indexServer, int threadCount) throws IOException
{
+    register(DocumentCount.class);
+    register(DocumentCountAggregator.class);
+    _indexServer = indexServer;
+    _executorService = Executors.newThreadPool("command-", threadCount);
+  }
+
+  private void register(Class<? extends PrimitiveCommand> commandClass) throws IOException
{
+    try {
+      PrimitiveCommand command = commandClass.newInstance();
+      _command.put(command.getName(), command);
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public Response execute(String table, String commandName, Args args) throws IOException
{
+    PrimitiveCommand command = getCommandObject(commandName);
+    if (command == null) {
+      throw new IOException("Command with name [" + commandName + "] not found.");
+    }
+    if (command instanceof ReadCommand) {
+      return toResponse(executeReadCommand((ReadCommand<?>) command, table, args),
command);
+    } else if (command instanceof ReadWriteCommand) {
+      return toResponse(executeReadWriteCommand((ReadWriteCommand<?>) command, table,
args), command);
+    }
+    throw new IOException("Command type of [" + command.getClass() + "] not supported.");
+  }
+
+  @SuppressWarnings("unchecked")
+  private Response toResponse(Map<String, Object> results, PrimitiveCommand command)
throws IOException {
+    if (command instanceof PrimitiveCommandAggregator) {
+      PrimitiveCommandAggregator<Object, Object> primitiveCommandAggregator = (PrimitiveCommandAggregator<Object,
Object>) command;
+      Iterator<Entry<String, Object>> iterator = results.entrySet().iterator();
+      Object object = primitiveCommandAggregator.aggregate(iterator);
+      return Response.createNewAggregateResponse(object);
+    }
+    return Response.createNewResponse(results);
+  }
+
+  private Map<String, Object> executeReadWriteCommand(ReadWriteCommand<?> command,
String table, Args args) {
+    return null;
+  }
+
+  private Map<String, Object> executeReadCommand(ReadCommand<?> command, String
table, final Args args)
+      throws IOException {
+    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+    Map<String, Future<?>> futureMap = new HashMap<String, Future<?>>();
+    for (Entry<String, BlurIndex> e : indexes.entrySet()) {
+      String shardId = e.getKey();
+      final BlurIndex blurIndex = e.getValue();
+      final ReadCommand<?> readCommand = command.clone();
+      Future<Object> future = _executorService.submit(new Callable<Object>()
{
+        @Override
+        public Object call() throws Exception {
+          IndexSearcherClosable searcher = blurIndex.getIndexSearcher();
+          try {
+            return readCommand.execute(args, searcher);
+          } finally {
+            searcher.close();
+          }
+        }
+      });
+      futureMap.put(shardId, future);
+    }
+    Map<String, Object> resultMap = new HashMap<String, Object>();
+    for (Entry<String, Future<?>> e : futureMap.entrySet()) {
+      Future<?> future = e.getValue();
+      Object object;
+      try {
+        object = future.get();
+      } catch (InterruptedException ex) {
+        throw new IOException(ex);
+      } catch (ExecutionException ex) {
+        throw new IOException(ex.getCause());
+      }
+      resultMap.put(e.getKey(), object);
+    }
+    return resultMap;
+  }
+
+  private PrimitiveCommand getCommandObject(String commandName) {
+    return _command.get(commandName);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _executorService.shutdownNow();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/manager/command/cluster/DocumentCount.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/cluster/DocumentCount.java
b/blur-core/src/main/java/org/apache/blur/manager/command/cluster/DocumentCount.java
new file mode 100644
index 0000000..c5fc7d6
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/cluster/DocumentCount.java
@@ -0,0 +1,42 @@
+package org.apache.blur.manager.command.cluster;
+
+import java.util.Map;
+
+import org.apache.blur.manager.command.Args;
+import org.apache.blur.manager.command.ClusterCommand;
+import org.apache.blur.manager.command.CommandContext;
+import org.apache.blur.manager.command.Response;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+@SuppressWarnings("serial")
+public class DocumentCount extends ClusterCommand {
+
+  @Override
+  public String getName() {
+    return "docCount";
+  }
+
+  @Override
+  public Response execute(Args args, CommandContext context) {
+    // where the key is the shard in the table
+    Map<String, Object> results = context.execute(args, "docCount");
+    return Response.createNewResponse(results);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/manager/command/cluster/DocumentCountAggregator.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/cluster/DocumentCountAggregator.java
b/blur-core/src/main/java/org/apache/blur/manager/command/cluster/DocumentCountAggregator.java
new file mode 100644
index 0000000..a7789dc
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/cluster/DocumentCountAggregator.java
@@ -0,0 +1,47 @@
+package org.apache.blur.manager.command.cluster;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.blur.manager.command.Args;
+import org.apache.blur.manager.command.ClusterCommand;
+import org.apache.blur.manager.command.CommandContext;
+import org.apache.blur.manager.command.Response;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+@SuppressWarnings("serial")
+public class DocumentCountAggregator extends ClusterCommand {
+
+  @Override
+  public String getName() {
+    return "docCountAggregate";
+  }
+
+  @Override
+  public Response execute(Args args, CommandContext context) {
+    // where the key is the server hostname
+    Map<String, Long> results = context.execute(args, "docCountAggregate");
+    long total = 0;
+    for (Entry<String, Long> e : results.entrySet()) {
+      total += e.getValue();
+    }
+    return Response.createNewAggregateResponse(total);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 5fda94a..9c6120b 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -53,6 +53,9 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.BlurPartitioner;
 import org.apache.blur.manager.BlurQueryChecker;
 import org.apache.blur.manager.IndexManager;
+import org.apache.blur.manager.command.CommandUtil;
+import org.apache.blur.manager.command.ControllerCommandManager;
+import org.apache.blur.manager.command.Response;
 import org.apache.blur.manager.indexserver.DistributedLayout;
 import org.apache.blur.manager.indexserver.DistributedLayoutFactory;
 import org.apache.blur.manager.indexserver.DistributedLayoutFactoryImpl;
@@ -83,7 +86,6 @@ import org.apache.blur.thrift.generated.ErrorType;
 import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.HighlightOptions;
 import org.apache.blur.thrift.generated.Query;
-import org.apache.blur.thrift.generated.Response;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.Schema;
 import org.apache.blur.thrift.generated.Selector;
@@ -193,6 +195,7 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
   private Timer _preconnectTimer;
   private Timer _tableContextWarmupTimer;
   private long _tableLayoutTimeoutNanos = TimeUnit.SECONDS.toNanos(30);
+  private ControllerCommandManager _commandManager;
 
   public void init() throws KeeperException, InterruptedException {
     setupZookeeper();
@@ -1495,8 +1498,20 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
   }
 
   @Override
-  public Response execute(String table, String commandName, Arguments arguments) throws BlurException,
TException {
-    throw new BlurException("not implemented", null, ErrorType.UNKNOWN);
+  public org.apache.blur.thrift.generated.Response  execute(String table, String commandName,
Arguments arguments) throws BlurException, TException {
+    try {
+      Response response = _commandManager.execute(table, commandName, CommandUtil.convert(arguments));
+      return CommandUtil.convert(response);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to execute command [{0}] for table [{1}]", e,
commandName, table);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
   }
 
+  public void setCommandManager(ControllerCommandManager commandManager) {
+    _commandManager = commandManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index afe50bd..d776fc0 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -38,7 +38,8 @@ import org.apache.blur.manager.BlurQueryChecker;
 import org.apache.blur.manager.IndexManager;
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.command.Args;
-import org.apache.blur.manager.command.CommandManager;
+import org.apache.blur.manager.command.CommandUtil;
+import org.apache.blur.manager.command.ShardCommandManager;
 import org.apache.blur.manager.command.Response;
 import org.apache.blur.manager.results.BlurResultIterable;
 import org.apache.blur.manager.writer.BlurIndex;
@@ -81,7 +82,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
   private ExecutorService _dataFetch;
   private String _cluster = BlurConstants.BLUR_CLUSTER;
   private int _dataFetchThreadCount = 32;
-  private CommandManager _commandManager;
+  private ShardCommandManager _commandManager;
 
   public void init() throws BlurException {
     _queryCache = new QueryCache("shard-cache", _maxQueryCacheElements, _maxTimeToLive);
@@ -594,8 +595,8 @@ public class BlurShardServer extends TableAdmin implements Iface {
   public org.apache.blur.thrift.generated.Response execute(String table, String commandName,
Arguments arguments)
       throws BlurException, TException {
     try {
-      Response response = _commandManager.execute(table, commandName, convert(arguments));
-      return convert(response);
+      Response response = _commandManager.execute(table, commandName, CommandUtil.convert(arguments));
+      return CommandUtil.convert(response);
     } catch (Exception e) {
       LOG.error("Unknown error while trying to execute command [{0}] for table [{1}]", e,
commandName, table);
       if (e instanceof BlurException) {
@@ -605,64 +606,8 @@ public class BlurShardServer extends TableAdmin implements Iface {
     }
   }
 
-  private org.apache.blur.thrift.generated.Response convert(Response response) throws BlurException
{
-    org.apache.blur.thrift.generated.Response converted = new org.apache.blur.thrift.generated.Response();
-    if (response.isAggregatedResults()) {
-      converted.setValue(toValue(response.getServerResult()));
-    } else {
-      converted.setShardToValue(convert(response.getShardResults()));
-    }
-    return converted;
-  }
-
-  private Map<String, Value> convert(Map<String, Object> map) throws BlurException
{
-    Map<String, Value> result = new HashMap<String, Value>();
-    for (Entry<String, Object> e : map.entrySet()) {
-      result.put(e.getKey(), toValue(e.getValue()));
-    }
-    return result;
-  }
-
-  private Value toValue(Object o) throws BlurException {
-    Value value = new Value();
-    if (o == null) {
-      value.setNullValue(true);
-      return value;
-    }
-    if (o instanceof Long) {
-      value.setLongValue((Long) o);
-      return value;
-    } else if (o instanceof String) {
-      value.setStringValue((String) o);
-      return value;
-    } else if (o instanceof Integer) {
-      value.setIntValue((Integer) o);
-      return value;
-    }
-    throw new BException("Object [{0}] not supported.", o);
-  }
-
-  private Args convert(Arguments arguments) {
-    if (arguments == null) {
-      return null;
-    }
-    Args args = new Args();
-    Map<String, Value> values = arguments.getValues();
-    Set<Entry<String, Value>> entrySet = values.entrySet();
-    for (Entry<String, Value> e : entrySet) {
-      args.set(e.getKey(), toObject(e.getValue()));
-    }
-    return args;
-  }
-
-  private Object toObject(Value value) {
-    if (value.getNullValue()) {
-      return null;
-    }
-    return value.getFieldValue();
-  }
 
-  public void setCommandManager(CommandManager commandManager) {
+  public void setCommandManager(ShardCommandManager commandManager) {
     _commandManager = commandManager;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fed63d6/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 754a70e..d516bb6 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -70,7 +70,7 @@ import org.apache.blur.manager.BlurQueryChecker;
 import org.apache.blur.manager.DefaultBlurFilterCache;
 import org.apache.blur.manager.IndexManager;
 import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
-import org.apache.blur.manager.command.CommandManager;
+import org.apache.blur.manager.command.ShardCommandManager;
 import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.manager.indexserver.BlurServerShutDown;
 import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
@@ -229,7 +229,7 @@ public class ThriftBlurShardServer extends ThriftServer {
         fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay,
facetThreadCount,
         deepPagingCache);
 
-    final CommandManager commandManager = new CommandManager(indexServer, 16);
+    final ShardCommandManager commandManager = new ShardCommandManager(indexServer, 16);
 
     final BlurShardServer shardServer = new BlurShardServer();
     shardServer.setCommandManager(commandManager);


Mime
View raw message