incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: The new Adhoc command is working though there are a few things hard coded that need to be pulled into the API.
Date Thu, 24 Jul 2014 20:46:41 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/blur-platform 486d37b52 -> 753ab411c


The new Adhoc command is working though there are a few things hard coded that need to be
pulled into the API.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/753ab411
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/753ab411
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/753ab411

Branch: refs/heads/blur-platform
Commit: 753ab411c3b3d1fc432b3de9335040274e2326a8
Parents: 486d37b
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jul 24 16:46:23 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jul 24 16:46:23 2014 -0400

----------------------------------------------------------------------
 .../apache/blur/server/platform/Command.java    | 32 +++----
 .../blur/server/platform/CommandClient.java     | 64 +++++++++++++-
 .../server/platform/CommandClientExample.java   | 54 ------------
 .../platform/CommandControllerServer.java       | 89 ++++++++++++++++++++
 .../server/platform/CommandShardServer.java     | 43 ++++------
 .../blur/server/platform/CommandUtils.java      | 29 +++++++
 .../blur/server/platform/TableShardKey.java     | 38 ---------
 .../blur/thrift/BlurControllerServer.java       | 42 ++++++++-
 .../org/apache/blur/thrift/BlurShardServer.java |  7 +-
 .../blur/thrift/ThriftBlurShardServer.java      |  6 +-
 .../java/org/apache/blur/utils/BlurUtil.java    | 10 ++-
 11 files changed, 268 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/server/platform/Command.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/platform/Command.java b/blur-core/src/main/java/org/apache/blur/server/platform/Command.java
index 2d97e0e..b1031b2 100644
--- a/blur-core/src/main/java/org/apache/blur/server/platform/Command.java
+++ b/blur-core/src/main/java/org/apache/blur/server/platform/Command.java
@@ -17,8 +17,8 @@
 package org.apache.blur.server.platform;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -29,7 +29,9 @@ import java.util.concurrent.Future;
 
 import org.apache.blur.manager.writer.BlurIndex;
 
-public abstract class Command<T1, T2> {
+public abstract class Command<T1, T2> implements Serializable {
+
+  private static final long serialVersionUID = 8496197672871317639L;
 
   private ExecutorService _executorService;
   private Object[] _args;
@@ -47,18 +49,15 @@ public abstract class Command<T1, T2> {
   }
 
   public T2 process(Map<String, Map<String, BlurIndex>> indexes) throws CommandException,
IOException {
-    List<Future<Map<TableShardKey, T1>>> futures = new ArrayList<Future<Map<TableShardKey,
T1>>>();
+    List<Future<T1>> futures = new ArrayList<Future<T1>>();
     for (Entry<String, Map<String, BlurIndex>> tableEntry : indexes.entrySet())
{
-      String table = tableEntry.getKey();
       Map<String, BlurIndex> shards = tableEntry.getValue();
       for (Entry<String, BlurIndex> shardEntry : shards.entrySet()) {
-        String shard = shardEntry.getKey();
         final BlurIndex blurIndex = shardEntry.getValue();
-        final TableShardKey tableShardKey = new TableShardKey(table, shard);
-        futures.add(_executorService.submit(new Callable<Map<TableShardKey, T1>>()
{
+        futures.add(_executorService.submit(new Callable<T1>() {
           @Override
-          public Map<TableShardKey, T1> call() throws Exception {
-            return processShard(tableShardKey, blurIndex);
+          public T1 call() throws Exception {
+            return Command.this.call(blurIndex);
           }
         }));
       }
@@ -66,10 +65,10 @@ public abstract class Command<T1, T2> {
 
     CommandException commandException = new CommandException();
     boolean error = false;
-    Map<TableShardKey, T1> results = new HashMap<TableShardKey, T1>();
-    for (Future<Map<TableShardKey, T1>> future : futures) {
+    List<T1> results = new ArrayList<T1>();
+    for (Future<T1> future : futures) {
       try {
-        results.putAll(future.get());
+        results.add(future.get());
       } catch (InterruptedException e) {
         commandException.addSuppressed(e);
         error = true;
@@ -81,13 +80,14 @@ public abstract class Command<T1, T2> {
     if (error) {
       throw commandException;
     }
-    return merge(results);
+    return mergeIntermediate(results);
 
   }
 
-  public abstract T2 merge(Map<TableShardKey, T1> results) throws IOException;
+  public abstract T2 mergeFinal(List<T2> results) throws IOException;
+
+  public abstract T2 mergeIntermediate(List<T1> results) throws IOException;
 
-  public abstract Map<TableShardKey, T1> processShard(TableShardKey tableShardKey,
BlurIndex blurIndex)
-      throws IOException;
+  public abstract T1 call(BlurIndex blurIndex) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/server/platform/CommandClient.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/platform/CommandClient.java b/blur-core/src/main/java/org/apache/blur/server/platform/CommandClient.java
index 69a48a9..fd049ad 100644
--- a/blur-core/src/main/java/org/apache/blur/server/platform/CommandClient.java
+++ b/blur-core/src/main/java/org/apache/blur/server/platform/CommandClient.java
@@ -16,14 +16,22 @@
  */
 package org.apache.blur.server.platform;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.generated.AdhocByteCodeCommandRequest;
 import org.apache.blur.thrift.generated.AdhocByteCodeCommandResponse;
@@ -32,9 +40,12 @@ import org.apache.blur.thrift.generated.BlurCommandRequest;
 import org.apache.blur.thrift.generated.BlurCommandResponse;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Value;
+import org.apache.commons.io.IOUtils;
 
 public class CommandClient {
 
+  private static final Log LOG = LogFactory.getLog(CommandClient.class);
+
   private final Iface _client;
 
   public CommandClient(Iface client) {
@@ -61,7 +72,7 @@ public class CommandClient {
 
   private void packCommandAndClasses(AdhocByteCodeCommandRequest request, Object[] args,
Object command)
       throws IOException {
-    request.setClassData(getClassData(getClass()));
+    request.setClassData(getClassData(command.getClass()));
     request.setInstanceData(CommandUtils.toBytesViaSerialization(command));
     request.setArguments(getArgs(args));
   }
@@ -78,10 +89,55 @@ public class CommandClient {
     return values;
   }
 
-  private Map<String, ByteBuffer> getClassData(Class<? extends CommandClient>
clazz) {
+  private Map<String, ByteBuffer> getClassData(Class<?> clazz) throws IOException
{
     String name = clazz.getName();
-    NOT IMPL
-    return null;
+    String r = "/" + name.replace('.', '/') + ".class";
+    URL url = getClass().getResource(r);
+    Map<String, ByteBuffer> map = new HashMap<String, ByteBuffer>();
+    packClassInfo(url, r, map);
+    return map;
+  }
+
+  private void packClassInfo(URL url, String baseName, Map<String, ByteBuffer> map)
throws IOException {
+    try {
+      URI uri = url.toURI();
+      String path = uri.toString();
+      LOG.info("Using path [{0}] to find class data to pack.", path);
+      if (path.endsWith(baseName)) {
+        String root = path.replace(baseName, "");
+        URI rootUri = new URI(root);
+        if (rootUri.getScheme().equals("file")) {
+          File file = new File(rootUri);
+          pack(rootUri, root, file, map);
+          return;
+        }
+      }
+      throw new IOException("Something bad has happened url [" + url + "] baseName [" + baseName
+ "]");
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+
   }
 
+  private void pack(URI rootUri, String root, File file, Map<String, ByteBuffer> map)
throws IOException {
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        pack(rootUri, root, f, map);
+      }
+    } else {
+      URI uri = file.toURI();
+      String classUri = uri.toString();
+      String classResourcePath = classUri.replace(root, "");
+      InputStream inputStream = getClass().getResourceAsStream(classResourcePath);
+      byte[] classData = IOUtils.toByteArray(inputStream);
+      inputStream.close();
+      String className = getClassName(classResourcePath);
+      LOG.info("Packing class [{0}] at uri [{1}].", className, classUri);
+      map.put(className, ByteBuffer.wrap(classData));
+    }
+  }
+
+  private String getClassName(String classResourcePath) {
+    return classResourcePath.substring(1).replace(".class", "").replace('/', '.');
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/server/platform/CommandClientExample.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/platform/CommandClientExample.java
b/blur-core/src/main/java/org/apache/blur/server/platform/CommandClientExample.java
deleted file mode 100644
index 7714060..0000000
--- a/blur-core/src/main/java/org/apache/blur/server/platform/CommandClientExample.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.server.platform;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-
-public class CommandClientExample {
-
-  public static void main(String[] args) throws BlurException, TException, IOException {
-    Iface client = BlurClient.getClient("localhost:40020");
-    CommandClient commandClient = new CommandClient(client);
-
-    String str = commandClient.execute("test", new Command<String, String>() {
-      @Override
-      public Map<TableShardKey, String> processShard(TableShardKey tableShardKey, BlurIndex
blurIndex)
-          throws IOException {
-        Map<TableShardKey, String> result = new HashMap<TableShardKey, String>();
-        result.put(tableShardKey, "hi");
-        return result;
-      }
-
-      @Override
-      public String merge(Map<TableShardKey, String> results) throws IOException {
-        return "hi";
-      }
-
-    });
-    
-    System.out.println(str);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/server/platform/CommandControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/platform/CommandControllerServer.java
b/blur-core/src/main/java/org/apache/blur/server/platform/CommandControllerServer.java
new file mode 100644
index 0000000..9e8d699
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/server/platform/CommandControllerServer.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.server.platform;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.blur.thrift.BException;
+import org.apache.blur.thrift.generated.AdhocByteCodeCommandRequest;
+import org.apache.blur.thrift.generated.AdhocByteCodeCommandResponse;
+import org.apache.blur.thrift.generated.BlurCommandRequest;
+import org.apache.blur.thrift.generated.BlurCommandResponse;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Value;
+
+public class CommandControllerServer implements Closeable {
+
+  @Override
+  public void close() throws IOException {
+    
+  }
+
+  public BlurCommandResponse merge(BlurCommandRequest request, List<BlurCommandResponse>
responses)
+      throws BlurException, IOException {
+    Object fieldValue = request.getFieldValue();
+    if (fieldValue instanceof AdhocByteCodeCommandRequest) {
+      AdhocByteCodeCommandRequest commandRequest = request.getAdhocByteCodeCommandRequest();
+      AdhocByteCodeCommandResponse commandResponse = merge(commandRequest, toAdhocByteCodeCommandResponses(responses));
+      BlurCommandResponse response = new BlurCommandResponse();
+      response.setAdhocByteCodeCommandResponse(commandResponse);
+      return response;
+    } else {
+      throw new BException("Not implemented.");
+    }
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private AdhocByteCodeCommandResponse merge(AdhocByteCodeCommandRequest commandRequest,
+      List<AdhocByteCodeCommandResponse> adhocByteCodeCommandResponses) throws BlurException,
IOException {
+    Map<String, ByteBuffer> classData = commandRequest.getClassData();
+    ClassLoader classLoader = CommandUtils.getClassLoader(classData);
+    Object[] args = CommandUtils.getArgs(classLoader, commandRequest.getArguments());
+    Command<?, ?> command = CommandUtils.toObjectViaSerialization(classLoader, commandRequest.getInstanceData());
+    command.setArgs(args);
+    List<?> results = getResults(classLoader, adhocByteCodeCommandResponses);
+    Object r = command.mergeFinal((List) results);
+    Value value = CommandUtils.toValue(r);
+    AdhocByteCodeCommandResponse adhocByteCodeCommandResponse = new AdhocByteCodeCommandResponse();
+    adhocByteCodeCommandResponse.setResult(value);
+    return adhocByteCodeCommandResponse;
+  }
+
+  private List<?> getResults(ClassLoader classLoader, List<AdhocByteCodeCommandResponse>
adhocByteCodeCommandResponses)
+      throws BlurException, IOException {
+    List<Object> result = new ArrayList<Object>();
+    for (AdhocByteCodeCommandResponse response : adhocByteCodeCommandResponses) {
+      Object object = CommandUtils.toObject(classLoader, response.getResult());
+      result.add(object);
+    }
+    return result;
+  }
+
+  private List<AdhocByteCodeCommandResponse> toAdhocByteCodeCommandResponses(List<BlurCommandResponse>
responses) {
+    List<AdhocByteCodeCommandResponse> result = new ArrayList<AdhocByteCodeCommandResponse>();
+    for (BlurCommandResponse r : responses) {
+      result.add(r.getAdhocByteCodeCommandResponse());
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/server/platform/CommandShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/platform/CommandShardServer.java
b/blur-core/src/main/java/org/apache/blur/server/platform/CommandShardServer.java
index 70643cc..49b3427 100644
--- a/blur-core/src/main/java/org/apache/blur/server/platform/CommandShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/platform/CommandShardServer.java
@@ -16,18 +16,18 @@
  */
 package org.apache.blur.server.platform;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.thirdparty.thrift_0_9_0.TBaseHelper;
 import org.apache.blur.thrift.BException;
 import org.apache.blur.thrift.generated.AdhocByteCodeCommandRequest;
 import org.apache.blur.thrift.generated.AdhocByteCodeCommandResponse;
@@ -35,9 +35,8 @@ import org.apache.blur.thrift.generated.BlurCommandRequest;
 import org.apache.blur.thrift.generated.BlurCommandResponse;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Value;
-import org.codehaus.janino.ByteArrayClassLoader;
 
-public class CommandShardServer {
+public class CommandShardServer implements Closeable {
 
   private final IndexServer _indexServer;
   private final ExecutorService _executorService;
@@ -47,6 +46,10 @@ public class CommandShardServer {
     _executorService = executorService;
   }
 
+  public CommandShardServer(IndexServer indexServer) {
+    this(indexServer, Executors.newCachedThreadPool());
+  }
+
   public <T1, T2> T2 execute(Set<String> tables, Command<T1, T2> command,
Set<String> tablesToExecute, Object... args)
       throws CommandException, IOException {
     command.setArgs(args);
@@ -67,7 +70,8 @@ public class CommandShardServer {
   public BlurCommandResponse execute(Set<String> tables, BlurCommandRequest request)
throws BlurException, IOException,
       CommandException {
     // @TODO deal with different command types.
-    Set<String> tablesToInvoke = request.getTablesToInvoke();
+    Set<String> tablesToInvoke = new HashSet<String>();
+    tablesToInvoke.add("test");
     Object fieldValue = request.getFieldValue();
     BlurCommandResponse blurCommandResponse = new BlurCommandResponse();
     if (fieldValue instanceof AdhocByteCodeCommandRequest) {
@@ -85,8 +89,8 @@ public class CommandShardServer {
     // @TODO handle libraries
 
     Map<String, ByteBuffer> classData = commandRequest.getClassData();
-    ClassLoader classLoader = getClassLoader(classData);
-    Object[] args = getArgs(classLoader, commandRequest.getArguments());
+    ClassLoader classLoader = CommandUtils.getClassLoader(classData);
+    Object[] args = CommandUtils.getArgs(classLoader, commandRequest.getArguments());
     Command<?, ?> command = CommandUtils.toObjectViaSerialization(classLoader, commandRequest.getInstanceData());
     Object object = execute(tables, command, tablesToInvoke, args);
     Value value = CommandUtils.toValue(object);
@@ -96,26 +100,9 @@ public class CommandShardServer {
     return adhocByteCodeCommandResponse;
   }
 
-  private ClassLoader getClassLoader(Map<String, ByteBuffer> classData) {
-    Map<String, byte[]> classDataMap = getClassDataMap(classData);
-    return new ByteArrayClassLoader(classDataMap);
-  }
-
-  private Map<String, byte[]> getClassDataMap(Map<String, ByteBuffer> classData)
{
-    Map<String, byte[]> map = new HashMap<String, byte[]>();
-    for (Entry<String, ByteBuffer> e : classData.entrySet()) {
-      map.put(e.getKey(), TBaseHelper.byteBufferToByteArray(e.getValue()));
-    }
-    return map;
-  }
-
-  private Object[] getArgs(ClassLoader classLoader, List<Value> arguments) throws BlurException,
IOException {
-    Object[] args = new Object[arguments.size()];
-    int i = 0;
-    for (Value argument : arguments) {
-      args[i++] = CommandUtils.toObject(classLoader, argument);
-    }
-    return args;
+  @Override
+  public void close() throws IOException {
+    _executorService.shutdownNow();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/server/platform/CommandUtils.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/platform/CommandUtils.java b/blur-core/src/main/java/org/apache/blur/server/platform/CommandUtils.java
index 43c86e0..369c075 100644
--- a/blur-core/src/main/java/org/apache/blur/server/platform/CommandUtils.java
+++ b/blur-core/src/main/java/org/apache/blur/server/platform/CommandUtils.java
@@ -23,11 +23,18 @@ import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.ObjectStreamClass;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
+import org.apache.blur.thirdparty.thrift_0_9_0.TBaseHelper;
 import org.apache.blur.thrift.BException;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Value;
 import org.apache.blur.thrift.generated.ValueType;
+import org.codehaus.janino.ByteArrayClassLoader;
 
 public class CommandUtils {
 
@@ -58,6 +65,28 @@ public class CommandUtils {
     }
   }
 
+  public static Object[] getArgs(ClassLoader classLoader, List<Value> arguments) throws
BlurException, IOException {
+    Object[] args = new Object[arguments.size()];
+    int i = 0;
+    for (Value argument : arguments) {
+      args[i++] = CommandUtils.toObject(classLoader, argument);
+    }
+    return args;
+  }
+
+  public static ClassLoader getClassLoader(Map<String, ByteBuffer> classData) {
+    Map<String, byte[]> classDataMap = getClassDataMap(classData);
+    return new ByteArrayClassLoader(classDataMap);
+  }
+
+  public static Map<String, byte[]> getClassDataMap(Map<String, ByteBuffer> classData)
{
+    Map<String, byte[]> map = new HashMap<String, byte[]>();
+    for (Entry<String, ByteBuffer> e : classData.entrySet()) {
+      map.put(e.getKey(), TBaseHelper.byteBufferToByteArray(e.getValue()));
+    }
+    return map;
+  }
+
   public static <T> T toObject(ClassLoader classLoader, Value value) throws BlurException,
IOException {
     ValueType type = value.getType();
     switch (type) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/server/platform/TableShardKey.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/platform/TableShardKey.java b/blur-core/src/main/java/org/apache/blur/server/platform/TableShardKey.java
deleted file mode 100644
index 2619633..0000000
--- a/blur-core/src/main/java/org/apache/blur/server/platform/TableShardKey.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.server.platform;
-
-public class TableShardKey {
-
-  private final String _table;
-
-  private final String _shard;
-
-  public TableShardKey(String table, String shard) {
-    _table = table;
-    _shard = shard;
-  }
-
-  public String getTable() {
-    return _table;
-  }
-
-  public String getShard() {
-    return _shard;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index acc4dfe..4438035 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -65,6 +65,7 @@ import org.apache.blur.manager.stats.MergerTableStats;
 import org.apache.blur.manager.status.MergerQueryStatusSingle;
 import org.apache.blur.server.ControllerServerContext;
 import org.apache.blur.server.TableContext;
+import org.apache.blur.server.platform.CommandControllerServer;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport;
@@ -194,6 +195,8 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
   private Timer _tableContextWarmupTimer;
   private long _tableLayoutTimeoutNanos = TimeUnit.SECONDS.toNanos(30);
 
+  private CommandControllerServer _commandControllerServer = new CommandControllerServer();
+
   public void init() throws KeeperException, InterruptedException {
     setupZookeeper();
     registerMyself();
@@ -1443,6 +1446,10 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     _client = client;
   }
 
+  public void setCommandControllerServer(CommandControllerServer commandControllerServer)
{
+    _commandControllerServer = commandControllerServer;
+  }
+
   @Override
   public void optimize(final String table, final int numberOfSegmentsPerShard) throws BlurException,
TException {
     checkTable(table);
@@ -1510,8 +1517,39 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
   }
 
   @Override
-  public BlurCommandResponse execute(BlurCommandRequest request) throws BlurException, TException
{
-    throw new BException("Not implemented.");
+  public BlurCommandResponse execute(final BlurCommandRequest request) throws BlurException,
TException {
+    try {
+      // @TODO pass cluster
+      return scatterGather("default", new BlurCommand<BlurCommandResponse>() {
+        @Override
+        public BlurCommandResponse call(Client client) throws BlurException, TException {
+          return client.execute(request);
+        }
+      }, new Merger<BlurCommandResponse>() {
+        @Override
+        public BlurCommandResponse merge(BlurExecutorCompletionService<BlurCommandResponse>
service)
+            throws BlurException {
+          List<BlurCommandResponse> results = new ArrayList<BlurCommandResponse>();
+          while (service.getRemainingCount() > 0) {
+            Future<BlurCommandResponse> future = service.poll(_defaultParallelCallTimeout,
TimeUnit.MILLISECONDS, true);
+            if (future != null) {
+              results.add(service.getResultThrowException(future));
+            }
+          }
+          try {
+            return _commandControllerServer.merge(request, results);
+          } catch (IOException e) {
+            throw new BException("Unknown ioexception", e);
+          }
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to run execute", e);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException("Unknown error while trying to run execute", e);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index e6dae12..136d4a7 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -596,13 +596,18 @@ public class BlurShardServer extends TableAdmin implements Iface {
 
   @Override
   public BlurCommandResponse execute(BlurCommandRequest request) throws BlurException, TException
{
-    List<String> tableList = tableList();
     try {
+      List<String> tableList = tableList();
       return _commandShardServer.execute(new HashSet<String>(tableList), request);
     } catch (IOException e) {
+      LOG.error("Unknown error.", e);
       throw new BException("Unknown error.", e);
     } catch (CommandException e) {
+      LOG.error("Unknown error.", e);
       throw new BException("Unknown error.", e);
+    } catch (Throwable t) {
+      LOG.error("Unknown error.", t);
+      throw new BException("Unknown error.", t);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index cd39485..cd90da5 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -80,6 +80,7 @@ import org.apache.blur.metrics.JSONReporter;
 import org.apache.blur.metrics.ReporterSetup;
 import org.apache.blur.server.ShardServerEventHandler;
 import org.apache.blur.server.TableContext;
+import org.apache.blur.server.platform.CommandShardServer;
 import org.apache.blur.store.BlockCacheDirectoryFactory;
 import org.apache.blur.store.BlockCacheDirectoryFactoryV1;
 import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
@@ -239,7 +240,10 @@ public class ThriftBlurShardServer extends ThriftServer {
         fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay,
facetThreadCount,
         deepPagingCache);
 
+    final CommandShardServer commandShardServer = new CommandShardServer(indexServer);
+    
     final BlurShardServer shardServer = new BlurShardServer();
+    shardServer.setCommandShardServer(commandShardServer);
     shardServer.setIndexServer(indexServer);
     shardServer.setIndexManager(indexManager);
     shardServer.setZookeeper(zooKeeper);
@@ -289,7 +293,7 @@ public class ThriftBlurShardServer extends ThriftServer {
       public void shutdown() {
         ThreadWatcher threadWatcher = ThreadWatcher.instance();
         quietClose(traceStorage, refresher, server, shardServer, indexManager, indexServer,
threadWatcher,
-            clusterStatus, zooKeeper, httpServer);
+            clusterStatus, zooKeeper, httpServer, commandShardServer);
       }
     };
     server.setShutdown(shutdown);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/753ab411/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index 52fd09f..740e805 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -201,8 +201,9 @@ public class BlurUtil {
           } else if (targetException instanceof TException) {
             throw targetException;
           } else {
+            targetException.printStackTrace();
             throw new BException(
-                "Unknown error during call on method [{0}], this means that the method is
handling exceptions correctly.",
+                "Unknown error during call on method [{0}], this means that the method is
handling exceptions incorrectly.",
                 targetException, method.getName());
           }
         }
@@ -293,10 +294,15 @@ public class BlurUtil {
             }
           }
           Histogram histogram = histogramMap.get(name);
-          histogram.update((end - start) / 1000);
+          if (histogram == null) {
+            LOG.warn("Histogram missing for [{0}]", name);
+          } else {
+            histogram.update((end - start) / 1000);
+          }
           if (loggerArgsState != null) {
             loggerArgsState.reset();
           }
+
         }
       }
 


Mime
View raw message