incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Now have reads commands working through controller.
Date Fri, 29 Aug 2014 12:55:59 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 4052727b3 -> 1deca99d2


Now have reads commands working through controller.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/1deca99d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/1deca99d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/1deca99d

Branch: refs/heads/master
Commit: 1deca99d25a68270143d193d4f5274ebaa9c5478
Parents: 4052727
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Aug 29 08:55:54 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Aug 29 08:55:54 2014 -0400

----------------------------------------------------------------------
 .../org/apache/blur/manager/command/Args.java   |   7 +-
 .../manager/command/BaseCommandManager.java     |  10 +-
 .../blur/manager/command/ClusterCommand.java    |   3 +-
 .../blur/manager/command/ClusterContext.java    |  19 ++-
 .../blur/manager/command/CommandUtil.java       |  16 +-
 .../command/ControllerClusterContext.java       | 165 ++++++++++++++++++-
 .../command/ControllerCommandManager.java       |  19 ++-
 .../org/apache/blur/manager/command/Server.java |   5 +
 .../org/apache/blur/manager/command/Shard.java  |  12 +-
 .../blur/manager/command/ShardResultFuture.java |  64 +++++++
 .../command/cmds/DocumentCountAggregator.java   |   2 +-
 .../blur/thrift/BlurControllerServer.java       |   7 +-
 .../org/apache/blur/thrift/BlurShardServer.java |   2 +-
 .../blur/thrift/ThriftBlurControllerServer.java |   4 +
 .../java/org/apache/blur/thrift/ClientPool.java |   9 +
 .../apache/blur/thrift/util/CommandExample.java |   2 +-
 16 files changed, 315 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/Args.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/Args.java b/blur-core/src/main/java/org/apache/blur/manager/command/Args.java
index 96dfee6..d851827 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/Args.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/Args.java
@@ -21,7 +21,7 @@ import java.util.Map;
 
 public class Args {
 
-  private Map<String, Object> _values = new HashMap<String, Object>();
+  private final Map<String, Object> _values = new HashMap<String, Object>();
 
   @SuppressWarnings("unchecked")
   public <T> T get(String name) {
@@ -39,4 +39,9 @@ public class Args {
   public <T> void set(String name, T value) {
     _values.put(name, value);
   }
+
+  public Map<String, Object> getValues() {
+    return _values;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
index 26f20f9..950d074 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/BaseCommandManager.java
@@ -29,9 +29,10 @@ import org.apache.blur.manager.command.cmds.DocumentCountAggregator;
  */
 
 public class BaseCommandManager implements Closeable {
-  
+
   protected final ExecutorService _executorService;
   protected final Map<String, BaseCommand> _command = new ConcurrentHashMap<String,
BaseCommand>();
+  protected final Map<Class<? extends BaseCommand>, String> _commandNameLookup
= new ConcurrentHashMap<Class<? extends BaseCommand>, String>();
 
   public BaseCommandManager(int threadCount) throws IOException {
     register(DocumentCount.class);
@@ -48,14 +49,19 @@ public class BaseCommandManager implements Closeable {
     try {
       BaseCommand command = commandClass.newInstance();
       _command.put(command.getName(), command);
+      _commandNameLookup.put(commandClass, command.getName());
     } catch (InstantiationException e) {
       throw new IOException(e);
     } catch (IllegalAccessException e) {
       throw new IOException(e);
     }
   }
-  
+
   protected BaseCommand getCommandObject(String commandName) {
     return _command.get(commandName);
   }
+
+  protected String getCommandName(Class<? extends BaseCommand> clazz) {
+    return _commandNameLookup.get(clazz);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
index 28567bf..002fefe 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterCommand.java
@@ -1,5 +1,6 @@
 package org.apache.blur.manager.command;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 /**
@@ -21,6 +22,6 @@ import java.io.Serializable;
 
 public interface ClusterCommand<T> extends Serializable, Cloneable {
 
-  T clusterExecute(ClusterContext context);
+  T clusterExecute(ClusterContext context) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java
index 2a901db..976e3f4 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ClusterContext.java
@@ -1,6 +1,8 @@
 package org.apache.blur.manager.command;
 
+import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.blur.server.TableContext;
 
@@ -27,10 +29,21 @@ public abstract class ClusterContext {
 
   public abstract TableContext getTableContext();
 
-  public abstract <T> Map<Shard, T> readIndexes(Args args, Class<? extends
IndexReadCommand<T>> clazz);
+  public abstract <T> Map<Shard, T> readIndexes(Args args, Class<? extends
IndexReadCommand<T>> clazz)
+      throws IOException;
 
-  public abstract <T> Map<Server, T> readServers(Args args, Class<? extends
IndexReadCombiningCommand<?, T>> clazz);
+  public abstract <T> Map<Shard, Future<T>> readIndexesAsync(Args args,
Class<? extends IndexReadCommand<T>> clazz)
+      throws IOException;
 
-  public abstract <T> T writeIndex(Args args, Class<? extends IndexWriteCommand<T>>
clazz);
+  public abstract <T> Map<Server, T> readServers(Args args, Class<? extends
IndexReadCombiningCommand<?, T>> clazz)
+      throws IOException;
+
+  public abstract <T> Map<Server, Future<T>> readServersAsync(Args args,
+      Class<? extends IndexReadCombiningCommand<?, T>> clazz) throws IOException;
+
+  public abstract <T> T writeIndex(Args args, Class<? extends IndexWriteCommand<T>>
clazz) throws IOException;
+
+  public abstract <T> Future<T> writeIndexAsync(Args args, Class<? extends
IndexWriteCommand<T>> clazz)
+      throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java b/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
index c35107f..23ee738 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/CommandUtil.java
@@ -67,7 +67,7 @@ public class CommandUtil {
     throw new BException("Object [{0}] not supported.", o);
   }
 
-  public static Args convert(Arguments arguments) {
+  public static Args toArgs(Arguments arguments) {
     if (arguments == null) {
       return null;
     }
@@ -81,9 +81,21 @@ public class CommandUtil {
   }
 
   public static Object toObject(Value value) {
-    if (value.getNullValue()) {
+    if (value.isSetNullValue()) {
       return null;
     }
     return value.getFieldValue();
   }
+
+  public static Arguments toArguments(Args args) throws BlurException {
+    if (args == null) {
+      return null;
+    }
+    Arguments arguments = new Arguments();
+    Set<Entry<String, Object>> entrySet = args.getValues().entrySet();
+    for (Entry<String, Object> e : entrySet) {
+      arguments.putToValues(e.getKey(), toValue(e.getValue()));
+    }
+    return arguments;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
index de17e44..63a706d 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerClusterContext.java
@@ -2,9 +2,28 @@ package org.apache.blur.manager.command;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.command.cmds.BaseCommand;
 import org.apache.blur.server.TableContext;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.apache.blur.thrift.BlurClientManager;
+import org.apache.blur.thrift.ClientPool;
+import org.apache.blur.thrift.Connection;
+import org.apache.blur.thrift.generated.Arguments;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.Response;
+import org.apache.blur.thrift.generated.Value;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -25,12 +44,36 @@ import org.apache.blur.server.TableContext;
 
 public class ControllerClusterContext extends ClusterContext implements Closeable {
 
+  private final static Log LOG = LogFactory.getLog(ControllerClusterContext.class);
+
   private final Args _args;
   private final TableContext _tableContext;
+  private final Map<Server, Client> _clientMap;
+  private final ExecutorService _executorService;
+  private final ControllerCommandManager _manager;
+  private final Map<String, String> _tableLayout;
 
-  public ControllerClusterContext(TableContext tableContext, Args args) {
+  public ControllerClusterContext(TableContext tableContext, Args args, Map<String, String>
tableLayout,
+      ExecutorService executorService, ControllerCommandManager manager) throws IOException
{
     _tableContext = tableContext;
     _args = args;
+    _clientMap = getBlurClientsForTable(_tableContext.getTable(), tableLayout);
+    _executorService = executorService;
+    _manager = manager;
+    _tableLayout = tableLayout;
+  }
+
+  public Map<Server, Client> getBlurClientsForTable(String table, Map<String, String>
tableLayout) throws IOException {
+    Map<Server, Client> clients = new HashMap<Server, Client>();
+    for (String serverStr : tableLayout.values()) {
+      try {
+        Client client = BlurClientManager.getClientPool().getClient(new Connection(serverStr));
+        clients.put(new Server(serverStr), client);
+      } catch (TTransportException e) {
+        throw new IOException(e);
+      }
+    }
+    return clients;
   }
 
   @Override
@@ -44,13 +87,18 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
   }
 
   @Override
-  public <T> Map<Shard, T> readIndexes(Args args, Class<? extends IndexReadCommand<T>>
clazz) {
-    throw new RuntimeException("Not Implemented");
+  public <T> Map<Shard, T> readIndexes(Args args, Class<? extends IndexReadCommand<T>>
clazz) throws IOException {
+    Map<Shard, Future<T>> futures = readIndexesAsync(args, clazz);
+    Map<Shard, T> result = new HashMap<Shard, T>();
+    return processFutures(clazz, futures, result);
   }
 
   @Override
-  public <T> Map<Server, T> readServers(Args args, Class<? extends IndexReadCombiningCommand<?,
T>> clazz) {
-    throw new RuntimeException("Not Implemented");
+  public <T> Map<Server, T> readServers(Args args, Class<? extends IndexReadCombiningCommand<?,
T>> clazz)
+      throws IOException {
+    Map<Server, Future<T>> futures = readServersAsync(args, clazz);
+    Map<Server, T> result = new HashMap<Server, T>();
+    return processFutures(clazz, futures, result);
   }
 
   @Override
@@ -60,7 +108,112 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
 
   @Override
   public void close() throws IOException {
-    throw new RuntimeException("Not Implemented");
+    ClientPool clientPool = BlurClientManager.getClientPool();
+    for (Entry<Server, Client> e : _clientMap.entrySet()) {
+      clientPool.returnClient(new Connection(e.getKey().getServer()), e.getValue());
+    }
   }
 
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T> Map<Shard, Future<T>> readIndexesAsync(final Args args, Class<?
extends IndexReadCommand<T>> clazz) {
+    final String commandName = _manager.getCommandName((Class<? extends BaseCommand>)
clazz);
+    Map<Shard, Future<T>> futureMap = new HashMap<Shard, Future<T>>();
+    for (Entry<Server, Client> e : _clientMap.entrySet()) {
+      Server server = e.getKey();
+      final Client client = e.getValue();
+      Future<Map<Shard, T>> future = _executorService.submit(new Callable<Map<Shard,
T>>() {
+        @Override
+        public Map<Shard, T> call() throws Exception {
+          Arguments arguments = CommandUtil.toArguments(args);
+          Response response = client.execute(getTable(), commandName, arguments);
+          Map<String, Value> shardToValue = response.getShardToValue();
+          return toShardResponse(shardToValue);
+        }
+
+        private Map<Shard, T> toShardResponse(Map<String, Value> values) {
+          Map<Shard, T> result = new HashMap<Shard, T>();
+          for (Entry<String, Value> e : values.entrySet()) {
+            result.put(new Shard(e.getKey()), (T) CommandUtil.toObject(e.getValue()));
+          }
+          return result;
+        }
+      });
+      Set<Shard> shards = getShardsOnServer(server);
+      for (Shard shard : shards) {
+        futureMap.put(shard, new ShardResultFuture<T>(shard, future));
+      }
+    }
+    return futureMap;
+  }
+
+  private Set<Shard> getShardsOnServer(Server server) {
+    Set<Shard> shards = new HashSet<Shard>();
+    for (Entry<String, String> e : _tableLayout.entrySet()) {
+      String value = e.getValue();
+      if (value.equals(server.getServer())) {
+        shards.add(new Shard(e.getKey()));
+      }
+    }
+    return shards;
+  }
+
+  private String getTable() {
+    return getTableContext().getTable();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T> Map<Server, Future<T>> readServersAsync(final Args args,
+      Class<? extends IndexReadCombiningCommand<?, T>> clazz) {
+    final String commandName = _manager.getCommandName((Class<? extends BaseCommand>)
clazz);
+    Map<Server, Future<T>> futureMap = new HashMap<Server, Future<T>>();
+    for (Entry<Server, Client> e : _clientMap.entrySet()) {
+      Server server = e.getKey();
+      final Client client = e.getValue();
+      Future<T> future = _executorService.submit(new Callable<T>() {
+        @Override
+        public T call() throws Exception {
+          Arguments arguments = CommandUtil.toArguments(args);
+          Response response = client.execute(getTable(), commandName, arguments);
+          Value value = response.getValue();
+          return (T) CommandUtil.toObject(value);
+        }
+      });
+      futureMap.put(server, future);
+    }
+    return futureMap;
+  }
+
+  @Override
+  public <T> Future<T> writeIndexAsync(Args args, Class<? extends IndexWriteCommand<T>>
clazz) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private <K, T> Map<K, T> processFutures(Class<?> clazz, Map<K, Future<T>>
futures, Map<K, T> result)
+      throws IOException {
+    Throwable firstError = null;
+    for (Entry<K, Future<T>> e : futures.entrySet()) {
+      K key = e.getKey();
+      Future<T> future = e.getValue();
+      T value;
+      try {
+        value = future.get();
+        result.put(key, value);
+      } catch (InterruptedException ex) {
+        throw new IOException(ex);
+      } catch (ExecutionException ex) {
+        Throwable cause = ex.getCause();
+        if (firstError == null) {
+          firstError = cause;
+        }
+        LOG.error("Unknown call while executing command [{0}] on server or shard [{1}]",
clazz, key);
+      }
+    }
+    if (firstError != null) {
+      throw new IOException(firstError);
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
index 29609ca..33a6b45 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ControllerCommandManager.java
@@ -29,8 +29,9 @@ public class ControllerCommandManager extends BaseCommandManager {
     super(threadCount);
   }
 
-  public Response execute(TableContext tableContext, String commandName, Args args) throws
IOException {
-    ClusterContext context = createCommandContext(tableContext, args);
+  public Response execute(TableContext tableContext, String commandName, Args args, Map<String,
String> tableLayout)
+      throws IOException {
+    ClusterContext context = createCommandContext(tableContext, args, tableLayout);
     BaseCommand command = getCommandObject(commandName);
     if (command == null) {
       throw new IOException("Command with name [" + commandName + "] not found.");
@@ -51,33 +52,35 @@ public class ControllerCommandManager extends BaseCommandManager {
     }
   }
 
-  private Response executeClusterCommand(ClusterContext context, BaseCommand command) {
+  private Response executeClusterCommand(ClusterContext context, BaseCommand command) throws
IOException {
     ClusterCommand<Object> clusterCommand = (ClusterCommand<Object>) command;
     Object object = clusterCommand.clusterExecute(context);
     return Response.createNewAggregateResponse(object);
   }
 
-  private Response executeIndexWriteCommand(Args args, ClusterContext context, BaseCommand
command) {
+  private Response executeIndexWriteCommand(Args args, ClusterContext context, BaseCommand
command) throws IOException {
     Class<? extends IndexWriteCommand<Object>> clazz = (Class<? extends IndexWriteCommand<Object>>)
command.getClass();
     Object object = context.writeIndex(args, clazz);
     return Response.createNewAggregateResponse(object);
   }
 
-  private Response executeIndexReadCommand(Args args, ClusterContext context, BaseCommand
command) {
+  private Response executeIndexReadCommand(Args args, ClusterContext context, BaseCommand
command) throws IOException {
     Class<? extends IndexReadCommand<Object>> clazz = (Class<? extends IndexReadCommand<Object>>)
command.getClass();
     Map<Shard, Object> result = context.readIndexes(args, clazz);
     return Response.createNewShardResponse(result);
   }
 
-  private Response executeIndexReadCombiningCommand(Args args, ClusterContext context, BaseCommand
command) {
+  private Response executeIndexReadCombiningCommand(Args args, ClusterContext context, BaseCommand
command)
+      throws IOException {
     Class<? extends IndexReadCombiningCommand<Object, Object>> clazz = (Class<?
extends IndexReadCombiningCommand<Object, Object>>) command
         .getClass();
     Map<Server, Object> result = context.readServers(args, clazz);
     return Response.createNewServerResponse(result);
   }
 
-  private ClusterContext createCommandContext(TableContext tableContext, Args args) {
-    return new ControllerClusterContext(tableContext, args);
+  private ClusterContext createCommandContext(TableContext tableContext, Args args, Map<String,
String> tableLayout)
+      throws IOException {
+    return new ControllerClusterContext(tableContext, args, tableLayout, _executorService,
this);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/Server.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/Server.java b/blur-core/src/main/java/org/apache/blur/manager/command/Server.java
index 64208af..9454b65 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/Server.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/Server.java
@@ -62,4 +62,9 @@ public class Server implements Comparable<Server> {
     return _server.compareTo(o._server);
   }
 
+  @Override
+  public String toString() {
+    return "Server [server=" + _server + "]";
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/Shard.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/Shard.java b/blur-core/src/main/java/org/apache/blur/manager/command/Shard.java
index 491cd3e..e61d87a 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/Shard.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/Shard.java
@@ -17,8 +17,8 @@ package org.apache.blur.manager.command;
  * the License.
  */
 
-public class Shard  implements Comparable<Shard> {
-  
+public class Shard implements Comparable<Shard> {
+
   private final String _shard;
 
   public Shard(String shard) {
@@ -53,7 +53,7 @@ public class Shard  implements Comparable<Shard> {
   public String getShard() {
     return _shard;
   }
-  
+
   @Override
   public int compareTo(Shard o) {
     if (o == null) {
@@ -61,4 +61,10 @@ public class Shard  implements Comparable<Shard> {
     }
     return _shard.compareTo(o._shard);
   }
+
+  @Override
+  public String toString() {
+    return "Shard [shard=" + _shard + "]";
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/ShardResultFuture.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/ShardResultFuture.java
b/blur-core/src/main/java/org/apache/blur/manager/command/ShardResultFuture.java
new file mode 100644
index 0000000..4e3abdb
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/ShardResultFuture.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.command;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class ShardResultFuture<T> implements Future<T> {
+
+  private final Shard _shard;
+  private final Future<Map<Shard, T>> _future;
+
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    return _future.cancel(mayInterruptIfRunning);
+  }
+
+  public boolean isCancelled() {
+    return _future.isCancelled();
+  }
+
+  public boolean isDone() {
+    return _future.isDone();
+  }
+
+  public T get() throws InterruptedException, ExecutionException {
+    Map<Shard, T> map = _future.get();
+    if (map == null) {
+      return null;
+    }
+    return map.get(_shard);
+  }
+
+  public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+      TimeoutException {
+    Map<Shard, T> map = _future.get(timeout, unit);
+    if (map == null) {
+      return null;
+    }
+    return map.get(_shard);
+  }
+
+  public ShardResultFuture(Shard shard, Future<Map<Shard, T>> future) {
+    _shard = shard;
+    _future = future;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/manager/command/cmds/DocumentCountAggregator.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/command/cmds/DocumentCountAggregator.java
b/blur-core/src/main/java/org/apache/blur/manager/command/cmds/DocumentCountAggregator.java
index 2a7c17b..ebb06fc 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/command/cmds/DocumentCountAggregator.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/command/cmds/DocumentCountAggregator.java
@@ -53,7 +53,7 @@ public class DocumentCountAggregator extends BaseCommand implements ClusterComma
   }
 
   @Override
-  public Long clusterExecute(ClusterContext context) {
+  public Long clusterExecute(ClusterContext context) throws IOException {
     Map<Server, Long> results = context.readServers(null, DocumentCountAggregator.class);
     long total = 0;
     for (Entry<Server, Long> e : results.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index efb2f20..3c0a924 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -1502,10 +1502,13 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
   }
 
   @Override
-  public org.apache.blur.thrift.generated.Response  execute(String table, String commandName,
Arguments arguments) throws BlurException, TException {
+  public org.apache.blur.thrift.generated.Response execute(String table, String commandName,
Arguments arguments)
+      throws BlurException, TException {
     try {
       TableContext tableContext = getTableContext(table);
-      Response response = _commandManager.execute(tableContext, commandName, CommandUtil.convert(arguments));
+      Map<String, String> tableLayout = getTableLayout(table);
+      Response response = _commandManager.execute(tableContext, commandName, CommandUtil.toArgs(arguments),
+          tableLayout);
       return CommandUtil.convert(response);
     } catch (Exception e) {
       LOG.error("Unknown error while trying to execute command [{0}] for table [{1}]", e,
commandName, table);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 2b48c15..bde114a 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -593,7 +593,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
   public org.apache.blur.thrift.generated.Response execute(String table, String commandName,
Arguments arguments)
       throws BlurException, TException {
     try {
-      Response response = _commandManager.execute(getTableContext(table), commandName, CommandUtil.convert(arguments));
+      Response response = _commandManager.execute(getTableContext(table), commandName, CommandUtil.toArgs(arguments));
       return CommandUtil.convert(response);
     } catch (Exception e) {
       LOG.error("Unknown error while trying to execute command [{0}] for table [{1}]", e,
commandName, table);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index 7814582..a22d372 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -54,6 +54,7 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.BlurQueryChecker;
 import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.command.ControllerCommandManager;
 import org.apache.blur.manager.indexserver.BlurServerShutDown;
 import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
 import org.apache.blur.metrics.ReporterSetup;
@@ -127,8 +128,11 @@ public class ThriftBlurControllerServer extends ThriftServer {
     int timeout = configuration.getInt(BLUR_CONTROLLER_SHARD_CONNECTION_TIMEOUT, 60000);
     BlurControllerServer.BlurClient client = new BlurControllerServer.BlurClientRemote(timeout);
 
+    ControllerCommandManager controllerCommandManager = new ControllerCommandManager(16);
+
     final BlurControllerServer controllerServer = new BlurControllerServer();
     controllerServer.setClient(client);
+    controllerServer.setCommandManager(controllerCommandManager);
     controllerServer.setClusterStatus(clusterStatus);
     controllerServer.setZookeeper(zooKeeper);
     controllerServer.setNodeName(nodeName);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
index 667817a..5c3063e 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
@@ -186,6 +186,15 @@ public class ClientPool {
     return new LinkedBlockingQueue<Client>(_maxConnectionsPerHost);
   }
 
+  /**
+   * Get a client from the pool or creates a new one if the pool is empty. Also
+   * the clients are tested before being returned.
+   * 
+   * @param connection
+   * @return
+   * @throws TTransportException
+   * @throws IOException
+   */
   public Client getClient(Connection connection) throws TTransportException, IOException
{
     BlockingQueue<Client> blockingQueue = getQueue(connection);
     if (blockingQueue.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1deca99d/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
index 3bfd1ac..1f434f2 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
@@ -26,7 +26,7 @@ import org.apache.blur.thrift.generated.BlurException;
 public class CommandExample {
 
   public static void main(String[] args) throws BlurException, TException, IOException {
-    Iface client = BlurClient.getClient("localhost:40020");
+    Iface client = BlurClient.getClient("localhost:40010");
     System.out.println(client.execute("test", "docCount", null));
     System.out.println(client.execute("test", "docCountAggregate", null));
   }


Mime
View raw message