incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Making some adjustments to make client calls easier.
Date Tue, 23 Sep 2014 19:13:02 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 20c163205 -> 697156e57


Making some adjustments to make client calls easier.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/697156e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/697156e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/697156e5

Branch: refs/heads/master
Commit: 697156e57e9863c9ac44ba587d1c829ea9720957
Parents: 20c1632
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Sep 23 15:12:51 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Sep 23 15:12:51 2014 -0400

----------------------------------------------------------------------
 ...UsingDocumentCountDefaultClusterCombine.java | 44 +++++++++
 .../java/org/apache/blur/command/Command.java   | 99 ++++++++++++++++++++
 .../org/apache/blur/command/CommandUtil.java    | 29 +++++-
 .../blur/command/ControllerClusterContext.java  |  2 +-
 .../java/org/apache/blur/thrift/BlurClient.java | 57 ++++++-----
 .../apache/blur/thrift/BlurClientManager.java   |  2 +-
 .../apache/blur/thrift/util/CommandExample.java | 10 +-
 7 files changed, 206 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/697156e5/blur-command/src/main/java/org/apache/blur/command/example/UsingDocumentCountDefaultClusterCombine.java
----------------------------------------------------------------------
diff --git a/blur-command/src/main/java/org/apache/blur/command/example/UsingDocumentCountDefaultClusterCombine.java
b/blur-command/src/main/java/org/apache/blur/command/example/UsingDocumentCountDefaultClusterCombine.java
new file mode 100644
index 0000000..ab4ad6f
--- /dev/null
+++ b/blur-command/src/main/java/org/apache/blur/command/example/UsingDocumentCountDefaultClusterCombine.java
@@ -0,0 +1,44 @@
+package org.apache.blur.command.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.command.Args;
+import org.apache.blur.command.Command;
+import org.apache.blur.command.DocumentCountDefaultClusterCombine;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+
+public class UsingDocumentCountDefaultClusterCombine {
+
+  public static void main(String[] args) throws BlurException, TException, IOException {
+
+    Iface client = BlurClient.getClient("localhost:40020");
+
+    DocumentCountDefaultClusterCombine command = new DocumentCountDefaultClusterCombine();
+
+    Args arguments = new Args();
+    arguments.set("table", "test");
+
+    Long count = Command.run(command, arguments, client);
+
+    System.out.println(count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/697156e5/blur-core/src/main/java/org/apache/blur/command/Command.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/Command.java b/blur-core/src/main/java/org/apache/blur/command/Command.java
index fadcd28..30b22b0 100644
--- a/blur-core/src/main/java/org/apache/blur/command/Command.java
+++ b/blur-core/src/main/java/org/apache/blur/command/Command.java
@@ -16,19 +16,118 @@
  */
 package org.apache.blur.command;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.blur.command.annotation.Argument;
 import org.apache.blur.command.annotation.OptionalArguments;
 import org.apache.blur.command.annotation.RequiredArguments;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.apache.blur.thrift.BlurClient.BlurClientInvocationHandler;
+import org.apache.blur.thrift.BlurClientManager;
+import org.apache.blur.thrift.ClientPool;
+import org.apache.blur.thrift.Connection;
+import org.apache.blur.thrift.generated.Blur;
+import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.ErrorType;
+import org.apache.blur.thrift.generated.Response;
+import org.apache.blur.thrift.generated.TimeoutException;
 
 @SuppressWarnings("serial")
 @RequiredArguments({ @Argument(name = "table", value = "The name of the table to execute
the document count command.", type = String.class) })
 @OptionalArguments({ @Argument(name = "shard", value = "The shard id to execute the document
count command.", type = String.class) })
 public abstract class Command implements Serializable, Cloneable {
 
+  private static Connection[] getConnection(Iface client) {
+    if (client instanceof Proxy) {
+      InvocationHandler invocationHandler = Proxy.getInvocationHandler(client);
+      if (invocationHandler instanceof BlurClientInvocationHandler) {
+        BlurClientInvocationHandler handler = (BlurClientInvocationHandler) invocationHandler;
+        return handler.getConnections().toArray(new Connection[] {});
+      }
+    }
+    if (client == null) {
+      throw new RuntimeException("Client cannot be null.");
+    }
+    throw new RuntimeException("Unknown client class [" + client.getClass() + "]");
+  }
+
   public abstract String getName();
 
+  public static <T> Map<Shard, T> run(IndexReadCommand<T> command, Args
arguments, Blur.Iface client)
+      throws IOException, BlurException, TException {
+    return run(command, arguments, getConnection(client));
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> Map<Shard, T> run(IndexReadCommand<T> command, Args
arguments, Connection... connection)
+      throws IOException, BlurException, TException {
+    return (Map<Shard, T>) runInternal((Command) command, arguments, connection);
+  }
+
+  public static <T> Map<Server, T> run(IndexReadCombiningCommand<?, T>
command, Args arguments, Blur.Iface client)
+      throws IOException, BlurException, TException {
+    return run(command, arguments, getConnection(client));
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> Map<Server, T> run(IndexReadCombiningCommand<?, T>
command, Args arguments,
+      Connection... connection) throws IOException, BlurException, TException {
+    return (Map<Server, T>) runInternal((Command) command, arguments, connection);
+  }
+
+  public static <T> T run(ClusterReadCombiningCommand<?, T> command, Args arguments,
Blur.Iface client)
+      throws IOException, BlurException, TException {
+    return run(command, arguments, getConnection(client));
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T run(ClusterReadCombiningCommand<?, T> command, Args arguments,
Connection... connection)
+      throws IOException, BlurException, TException {
+    return (T) runInternal((Command) command, arguments, connection);
+  }
+
+  public static <T> T run(ClusterCommand<T> command, Args arguments, Blur.Iface
client) throws IOException,
+      BlurException, TException {
+    return run(command, arguments, getConnection(client));
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T run(ClusterCommand<T> command, Args arguments, Connection...
connection) throws IOException,
+      BlurException, TException {
+    return (T) runInternal((Command) command, arguments, connection);
+  }
+
+  private static Object runInternal(Command command, Args arguments, Connection... connectionsArray)
+      throws TTransportException, IOException, BlurException, TimeoutException, TException
{
+    List<Connection> connections = new ArrayList<Connection>(Arrays.asList(connectionsArray));
+    Collections.shuffle(connections);
+    for (Connection connection : connections) {
+      if (BlurClientManager.isBadConnection(connection)) {
+        continue;
+      }
+      ClientPool clientPool = BlurClientManager.getClientPool();
+      Client client = clientPool.getClient(connection);
+      try {
+        Response response = client.execute(command.getName(), CommandUtil.toArguments(arguments));
+        return CommandUtil.fromThriftResponseToObject(response);
+      } finally {
+        clientPool.returnClient(connection, client);
+      }
+    }
+    throw new BlurException("All connections bad. [" + connections + "]", null, ErrorType.UNKNOWN);
+  }
+
   @Override
   public Command clone() {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/697156e5/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java b/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
index 5e61661..28f4f80 100644
--- a/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/command/CommandUtil.java
@@ -145,7 +145,7 @@ public class CommandUtil {
   }
 
   @SuppressWarnings("unchecked")
-  public static <T> Map<Shard, T> fromThriftToObject(
+  public static <T> Map<Shard, T> fromThriftToObjectShard(
       Map<org.apache.blur.thrift.generated.Shard, ValueObject> shardToValue) {
     Map<Shard, T> result = new HashMap<Shard, T>();
     for (Entry<org.apache.blur.thrift.generated.Shard, ValueObject> e : shardToValue.entrySet())
{
@@ -156,6 +156,17 @@ public class CommandUtil {
   }
 
   @SuppressWarnings("unchecked")
+  public static <T> Map<Server, T> fromThriftToObjectServer(
+      Map<org.apache.blur.thrift.generated.Server, ValueObject> serverToValue) {
+    Map<Server, T> result = new HashMap<Server, T>();
+    for (Entry<org.apache.blur.thrift.generated.Server, ValueObject> e : serverToValue.entrySet())
{
+      org.apache.blur.thrift.generated.Server server = e.getKey();
+      result.put(new Server(server.getServer()), (T) CommandUtil.toObject(e.getValue()));
+    }
+    return result;
+  }
+
+  @SuppressWarnings("unchecked")
   public static <T> T toObject(ValueObject valueObject) {
     _Fields field = valueObject.getSetField();
     switch (field) {
@@ -167,4 +178,20 @@ public class CommandUtil {
       throw new RuntimeException("Type unknown.");
     }
   }
+
+  public static Object fromThriftResponseToObject(org.apache.blur.thrift.generated.Response
response) {
+    org.apache.blur.thrift.generated.Response._Fields setField = response.getSetField();
+    switch (setField) {
+    case SERVER_TO_VALUE:
+      Map<org.apache.blur.thrift.generated.Server, ValueObject> serverToValue = response.getServerToValue();
+      return fromThriftToObjectServer(serverToValue);
+    case SHARD_TO_VALUE:
+      Map<org.apache.blur.thrift.generated.Shard, ValueObject> shardToValue = response.getShardToValue();
+      return fromThriftToObjectShard(shardToValue);
+    case VALUE:
+      return toObject(response.getValue());
+    default:
+      throw new RuntimeException("Not supported.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/697156e5/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
index 30bbbed..007126e 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
@@ -130,7 +130,7 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
         public Map<Shard, T> call() throws Exception {
           Arguments arguments = CommandUtil.toArguments(args);
           Response response = waitForResponse(client, commandName, arguments);
-          Map<Shard, Object> shardToValue = CommandUtil.fromThriftToObject(response.getShardToValue());
+          Map<Shard, Object> shardToValue = CommandUtil.fromThriftToObjectShard(response.getShardToValue());
           return (Map<Shard, T>) shardToValue;
         }
       });

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/697156e5/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
index ad4813a..407b039 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
@@ -41,11 +41,9 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 
-
-
 public class BlurClient {
 
-  static class BlurClientInvocationHandler implements InvocationHandler {
+  public static class BlurClientInvocationHandler implements InvocationHandler {
 
     private final List<Connection> _connections;
     private final int _maxRetries;
@@ -65,6 +63,10 @@ public class BlurClient {
           BlurClientManager.MAX_BACK_OFF_TIME);
     }
 
+    public List<Connection> getConnections() {
+      return _connections;
+    }
+
     @Override
     public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable
{
       return BlurClientManager.execute(_connections, new BlurCommand<Object>() {
@@ -89,21 +91,19 @@ public class BlurClient {
         }
       }, _maxRetries, _backOffTime, _maxBackOffTime);
     }
-
   }
-  
+
   public static Iface getClient() {
-	try {
-	  return getClient(new BlurConfiguration());
-	} catch (IOException e) {
-		throw new RuntimeException("Unable to load configurations.", e);
-	}
+    try {
+      return getClient(new BlurConfiguration());
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to load configurations.", e);
+    }
   }
-  
+
   public static Iface getClient(BlurConfiguration conf) {
-	List<String> onlineControllers = getOnlineControllers(conf);
-	  
-	return getClient(StringUtils.join(onlineControllers, ","));
+    List<String> onlineControllers = getOnlineControllers(conf);
+    return getClient(StringUtils.join(onlineControllers, ","));
   }
 
   /**
@@ -151,22 +151,21 @@ public class BlurClient {
     return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class
},
         new BlurClientInvocationHandler(connections, maxRetries, backOffTime, maxBackOffTime));
   }
-  
+
   private static List<String> getOnlineControllers(BlurConfiguration conf) {
-	  String zkConn = conf.getExpected(BLUR_ZOOKEEPER_CONNECTION);
-	  int zkSessionTimeout = conf.getInt(BLUR_ZOOKEEPER_TIMEOUT, BLUR_ZOOKEEPER_TIMEOUT_DEFAULT);
-	  
-	  ZooKeeper zkClient = null;
-	  try {
-		  zkClient = ZkUtils.newZooKeeper(zkConn, zkSessionTimeout);
-		  return zkClient.getChildren(ZookeeperPathConstants.getOnlineControllersPath(), false);
-	  } catch (KeeperException e) {
-		  throw new RuntimeException("Error communicating with Zookeeper", e);
-	  } catch (InterruptedException e) {
-		  throw new RuntimeException("Error communicating with Zookeeper", e);
-	  } catch (IOException e) {
-		  throw new RuntimeException("Unable to initialize Zookeeper", e);
-	  }
+    String zkConn = conf.getExpected(BLUR_ZOOKEEPER_CONNECTION);
+    int zkSessionTimeout = conf.getInt(BLUR_ZOOKEEPER_TIMEOUT, BLUR_ZOOKEEPER_TIMEOUT_DEFAULT);
+    ZooKeeper zkClient = null;
+    try {
+      zkClient = ZkUtils.newZooKeeper(zkConn, zkSessionTimeout);
+      return zkClient.getChildren(ZookeeperPathConstants.getOnlineControllersPath(), false);
+    } catch (KeeperException e) {
+      throw new RuntimeException("Error communicating with Zookeeper", e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Error communicating with Zookeeper", e);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to initialize Zookeeper", e);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/697156e5/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
index 967bd60..99c90a5 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
@@ -270,7 +270,7 @@ public class BlurClientManager {
     _badConnections.put(connection, NULL);
   }
 
-  private static boolean isBadConnection(Connection connection) {
+  public static boolean isBadConnection(Connection connection) {
     return _badConnections.containsKey(connection);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/697156e5/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
index b6ce76d..ec54ff0 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/CommandExample.java
@@ -30,14 +30,14 @@ import org.apache.blur.thrift.generated.ValueObject;
 public class CommandExample {
 
   public static void main(String[] args) throws BlurException, TException, IOException {
-    Client client = BlurClientManager.getClientPool().getClient(new Connection("localhost:40010"));
+    Client client = BlurClientManager.getClientPool().getClient(new Connection("localhost:40020"));
 
     Arguments arguments = new Arguments();
     arguments.putToValues("table", new ValueObject(ValueObject._Fields.VALUE, new Value(Value._Fields.STRING_VALUE,
-        "test2")));
-    arguments.putToValues("shard", new ValueObject(ValueObject._Fields.VALUE, new Value(Value._Fields.STRING_VALUE,
-        "shard-00000000")));
+        "test")));
+//    arguments.putToValues("shard", new ValueObject(ValueObject._Fields.VALUE, new Value(Value._Fields.STRING_VALUE,
+//        "shard-00000000")));
 
-    System.out.println(client.execute("docCount", arguments));
+    System.out.println(client.execute("docCountClusterCombine", arguments));
   }
 }


Mime
View raw message