incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: More API changes to the platform.
Date Thu, 28 Aug 2014 16:52:13 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master a63a79a1e -> c0bb9f9fd


More API changes to the platform.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/c0bb9f9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/c0bb9f9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/c0bb9f9f

Branch: refs/heads/master
Commit: c0bb9f9fdb6fce3905e5af1023c641ec48573377
Parents: a63a79a
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Aug 28 12:51:04 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Aug 28 12:51:04 2014 -0400

----------------------------------------------------------------------
 .../manager/command/BaseCommandManager.java     | 57 ++++++++++++
 .../blur/manager/command/ClusterCommand.java    |  2 +-
 .../blur/manager/command/ClusterContext.java    | 36 ++++++++
 .../blur/manager/command/CommandContext.java    | 29 ------
 .../blur/manager/command/CommandUtil.java       |  7 +-
 .../command/ControllerCommandManager.java       | 18 +++-
 .../blur/manager/command/IndexContext.java      | 37 ++++++++
 .../command/IndexReadCombiningCommand.java      |  9 +-
 .../blur/manager/command/IndexReadCommand.java  |  4 +-
 .../blur/manager/command/IndexWriteCommand.java |  3 +-
 .../apache/blur/manager/command/Response.java   |  8 +-
 .../manager/command/ShardCommandManager.java    | 96 ++++++++++++--------
 .../command/primitive/DocumentCount.java        |  7 +-
 .../primitive/DocumentCountAggregator.java      | 22 ++---
 .../org/apache/blur/thrift/BlurShardServer.java | 11 ++-
 15 files changed, 232 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
new file mode 100644
index 0000000..4bd2626
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
@@ -0,0 +1,57 @@
+package org.apache.blur.manager.command;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.manager.command.primitive.BaseCommand;
+import org.apache.blur.manager.command.primitive.DocumentCount;
+import org.apache.blur.manager.command.primitive.DocumentCountAggregator;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class BaseCommandManager implements Closeable {
+  
+  protected final ExecutorService _executorService;
+  protected final Map<String, BaseCommand> _command = new ConcurrentHashMap<String,
BaseCommand>();
+
+  public BaseCommandManager(int threadCount) throws IOException {
+    register(DocumentCount.class);
+    register(DocumentCountAggregator.class);
+    _executorService = Executors.newThreadPool("command-", threadCount);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _executorService.shutdownNow();
+  }
+
+  public void register(Class<? extends BaseCommand> commandClass) throws IOException
{
+    try {
+      BaseCommand command = commandClass.newInstance();
+      _command.put(command.getName(), command);
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
index 2d04e8a..28567bf 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
@@ -21,6 +21,6 @@ import java.io.Serializable;
 
 public interface ClusterCommand<T> extends Serializable, Cloneable {
 
-  T clusterExecute(Args args, CommandContext context);
+  T clusterExecute(ClusterContext context);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java
new file mode 100644
index 0000000..2a901db
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java
@@ -0,0 +1,36 @@
+package org.apache.blur.manager.command;
+
+import java.util.Map;
+
+import org.apache.blur.server.TableContext;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public abstract class ClusterContext {
+
+  public abstract Args getArgs();
+
+  public abstract TableContext getTableContext();
+
+  public abstract <T> Map<Shard, T> readIndexes(Args args, Class<? extends
IndexReadCommand<T>> clazz);
+
+  public abstract <T> Map<Server, T> readServers(Args args, Class<? extends
IndexReadCombiningCommand<?, T>> clazz);
+
+  public abstract <T> T writeIndex(Args args, Class<? extends IndexWriteCommand<T>>
clazz);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/CommandContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/CommandContext.java b/blur-core/src/main/java/org/apache/blur/manager/command/CommandContext.java
deleted file mode 100644
index 1ded8cb..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/command/CommandContext.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.blur.manager.command;
-
-import java.util.Map;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-public abstract class CommandContext {
-
-  public abstract <T> Map<Shard, T> readIndexes(Args args, Class<? extends
IndexReadCommand<T>> clazz);
-
-  public abstract <T> Map<Server, T> readServers(Args args, Class<? extends
IndexReadCombiningCommand<?, T>> clazz);
-
-  public abstract <T> T writeIndex(Args args, Class<? extends IndexWriteCommand<T>>
clazz);
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java b/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
index ee28a61..c35107f 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
@@ -39,10 +39,11 @@ public class CommandUtil {
     return converted;
   }
 
-  public static Map<String, Value> convert(Map<String, Object> map) throws BlurException
{
+  public static Map<String, Value> convert(Map<Shard, Object> map) throws BlurException
{
     Map<String, Value> result = new HashMap<String, Value>();
-    for (Entry<String, Object> e : map.entrySet()) {
-      result.put(e.getKey(), toValue(e.getValue()));
+    for (Entry<Shard, Object> e : map.entrySet()) {
+      // @TODO need to make different setters for shard and server results
+      result.put(e.getKey().getShard(), toValue(e.getValue()));
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
index a3503dd..318a851 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
@@ -1,5 +1,9 @@
 package org.apache.blur.manager.command;
 
+import java.io.IOException;
+
+import org.apache.blur.manager.command.primitive.BaseCommand;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with this
@@ -17,22 +21,26 @@ package org.apache.blur.manager.command;
  * the License.
  */
 
-public class ControllerCommandManager {
+public class ControllerCommandManager extends BaseCommandManager {
+
+  public ControllerCommandManager(int threadCount) throws IOException {
+    super(threadCount);
+  }
 
   public Response execute(String table, String commandName, Args args) {
-    CommandContext context = createCommandContext(table);
-    ClusterCommand clusterCommand = getCommand(commandName);
+    ClusterContext context = createCommandContext(table);
+    BaseCommand command = getCommand(commandName);
     
     // For those commands that do not implement cluster command, run them in a base impl.
     
     throw new RuntimeException("Not Implemented");
   }
 
-  private ClusterCommand getCommand(String commandName) {
+  private BaseCommand getCommand(String commandName) {
     throw new RuntimeException("Not Implemented");
   }
 
-  private CommandContext createCommandContext(String table) {
+  private ClusterContext createCommandContext(String table) {
     throw new RuntimeException("Not Implemented");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/IndexContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/IndexContext.java b/blur-core/src/main/java/org/apache/blur/manager/command/IndexContext.java
new file mode 100644
index 0000000..a555e94
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/IndexContext.java
@@ -0,0 +1,37 @@
+package org.apache.blur.manager.command;
+
+import org.apache.blur.server.TableContext;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public abstract class IndexContext {
+
+  public abstract Args getArgs();
+
+  public abstract TableContext getTableContext();
+  
+  public abstract Shard getShard();
+
+  public abstract IndexReader getIndexReader();
+
+  public abstract IndexSearcher getIndexSearcher();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCombiningCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCombiningCommand.java
b/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCombiningCommand.java
index 6cf7e9d..1b433ab 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCombiningCommand.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCombiningCommand.java
@@ -17,15 +17,12 @@
 package org.apache.blur.manager.command;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.lucene.search.IndexSearcher;
+import java.util.Map;
 
 public interface IndexReadCombiningCommand<T1, T2> {
 
-  T1 execute(Args args, IndexSearcher searcher) throws IOException;
+  T1 execute(IndexContext context) throws IOException;
 
-  T2 combine(Iterator<Entry<String, T1>> it) throws IOException;
+  T2 combine(Map<Shard, T1> results) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCommand.java
b/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCommand.java
index c7646d1..d4c156c 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCommand.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/IndexReadCommand.java
@@ -18,10 +18,8 @@ package org.apache.blur.manager.command;
 
 import java.io.IOException;
 
-import org.apache.lucene.search.IndexSearcher;
-
 public interface IndexReadCommand<T> {
 
-  T execute(Args args, IndexSearcher searcher) throws IOException;
+  T execute(IndexContext context) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/IndexWriteCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/IndexWriteCommand.java
b/blur-core/src/main/java/org/apache/blur/manager/command/IndexWriteCommand.java
index 01e49f6..893638b 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/IndexWriteCommand.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/IndexWriteCommand.java
@@ -19,10 +19,9 @@ package org.apache.blur.manager.command;
 import java.io.IOException;
 
 import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.search.IndexSearcher;
 
 public interface IndexWriteCommand<T> {
 
-  public abstract T execute(Args args, IndexSearcher searcher, IndexWriter writer) throws
IOException;
+  public abstract T execute(IndexContext context, IndexWriter writer) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/Response.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/Response.java b/blur-core/src/main/java/org/apache/blur/manager/command/Response.java
index f5bce02..1cf5b72 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/Response.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/Response.java
@@ -20,11 +20,11 @@ import java.util.Map;
 
 public class Response {
 
-  private final Map<String, Object> _shardResults;
+  private final Map<Shard, Object> _shardResults;
   private final Object _serverResult;
   private final boolean _aggregatedResults;
 
-  private Response(Map<String, Object> shardResults, Object serverResult, boolean aggregatedResults)
{
+  private Response(Map<Shard, Object> shardResults, Object serverResult, boolean aggregatedResults)
{
     _shardResults = shardResults;
     _serverResult = serverResult;
     _aggregatedResults = aggregatedResults;
@@ -34,7 +34,7 @@ public class Response {
     return _aggregatedResults;
   }
 
-  public Map<String, Object> getShardResults() {
+  public Map<Shard, Object> getShardResults() {
     return _shardResults;
   }
 
@@ -46,7 +46,7 @@ public class Response {
     return new Response(null, object, true);
   }
 
-  public static Response createNewResponse(Map<String, Object> map) {
+  public static Response createNewResponse(Map<Shard, Object> map) {
     return new Response(map, null, false);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
index 7bd2e70..d48b6b6 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
@@ -16,84 +16,65 @@
  */
 package org.apache.blur.manager.command;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
-import org.apache.blur.concurrent.Executors;
 import org.apache.blur.manager.IndexServer;
-import org.apache.blur.manager.command.primitive.DocumentCount;
-import org.apache.blur.manager.command.primitive.DocumentCountAggregator;
 import org.apache.blur.manager.command.primitive.BaseCommand;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.server.IndexSearcherClosable;
+import org.apache.blur.server.TableContext;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
 
-public class ShardCommandManager implements Closeable {
+public class ShardCommandManager extends BaseCommandManager {
 
   private final IndexServer _indexServer;
-  private final ExecutorService _executorService;
-  private final Map<String, BaseCommand> _command = new ConcurrentHashMap<String,
BaseCommand>();
 
   public ShardCommandManager(IndexServer indexServer, int threadCount) throws IOException
{
-    register(DocumentCount.class);
-    register(DocumentCountAggregator.class);
+    super(threadCount);
     _indexServer = indexServer;
-    _executorService = Executors.newThreadPool("command-", threadCount);
   }
 
-  private void register(Class<? extends BaseCommand> commandClass) throws IOException
{
-    try {
-      BaseCommand command = commandClass.newInstance();
-      _command.put(command.getName(), command);
-    } catch (InstantiationException e) {
-      throw new IOException(e);
-    } catch (IllegalAccessException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public Response execute(String table, String commandName, Args args) throws IOException
{
+  public Response execute(TableContext tableContext, String commandName, Args args) throws
IOException {
     BaseCommand command = getCommandObject(commandName);
     if (command == null) {
       throw new IOException("Command with name [" + commandName + "] not found.");
     }
     if (command instanceof IndexReadCommand) {
-      return toResponse(executeReadCommand(command, table, args), command);
+      return toResponse(executeReadCommand(command, tableContext, args), command);
     } else if (command instanceof IndexWriteCommand) {
-      return toResponse(executeReadWriteCommand((IndexWriteCommand<?>) command, table,
args), command);
+      return toResponse(executeReadWriteCommand((IndexWriteCommand<?>) command, tableContext,
args), command);
     }
     throw new IOException("Command type of [" + command.getClass() + "] not supported.");
   }
 
   @SuppressWarnings("unchecked")
-  private Response toResponse(Map<String, Object> results, BaseCommand command) throws
IOException {
+  private Response toResponse(Map<Shard, Object> results, BaseCommand command) throws
IOException {
     if (command instanceof IndexReadCombiningCommand) {
       IndexReadCombiningCommand<Object, Object> primitiveCommandAggregator = (IndexReadCombiningCommand<Object,
Object>) command;
-      Iterator<Entry<String, Object>> iterator = results.entrySet().iterator();
-      Object object = primitiveCommandAggregator.combine(iterator);
+      Object object = primitiveCommandAggregator.combine(results);
       return Response.createNewAggregateResponse(object);
     }
     return Response.createNewResponse(results);
   }
 
-  private Map<String, Object> executeReadWriteCommand(IndexWriteCommand<?> command,
String table, Args args) {
+  private Map<Shard, Object> executeReadWriteCommand(IndexWriteCommand<?> command,
TableContext tableContext, Args args) {
     return null;
   }
 
-  private Map<String, Object> executeReadCommand(BaseCommand command, String table,
final Args args)
+  private Map<Shard, Object> executeReadCommand(BaseCommand command, final TableContext
tableContext, final Args args)
       throws IOException {
-    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+    Map<String, BlurIndex> indexes = _indexServer.getIndexes(tableContext.getTable());
     Map<String, Future<?>> futureMap = new HashMap<String, Future<?>>();
     for (Entry<String, BlurIndex> e : indexes.entrySet()) {
       String shardId = e.getKey();
+      final Shard shard = new Shard(shardId);
       final BlurIndex blurIndex = e.getValue();
       final IndexReadCommand<?> readCommand = (IndexReadCommand<?>) command.clone();
       Future<Object> future = _executorService.submit(new Callable<Object>()
{
@@ -101,7 +82,7 @@ public class ShardCommandManager implements Closeable {
         public Object call() throws Exception {
           IndexSearcherClosable searcher = blurIndex.getIndexSearcher();
           try {
-            return readCommand.execute(args, searcher);
+            return readCommand.execute(new ShardIndexContext(tableContext, shard, searcher,
args));
           } finally {
             searcher.close();
           }
@@ -109,7 +90,7 @@ public class ShardCommandManager implements Closeable {
       });
       futureMap.put(shardId, future);
     }
-    Map<String, Object> resultMap = new HashMap<String, Object>();
+    Map<Shard, Object> resultMap = new HashMap<Shard, Object>();
     for (Entry<String, Future<?>> e : futureMap.entrySet()) {
       Future<?> future = e.getValue();
       Object object;
@@ -120,7 +101,7 @@ public class ShardCommandManager implements Closeable {
       } catch (ExecutionException ex) {
         throw new IOException(ex.getCause());
       }
-      resultMap.put(e.getKey(), object);
+      resultMap.put(new Shard(e.getKey()), object);
     }
     return resultMap;
   }
@@ -129,9 +110,44 @@ public class ShardCommandManager implements Closeable {
     return _command.get(commandName);
   }
 
-  @Override
-  public void close() throws IOException {
-    _executorService.shutdownNow();
-  }
+  static class ShardIndexContext extends IndexContext {
+
+    private final TableContext _tableContext;
+    private final Shard _shard;
+    private final IndexSearcher _searcher;
+    private final Args _args;
+
+    public ShardIndexContext(TableContext tableContext, Shard shard, IndexSearcher searcher,
Args args) {
+      _tableContext = tableContext;
+      _shard = shard;
+      _searcher = searcher;
+      _args = args;
+    }
+
+    @Override
+    public Args getArgs() {
+      return _args;
+    }
 
+    @Override
+    public IndexReader getIndexReader() {
+      return getIndexSearcher().getIndexReader();
+    }
+
+    @Override
+    public IndexSearcher getIndexSearcher() {
+      return _searcher;
+    }
+
+    @Override
+    public TableContext getTableContext() {
+      return _tableContext;
+    }
+
+    @Override
+    public Shard getShard() {
+      return _shard;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/primitive/DocumentCount.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/primitive/DocumentCount.java
b/blur-core/src/main/java/org/apache/blur/manager/command/primitive/DocumentCount.java
index 5bb37cf..f1cd904 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/primitive/DocumentCount.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/primitive/DocumentCount.java
@@ -18,9 +18,8 @@ package org.apache.blur.manager.command.primitive;
 
 import java.io.IOException;
 
-import org.apache.blur.manager.command.Args;
+import org.apache.blur.manager.command.IndexContext;
 import org.apache.blur.manager.command.IndexReadCommand;
-import org.apache.lucene.search.IndexSearcher;
 
 @SuppressWarnings("serial")
 public class DocumentCount extends BaseCommand implements IndexReadCommand<Integer>
{
@@ -33,8 +32,8 @@ public class DocumentCount extends BaseCommand implements IndexReadCommand<Integ
   }
 
   @Override
-  public Integer execute(Args args, IndexSearcher searcher) throws IOException {
-    return (int) searcher.getIndexReader().numDocs();
+  public Integer execute(IndexContext context) throws IOException {
+    return context.getIndexReader().numDocs();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/manager/command/primitive/DocumentCountAggregator.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/primitive/DocumentCountAggregator.java
b/blur-core/src/main/java/org/apache/blur/manager/command/primitive/DocumentCountAggregator.java
index cf4533e..5739f1b 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/primitive/DocumentCountAggregator.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/primitive/DocumentCountAggregator.java
@@ -17,16 +17,15 @@
 package org.apache.blur.manager.command.primitive;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.blur.manager.command.Args;
 import org.apache.blur.manager.command.ClusterCommand;
-import org.apache.blur.manager.command.CommandContext;
+import org.apache.blur.manager.command.ClusterContext;
+import org.apache.blur.manager.command.IndexContext;
 import org.apache.blur.manager.command.IndexReadCombiningCommand;
 import org.apache.blur.manager.command.Server;
-import org.apache.lucene.search.IndexSearcher;
+import org.apache.blur.manager.command.Shard;
 
 @SuppressWarnings("serial")
 public class DocumentCountAggregator extends BaseCommand implements ClusterCommand<Long>,
@@ -40,23 +39,22 @@ public class DocumentCountAggregator extends BaseCommand implements ClusterComma
   }
 
   @Override
-  public Integer execute(Args args, IndexSearcher searcher) throws IOException {
-    return (int) searcher.getIndexReader().numDocs();
+  public Integer execute(IndexContext context) throws IOException {
+    return context.getIndexReader().numDocs();
   }
 
   @Override
-  public Long combine(Iterator<Entry<String, Integer>> it) throws IOException
{
+  public Long combine(Map<Shard, Integer> results) throws IOException {
     long total = 0;
-    while (it.hasNext()) {
-      total += it.next().getValue();
+    for (Integer i : results.values()) {
+      total += i;
     }
     return total;
   }
 
   @Override
-  public Long clusterExecute(Args args, CommandContext context) {
-    // where the key is the server hostname
-    Map<Server, Long> results = context.readServers(args, DocumentCountAggregator.class);
+  public Long clusterExecute(ClusterContext context) {
+    Map<Server, Long> results = context.readServers(null, DocumentCountAggregator.class);
     long total = 0;
     for (Entry<Server, Long> e : results.entrySet()) {
       total += e.getValue();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c0bb9f9f/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index d776fc0..2b48c15 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -37,13 +36,13 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.BlurQueryChecker;
 import org.apache.blur.manager.IndexManager;
 import org.apache.blur.manager.IndexServer;
-import org.apache.blur.manager.command.Args;
 import org.apache.blur.manager.command.CommandUtil;
-import org.apache.blur.manager.command.ShardCommandManager;
 import org.apache.blur.manager.command.Response;
+import org.apache.blur.manager.command.ShardCommandManager;
 import org.apache.blur.manager.results.BlurResultIterable;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.server.ShardServerContext;
+import org.apache.blur.server.TableContext;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.generated.Arguments;
 import org.apache.blur.thrift.generated.Blur.Iface;
@@ -61,7 +60,6 @@ import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.Status;
 import org.apache.blur.thrift.generated.TableStats;
 import org.apache.blur.thrift.generated.User;
-import org.apache.blur.thrift.generated.Value;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.QueryCache;
@@ -595,7 +593,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
   public org.apache.blur.thrift.generated.Response execute(String table, String commandName,
Arguments arguments)
       throws BlurException, TException {
     try {
-      Response response = _commandManager.execute(table, commandName, CommandUtil.convert(arguments));
+      Response response = _commandManager.execute(getTableContext(table), commandName, CommandUtil.convert(arguments));
       return CommandUtil.convert(response);
     } catch (Exception e) {
       LOG.error("Unknown error while trying to execute command [{0}] for table [{1}]", e,
commandName, table);
@@ -606,6 +604,9 @@ public class BlurShardServer extends TableAdmin implements Iface {
     }
   }
 
+  private TableContext getTableContext(final String table) {
+    return TableContext.create(_clusterStatus.getTableDescriptor(true, _clusterStatus.getCluster(true,
table), table));
+  }
 
   public void setCommandManager(ShardCommandManager commandManager) {
     _commandManager = commandManager;


Mime
View raw message