incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Command timeouts are now handled through the controller.
Date Wed, 03 Sep 2014 14:04:14 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 6dc0a83bf -> dc5c5670a


Command timeouts are now handled through the controller.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/dc5c5670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/dc5c5670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/dc5c5670

Branch: refs/heads/master
Commit: dc5c5670a63cca66ed4b14ef8e8b58d3450ddd91
Parents: 6dc0a83
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Sep 3 10:04:30 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Sep 3 10:04:30 2014 -0400

----------------------------------------------------------------------
 .../manager/command/BaseCommandManager.java     | 52 +++++++++++++++++++-
 .../command/ControllerClusterContext.java       | 34 +++++++++++--
 .../command/ControllerCommandManager.java       | 45 +++++++++--------
 .../manager/command/ShardCommandManager.java    | 45 ++---------------
 .../blur/thrift/BlurControllerServer.java       | 17 ++++++-
 .../blur/thrift/ThriftBlurControllerServer.java |  3 +-
 .../java/org/apache/blur/thrift/Connection.java |  2 +-
 .../apache/blur/thrift/util/CommandExample.java |  2 +-
 8 files changed, 130 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dc5c5670/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
index 71ee694..eb1cbcc 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
@@ -3,10 +3,18 @@ package org.apache.blur.manager.command;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.command.cmds.BaseCommand;
 import org.apache.blur.manager.command.cmds.DocumentCount;
 import org.apache.blur.manager.command.cmds.DocumentCountCombiner;
@@ -33,12 +41,16 @@ import org.apache.blur.manager.command.cmds.WaitForSeconds;
 
 public class BaseCommandManager implements Closeable {
 
+  private final static Log LOG = LogFactory.getLog(BaseCommandManager.class);
+
   protected final ExecutorService _executorService;
   protected final Map<String, BaseCommand> _command = new ConcurrentHashMap<String,
BaseCommand>();
   protected final Map<Class<? extends BaseCommand>, String> _commandNameLookup
= new ConcurrentHashMap<Class<? extends BaseCommand>, String>();
   protected final ExecutorService _executorServiceDriver;
+  protected final ConcurrentHashMap<String, Future<Response>> _runningMap = new
ConcurrentHashMap<String, Future<Response>>();
+  protected final long _connectionTimeout;
 
-  public BaseCommandManager(int threadCount) throws IOException {
+  public BaseCommandManager(int threadCount, long connectionTimeout) throws IOException {
     register(DocumentCount.class);
     register(DocumentCountNoCombine.class);
     register(DocumentCountCombiner.class);
@@ -46,6 +58,44 @@ public class BaseCommandManager implements Closeable {
     register(WaitForSeconds.class);
     _executorService = Executors.newThreadPool("command-", threadCount);
     _executorServiceDriver = Executors.newThreadPool("command-driver-", threadCount);
+    _connectionTimeout = connectionTimeout / 2;
+  }
+
+  public Response reconnect(String executionId) throws IOException, TimeoutException {
+    Future<Response> future = _runningMap.get(executionId);
+    if (future == null) {
+      throw new IOException("Command id [" + executionId + "] did not find any executing
commands.");
+    }
+    try {
+      return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } catch (java.util.concurrent.TimeoutException e) {
+      LOG.info("Timeout of command [{0}]", executionId);
+      throw new TimeoutException(executionId);
+    }
+  }
+
+  protected Response submitCallable(Callable<Response> callable) throws IOException,
TimeoutException {
+    String executionId = UUID.randomUUID().toString();
+    Future<Response> future = _executorServiceDriver.submit(callable);
+    _runningMap.put(executionId, future);
+    try {
+      return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } catch (java.util.concurrent.TimeoutException e) {
+      LOG.info("Timeout of command [{0}]", executionId);
+      throw new TimeoutException(executionId);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dc5c5670/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
index 5152683..9de500d 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
@@ -17,13 +17,15 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.command.cmds.BaseCommand;
 import org.apache.blur.server.TableContext;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.BlurClientManager;
 import org.apache.blur.thrift.ClientPool;
 import org.apache.blur.thrift.Connection;
 import org.apache.blur.thrift.generated.Arguments;
 import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Response;
+import org.apache.blur.thrift.generated.TimeoutException;
 import org.apache.blur.thrift.generated.ValueObject;
 import org.apache.hadoop.conf.Configuration;
 
@@ -70,8 +72,9 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
     for (String serverStr : tableLayout.values()) {
       try {
         Client client = BlurClientManager.getClientPool().getClient(new Connection(serverStr));
+        client.refresh();
         clients.put(new Server(serverStr), client);
-      } catch (TTransportException e) {
+      } catch (TException e) {
         throw new IOException(e);
       }
     }
@@ -128,7 +131,7 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
         @Override
         public Map<Shard, T> call() throws Exception {
           Arguments arguments = CommandUtil.toArguments(args);
-          Response response = client.execute(getTable(), commandName, arguments);
+          Response response = waitForResponse(client, getTable(), commandName, arguments);
           Map<Shard, Object> shardToValue = CommandUtil.fromThriftToObject(response.getShardToValue());
           return (Map<Shard, T>) shardToValue;
         }
@@ -141,6 +144,29 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
     return futureMap;
   }
 
+  protected static Response waitForResponse(Client client, String table, String commandName,
Arguments arguments)
+      throws TException {
+    // TODO This should likely be changed to run of a AtomicBoolean used for
+    // the status of commands.
+    String executionId = null;
+    while (true) {
+      try {
+        if (executionId == null) {
+          return client.execute(table, commandName, arguments);
+        } else {
+          return client.reconnect(executionId);
+        }
+      } catch (BlurException e) {
+        throw e;
+      } catch (TimeoutException e) {
+        executionId = e.getExecutionId();
+        LOG.info("Execution fetch timed out, reconnecting using [{0}].", executionId);
+      } catch (TException e) {
+        throw e;
+      }
+    }
+  }
+
   private Set<Shard> getShardsOnServer(Server server) {
     Set<Shard> shards = new HashSet<Shard>();
     for (Entry<String, String> e : _tableLayout.entrySet()) {
@@ -169,7 +195,7 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
         @Override
         public T call() throws Exception {
           Arguments arguments = CommandUtil.toArguments(args);
-          Response response = client.execute(getTable(), commandName, arguments);
+          Response response = waitForResponse(client, getTable(), commandName, arguments);
           ValueObject valueObject = response.getValue();
           return (T) CommandUtil.toObject(valueObject);
         }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dc5c5670/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
index 33a6b45..6a1ebb8 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
@@ -2,6 +2,7 @@ package org.apache.blur.manager.command;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.apache.blur.manager.command.cmds.BaseCommand;
 import org.apache.blur.server.TableContext;
@@ -25,31 +26,35 @@ import org.apache.blur.server.TableContext;
 @SuppressWarnings("unchecked")
 public class ControllerCommandManager extends BaseCommandManager {
 
-  public ControllerCommandManager(int threadCount) throws IOException {
-    super(threadCount);
+  public ControllerCommandManager(int threadCount, long connectionTimeout) throws IOException
{
+    super(threadCount, connectionTimeout);
   }
 
-  public Response execute(TableContext tableContext, String commandName, Args args, Map<String,
String> tableLayout)
-      throws IOException {
-    ClusterContext context = createCommandContext(tableContext, args, tableLayout);
-    BaseCommand command = getCommandObject(commandName);
+  public Response execute(TableContext tableContext, String commandName, final Args args,
Map<String, String> tableLayout)
+      throws IOException, TimeoutException {
+    final ClusterContext context = createCommandContext(tableContext, args, tableLayout);
+    final BaseCommand command = getCommandObject(commandName);
     if (command == null) {
       throw new IOException("Command with name [" + commandName + "] not found.");
     }
-    // For those commands that do not implement cluster command, run them in a
-    // base impl.
-
-    if (command instanceof ClusterCommand) {
-      return executeClusterCommand(context, command);
-    } else if (command instanceof IndexReadCombiningCommand) {
-      return executeIndexReadCombiningCommand(args, context, command);
-    } else if (command instanceof IndexReadCommand) {
-      return executeIndexReadCommand(args, context, command);
-    } else if (command instanceof IndexWriteCommand) {
-      return executeIndexWriteCommand(args, context, command);
-    } else {
-      throw new IOException("Command type of [" + command.getClass() + "] not supported.");
-    }
+    return submitCallable(new Callable<Response>() {
+      @Override
+      public Response call() throws Exception {
+        // For those commands that do not implement cluster command, run them in a
+        // base impl.
+        if (command instanceof ClusterCommand) {
+          return executeClusterCommand(context, command);
+        } else if (command instanceof IndexReadCombiningCommand) {
+          return executeIndexReadCombiningCommand(args, context, command);
+        } else if (command instanceof IndexReadCommand) {
+          return executeIndexReadCommand(args, context, command);
+        } else if (command instanceof IndexWriteCommand) {
+          return executeIndexWriteCommand(args, context, command);
+        } else {
+          throw new IOException("Command type of [" + command.getClass() + "] not supported.");
+        }
+      }
+    });
   }
 
   private Response executeClusterCommand(ClusterContext context, BaseCommand command) throws
IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dc5c5670/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
index 50b520b..6a89394 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
@@ -20,13 +20,9 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.manager.IndexServer;
@@ -42,38 +38,16 @@ import org.apache.lucene.search.IndexSearcher;
 public class ShardCommandManager extends BaseCommandManager {
 
   private final IndexServer _indexServer;
-  private final ConcurrentHashMap<String, Future<Response>> _runningMap = new
ConcurrentHashMap<String, Future<Response>>();
-  private final long _connectionTimeout;
 
   public ShardCommandManager(IndexServer indexServer, int threadCount, long connectionTimeout)
throws IOException {
-    super(threadCount);
+    super(threadCount, connectionTimeout);
     _indexServer = indexServer;
-    _connectionTimeout = connectionTimeout / 2;
-  }
-
-  public Response reconnect(String executionId) throws IOException, TimeoutException {
-    Future<Response> future = _runningMap.get(executionId);
-    if (future == null) {
-      throw new IOException("Command id [" + executionId + "] did not find any executing
commands.");
-    }
-    try {
-      return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
-    } catch (CancellationException e) {
-      throw new IOException(e);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } catch (ExecutionException e) {
-      throw new IOException(e.getCause());
-    } catch (java.util.concurrent.TimeoutException e) {
-      throw new TimeoutException(executionId);
-    }
   }
 
   public Response execute(final TableContext tableContext, final String commandName, final
Args args)
       throws IOException, TimeoutException {
     final ShardServerContext shardServerContext = ShardServerContext.getShardServerContext();
-    String uuid = UUID.randomUUID().toString();
-    Future<Response> future = _executorServiceDriver.submit(new Callable<Response>()
{
+    Callable<Response> callable = new Callable<Response>() {
       @Override
       public Response call() throws Exception {
         BaseCommand command = getCommandObject(commandName);
@@ -87,19 +61,8 @@ public class ShardCommandManager extends BaseCommandManager {
         }
         throw new IOException("Command type of [" + command.getClass() + "] not supported.");
       }
-    });
-    _runningMap.put(uuid, future);
-    try {
-      return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
-    } catch (CancellationException e) {
-      throw new IOException(e);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    } catch (ExecutionException e) {
-      throw new IOException(e.getCause());
-    } catch (java.util.concurrent.TimeoutException e) {
-      throw new TimeoutException(uuid);
-    }
+    };
+    return submitCallable(callable);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dc5c5670/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index f6ce7f9..96f7872 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -1512,6 +1512,9 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
           tableLayout);
       return CommandUtil.fromObjectToThrift(response);
     } catch (Exception e) {
+      if (e instanceof org.apache.blur.manager.command.TimeoutException) {
+        throw new TimeoutException(((org.apache.blur.manager.command.TimeoutException) e).getExecutionId());
+      }
       LOG.error("Unknown error while trying to execute command [{0}] for table [{1}]", e,
commandName, table);
       if (e instanceof BlurException) {
         throw (BlurException) e;
@@ -1527,7 +1530,19 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
   @Override
   public org.apache.blur.thrift.generated.Response reconnect(String executionId) throws BlurException,
       TimeoutException, TException {
-    throw new BException("Not implemented yet.");
+    try {
+      Response response = _commandManager.reconnect(executionId);
+      return CommandUtil.fromObjectToThrift(response);
+    } catch (Exception e) {
+      if (e instanceof org.apache.blur.manager.command.TimeoutException) {
+        throw new TimeoutException(((org.apache.blur.manager.command.TimeoutException) e).getExecutionId());
+      }
+      LOG.error("Unknown error while trying to reconnect to executing command [{0}]", e,
executionId);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dc5c5670/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index c17235f..d9aad1f 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -128,7 +128,8 @@ public class ThriftBlurControllerServer extends ThriftServer {
     int timeout = configuration.getInt(BLUR_CONTROLLER_SHARD_CONNECTION_TIMEOUT, 60000);
     BlurControllerServer.BlurClient client = new BlurControllerServer.BlurClientRemote(timeout);
 
-    final ControllerCommandManager controllerCommandManager = new ControllerCommandManager(16);
+    final ControllerCommandManager controllerCommandManager = new ControllerCommandManager(16,
+        Connection.DEFAULT_TIMEOUT);
 
     final BlurControllerServer controllerServer = new BlurControllerServer();
     controllerServer.setClient(client);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dc5c5670/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java b/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
index 7145a83..c976a47 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/Connection.java
@@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class Connection {
 
-  public final static int DEFAULT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
+  public final static int DEFAULT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(10);
 
   private final String _host;
   private final int _port;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dc5c5670/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
index 7161ed0..22de826 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
@@ -29,7 +29,7 @@ import org.apache.blur.thrift.generated.TimeoutException;
 public class CommandExample {
 
   public static void main(String[] args) throws BlurException, TException, IOException {
-    Client client = BlurClientManager.getClientPool().getClient(new Connection("localhost:40020"));
+    Client client = BlurClientManager.getClientPool().getClient(new Connection("localhost:40010"));
     String executionId = null;
     while (true) {
       try {


Mime
View raw message