incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: The reconnecting to a running command after a timeout is working in shard server. Still need to roll off old results from map.
Date Wed, 03 Sep 2014 13:41:10 GMT
The reconnecting to a running command after a timeout is working in shard server.  Still need
to roll off old results from map.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/6dc0a83b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/6dc0a83b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/6dc0a83b

Branch: refs/heads/master
Commit: 6dc0a83bff7eb284d323127e14429ab43b6d37ca
Parents: 74cdbb6
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Sep 3 09:06:50 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Sep 3 09:06:50 2014 -0400

----------------------------------------------------------------------
 .../manager/command/BaseCommandManager.java     |  5 ++
 .../blur/manager/command/CommandUtil.java       | 18 ++++-
 .../manager/command/ShardCommandManager.java    | 74 ++++++++++++++++----
 .../blur/manager/command/TimeoutException.java  | 32 +++++++++
 .../manager/command/cmds/WaitForSeconds.java    | 42 +++++++++++
 .../org/apache/blur/thrift/BlurShardServer.java | 17 ++++-
 .../blur/thrift/ThriftBlurShardServer.java      |  2 +-
 .../apache/blur/thrift/util/CommandExample.java | 41 +++++++----
 8 files changed, 202 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6dc0a83b/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
index e8ba098..71ee694 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
@@ -12,6 +12,7 @@ import org.apache.blur.manager.command.cmds.DocumentCount;
 import org.apache.blur.manager.command.cmds.DocumentCountCombiner;
 import org.apache.blur.manager.command.cmds.DocumentCountNoCombine;
 import org.apache.blur.manager.command.cmds.TestBlurObjectCommand;
+import org.apache.blur.manager.command.cmds.WaitForSeconds;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -35,18 +36,22 @@ public class BaseCommandManager implements Closeable {
   protected final ExecutorService _executorService;
   protected final Map<String, BaseCommand> _command = new ConcurrentHashMap<String,
BaseCommand>();
   protected final Map<Class<? extends BaseCommand>, String> _commandNameLookup
= new ConcurrentHashMap<Class<? extends BaseCommand>, String>();
+  protected final ExecutorService _executorServiceDriver;
 
   public BaseCommandManager(int threadCount) throws IOException {
     register(DocumentCount.class);
     register(DocumentCountNoCombine.class);
     register(DocumentCountCombiner.class);
     register(TestBlurObjectCommand.class);
+    register(WaitForSeconds.class);
     _executorService = Executors.newThreadPool("command-", threadCount);
+    _executorServiceDriver = Executors.newThreadPool("command-driver-", threadCount);
   }
 
   @Override
   public void close() throws IOException {
     _executorService.shutdownNow();
+    _executorServiceDriver.shutdownNow();
   }
 
   public void register(Class<? extends BaseCommand> commandClass) throws IOException
{

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6dc0a83b/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java b/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
index 725b3cc..3bd3309 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
@@ -80,6 +80,21 @@ public class CommandUtil {
     } else if (o instanceof Integer) {
       value.setIntValue((Integer) o);
       return value;
+    } else if (o instanceof Boolean) {
+      value.setBooleanValue((Boolean) o);
+      return value;
+    } else if (o instanceof Short) {
+      value.setShortValue((Short) o);
+      return value;
+    } else if (o instanceof byte[]) {
+      value.setBinaryValue((byte[]) o);
+      return value;
+    } else if (o instanceof Double) {
+      value.setDoubleValue((Double) o);
+      return value;
+    } else if (o instanceof Float) {
+      value.setFloatValue((Float) o);
+      return value;
     }
     throw new BException("Object [{0}] not supported.", o);
   }
@@ -129,7 +144,8 @@ public class CommandUtil {
   }
 
   @SuppressWarnings("unchecked")
-  public static <T> Map<Shard, T> fromThriftToObject(Map<org.apache.blur.thrift.generated.Shard,
ValueObject> shardToValue) {
+  public static <T> Map<Shard, T> fromThriftToObject(
+      Map<org.apache.blur.thrift.generated.Shard, ValueObject> shardToValue) {
     Map<Shard, T> result = new HashMap<Shard, T>();
     for (Entry<org.apache.blur.thrift.generated.Shard, ValueObject> e : shardToValue.entrySet())
{
       result.put(new Shard(e.getKey().getShard()), (T) CommandUtil.toObject(e.getValue()));

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6dc0a83b/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
index 1790b30..50b520b 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ShardCommandManager.java
@@ -20,9 +20,13 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.manager.IndexServer;
@@ -38,23 +42,64 @@ import org.apache.lucene.search.IndexSearcher;
 public class ShardCommandManager extends BaseCommandManager {
 
   private final IndexServer _indexServer;
+  private final ConcurrentHashMap<String, Future<Response>> _runningMap = new
ConcurrentHashMap<String, Future<Response>>();
+  private final long _connectionTimeout;
 
-  public ShardCommandManager(IndexServer indexServer, int threadCount) throws IOException
{
+  public ShardCommandManager(IndexServer indexServer, int threadCount, long connectionTimeout)
throws IOException {
     super(threadCount);
     _indexServer = indexServer;
+    _connectionTimeout = connectionTimeout / 2;
   }
 
-  public Response execute(TableContext tableContext, String commandName, Args args) throws
IOException {
-    BaseCommand command = getCommandObject(commandName);
-    if (command == null) {
-      throw new IOException("Command with name [" + commandName + "] not found.");
+  public Response reconnect(String executionId) throws IOException, TimeoutException {
+    Future<Response> future = _runningMap.get(executionId);
+    if (future == null) {
+      throw new IOException("Command id [" + executionId + "] did not find any executing
commands.");
     }
-    if (command instanceof IndexReadCommand || command instanceof IndexReadCombiningCommand)
{
-      return toResponse(executeReadCommand(command, tableContext, args), command);
-    } else if (command instanceof IndexWriteCommand) {
-      return toResponse(executeReadWriteCommand((IndexWriteCommand<?>) command, tableContext,
args), command);
+    try {
+      return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } catch (java.util.concurrent.TimeoutException e) {
+      throw new TimeoutException(executionId);
+    }
+  }
+
+  public Response execute(final TableContext tableContext, final String commandName, final
Args args)
+      throws IOException, TimeoutException {
+    final ShardServerContext shardServerContext = ShardServerContext.getShardServerContext();
+    String uuid = UUID.randomUUID().toString();
+    Future<Response> future = _executorServiceDriver.submit(new Callable<Response>()
{
+      @Override
+      public Response call() throws Exception {
+        BaseCommand command = getCommandObject(commandName);
+        if (command == null) {
+          throw new IOException("Command with name [" + commandName + "] not found.");
+        }
+        if (command instanceof IndexReadCommand || command instanceof IndexReadCombiningCommand)
{
+          return toResponse(executeReadCommand(shardServerContext, command, tableContext,
args), command);
+        } else if (command instanceof IndexWriteCommand) {
+          return toResponse(executeReadWriteCommand(shardServerContext, command, tableContext,
args), command);
+        }
+        throw new IOException("Command type of [" + command.getClass() + "] not supported.");
+      }
+    });
+    _runningMap.put(uuid, future);
+    try {
+      return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
+    } catch (CancellationException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } catch (java.util.concurrent.TimeoutException e) {
+      throw new TimeoutException(uuid);
     }
-    throw new IOException("Command type of [" + command.getClass() + "] not supported.");
   }
 
   @SuppressWarnings("unchecked")
@@ -67,15 +112,15 @@ public class ShardCommandManager extends BaseCommandManager {
     return Response.createNewShardResponse(results);
   }
 
-  private Map<Shard, Object> executeReadWriteCommand(IndexWriteCommand<?> command,
TableContext tableContext, Args args) {
+  private Map<Shard, Object> executeReadWriteCommand(ShardServerContext shardServerContext,
BaseCommand command,
+      TableContext tableContext, Args args) {
     return null;
   }
 
-  private Map<Shard, Object> executeReadCommand(BaseCommand command, final TableContext
tableContext, final Args args)
-      throws IOException {
+  private Map<Shard, Object> executeReadCommand(ShardServerContext shardServerContext,
BaseCommand command,
+      final TableContext tableContext, final Args args) throws IOException {
     Map<String, BlurIndex> indexes = _indexServer.getIndexes(tableContext.getTable());
     Map<String, Future<?>> futureMap = new HashMap<String, Future<?>>();
-    ShardServerContext shardServerContext = ShardServerContext.getShardServerContext();
     for (Entry<String, BlurIndex> e : indexes.entrySet()) {
       String shardId = e.getKey();
       final Shard shard = new Shard(shardId);
@@ -194,4 +239,5 @@ public class ShardCommandManager extends BaseCommandManager {
     }
 
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6dc0a83b/blur-core/src/main/java/org/apache/blur/manager/command/TimeoutException.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/TimeoutException.java
b/blur-core/src/main/java/org/apache/blur/manager/command/TimeoutException.java
new file mode 100644
index 0000000..9e44839
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/TimeoutException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.command;
+
+@SuppressWarnings("serial")
+public class TimeoutException extends Exception {
+
+  private final String _executionId;
+
+  public TimeoutException(String executionId) {
+    _executionId = executionId;
+  }
+
+  public String getExecutionId() {
+    return _executionId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6dc0a83b/blur-core/src/main/java/org/apache/blur/manager/command/cmds/WaitForSeconds.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/cmds/WaitForSeconds.java
b/blur-core/src/main/java/org/apache/blur/manager/command/cmds/WaitForSeconds.java
new file mode 100644
index 0000000..3ffde62
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/cmds/WaitForSeconds.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.command.cmds;
+
+import java.io.IOException;
+
+import org.apache.blur.manager.command.IndexContext;
+import org.apache.blur.manager.command.IndexReadCommand;
+
+@SuppressWarnings("serial")
+public class WaitForSeconds extends BaseCommand implements IndexReadCommand<Boolean>
{
+
+  @Override
+  public Boolean execute(IndexContext context) throws IOException {
+    try {
+      Thread.sleep(30000);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+    return true;
+  }
+
+  @Override
+  public String getName() {
+    return "wait";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6dc0a83b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index e54b53e..66159f0 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -597,6 +597,9 @@ public class BlurShardServer extends TableAdmin implements Iface {
       Response response = _commandManager.execute(getTableContext(table), commandName, CommandUtil.toArgs(arguments));
       return CommandUtil.fromObjectToThrift(response);
     } catch (Exception e) {
+      if (e instanceof org.apache.blur.manager.command.TimeoutException) {
+        throw new TimeoutException(((org.apache.blur.manager.command.TimeoutException) e).getExecutionId());
+      }
       LOG.error("Unknown error while trying to execute command [{0}] for table [{1}]", e,
commandName, table);
       if (e instanceof BlurException) {
         throw (BlurException) e;
@@ -616,7 +619,19 @@ public class BlurShardServer extends TableAdmin implements Iface {
   @Override
   public org.apache.blur.thrift.generated.Response reconnect(String executionId) throws BlurException,
       TimeoutException, TException {
-    throw new BException("Not implemented yet.");
+    try {
+      Response response = _commandManager.reconnect(executionId);
+      return CommandUtil.fromObjectToThrift(response);
+    } catch (Exception e) {
+      if (e instanceof org.apache.blur.manager.command.TimeoutException) {
+        throw new TimeoutException(((org.apache.blur.manager.command.TimeoutException) e).getExecutionId());
+      }
+      LOG.error("Unknown error while trying to reconnect to executing command [{0}]", e,
executionId);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6dc0a83b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index d516bb6..6cddf17 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -229,7 +229,7 @@ public class ThriftBlurShardServer extends ThriftServer {
         fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay,
facetThreadCount,
         deepPagingCache);
 
-    final ShardCommandManager commandManager = new ShardCommandManager(indexServer, 16);
+    final ShardCommandManager commandManager = new ShardCommandManager(indexServer, 16, Connection.DEFAULT_TIMEOUT);
 
     final BlurShardServer shardServer = new BlurShardServer();
     shardServer.setCommandManager(commandManager);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6dc0a83b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
index c19fb1f..7161ed0 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
@@ -24,22 +24,39 @@ import org.apache.blur.thrift.Connection;
 import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Response;
+import org.apache.blur.thrift.generated.TimeoutException;
 
 public class CommandExample {
 
   public static void main(String[] args) throws BlurException, TException, IOException {
-    Client client = BlurClientManager.getClientPool().getClient(new Connection("localhost:40010"));
-
-    System.out.println(client.execute("test", "docCount", null));
-    System.out.println(client.execute("test", "docCountNoCombine", null));
-    {
-      Response response = client.execute("test", "docCountAggregate", null);
-      long count = response.getValue().getValue().getLongValue();
-      System.out.println(count);
-    }
-    {
-      Response response = client.execute("test", "testBlurObject", null);
-      System.out.println(response);
+    Client client = BlurClientManager.getClientPool().getClient(new Connection("localhost:40020"));
+    String executionId = null;
+    while (true) {
+      try {
+        Response response;
+        if (executionId == null) {
+          response = client.execute("test", "wait", null);
+        } else {
+          System.out.println("Reconecting...");
+          response = client.reconnect(executionId);
+        }
+        System.out.println(response);
+        break;
+      } catch (TimeoutException ex) {
+        executionId = ex.getExecutionId();
+      }
     }
+
+    // System.out.println(client.execute("test", "docCount", null));
+    // System.out.println(client.execute("test", "docCountNoCombine", null));
+    // {
+    // Response response = client.execute("test", "docCountAggregate", null);
+    // long count = response.getValue().getValue().getLongValue();
+    // System.out.println(count);
+    // }
+    // {
+    // Response response = client.execute("test", "testBlurObject", null);
+    // System.out.println(response);
+    // }
   }
 }


Mime
View raw message