incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cr...@apache.org
Subject [33/45] git commit: Fixing the client and removing a weak map in the base command manager to allow for long running commands to timeout and then allow clients to reconnect to those running commands.
Date Sun, 26 Oct 2014 17:55:31 GMT
Fixing the client and removing a weak map in the base command manager to allow for long running
commands to timeout and then allow clients to reconnect to those running commands.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/08461e91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/08461e91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/08461e91

Branch: refs/heads/blur-384-random-port-cleanup
Commit: 08461e914c4108aaadd246c8a67c68ae4df4147c
Parents: 72dccb8
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Oct 14 21:45:22 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Oct 14 21:45:22 2014 -0400

----------------------------------------------------------------------
 .../apache/blur/command/BaseCommandManager.java | 86 ++++++--------------
 .../org/apache/blur/command/CommandRunner.java  | 18 +++-
 .../org/apache/blur/command/ResponseFuture.java | 72 ++++++++++++++++
 .../ReconnectWhileCommandIsRunningIntTests.java | 39 +++++++++
 4 files changed, 150 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/08461e91/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
index 7b73ab0..1f13e48 100644
--- a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
@@ -14,6 +14,7 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Timer;
@@ -71,7 +72,7 @@ public abstract class BaseCommandManager implements Closeable {
   protected final Map<String, BigInteger> _commandLoadTime = new ConcurrentHashMap<String,
BigInteger>();
   protected final Map<String, Command<?>> _command = new ConcurrentHashMap<String,
Command<?>>();
   protected final Map<Class<? extends Command<?>>, String> _commandNameLookup
= new ConcurrentHashMap<Class<? extends Command<?>>, String>();
-  protected final ConcurrentMap<ExecutionId, Future<Response>> _runningMap;
+  protected final ConcurrentMap<ExecutionId, ResponseFuture> _runningMap = new MapMaker().makeMap();
   protected final long _connectionTimeout;
   protected final String _tmpPath;
   protected final String _commandPath;
@@ -80,6 +81,7 @@ public abstract class BaseCommandManager implements Closeable {
   protected final Map<Path, BigInteger> _commandPathLastChange = new ConcurrentHashMap<Path,
BigInteger>();
   protected final Configuration _configuration;
   protected final BlurObjectSerDe _serDe = new BlurObjectSerDe();
+  protected final long _runningCacheTombstoneTime = TimeUnit.SECONDS.toMillis(60);
 
   public BaseCommandManager(String tmpPath, String commandPath, int workerThreadCount, int
driverThreadCount,
       long connectionTimeout, Configuration configuration) throws IOException {
@@ -90,18 +92,34 @@ public abstract class BaseCommandManager implements Closeable {
     _executorService = Executors.newThreadPool("command-worker-", workerThreadCount);
     _executorServiceDriver = Executors.newThreadPool("command-driver-", driverThreadCount);
     _connectionTimeout = connectionTimeout / 2;
-    _runningMap = new MapMaker().weakKeys().makeMap();
+    _timer = new Timer("BaseCommandManager-Timer", true);
+    _timer.schedule(getTimerTaskForRemovalOfOldCommands(), _pollingPeriod, _pollingPeriod);
     if (_tmpPath == null || _commandPath == null) {
-      _timer = null;
       LOG.info("Tmp Path [{0}] or Command Path [{1}] is null so the automatic command reload
will be disabled.",
           _tmpPath, _commandPath);
     } else {
       loadNewCommandsFromCommandPath();
-      _timer = new Timer("Command-Loader", true);
       _timer.schedule(getNewCommandTimerTask(), _pollingPeriod, _pollingPeriod);
     }
   }
 
+  private TimerTask getTimerTaskForRemovalOfOldCommands() {
+    return new TimerTask() {
+      @Override
+      public void run() {
+        Set<Entry<ExecutionId, ResponseFuture>> entrySet = _runningMap.entrySet();
+        for (Entry<ExecutionId, ResponseFuture> e : entrySet) {
+          ExecutionId executionId = e.getKey();
+          ResponseFuture responseFuture = e.getValue();
+          if (!responseFuture.isRunning() && responseFuture.hasExpired()) {
+            LOG.info("Removing old execution id [{0}]", executionId);
+            _runningMap.remove(executionId);
+          }
+        }
+      }
+    };
+  }
+
   public Map<String, BigInteger> getCommands() {
     return new HashMap<String, BigInteger>(_commandLoadTime);
   }
@@ -116,64 +134,6 @@ public abstract class BaseCommandManager implements Closeable {
     return command.getOptionalArguments();
   }
 
-  protected Map<String, String> getArguments(String commandName, boolean optional)
{
-
-    // @RequiredArguments({ @Argument(name = "table", value =
-    // "The name of the table to execute the document count command.", type =
-    // String.class) })
-    // @OptionalArguments({ @Argument(name = "shard", value =
-    // "The shard id to execute the document count command.", type =
-    // String.class) })
-
-    throw new RuntimeException("not implemented");
-    // Command<?> command = getCommandObject(commandName);
-    // if (command == null) {
-    // return null;
-    // }
-    // Class<Command<?>> clazz = (Class<Command<?>>) command.getClass();
-    // Map<String, String> arguments = new TreeMap<String, String>();
-    // Argument[] args = getArgumentArray(clazz, optional);
-    // addArguments(arguments, args);
-    // if (optional) {
-    // if (!(command instanceof ShardRoute)) {
-    // Argument[] argumentArray = getArgumentArray(Command.class, optional);
-    // addArguments(arguments, argumentArray);
-    // }
-    // } else {
-    // if (!(command instanceof TableRoute)) {
-    // Argument[] argumentArray = getArgumentArray(Command.class, optional);
-    // addArguments(arguments, argumentArray);
-    // }
-    // }
-    // return arguments;
-  }
-
-  // private void addArguments(Map<String, String> arguments, Argument[] args) {
-  // if (args != null) {
-  // for (Argument argument : args) {
-  // Class<?> type = argument.type();
-  // arguments.put(argument.name(), ("(" + type.getSimpleName() + ") " +
-  // argument.value()).trim());
-  // }
-  // }
-  // }
-  //
-  // protected Argument[] getArgumentArray(Class<?> clazz, boolean optional) {
-  // if (optional) {
-  // OptionalArguments arguments = clazz.getAnnotation(OptionalArguments.class);
-  // if (arguments == null) {
-  // return null;
-  // }
-  // return arguments.value();
-  // } else {
-  // RequiredArguments arguments = clazz.getAnnotation(RequiredArguments.class);
-  // if (arguments == null) {
-  // return null;
-  // }
-  // return arguments.value();
-  // }
-  // }
-
   protected TimerTask getNewCommandTimerTask() {
     return new TimerTask() {
       @Override
@@ -336,7 +296,7 @@ public abstract class BaseCommandManager implements Closeable {
     Future<Response> future = _executorServiceDriver.submit(executionContext.wrapCallable(callable));
     executionContext.registerDriverFuture(future);
     ExecutionId executionId = executionContext.getExecutionId();
-    _runningMap.put(executionId, future);
+    _runningMap.put(executionId, new ResponseFuture(_runningCacheTombstoneTime, future));
     try {
       return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
     } catch (CancellationException e) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/08461e91/blur-core/src/main/java/org/apache/blur/command/CommandRunner.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/CommandRunner.java b/blur-core/src/main/java/org/apache/blur/command/CommandRunner.java
index b281f05..9818bc8 100644
--- a/blur-core/src/main/java/org/apache/blur/command/CommandRunner.java
+++ b/blur-core/src/main/java/org/apache/blur/command/CommandRunner.java
@@ -193,15 +193,29 @@ public class CommandRunner {
       IOException, BlurException, TimeoutException, TException {
     List<Connection> connections = new ArrayList<Connection>(Arrays.asList(connectionsArray));
     Collections.shuffle(connections);
+    BlurObjectSerDe serde = new BlurObjectSerDe();
     for (Connection connection : connections) {
       if (BlurClientManager.isBadConnection(connection)) {
         continue;
       }
       ClientPool clientPool = BlurClientManager.getClientPool();
       Client client = clientPool.getClient(connection);
+
       try {
-        BlurObjectSerDe serde = new BlurObjectSerDe();
-        Response response = client.execute(command.getName(), CommandUtil.toArguments(command,
serde));
+        String executionId = null;
+        Response response;
+        INNER: while (true) {
+          try {
+            if (executionId == null) {
+              response = client.execute(command.getName(), CommandUtil.toArguments(command,
serde));
+            } else {
+              response = client.reconnect(executionId);
+            }
+            break INNER;
+          } catch (TimeoutException te) {
+            executionId = te.getExecutionId();
+          }
+        }
         Object thriftObject = CommandUtil.fromThriftResponseToObject(response);
         return serde.fromSupportedThriftObject(thriftObject);
       } finally {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/08461e91/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
new file mode 100644
index 0000000..ca75b3e
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ResponseFuture implements Future<Response> {
+
+  private final Future<Response> _future;
+  private final AtomicLong _timeWhenNotRunningObserved = new AtomicLong();
+  private final long _tombstone;
+
+  public ResponseFuture(long tombstone, Future<Response> future) {
+    _tombstone = tombstone;
+    _future = future;
+  }
+
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    return _future.cancel(mayInterruptIfRunning);
+  }
+
+  public boolean isCancelled() {
+    return _future.isCancelled();
+  }
+
+  public boolean isDone() {
+    return _future.isDone();
+  }
+
+  public Response get() throws InterruptedException, ExecutionException {
+    return _future.get();
+  }
+
+  public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
+    return _future.get(timeout, unit);
+  }
+
+  public boolean isRunning() {
+    if (isDone() || isCancelled()) {
+      _timeWhenNotRunningObserved.compareAndSet(0, System.currentTimeMillis());
+      return false;
+    }
+    return true;
+  }
+
+  public boolean hasExpired() {
+    long timeWhenNotRunningObserved = _timeWhenNotRunningObserved.get();
+    if (timeWhenNotRunningObserved + _tombstone < System.currentTimeMillis()) {
+      return true;
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/08461e91/blur-core/src/test/java/org/apache/blur/command/ReconnectWhileCommandIsRunningIntTests.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/command/ReconnectWhileCommandIsRunningIntTests.java
b/blur-core/src/test/java/org/apache/blur/command/ReconnectWhileCommandIsRunningIntTests.java
new file mode 100644
index 0000000..a38c5ab
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/command/ReconnectWhileCommandIsRunningIntTests.java
@@ -0,0 +1,39 @@
+package org.apache.blur.command;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BaseClusterTest;
+import org.apache.blur.thrift.TableGen;
+import org.apache.blur.thrift.generated.BlurException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ReconnectWhileCommandIsRunningIntTests extends BaseClusterTest {
+
+//  @Test  Slow test
+  public void testWaitForSecondsIntTest() throws BlurException, TException, IOException,
InterruptedException {
+    final String tableName = "testWaitForSecondsIntTest";
+    TableGen.define(tableName).cols("test", "col1").addRows(100, 20, "r1", "rec-###", "value").build(getClient());
+    WaitForSeconds command = new WaitForSeconds();
+    command.setTable(tableName);
+    assertNotNull(command.run(getClient()));
+  }
+}


Mime
View raw message