incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding execution tracking so that command canceling can be implemented.
Date Wed, 10 Sep 2014 13:56:07 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master e97a120df -> 528ef3ff7


Adding execution tracking so that command canceling can be implemented.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/528ef3ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/528ef3ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/528ef3ff

Branch: refs/heads/master
Commit: 528ef3ff785b5f2b459c3896e477dbf4916f15d2
Parents: e97a120
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Sep 10 09:55:58 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Sep 10 09:55:58 2014 -0400

----------------------------------------------------------------------
 .../apache/blur/command/BaseCommandManager.java | 34 ++++---
 .../blur/command/ControllerClusterContext.java  |  9 +-
 .../blur/command/ControllerCommandManager.java  | 11 +--
 .../apache/blur/command/ExecutionContext.java   | 95 ++++++++++++++++++++
 .../org/apache/blur/command/ExecutionId.java    | 56 ++++++++++++
 .../blur/command/ShardCommandManager.java       |  4 +-
 .../apache/blur/thrift/util/CommandExample.java | 34 +++----
 7 files changed, 202 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/528ef3ff/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
index cea496b..6c25c28 100644
--- a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
@@ -8,10 +8,10 @@ import java.util.Enumeration;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -21,6 +21,8 @@ import org.apache.blur.concurrent.Executors;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 
+import com.google.common.collect.MapMaker;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with this
@@ -41,14 +43,14 @@ import org.apache.blur.log.LogFactory;
 public class BaseCommandManager implements Closeable {
 
   private static final String META_INF_SERVICES_ORG_APACHE_BLUR_COMMAND_COMMANDS = "META-INF/services/org.apache.blur.command.Commands";
+  private static final Log LOG = LogFactory.getLog(BaseCommandManager.class);
 
-  private final static Log LOG = LogFactory.getLog(BaseCommandManager.class);
-
-  protected final ExecutorService _executorService;
+  private final ExecutorService _executorService;
+  private final ExecutorService _executorServiceDriver;
+  
   protected final Map<String, Command> _command = new ConcurrentHashMap<String,
Command>();
   protected final Map<Class<? extends Command>, String> _commandNameLookup =
new ConcurrentHashMap<Class<? extends Command>, String>();
-  protected final ExecutorService _executorServiceDriver;
-  protected final ConcurrentHashMap<String, Future<Response>> _runningMap = new
ConcurrentHashMap<String, Future<Response>>();
+  protected final ConcurrentMap<ExecutionId, Future<Response>> _runningMap;
   protected final long _connectionTimeout;
 
   public BaseCommandManager(int threadCount, long connectionTimeout) throws IOException {
@@ -56,6 +58,7 @@ public class BaseCommandManager implements Closeable {
     _executorService = Executors.newThreadPool("command-", threadCount);
     _executorServiceDriver = Executors.newThreadPool("command-driver-", threadCount);
     _connectionTimeout = connectionTimeout / 2;
+    _runningMap = new MapMaker().weakKeys().makeMap();
   }
 
   @SuppressWarnings("unchecked")
@@ -99,9 +102,11 @@ public class BaseCommandManager implements Closeable {
     }
   }
 
-  protected Response submitCallable(Callable<Response> callable) throws IOException,
TimeoutException {
-    String executionId = UUID.randomUUID().toString();
-    Future<Response> future = _executorServiceDriver.submit(callable);
+  protected Response submitDriverCallable(Callable<Response> callable) throws IOException,
TimeoutException {
+    ExecutionContext executionContext = ExecutionContext.create();
+    Future<Response> future = _executorServiceDriver.submit(executionContext.wrapCallable(callable));
+    executionContext.registerDriverFuture(future);
+    ExecutionId executionId = executionContext.getExecutionId();
     _runningMap.put(executionId, future);
     try {
       return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
@@ -112,11 +117,18 @@ public class BaseCommandManager implements Closeable {
     } catch (ExecutionException e) {
       throw new IOException(e.getCause());
     } catch (java.util.concurrent.TimeoutException e) {
-      LOG.info("Timeout of command [{0}]", executionId);
-      throw new TimeoutException(executionId);
+      LOG.info("Timeout of command [{0}]", executionId.getId());
+      throw new TimeoutException(executionId.getId());
     }
   }
 
+  protected <T> Future<T> submitToExecutorService(Callable<T> callable)
{
+    ExecutionContext executionContext = ExecutionContext.get();
+    Future<T> future = _executorService.submit(executionContext.wrapCallable(callable));
+    executionContext.registerFuture(future);
+    return future;
+  }
+
   @Override
   public void close() throws IOException {
     _executorService.shutdownNow();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/528ef3ff/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
index 5a03806..fa694c1 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
@@ -9,7 +9,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.blur.BlurConfiguration;
@@ -52,16 +51,14 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
   private final Args _args;
   private final TableContext _tableContext;
   private final Map<Server, Client> _clientMap;
-  private final ExecutorService _executorService;
   private final ControllerCommandManager _manager;
   private final Map<String, String> _tableLayout;
 
   public ControllerClusterContext(TableContext tableContext, Args args, Map<String, String>
tableLayout,
-      ExecutorService executorService, ControllerCommandManager manager) throws IOException
{
+      ControllerCommandManager manager) throws IOException {
     _tableContext = tableContext;
     _args = args;
     _clientMap = getBlurClientsForTable(_tableContext.getTable(), tableLayout);
-    _executorService = executorService;
     _manager = manager;
     _tableLayout = tableLayout;
   }
@@ -126,7 +123,7 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
     for (Entry<Server, Client> e : _clientMap.entrySet()) {
       Server server = e.getKey();
       final Client client = e.getValue();
-      Future<Map<Shard, T>> future = _executorService.submit(new Callable<Map<Shard,
T>>() {
+      Future<Map<Shard, T>> future = _manager.submitToExecutorService(new Callable<Map<Shard,
T>>() {
         @Override
         public Map<Shard, T> call() throws Exception {
           Arguments arguments = CommandUtil.toArguments(args);
@@ -190,7 +187,7 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
     for (Entry<Server, Client> e : _clientMap.entrySet()) {
       Server server = e.getKey();
       final Client client = e.getValue();
-      Future<T> future = _executorService.submit(new Callable<T>() {
+      Future<T> future = _manager.submitToExecutorService(new Callable<T>() {
         @Override
         public T call() throws Exception {
           Arguments arguments = CommandUtil.toArguments(args);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/528ef3ff/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
index cc445e9..31a938d 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
@@ -29,17 +29,18 @@ public class ControllerCommandManager extends BaseCommandManager {
     super(threadCount, connectionTimeout);
   }
 
-  public Response execute(TableContext tableContext, String commandName, final Args args,
Map<String, String> tableLayout)
-      throws IOException, TimeoutException {
+  public Response execute(TableContext tableContext, String commandName, final Args args,
+      Map<String, String> tableLayout) throws IOException, TimeoutException {
     final ClusterContext context = createCommandContext(tableContext, args, tableLayout);
     final Command command = getCommandObject(commandName);
     if (command == null) {
       throw new IOException("Command with name [" + commandName + "] not found.");
     }
-    return submitCallable(new Callable<Response>() {
+    return submitDriverCallable(new Callable<Response>() {
       @Override
       public Response call() throws Exception {
-        // For those commands that do not implement cluster command, run them in a
+        // For those commands that do not implement cluster command, run them in
+        // a
         // base impl.
         if (command instanceof ClusterCommand) {
           return executeClusterCommand(context, command);
@@ -84,7 +85,7 @@ public class ControllerCommandManager extends BaseCommandManager {
 
   private ClusterContext createCommandContext(TableContext tableContext, Args args, Map<String,
String> tableLayout)
       throws IOException {
-    return new ControllerClusterContext(tableContext, args, tableLayout, _executorService,
this);
+    return new ControllerClusterContext(tableContext, args, tableLayout, this);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/528ef3ff/blur-core/src/main/java/org/apache/blur/command/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ExecutionContext.java b/blur-core/src/main/java/org/apache/blur/command/ExecutionContext.java
new file mode 100644
index 0000000..e5d783c
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ExecutionContext.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+import com.google.common.collect.MapMaker;
+
+public class ExecutionContext {
+
+  private static final Log LOG = LogFactory.getLog(ExecutionContext.class);
+
+  private static ConcurrentMap<ExecutionId, ExecutionContext> _contextMap = new MapMaker().makeMap();
+  private static ThreadLocal<ExecutionId> _currentId = new ThreadLocal<ExecutionId>();
+
+  public static ExecutionContext create() {
+    ExecutionId executionId = new ExecutionId(UUID.randomUUID().toString());
+    ExecutionContext executionContext = new ExecutionContext(executionId);
+    _contextMap.put(executionId, executionContext);
+    _currentId.set(executionId);
+    return executionContext;
+  }
+
+  public static ExecutionContext get() {
+    ExecutionId executionId = _currentId.get();
+    return _contextMap.get(executionId);
+  }
+
+  private final ExecutionId _executionId;
+  private final AtomicBoolean _running;
+  private final List<Future<?>> _futures = new ArrayList<Future<?>>();
+  private Future<?> _driverFuture;
+
+  public ExecutionContext(ExecutionId executionId) {
+    _executionId = executionId;
+    _running = new AtomicBoolean(true);
+  }
+
+  public void registerFuture(Future<?> future) {
+    synchronized (_futures) {
+      _futures.add(future);
+    }
+  }
+
+  public <T> Callable<T> wrapCallable(final Callable<T> callable) {
+    return new Callable<T>() {
+      @Override
+      public T call() throws Exception {
+        _currentId.set(_executionId);
+        LOG.info("Executing in new thread [{0}]", _executionId.getId());
+        try {
+          return callable.call();
+        } finally {
+          _currentId.set(null);
+        }
+      }
+    };
+  }
+
+  public ExecutionId getExecutionId() {
+    return _executionId;
+  }
+
+  public AtomicBoolean getRunning() {
+    return _running;
+  }
+
+  public <T> void registerDriverFuture(Future<T> driverFuture) {
+    _driverFuture = driverFuture;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/528ef3ff/blur-core/src/main/java/org/apache/blur/command/ExecutionId.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ExecutionId.java b/blur-core/src/main/java/org/apache/blur/command/ExecutionId.java
new file mode 100644
index 0000000..a018d27
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ExecutionId.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+public class ExecutionId {
+
+  private final String _id;
+
+  public ExecutionId(String id) {
+    _id = id;
+  }
+
+  public String getId() {
+    return _id;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_id == null) ? 0 : _id.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ExecutionId other = (ExecutionId) obj;
+    if (_id == null) {
+      if (other._id != null)
+        return false;
+    } else if (!_id.equals(other._id))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/528ef3ff/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
index 3a16ff6..6b5a1fd 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
@@ -61,7 +61,7 @@ public class ShardCommandManager extends BaseCommandManager {
         throw new IOException("Command type of [" + command.getClass() + "] not supported.");
       }
     };
-    return submitCallable(callable);
+    return submitDriverCallable(callable);
   }
 
   @SuppressWarnings("unchecked")
@@ -97,7 +97,7 @@ public class ShardCommandManager extends BaseCommandManager {
       } else {
         throw new IOException("Command type of [" + command.getClass() + "] not supported.");
       }
-      Future<Object> future = _executorService.submit(callable);
+      Future<Object> future = submitToExecutorService(callable);
       futureMap.put(shardId, future);
     }
     Map<Shard, Object> resultMap = new HashMap<Shard, Object>();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/528ef3ff/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
index 22de826..43a6909 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
@@ -30,24 +30,24 @@ public class CommandExample {
 
   public static void main(String[] args) throws BlurException, TException, IOException {
     Client client = BlurClientManager.getClientPool().getClient(new Connection("localhost:40010"));
-    String executionId = null;
-    while (true) {
-      try {
-        Response response;
-        if (executionId == null) {
-          response = client.execute("test", "wait", null);
-        } else {
-          System.out.println("Reconecting...");
-          response = client.reconnect(executionId);
-        }
-        System.out.println(response);
-        break;
-      } catch (TimeoutException ex) {
-        executionId = ex.getExecutionId();
-      }
-    }
+    // String executionId = null;
+    // while (true) {
+    // try {
+    // Response response;
+    // if (executionId == null) {
+    // response = client.execute("test", "wait", null);
+    // } else {
+    // System.out.println("Reconecting...");
+    // response = client.reconnect(executionId);
+    // }
+    // System.out.println(response);
+    // break;
+    // } catch (TimeoutException ex) {
+    // executionId = ex.getExecutionId();
+    // }
+    // }
 
-    // System.out.println(client.execute("test", "docCount", null));
+    System.out.println(client.execute("test2", "docCount", null));
     // System.out.println(client.execute("test", "docCountNoCombine", null));
     // {
     // Response response = client.execute("test", "docCountAggregate", null);


Mime
View raw message