incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Blur commands can have their status be returned through the thrift api and the commands execution can now be interrupted.
Date Mon, 11 Jan 2016 19:59:33 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master c9a99b4d1 -> 197936246


Blur commands can have their status be returned through the thrift api and the commands execution can now be interrupted.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/19793624
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/19793624
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/19793624

Branch: refs/heads/master
Commit: 19793624635b0e857dcebf6315bf085f84264c2d
Parents: c9a99b4
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Jan 11 14:59:22 2016 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Jan 11 14:59:22 2016 -0500

----------------------------------------------------------------------
 .../apache/blur/command/BaseCommandManager.java |  91 ++++++++--
 .../blur/command/CommandStatusStateEnum.java    |  21 ---
 .../blur/command/ControllerClusterContext.java  |   9 +-
 .../blur/command/ControllerCommandManager.java  |  11 +-
 .../org/apache/blur/command/ResponseFuture.java |  15 +-
 .../blur/command/ShardCommandManager.java       |  41 +++--
 .../org/apache/blur/manager/IndexManager.java   |  12 +-
 .../blur/manager/status/QueryStatusManager.java |  55 +++---
 .../apache/blur/server/FilteredBlurServer.java  |   5 +-
 .../blur/thrift/BlurControllerServer.java       | 176 ++++++++++++++-----
 .../org/apache/blur/thrift/BlurShardServer.java |  25 +--
 .../blur/thrift/ThriftBlurControllerServer.java |  10 +-
 .../blur/thrift/ThriftBlurShardServer.java      |  12 +-
 .../blur/command/ShardCommandManagerTest.java   |  22 ++-
 .../org/apache/blur/command/WaitForSeconds.java |  12 ++
 .../apache/blur/manager/IndexManagerTest.java   |  10 +-
 .../server/cache/ThriftCacheServerTest.java     |   4 +-
 17 files changed, 367 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
index edee060..ad542ef 100644
--- a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
@@ -10,6 +10,7 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
@@ -20,6 +21,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
@@ -29,6 +31,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.command.annotation.Description;
 import org.apache.blur.concurrent.Executors;
@@ -36,6 +39,9 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.thrift.generated.Arguments;
 import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.CommandStatus;
+import org.apache.blur.thrift.generated.CommandStatusState;
+import org.apache.blur.thrift.generated.User;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -85,9 +91,11 @@ public abstract class BaseCommandManager implements Closeable {
   protected final Configuration _configuration;
   protected final BlurObjectSerDe _serDe = new BlurObjectSerDe();
   protected final long _runningCacheTombstoneTime = TimeUnit.SECONDS.toMillis(60);
+  protected final String _serverName;
 
   public BaseCommandManager(File tmpPath, String commandPath, int workerThreadCount, int driverThreadCount,
-      long connectionTimeout, Configuration configuration) throws IOException {
+      long connectionTimeout, Configuration configuration, String serverName) throws IOException {
+    _serverName = serverName;
     _configuration = configuration;
     lookForCommandsToRegisterInClassPath();
     _tmpPath = tmpPath;
@@ -125,8 +133,70 @@ public abstract class BaseCommandManager implements Closeable {
     }
   }
 
-  public List<String> commandStatusList(CommandStatusStateEnum commandStatus) {
-    throw new RuntimeException("Not implemented.");
+  public CommandStatus getCommandStatus(String commandExecutionId) {
+    CommandStatus cso = findCommandStatusObject(commandExecutionId, _workerRunningMap.values());
+    if (cso != null) {
+      return cso;
+    }
+    return findCommandStatusObject(commandExecutionId, _driverRunningMap.values());
+  }
+
+  private CommandStatus findCommandStatusObject(String commandExecutionId, Collection<ResponseFuture<?>> values) {
+    Map<String, Map<CommandStatusState, Long>> serverStateMap = new HashMap<String, Map<CommandStatusState, Long>>();
+    CommandStatus commandStatus = null;
+    for (ResponseFuture<?> responseFuture : values) {
+      Command<?> commandExecuting = responseFuture.getCommandExecuting();
+      if (commandExecuting.getCommandExecutionId().equals(commandExecutionId)) {
+        if (commandStatus == null) {
+          CommandStatus originalCommandStatusObject = responseFuture.getOriginalCommandStatusObject();
+          String commandName = responseFuture.getCommandExecuting().getName();
+          Arguments arguments = originalCommandStatusObject.getArguments();
+          User user = originalCommandStatusObject.getUser();
+          commandStatus = new CommandStatus(commandExecutionId, commandName, arguments, serverStateMap, user);
+        }
+
+        CommandStatusState commandStatusStateEnum = getCommandStatusStateEnum(responseFuture);
+        Map<CommandStatusState, Long> map = serverStateMap.get(_serverName);
+        if (map == null) {
+          serverStateMap.put(_serverName, map = new HashMap<CommandStatusState, Long>());
+        }
+        Long l = map.get(commandStatusStateEnum);
+        if (l == null) {
+          map.put(commandStatusStateEnum, 1L);
+        } else {
+          map.put(commandStatusStateEnum, 1L + l);
+        }
+      }
+    }
+    return commandStatus;
+  }
+
+  public List<String> commandStatusList() {
+    Set<String> result = new TreeSet<String>();
+    result.addAll(getStatusList(_workerRunningMap.values()));
+    result.addAll(getStatusList(_driverRunningMap.values()));
+    return new ArrayList<String>(result);
+  }
+
+  private List<String> getStatusList(Collection<ResponseFuture<?>> values) {
+    List<String> result = new ArrayList<String>();
+    for (ResponseFuture<?> responseFuture : values) {
+      Command<?> commandExecuting = responseFuture.getCommandExecuting();
+      result.add(commandExecuting.getCommandExecutionId());
+    }
+    return result;
+  }
+
+  private CommandStatusState getCommandStatusStateEnum(ResponseFuture<?> responseFuture) {
+    if (responseFuture.isCancelled()) {
+      return CommandStatusState.INTERRUPTED;
+    } else {
+      if (responseFuture.isDone()) {
+        return CommandStatusState.COMPLETE;
+      } else {
+        return CommandStatusState.RUNNING;
+      }
+    }
   }
 
   private TimerTask getTimerTaskForRemovalOfOldCommands(final Map<Long, ResponseFuture<?>> runningMap) {
@@ -240,7 +310,7 @@ public abstract class BaseCommandManager implements Closeable {
   protected void copyLocal(FileSystem fileSystem, FileStatus fileStatus, File destDir) throws IOException {
     Path path = fileStatus.getPath();
     File file = new File(destDir, path.getName());
-    if (fileStatus.isDir()) {
+    if (fileStatus.isDirectory()) {
       if (!file.mkdirs()) {
         LOG.error("Error while trying to create a sub directory [{0}].", file.getAbsolutePath());
         throw new IOException("Error while trying to create a sub directory [" + file.getAbsolutePath() + "].");
@@ -259,7 +329,7 @@ public abstract class BaseCommandManager implements Closeable {
   }
 
   protected BigInteger checkContents(FileStatus fileStatus, FileSystem fileSystem) throws IOException {
-    if (fileStatus.isDir()) {
+    if (fileStatus.isDirectory()) {
       LOG.debug("Scanning directory [{0}].", fileStatus.getPath());
       BigInteger count = BigInteger.ZERO;
       Path path = fileStatus.getPath();
@@ -329,12 +399,12 @@ public abstract class BaseCommandManager implements Closeable {
     }
   }
 
-  protected Response submitDriverCallable(Callable<Response> callable, Command<?> commandExecuting) throws IOException,
-      TimeoutException, ExceptionCollector {
+  protected Response submitDriverCallable(Callable<Response> callable, Command<?> commandExecuting,
+      CommandStatus originalCommandStatusObject, AtomicBoolean running) throws IOException, TimeoutException, ExceptionCollector {
     Future<Response> future = _executorServiceDriver.submit(callable);
     Long instanceExecutionId = getInstanceExecutionId();
     _driverRunningMap.put(instanceExecutionId, new ResponseFuture<Response>(_runningCacheTombstoneTime, future,
-        commandExecuting));
+        commandExecuting, originalCommandStatusObject,running));
     try {
       return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
     } catch (CancellationException e) {
@@ -368,11 +438,12 @@ public abstract class BaseCommandManager implements Closeable {
     }
   }
 
-  protected <T> Future<T> submitToExecutorService(Callable<T> callable, Command<?> commandExecuting) {
+  protected <T> Future<T> submitToExecutorService(Callable<T> callable, Command<?> commandExecuting,
+      CommandStatus originalCommandStatusObject, AtomicBoolean running) {
     Future<T> future = _executorServiceWorker.submit(callable);
     Long instanceExecutionId = getInstanceExecutionId();
     _workerRunningMap.put(instanceExecutionId, new ResponseFuture<T>(_runningCacheTombstoneTime, future,
-        commandExecuting));
+        commandExecuting, originalCommandStatusObject, running));
     return future;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/command/CommandStatusStateEnum.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/CommandStatusStateEnum.java b/blur-core/src/main/java/org/apache/blur/command/CommandStatusStateEnum.java
deleted file mode 100644
index 575bb9d..0000000
--- a/blur-core/src/main/java/org/apache/blur/command/CommandStatusStateEnum.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.command;
-
-public enum CommandStatusStateEnum {
-  RUNNING, INTERRUPTED, COMPLETE, BACK_PRESSURE_INTERRUPTED;
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
index 192f743..b7a1a63 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
@@ -11,6 +11,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
@@ -25,6 +26,7 @@ import org.apache.blur.thrift.Connection;
 import org.apache.blur.thrift.generated.Arguments;
 import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.CommandStatus;
 import org.apache.blur.thrift.generated.Response;
 import org.apache.blur.thrift.generated.TimeoutException;
 import org.apache.blur.thrift.generated.ValueObject;
@@ -131,6 +133,8 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
     Map<Server, Client> clientMap = getClientMap(command, tables, shards);
 
     final Arguments arguments = _manager.toArguments(command);
+
+    CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, null);
     for (Entry<Server, Client> e : clientMap.entrySet()) {
       Server server = e.getKey();
       final Client client = e.getValue();
@@ -142,7 +146,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
           Map<Shard, Object> shardToValue = CommandUtil.fromThriftSupportedObjects(shardToThriftValue, _serDe);
           return (Map<Shard, T>) shardToValue;
         }
-      }, command);
+      }, command, originalCommandStatusObject, new AtomicBoolean(true));
       for (Shard shard : getShardsOnServer(server, tables, shards)) {
         futureMap.put(shard, new ShardResultFuture<T>(shard, future));
       }
@@ -222,6 +226,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
     Set<Shard> shards = command.routeShards(this, tables);
     Map<Server, Client> clientMap = getClientMap(command, tables, shards);
     final Arguments arguments = _manager.toArguments(command);
+    CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, null);
     for (Entry<Server, Client> e : clientMap.entrySet()) {
       Server server = e.getKey();
       final Client client = e.getValue();
@@ -233,7 +238,7 @@ public class ControllerClusterContext extends ClusterContext implements Closeabl
           Object thriftObject = CommandUtil.toObject(valueObject);
           return (T) _serDe.fromSupportedThriftObject(thriftObject);
         }
-      }, command);
+      }, command, originalCommandStatusObject, new AtomicBoolean(true));
       futureMap.put(server, future);
     }
     return futureMap;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
index b11d09a..86d90cd 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
@@ -4,6 +4,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.command.commandtype.ClusterExecuteCommand;
@@ -14,6 +15,7 @@ import org.apache.blur.command.commandtype.ServerReadCommand;
 import org.apache.blur.server.LayoutFactory;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.server.TableContextFactory;
+import org.apache.blur.thrift.generated.CommandStatus;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 
@@ -37,12 +39,13 @@ import org.apache.hadoop.conf.Configuration;
 public class ControllerCommandManager extends BaseCommandManager {
 
   public ControllerCommandManager(File tmpPath, String commandPath, int workerThreadCount, int driverThreadCount,
-      long connectionTimeout, Configuration configuration) throws IOException {
-    super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout, configuration);
+      long connectionTimeout, Configuration configuration, String serverName) throws IOException {
+    super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout, configuration, serverName);
   }
 
   public Response execute(final TableContextFactory tableContextFactory, LayoutFactory layoutFactory,
-      String commandName, ArgumentOverlay argumentOverlay) throws IOException, TimeoutException, ExceptionCollector {
+      String commandName, ArgumentOverlay argumentOverlay, CommandStatus originalCommandStatusObject)
+      throws IOException, TimeoutException, ExceptionCollector {
     final ControllerClusterContext context = createCommandContext(tableContextFactory, layoutFactory);
     final Command<?> command = getCommandObject(commandName, argumentOverlay);
     if (command == null) {
@@ -78,7 +81,7 @@ public class ControllerCommandManager extends BaseCommandManager {
         }
       }
 
-    }, command);
+    }, command, originalCommandStatusObject, new AtomicBoolean(true));
   }
 
   private CombiningContext getCombiningContext(final TableContextFactory tableContextFactory) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
index eb9b30d..a5a629e 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
@@ -20,19 +20,31 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.blur.thrift.generated.CommandStatus;
+
 public class ResponseFuture<T> implements Future<T> {
 
   private final Future<T> _future;
   private final AtomicLong _timeWhenNotRunningObserved = new AtomicLong();
   private final long _tombstone;
   private final Command<?> _commandExecuting;
+  private final CommandStatus _originalCommandStatusObject;
+  private final AtomicBoolean _running;
 
-  public ResponseFuture(long tombstone, Future<T> future, Command<?> commandExecuting) {
+  public ResponseFuture(long tombstone, Future<T> future, Command<?> commandExecuting,
+      CommandStatus originalCommandStatusObject, AtomicBoolean running) {
     _tombstone = tombstone;
     _future = future;
     _commandExecuting = commandExecuting;
+    _originalCommandStatusObject = originalCommandStatusObject;
+    _running = running;
+  }
+
+  public CommandStatus getOriginalCommandStatusObject() {
+    return _originalCommandStatusObject;
   }
 
   public Command<?> getCommandExecuting() {
@@ -40,6 +52,7 @@ public class ResponseFuture<T> implements Future<T> {
   }
 
   public boolean cancel(boolean mayInterruptIfRunning) {
+    _running.set(false);
     return _future.cancel(mayInterruptIfRunning);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
index d518523..5806874 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
@@ -26,14 +26,17 @@ import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.lucene.search.IndexSearcherCloseable;
+import org.apache.blur.manager.IndexManager;
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.server.ShardServerContext;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.server.TableContextFactory;
+import org.apache.blur.thrift.generated.CommandStatus;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.lucene.index.IndexReader;
 
@@ -42,14 +45,16 @@ public class ShardCommandManager extends BaseCommandManager {
   private final IndexServer _indexServer;
 
   public ShardCommandManager(IndexServer indexServer, File tmpPath, String commandPath, int workerThreadCount,
-      int driverThreadCount, long connectionTimeout, Configuration configuration) throws IOException {
-    super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout, configuration);
+      int driverThreadCount, long connectionTimeout, Configuration configuration, String serverName) throws IOException {
+    super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout, configuration, serverName);
     _indexServer = indexServer;
   }
 
   public Response execute(final TableContextFactory tableContextFactory, final String commandName,
-      final ArgumentOverlay argumentOverlay) throws IOException, TimeoutException, ExceptionCollector {
+      final ArgumentOverlay argumentOverlay, CommandStatus originalCommandStatusObject) throws IOException,
+      TimeoutException, ExceptionCollector {
     final ShardServerContext shardServerContext = getShardServerContext();
+    AtomicBoolean running = new AtomicBoolean(true);
     final Command<?> command = getCommandObject(commandName, argumentOverlay);
     Callable<Response> callable = new Callable<Response>() {
       @Override
@@ -58,8 +63,9 @@ public class ShardCommandManager extends BaseCommandManager {
           throw new IOException("Command with name [" + commandName + "] not found.");
         }
         if (command instanceof IndexRead || command instanceof ServerRead) {
-          return toResponse(executeReadCommand(shardServerContext, command, tableContextFactory), command,
-              getServerContext(tableContextFactory));
+          return toResponse(
+              executeReadCommand(shardServerContext, command, tableContextFactory, originalCommandStatusObject, running),
+              command, getServerContext(tableContextFactory));
         }
         throw new IOException("Command type of [" + command.getClass() + "] not supported.");
       }
@@ -79,7 +85,7 @@ public class ShardCommandManager extends BaseCommandManager {
         };
       }
     };
-    return submitDriverCallable(callable, command);
+    return submitDriverCallable(callable, command, originalCommandStatusObject, running);
   }
 
   private ShardServerContext getShardServerContext() {
@@ -102,7 +108,8 @@ public class ShardCommandManager extends BaseCommandManager {
   }
 
   private Map<Shard, Object> executeReadCommand(ShardServerContext shardServerContext, Command<?> command,
-      final TableContextFactory tableContextFactory) throws IOException, ExceptionCollector {
+      final TableContextFactory tableContextFactory, CommandStatus originalCommandStatusObject, AtomicBoolean running)
+      throws IOException, ExceptionCollector {
     BaseContext context = new BaseContext() {
       @Override
       public TableContext getTableContext(String table) throws IOException {
@@ -139,14 +146,15 @@ public class ShardCommandManager extends BaseCommandManager {
         Command<?> clone = command.clone();
         if (clone instanceof IndexRead) {
           final IndexRead<?> readCommand = (IndexRead<?>) clone;
-          callable = getCallable(shardServerContext, tableContextFactory, table, shard, blurIndex, readCommand);
+          callable = getCallable(shardServerContext, tableContextFactory, table, shard, blurIndex, readCommand, running);
         } else if (clone instanceof ServerRead) {
           final ServerRead<?, ?> readCombiningCommand = (ServerRead<?, ?>) clone;
-          callable = getCallable(shardServerContext, tableContextFactory, table, shard, blurIndex, readCombiningCommand);
+          callable = getCallable(shardServerContext, tableContextFactory, table, shard, blurIndex,
+              readCombiningCommand, running);
         } else {
           throw new IOException("Command type of [" + clone.getClass() + "] not supported.");
         }
-        Future<Object> future = submitToExecutorService(callable, clone);
+        Future<Object> future = submitToExecutorService(callable, clone, originalCommandStatusObject, running);
         futureMap.put(shard, future);
       }
     }
@@ -189,7 +197,7 @@ public class ShardCommandManager extends BaseCommandManager {
 
   private Callable<Object> getCallable(final ShardServerContext shardServerContext,
       final TableContextFactory tableContextFactory, final String table, final Shard shard, final BlurIndex blurIndex,
-      final ServerRead<?, ?> readCombiningCommand) {
+      final ServerRead<?, ?> readCombiningCommand, AtomicBoolean running) {
     return new Callable<Object>() {
       @Override
       public Object call() throws Exception {
@@ -199,14 +207,15 @@ public class ShardCommandManager extends BaseCommandManager {
           searcher = blurIndex.getIndexSearcher();
           shardServerContext.setIndexSearcherClosable(table, shardId, searcher);
         }
-        return readCombiningCommand.execute(new ShardIndexContext(tableContextFactory, table, shard, searcher));
+        return readCombiningCommand
+            .execute(new ShardIndexContext(tableContextFactory, table, shard, searcher, running));
       }
     };
   }
 
   private Callable<Object> getCallable(final ShardServerContext shardServerContext,
       final TableContextFactory tableContextFactory, final String table, final Shard shard, final BlurIndex blurIndex,
-      final IndexRead<?> readCommand) {
+      final IndexRead<?> readCommand, AtomicBoolean running) {
     return new Callable<Object>() {
       @Override
       public Object call() throws Exception {
@@ -217,7 +226,7 @@ public class ShardCommandManager extends BaseCommandManager {
           searcher = blurIndex.getIndexSearcher();
           shardServerContext.setIndexSearcherClosable(table, shardId, searcher);
         }
-        return readCommand.execute(new ShardIndexContext(tableContextFactory, table, shard, searcher));
+        return readCommand.execute(new ShardIndexContext(tableContextFactory, table, shard, searcher, running));
       }
     };
   }
@@ -229,11 +238,13 @@ public class ShardCommandManager extends BaseCommandManager {
     private final TableContextFactory _tableContextFactory;
     private final String _table;
 
-    public ShardIndexContext(TableContextFactory tableContextFactory, String table, Shard shard, IndexSearcherCloseable searcher) {
+    public ShardIndexContext(TableContextFactory tableContextFactory, String table, Shard shard,
+        IndexSearcherCloseable searcher, AtomicBoolean running) {
       _tableContextFactory = tableContextFactory;
       _table = table;
       _shard = shard;
       _searcher = searcher;
+      IndexManager.resetExitableReader(getIndexReader(), running);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index de225d5..cca43a8 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -166,7 +166,7 @@ public class IndexManager {
   private final ExecutorService _facetExecutor;
   private final ExecutorService _mutateExecutor;
 
-  private final QueryStatusManager _statusManager = new QueryStatusManager();
+  private final QueryStatusManager _statusManager;
   private final AtomicBoolean _closed = new AtomicBoolean(false);
   private final BlurPartitioner _blurPartitioner = new BlurPartitioner();
   private final BlurFilterCache _filterCache;
@@ -184,8 +184,9 @@ public class IndexManager {
   public static AtomicBoolean DEBUG_RUN_SLOW = new AtomicBoolean(false);
 
   public IndexManager(IndexServer indexServer, ClusterStatus clusterStatus, BlurFilterCache filterCache,
-      int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, long statusCleanupTimerDelay,
-      int facetThreadCount, DeepPagingCache deepPagingCache, MemoryAllocationWatcher memoryAllocationWatcher) {
+      int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, int facetThreadCount,
+      DeepPagingCache deepPagingCache, MemoryAllocationWatcher memoryAllocationWatcher, QueryStatusManager statusManager) {
+    _statusManager = statusManager;
     _memoryAllocationWatcher = memoryAllocationWatcher;
     _deepPagingCache = deepPagingCache;
     _indexServer = indexServer;
@@ -219,8 +220,6 @@ public class IndexManager {
       _facetExecutor = Executors.newThreadPool(new SynchronousQueue<Runnable>(), "facet-execution", facetThreadCount);
     }
 
-    _statusManager.setStatusCleanupTimerDelay(statusCleanupTimerDelay);
-    _statusManager.init();
     LOG.info("Init Complete");
 
   }
@@ -228,7 +227,6 @@ public class IndexManager {
   public synchronized void close() {
     if (!_closed.get()) {
       _closed.set(true);
-      _statusManager.close();
       _executor.shutdownNow();
       _mutateExecutor.shutdownNow();
       if (_facetExecutor != null) {
@@ -1296,7 +1294,7 @@ public class IndexManager {
 
   }
 
-  private static boolean resetExitableReader(IndexReader indexReader, AtomicBoolean running) {
+  public static boolean resetExitableReader(IndexReader indexReader, AtomicBoolean running) {
     if (indexReader instanceof ExitableReader) {
       ExitableReader exitableReader = (ExitableReader) indexReader;
       exitableReader.setRunning(running);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
index bb419b2..a86858a 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
@@ -16,6 +16,7 @@ package org.apache.blur.manager.status;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -24,7 +25,7 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.blur.log.Log;
@@ -36,18 +37,19 @@ import org.apache.blur.thrift.generated.User;
 import org.apache.blur.utils.GCAction;
 import org.apache.blur.utils.GCWatcher;
 
-public class QueryStatusManager {
+public class QueryStatusManager implements Closeable {
 
   private static final Log LOG = LogFactory.getLog(QueryStatusManager.class);
   private static final Object CONSTANT_VALUE = new Object();
 
-  private Timer statusCleanupTimer;
-  private long statusCleanupTimerDelay = TimeUnit.SECONDS.toMillis(10);
-  private ConcurrentHashMap<QueryStatus, Object> currentQueryStatusCollection = new ConcurrentHashMap<QueryStatus, Object>();
-
-  public void init() {
-    statusCleanupTimer = new Timer("Query-Status-Cleanup", true);
-    statusCleanupTimer.schedule(new TimerTask() {
+  private final Timer _statusCleanupTimer;
+  private final long _statusCleanupTimerDelay;
+  private final ConcurrentMap<QueryStatus, Object> _currentQueryStatusCollection = new ConcurrentHashMap<QueryStatus, Object>();
+  
+  public QueryStatusManager(long statusCleanupTimerDelay) {
+    _statusCleanupTimerDelay = statusCleanupTimerDelay;
+    _statusCleanupTimer = new Timer("Query-Status-Cleanup", true);
+    _statusCleanupTimer.schedule(new TimerTask() {
       @Override
       public void run() {
         try {
@@ -56,7 +58,7 @@ public class QueryStatusManager {
           LOG.error("Unknown error while trying to cleanup finished queries.", e);
         }
       }
-    }, statusCleanupTimerDelay, statusCleanupTimerDelay);
+    }, _statusCleanupTimerDelay, _statusCleanupTimerDelay);
     GCWatcher.registerAction(new GCAction() {
       @Override
       public void takeAction() throws Exception {
@@ -65,14 +67,15 @@ public class QueryStatusManager {
     });
   }
 
+  @Override
   public void close() {
-    statusCleanupTimer.cancel();
-    statusCleanupTimer.purge();
+    _statusCleanupTimer.cancel();
+    _statusCleanupTimer.purge();
   }
 
   public QueryStatus newQueryStatus(String table, BlurQuery blurQuery, int maxNumberOfThreads, AtomicBoolean running, User user) {
-    QueryStatus queryStatus = new QueryStatus(statusCleanupTimerDelay, table, blurQuery, running, user);
-    currentQueryStatusCollection.put(queryStatus, CONSTANT_VALUE);
+    QueryStatus queryStatus = new QueryStatus(_statusCleanupTimerDelay, table, blurQuery, running, user);
+    _currentQueryStatusCollection.put(queryStatus, CONSTANT_VALUE);
     return queryStatus;
   }
 
@@ -81,27 +84,23 @@ public class QueryStatusManager {
   }
 
   private void cleanupFinishedQueryStatuses() {
-    LOG.debug("QueryStatus Start count [{0}].", currentQueryStatusCollection.size());
-    Iterator<QueryStatus> iterator = currentQueryStatusCollection.keySet().iterator();
+    LOG.debug("QueryStatus Start count [{0}].", _currentQueryStatusCollection.size());
+    Iterator<QueryStatus> iterator = _currentQueryStatusCollection.keySet().iterator();
     while (iterator.hasNext()) {
       QueryStatus status = iterator.next();
       if (status.isValidForCleanUp()) {
-        currentQueryStatusCollection.remove(status);
+        _currentQueryStatusCollection.remove(status);
       }
     }
-    LOG.debug("QueryStatus Finish count [{0}].", currentQueryStatusCollection.size());
+    LOG.debug("QueryStatus Finish count [{0}].", _currentQueryStatusCollection.size());
   }
 
   public long getStatusCleanupTimerDelay() {
-    return statusCleanupTimerDelay;
-  }
-
-  public void setStatusCleanupTimerDelay(long statusCleanupTimerDelay) {
-    this.statusCleanupTimerDelay = statusCleanupTimerDelay;
+    return _statusCleanupTimerDelay;
   }
 
   public void cancelQuery(String table, String uuid) {
-    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+    for (QueryStatus status : _currentQueryStatusCollection.keySet()) {
       String userUuid = status.getUserUuid();
       if (userUuid != null && userUuid.equals(uuid) && status.getTable().equals(table)) {
         status.cancelQuery();
@@ -111,7 +110,7 @@ public class QueryStatusManager {
 
   public List<BlurQueryStatus> currentQueries(String table) {
     List<BlurQueryStatus> result = new ArrayList<BlurQueryStatus>();
-    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+    for (QueryStatus status : _currentQueryStatusCollection.keySet()) {
       if (status.getTable().equals(table)) {
         result.add(status.getQueryStatus());
       }
@@ -120,7 +119,7 @@ public class QueryStatusManager {
   }
 
   public BlurQueryStatus queryStatus(String table, String uuid) {
-    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+    for (QueryStatus status : _currentQueryStatusCollection.keySet()) {
       String userUuid = status.getUserUuid();
       if (userUuid != null && userUuid.equals(uuid) && status.getTable().equals(table)) {
         return status.getQueryStatus();
@@ -131,7 +130,7 @@ public class QueryStatusManager {
 
   public List<String> queryStatusIdList(String table) {
     Set<String> ids = new HashSet<String>();
-    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+    for (QueryStatus status : _currentQueryStatusCollection.keySet()) {
       if (status.getTable().equals(table)) {
         if (status.getUserUuid() != null) {
           ids.add(status.getUserUuid());  
@@ -143,7 +142,7 @@ public class QueryStatusManager {
 
   public void stopAllQueriesForBackPressure() {
     LOG.warn("Stopping all queries for back pressure.");
-    for (QueryStatus status : currentQueryStatusCollection.keySet()) {
+    for (QueryStatus status : _currentQueryStatusCollection.keySet()) {
       QueryState state = status.getQueryStatus().getState();
       if (state == QueryState.RUNNING) {
         status.stopQueryForBackPressure();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
index ff40e5a..1e819ce 100644
--- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
@@ -31,7 +31,6 @@ import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.ColumnDefinition;
 import org.apache.blur.thrift.generated.CommandDescriptor;
 import org.apache.blur.thrift.generated.CommandStatus;
-import org.apache.blur.thrift.generated.CommandStatusState;
 import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.Level;
 import org.apache.blur.thrift.generated.Metric;
@@ -261,9 +260,9 @@ public class FilteredBlurServer implements Iface {
   }
 
   @Override
-  public List<String> commandStatusList(int startingAt, short fetch, CommandStatusState state) throws BlurException,
+  public List<String> commandStatusList(int startingAt, short fetch) throws BlurException,
       TException {
-    return _iface.commandStatusList(startingAt, fetch, state);
+    return _iface.commandStatusList(startingAt, fetch);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 8ee32af..f43be5b 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -112,9 +112,9 @@ import org.apache.blur.utils.BlurExecutorCompletionService;
 import org.apache.blur.utils.BlurIterator;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.ForkJoin;
-import org.apache.blur.utils.ShardUtil;
 import org.apache.blur.utils.ForkJoin.Merger;
 import org.apache.blur.utils.ForkJoin.ParallelCall;
+import org.apache.blur.utils.ShardUtil;
 import org.apache.blur.zookeeper.WatchChildren;
 import org.apache.blur.zookeeper.WatchChildren.OnChange;
 import org.apache.blur.zookeeper.WatchNodeExistance;
@@ -1514,8 +1514,9 @@ public class BlurControllerServer extends TableAdmin implements Iface {
       throws BlurException, TException {
     try {
       BlurObject args = CommandUtil.toBlurObject(arguments);
+      CommandStatus originalCommandStatusObject = new CommandStatus(null, commandName, arguments, null, null);
       Response response = _commandManager.execute(getTableContextFactory(), getLayoutFactory(), commandName,
-          new ArgumentOverlay(args, _serDe));
+          new ArgumentOverlay(args, _serDe), originalCommandStatusObject);
       return CommandUtil.fromObjectToThrift(response, _serDe);
     } catch (Exception e) {
       if (e instanceof org.apache.blur.command.TimeoutException) {
@@ -1659,44 +1660,6 @@ public class BlurControllerServer extends TableAdmin implements Iface {
   }
 
   @Override
-  public List<String> commandStatusList(int startingAt, short fetch, CommandStatusState state) throws BlurException,
-      TException {
-    throw new BException("Not Implemented");
-  }
-
-  @Override
-  public CommandStatus commandStatus(String commandExecutionId) throws BlurException, TException {
-    throw new BException("Not Implemented");
-  }
-
-  @Override
-  public void commandCancel(String commandExecutionId) throws BlurException, TException {
-    throw new BException("Not Implemented");
-  }
-
-  // @Override
-  // public void bulkMutateStart(final String bulkId) throws BlurException,
-  // TException {
-  // String cluster = getCluster(table);
-  // try {
-  // scatter(cluster, new BlurCommand<Void>() {
-  // @Override
-  // public Void call(Client client) throws BlurException, TException {
-  // client.bulkMutateStart(bulkId);
-  // return null;
-  // }
-  // });
-  // } catch (Exception e) {
-  // LOG.error("Unknown error while trying to get start a bulk mutate [{0}] [{1}]",
-  // e, bulkId);
-  // if (e instanceof BlurException) {
-  // throw (BlurException) e;
-  // }
-  // throw new BException(e.getMessage(), e);
-  // }
-  // }
-  //
-  @Override
   public void bulkMutateAdd(final String bulkId, final RowMutation mutation) throws BlurException, TException {
     try {
       String table = mutation.getTable();
@@ -1873,4 +1836,137 @@ public class BlurControllerServer extends TableAdmin implements Iface {
       throw new BException("Unknown error while trying to validate indexes for table [{0}]", e, table);
     }
   }
+
+  @Override
+  public List<String> commandStatusList(int startingAt, short fetch) throws BlurException, TException {
+    try {
+      List<String> shardClusterList = shardClusterList();
+      SortedSet<String> result = new TreeSet<String>();
+      result.addAll(_commandManager.commandStatusList());
+      for (String cluster : shardClusterList) {
+        result.addAll(scatterGather(cluster, new BlurCommand<List<String>>() {
+          @Override
+          public List<String> call(Client client) throws BlurException, TException {
+            return client.commandStatusList(0, Short.MAX_VALUE);
+          }
+        }, new Merger<List<String>>() {
+          @Override
+          public List<String> merge(BlurExecutorCompletionService<List<String>> service) throws BlurException {
+            SortedSet<String> ids = new TreeSet<String>();
+            while (service.getRemainingCount() > 0) {
+              Future<List<String>> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
+              ids.addAll(service.getResultThrowException(future));
+            }
+            return new ArrayList<String>(ids);
+          }
+        }));
+      }
+      return new ArrayList<String>().subList(startingAt, fetch);
+    } catch (Exception e) {
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public CommandStatus commandStatus(String commandExecutionId) throws BlurException, TException {
+    try {
+      List<String> shardClusterList = shardClusterList();
+      CommandStatus commandStatus = _commandManager.getCommandStatus(commandExecutionId);
+      for (String cluster : shardClusterList) {
+        CommandStatus cs = scatterGather(cluster, new BlurCommand<CommandStatus>() {
+          @Override
+          public CommandStatus call(Client client) throws BlurException, TException {
+            return client.commandStatus(commandExecutionId);
+          }
+        }, new Merger<CommandStatus>() {
+          @Override
+          public CommandStatus merge(BlurExecutorCompletionService<CommandStatus> service) throws BlurException {
+            CommandStatus commandStatus = null;
+            while (service.getRemainingCount() > 0) {
+              Future<CommandStatus> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
+              commandStatus = mergeCommandStatus(commandStatus, service.getResultThrowException(future));
+            }
+            return commandStatus;
+          }
+        });
+        commandStatus = mergeCommandStatus(commandStatus, cs);
+      }
+      return commandStatus;
+    } catch (Exception e) {
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  private static CommandStatus mergeCommandStatus(CommandStatus cs1, CommandStatus cs2) {
+    if (cs1 == null && cs2 == null) {
+      return null;
+    } else if (cs1 == null) {
+      return cs2;
+    } else if (cs2 == null) {
+      return cs1;
+    } else {
+      Map<String, Map<CommandStatusState, Long>> serverStateMap1 = cs1.getServerStateMap();
+      Map<String, Map<CommandStatusState, Long>> serverStateMap2 = cs2.getServerStateMap();
+      Map<String, Map<CommandStatusState, Long>> merge = mergeServerStateMap(serverStateMap1, serverStateMap2);
+      return new CommandStatus(cs1.getExecutionId(), cs1.getCommandName(), cs1.getArguments(), merge, cs1.getUser());
+    }
+  }
+
+  private static Map<String, Map<CommandStatusState, Long>> mergeServerStateMap(
+      Map<String, Map<CommandStatusState, Long>> serverStateMap1,
+      Map<String, Map<CommandStatusState, Long>> serverStateMap2) {
+    Map<String, Map<CommandStatusState, Long>> result = new HashMap<String, Map<CommandStatusState, Long>>();
+    Set<String> keys = new HashSet<String>();
+    keys.addAll(serverStateMap1.keySet());
+    keys.addAll(serverStateMap2.keySet());
+    for (String key : keys) {
+      Map<CommandStatusState, Long> css1 = serverStateMap2.get(key);
+      Map<CommandStatusState, Long> css2 = serverStateMap2.get(key);
+      result.put(key, mergeCommandStatusState(css1, css2));
+    }
+    return result;
+  }
+
+  private static Map<CommandStatusState, Long> mergeCommandStatusState(Map<CommandStatusState, Long> css1,
+      Map<CommandStatusState, Long> css2) {
+    if (css1 == null && css2 == null) {
+      return new HashMap<CommandStatusState, Long>();
+    } else if (css1 == null) {
+      return css2;
+    } else if (css2 == null) {
+      return css1;
+    } else {
+      Map<CommandStatusState, Long> result = new HashMap<CommandStatusState, Long>(css1);
+      for (Entry<CommandStatusState, Long> e : css2.entrySet()) {
+        CommandStatusState key = e.getKey();
+        Long l = result.get(key);
+        Long value = e.getValue();
+        if (l == null) {
+          result.put(key, value);
+        } else {
+          result.put(key, l + value);
+        }
+      }
+      return result;
+    }
+  }
+
+  @Override
+  public void commandCancel(String commandExecutionId) throws BlurException, TException {
+    try {
+      List<String> shardClusterList = shardClusterList();
+      _commandManager.cancelCommand(commandExecutionId);
+      for (String cluster : shardClusterList) {
+        scatter(cluster, new BlurCommand<Void>() {
+          @Override
+          public Void call(Client client) throws BlurException, TException {
+            client.commandCancel(commandExecutionId);
+            return null;
+          }
+        });
+      }
+    } catch (Exception e) {
+      throw new BException(e.getMessage(), e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 625ed26..08b4400 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicLongArray;
 import org.apache.blur.command.ArgumentOverlay;
 import org.apache.blur.command.BlurObject;
 import org.apache.blur.command.BlurObjectSerDe;
-import org.apache.blur.command.CommandStatusStateEnum;
 import org.apache.blur.command.CommandUtil;
 import org.apache.blur.command.Response;
 import org.apache.blur.command.ShardCommandManager;
@@ -59,7 +58,6 @@ import org.apache.blur.thrift.generated.BlurQueryStatus;
 import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.CommandDescriptor;
 import org.apache.blur.thrift.generated.CommandStatus;
-import org.apache.blur.thrift.generated.CommandStatusState;
 import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.HighlightOptions;
 import org.apache.blur.thrift.generated.Query;
@@ -71,6 +69,7 @@ import org.apache.blur.thrift.generated.Status;
 import org.apache.blur.thrift.generated.TableStats;
 import org.apache.blur.thrift.generated.TimeoutException;
 import org.apache.blur.thrift.generated.User;
+import org.apache.blur.user.UserContext;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.QueryCache;
@@ -615,7 +614,10 @@ public class BlurShardServer extends TableAdmin implements Iface {
         }
       };
       BlurObject args = CommandUtil.toBlurObject(arguments);
-      Response response = _commandManager.execute(tableContextFactory, commandName, new ArgumentOverlay(args, _serDe));
+      User thriftUser = UserConverter.toThriftUser(UserContext.getUser());
+      CommandStatus originalCommandStatusObject = new CommandStatus(null, commandName, arguments, null, thriftUser);
+      Response response = _commandManager.execute(tableContextFactory, commandName, new ArgumentOverlay(args, _serDe),
+          originalCommandStatusObject);
       return CommandUtil.fromObjectToThrift(response, _serDe);
     } catch (Exception e) {
       if (e instanceof org.apache.blur.command.TimeoutException) {
@@ -670,11 +672,10 @@ public class BlurShardServer extends TableAdmin implements Iface {
   }
 
   @Override
-  public List<String> commandStatusList(int startingAt, short fetch, CommandStatusState state) throws BlurException,
-      TException {
+  public List<String> commandStatusList(int startingAt, short fetch) throws BlurException, TException {
     try {
-      List<String> ids = _commandManager.commandStatusList(toCommandStatus(state));
-      return ids.subList(startingAt, fetch);
+      List<String> ids = _commandManager.commandStatusList();
+      return ids.subList(startingAt, Math.min(ids.size(), fetch));
     } catch (Exception e) {
       throw new BException(e.getMessage(), e);
     }
@@ -682,7 +683,11 @@ public class BlurShardServer extends TableAdmin implements Iface {
 
   @Override
   public CommandStatus commandStatus(String commandExecutionId) throws BlurException, TException {
-    throw new BException("Not Implemented");
+    try {
+      return _commandManager.getCommandStatus(commandExecutionId);
+    } catch (Exception e) {
+      throw new BException(e.getMessage(), e);
+    }
   }
 
   @Override
@@ -694,10 +699,6 @@ public class BlurShardServer extends TableAdmin implements Iface {
     }
   }
 
-  private CommandStatusStateEnum toCommandStatus(CommandStatusState state) {
-    return CommandStatusStateEnum.valueOf(state.name());
-  }
-
   @Override
   public void bulkMutateAdd(String bulkId, RowMutation rowMutation) throws BlurException, TException {
     String table = rowMutation.getTable();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index a1c0f88..2df447c 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -110,7 +110,7 @@ public class ThriftBlurControllerServer extends ThriftServer {
     Configuration config = BlurUtil.newHadoopConfiguration(configuration);
     TableContext.setSystemBlurConfiguration(configuration);
     TableContext.setSystemConfiguration(config);
-    
+
     Thread.setDefaultUncaughtExceptionHandler(new SimpleUncaughtExceptionHandler());
     String bindAddress = configuration.get(BLUR_CONTROLLER_BIND_ADDRESS);
     int configBindPort = configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1);
@@ -127,12 +127,11 @@ public class ThriftBlurControllerServer extends ThriftServer {
     String nodeName = getNodeName(configuration, BLUR_CONTROLLER_HOSTNAME);
     nodeName = nodeName + ":" + instanceBindPort;
     configuration.set(BLUR_NODENAME, nodeName);
-    
 
     BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);
 
     final ZooKeeper zooKeeper = setupZookeeper(configuration, null);
-    
+
     final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(zooKeeper, configuration, config);
 
     int timeout = configuration.getInt(BLUR_CONTROLLER_SHARD_CONNECTION_TIMEOUT, 60000);
@@ -157,7 +156,7 @@ public class ThriftBlurControllerServer extends ThriftServer {
     }
     final ControllerCommandManager controllerCommandManager = new ControllerCommandManager(tmpPath, commandPath,
         numberOfControllerWorkerCommandThreads, numberOfControllerDriverCommandThreads, Connection.DEFAULT_TIMEOUT,
-        config);
+        config, nodeName);
 
     final BlurControllerServer controllerServer = new BlurControllerServer();
     controllerServer.setClient(client);
@@ -187,7 +186,8 @@ public class ThriftBlurControllerServer extends ThriftServer {
     Trace.setStorage(traceStorage);
     Trace.setNodeName(nodeName);
 
-    List<ServerSecurityFilter> serverSecurity = getServerSecurityList(configuration, ServerSecurityFilterFactory.ServerType.CONTROLLER);
+    List<ServerSecurityFilter> serverSecurity = getServerSecurityList(configuration,
+        ServerSecurityFilterFactory.ServerType.CONTROLLER);
 
     Iface iface = BlurUtil.wrapFilteredBlurServer(configuration, controllerServer, false);
     iface = ServerSecurityUtil.applySecurity(iface, serverSecurity, false);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index b7f969c..0b4e290 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -88,6 +88,7 @@ import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
 import org.apache.blur.manager.indexserver.DistributedIndexServer;
 import org.apache.blur.manager.indexserver.DistributedLayoutFactory;
 import org.apache.blur.manager.indexserver.DistributedLayoutFactoryImpl;
+import org.apache.blur.manager.status.QueryStatusManager;
 import org.apache.blur.memory.MemoryAllocationWatcher;
 import org.apache.blur.memory.Watcher;
 import org.apache.blur.metrics.JSONReporter;
@@ -252,9 +253,11 @@ public class ThriftBlurShardServer extends ThriftServer {
       }
     };
 
+    QueryStatusManager statusManager = new QueryStatusManager(statusCleanupTimerDelay);
+
     final IndexManager indexManager = new IndexManager(indexServer, clusterStatus, filterCache, maxHeapPerRowFetch,
-        fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay, facetThreadCount,
-        deepPagingCache, memoryAllocationWatcher);
+        fetchCount, indexManagerThreadCount, mutateThreadCount, facetThreadCount, deepPagingCache,
+        memoryAllocationWatcher, statusManager);
 
     File tmpPath = getTmpPath(configuration);
     int numberOfShardWorkerCommandThreads = configuration.getInt(BLUR_SHARD_COMMAND_WORKER_THREADS, 16);
@@ -266,7 +269,8 @@ public class ThriftBlurShardServer extends ThriftServer {
       LOG.info("Command Path was not set.");
     }
     final ShardCommandManager commandManager = new ShardCommandManager(indexServer, tmpPath, commandPath,
-        numberOfShardWorkerCommandThreads, numberOfShardDriverCommandThreads, Connection.DEFAULT_TIMEOUT, config);
+        numberOfShardWorkerCommandThreads, numberOfShardDriverCommandThreads, Connection.DEFAULT_TIMEOUT, config,
+        nodeName);
 
     clusterStatus.registerActionOnTableStateChange(new Action() {
       @Override
@@ -361,7 +365,7 @@ public class ThriftBlurShardServer extends ThriftServer {
         ThreadWatcher threadWatcher = ThreadWatcher.instance();
         quietClose(streamServer, makeCloseable(hdfsKeyValueTimer), makeCloseable(indexImporterTimer),
             makeCloseable(indexBulkTimer), blockCacheDirectoryFactory, commandManager, traceStorage, server,
-            shardServer, indexManager, indexServer, threadWatcher, clusterStatus, zooKeeper, httpServer);
+            shardServer, indexManager, statusManager, indexServer, threadWatcher, clusterStatus, zooKeeper, httpServer);
       }
     };
     server.setShutdown(shutdown);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
index 862c509..64bb2d0 100644
--- a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
@@ -44,6 +44,7 @@ import org.apache.blur.server.TableContext;
 import org.apache.blur.server.TableContextFactory;
 import org.apache.blur.thrift.generated.Arguments;
 import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.CommandStatus;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
@@ -83,7 +84,7 @@ public class ShardCommandManagerTest {
   @Before
   public void setup() throws IOException {
     _config = new Configuration();
-    _manager = new ShardCommandManager(getIndexServer(), null, null, 10, 10, 1000, _config);
+    _manager = new ShardCommandManager(getIndexServer(), null, null, 10, 10, 1000, _config, "test");
   }
 
   @After
@@ -136,12 +137,14 @@ public class ShardCommandManagerTest {
       output.close();
     }
     ShardCommandManager manager = new ShardCommandManager(getIndexServer(), _tmpPath, _commandPath, 10, 10, 1000,
-        _config);
+        _config, "test");
     {
       BlurObject args = new BlurObject();
       args.put("table", "test");
       ArgumentOverlay argumentOverlay = new ArgumentOverlay(args, new BlurObjectSerDe());
-      Response response = manager.execute(getTableContextFactory(), "test", argumentOverlay);
+      CommandStatus originalCommandStatusObject = new CommandStatus(null, "test", null, null, null);
+      Response response = manager.execute(getTableContextFactory(), "test", argumentOverlay,
+          originalCommandStatusObject);
       Map<Shard, Object> shardResults = response.getShardResults();
       for (Object o : shardResults.values()) {
         assertEquals("test1", o);
@@ -163,7 +166,9 @@ public class ShardCommandManagerTest {
       BlurObject args = new BlurObject();
       args.put("table", "test");
       ArgumentOverlay argumentOverlay = new ArgumentOverlay(args, new BlurObjectSerDe());
-      Response response = manager.execute(getTableContextFactory(), "test", argumentOverlay);
+      CommandStatus originalCommandStatusObject = new CommandStatus(null, "test", null, null, null);
+      Response response = manager.execute(getTableContextFactory(), "test", argumentOverlay,
+          originalCommandStatusObject);
       Map<Shard, Object> shardResults = response.getShardResults();
       for (Object o : shardResults.values()) {
         assertEquals("test2", o);
@@ -205,7 +210,8 @@ public class ShardCommandManagerTest {
       try {
         if (instanceExecutionId == null) {
           TableContextFactory tableContextFactory = getTableContextFactory();
-          response = _manager.execute(tableContextFactory, "wait", argumentOverlay);
+          CommandStatus originalCommandStatusObject = new CommandStatus(null, "test", null, null, null);
+          response = _manager.execute(tableContextFactory, "wait", argumentOverlay, originalCommandStatusObject);
         } else {
           response = _manager.reconnect(instanceExecutionId);
         }
@@ -225,7 +231,8 @@ public class ShardCommandManagerTest {
     ArgumentOverlay argumentOverlay = new ArgumentOverlay(args, new BlurObjectSerDe());
     TableContextFactory tableContextFactory = getTableContextFactory();
     try {
-      _manager.execute(tableContextFactory, "error", argumentOverlay);
+      CommandStatus originalCommandStatusObject = new CommandStatus(null, "test", null, null, null);
+      _manager.execute(tableContextFactory, "error", argumentOverlay, originalCommandStatusObject);
       fail();
     } catch (ExceptionCollector e) {
       Throwable t = e.getCause();
@@ -263,7 +270,8 @@ public class ShardCommandManagerTest {
           try {
             Response response;
             if (instanceExecutionId == null) {
-              response = _manager.execute(tableContextFactory, "wait", argumentOverlay);
+              CommandStatus originalCommandStatusObject = new CommandStatus(null, "test", null, null, null);
+              response = _manager.execute(tableContextFactory, "wait", argumentOverlay, originalCommandStatusObject);
             } else {
               response = _manager.reconnect(instanceExecutionId);
             }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/test/java/org/apache/blur/command/WaitForSeconds.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/command/WaitForSeconds.java b/blur-core/src/test/java/org/apache/blur/command/WaitForSeconds.java
index 3216d7f..338527b 100644
--- a/blur-core/src/test/java/org/apache/blur/command/WaitForSeconds.java
+++ b/blur-core/src/test/java/org/apache/blur/command/WaitForSeconds.java
@@ -21,14 +21,26 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.command.annotation.OptionalArgument;
 import org.apache.blur.command.commandtype.IndexReadCommandSingleTable;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 
 public class WaitForSeconds extends IndexReadCommandSingleTable<Boolean> {
 
+  private static final Log LOG = LogFactory.getLog(WaitForSeconds.class);
+
+  public static void main(String[] args) throws IOException {
+    WaitForSeconds waitForSecond = new WaitForSeconds();
+    waitForSecond.setSeconds(30);
+    waitForSecond.setTable("test1");
+    waitForSecond.run("localhost:40010");
+  }
+
   @OptionalArgument("The number of seconds to sleep, the default is 30 seconds.")
   private int seconds = 30;
 
   @Override
   public Boolean execute(IndexContext context) throws IOException, InterruptedException {
+    LOG.info(Thread.currentThread().isInterrupted());
     Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index e5bd42d..893d65b 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -51,6 +51,7 @@ import org.apache.blur.lucene.search.DeepPagingCache;
 import org.apache.blur.manager.clusterstatus.ClusterStatus;
 import org.apache.blur.manager.indexserver.LocalIndexServer;
 import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.manager.status.QueryStatusManager;
 import org.apache.blur.memory.MemoryAllocationWatcher;
 import org.apache.blur.memory.Watcher;
 import org.apache.blur.server.TableContext;
@@ -106,6 +107,8 @@ public class IndexManagerTest {
   private IndexManager indexManager;
   private File base;
 
+  private QueryStatusManager _statusManager;
+
   @Before
   public void setUp() throws BlurException, IOException, InterruptedException {
     TableContext.clear();
@@ -128,8 +131,10 @@ public class IndexManagerTest {
 
     BlurFilterCache filterCache = new DefaultBlurFilterCache(new BlurConfiguration());
     long statusCleanupTimerDelay = 1000;
-    indexManager = new IndexManager(server, getClusterStatus(tableDescriptor), filterCache, 10000000, 100, 1, 1,
-        statusCleanupTimerDelay, 0, new DeepPagingCache(), NOTHING);
+    _statusManager = new QueryStatusManager(statusCleanupTimerDelay);
+
+    indexManager = new IndexManager(server, getClusterStatus(tableDescriptor), filterCache, 10000000, 100, 1, 1, 0,
+        new DeepPagingCache(), NOTHING, _statusManager);
     setupData();
   }
 
@@ -231,6 +236,7 @@ public class IndexManagerTest {
   @After
   public void teardown() {
     if (indexManager != null) {
+      _statusManager.close();
       indexManager.close();
       indexManager = null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/19793624/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java b/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
index 73ef9ed..29bb600 100644
--- a/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java
@@ -41,7 +41,6 @@ import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.ColumnDefinition;
 import org.apache.blur.thrift.generated.CommandDescriptor;
 import org.apache.blur.thrift.generated.CommandStatus;
-import org.apache.blur.thrift.generated.CommandStatusState;
 import org.apache.blur.thrift.generated.FetchResult;
 import org.apache.blur.thrift.generated.Level;
 import org.apache.blur.thrift.generated.Metric;
@@ -463,8 +462,7 @@ public class ThriftCacheServerTest {
       }
 
       @Override
-      public List<String> commandStatusList(int startingAt, short fetch, CommandStatusState state)
-          throws BlurException, TException {
+      public List<String> commandStatusList(int startingAt, short fetch) throws BlurException, TException {
         throw new RuntimeException("Not implemented.");
       }
 


Mime
View raw message