incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Moving the execution logic to the server from the command itself.
Date Fri, 25 Jul 2014 13:39:03 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/blur-platform d96892e4f -> 067b8d4fb


Moving the execution logic to the server from the command itself.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/067b8d4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/067b8d4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/067b8d4f

Branch: refs/heads/blur-platform
Commit: 067b8d4fbc74828145a24b5f742f7df93fc250ed
Parents: d96892e
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Jul 25 09:38:49 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Jul 25 09:38:49 2014 -0400

----------------------------------------------------------------------
 .../apache/blur/server/platform/Command.java    | 58 ++++----------------
 .../server/platform/CommandShardServer.java     | 45 ++++++++++++++-
 2 files changed, 52 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/067b8d4f/blur-core/src/main/java/org/apache/blur/server/platform/Command.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/platform/Command.java b/blur-core/src/main/java/org/apache/blur/server/platform/Command.java
index b1031b2..3da463e 100644
--- a/blur-core/src/main/java/org/apache/blur/server/platform/Command.java
+++ b/blur-core/src/main/java/org/apache/blur/server/platform/Command.java
@@ -18,22 +18,15 @@ package org.apache.blur.server.platform;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 import org.apache.blur.manager.writer.BlurIndex;
 
 public abstract class Command<T1, T2> implements Serializable {
 
-  private static final long serialVersionUID = 8496197672871317639L;
+  private static final long serialVersionUID = 1L;
 
-  private ExecutorService _executorService;
   private Object[] _args;
 
   public Object[] getArgs() {
@@ -44,50 +37,19 @@ public abstract class Command<T1, T2> implements Serializable {
     _args = args;
   }
 
-  public void setExecutorService(ExecutorService executorService) {
-    _executorService = executorService;
-  }
-
-  public T2 process(Map<String, Map<String, BlurIndex>> indexes) throws CommandException,
IOException {
-    List<Future<T1>> futures = new ArrayList<Future<T1>>();
-    for (Entry<String, Map<String, BlurIndex>> tableEntry : indexes.entrySet())
{
-      Map<String, BlurIndex> shards = tableEntry.getValue();
-      for (Entry<String, BlurIndex> shardEntry : shards.entrySet()) {
-        final BlurIndex blurIndex = shardEntry.getValue();
-        futures.add(_executorService.submit(new Callable<T1>() {
-          @Override
-          public T1 call() throws Exception {
-            return Command.this.call(blurIndex);
-          }
-        }));
-      }
-    }
-
-    CommandException commandException = new CommandException();
-    boolean error = false;
-    List<T1> results = new ArrayList<T1>();
-    for (Future<T1> future : futures) {
-      try {
-        results.add(future.get());
-      } catch (InterruptedException e) {
-        commandException.addSuppressed(e);
-        error = true;
-      } catch (ExecutionException e) {
-        commandException.addSuppressed(e.getCause());
-        error = true;
-      }
-    }
-    if (error) {
-      throw commandException;
-    }
-    return mergeIntermediate(results);
-
-  }
-
   public abstract T2 mergeFinal(List<T2> results) throws IOException;
 
   public abstract T2 mergeIntermediate(List<T1> results) throws IOException;
 
   public abstract T1 call(BlurIndex blurIndex) throws IOException;
 
+  public Callable<T1> createCallable(final BlurIndex blurIndex) {
+    return new Callable<T1>() {
+      @Override
+      public T1 call() throws Exception {
+        return Command.this.call(blurIndex);
+      }
+    };
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/067b8d4f/blur-core/src/main/java/org/apache/blur/server/platform/CommandShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/platform/CommandShardServer.java
b/blur-core/src/main/java/org/apache/blur/server/platform/CommandShardServer.java
index 24524f7..df33745 100644
--- a/blur-core/src/main/java/org/apache/blur/server/platform/CommandShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/platform/CommandShardServer.java
@@ -19,11 +19,17 @@ package org.apache.blur.server.platform;
 import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.writer.BlurIndex;
@@ -52,8 +58,40 @@ public class CommandShardServer implements Closeable {
   public <T1, T2> T2 execute(Set<String> tables, Command<T1, T2> command,
Set<String> tablesToExecute, Object... args)
       throws CommandException, IOException {
     command.setArgs(args);
-    command.setExecutorService(_executorService);
-    return command.process(getBlurIndexes(tables, tablesToExecute));
+    return execute(command, getBlurIndexes(tables, tablesToExecute));
+  }
+
+  private <T1, T2> T2 execute(Command<T1, T2> command, Map<String, Map<String,
BlurIndex>> indexes) throws IOException,
+      CommandException {
+    List<Future<T1>> futures = new ArrayList<Future<T1>>();
+    for (Entry<String, Map<String, BlurIndex>> tableEntry : indexes.entrySet())
{
+      Map<String, BlurIndex> shards = tableEntry.getValue();
+      for (Entry<String, BlurIndex> shardEntry : shards.entrySet()) {
+        final BlurIndex blurIndex = shardEntry.getValue();
+        Callable<T1> callable = command.createCallable(blurIndex);
+        futures.add(_executorService.submit(callable));
+      }
+    }
+
+    CommandException commandException = new CommandException();
+    boolean error = false;
+    List<T1> results = new ArrayList<T1>();
+    for (Future<T1> future : futures) {
+      try {
+        results.add(future.get());
+      } catch (InterruptedException e) {
+        commandException.addSuppressed(e);
+        error = true;
+      } catch (ExecutionException e) {
+        commandException.addSuppressed(e.getCause());
+        error = true;
+      }
+    }
+    if (error) {
+      throw commandException;
+    }
+    return command.mergeIntermediate(results);
+
   }
 
   private Map<String, Map<String, BlurIndex>> getBlurIndexes(Set<String>
tables, Set<String> tablesToExecute)
@@ -81,7 +119,8 @@ public class CommandShardServer implements Closeable {
     return blurCommandResponse;
   }
 
-  public AdHocByteCodeCommandResponse execute(Set<String> tables, AdHocByteCodeCommandRequest
commandRequest) throws BlurException, IOException, CommandException {
+  public AdHocByteCodeCommandResponse execute(Set<String> tables, AdHocByteCodeCommandRequest
commandRequest)
+      throws BlurException, IOException, CommandException {
     // @TODO handle libraries
     Set<String> tablesToInvoke = commandRequest.getTablesToInvoke();
     Map<String, ByteBuffer> classData = commandRequest.getClassData();


Mime
View raw message