incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixing an issue with the BlurClient not supporting multiple ZK quorums in a single jvm.
Date Mon, 19 Jan 2015 15:57:34 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master e421f61d4 -> 3f9079a1d


Fixing an issue with the BlurClient not supporting multiple ZK quorums in a single jvm.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/3f9079a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/3f9079a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/3f9079a1

Branch: refs/heads/master
Commit: 3f9079a1d58b708a29dde38886eca3bbfd9e6f0e
Parents: e421f61
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Jan 19 10:57:26 2015 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Jan 19 10:57:26 2015 -0500

----------------------------------------------------------------------
 .../test/java/org/apache/blur/MiniCluster.java  |  55 +++++--
 .../org/apache/blur/thrift/BlurClientTest.java  |  55 +++++++
 .../java/org/apache/blur/thrift/BlurClient.java | 162 +++++++++++++------
 3 files changed, 204 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3f9079a1/blur-core/src/test/java/org/apache/blur/MiniCluster.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/MiniCluster.java b/blur-core/src/test/java/org/apache/blur/MiniCluster.java
index 796b7f4..12688a3 100644
--- a/blur-core/src/test/java/org/apache/blur/MiniCluster.java
+++ b/blur-core/src/test/java/org/apache/blur/MiniCluster.java
@@ -17,7 +17,15 @@ package org.apache.blur;
  * limitations under the License.
  */
 
-import static org.apache.blur.utils.BlurConstants.*;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_DIRECT_MEMORY_ALLOCATION;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCKCACHE_SLAB_COUNT;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_HOSTNAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
+import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
 
 import java.io.BufferedReader;
 import java.io.Closeable;
@@ -82,9 +90,11 @@ public class MiniCluster {
 
   private static Log LOG = LogFactory.getLog(MiniCluster.class);
   private MiniDFSCluster cluster;
+  private final String id = UUID.randomUUID().toString();
   private ZkMiniCluster zkMiniCluster = new ZkMiniCluster();
   private List<MiniClusterServer> controllers = new ArrayList<MiniClusterServer>();
   private List<MiniClusterServer> shards = new ArrayList<MiniClusterServer>();
+  private ThreadGroup group = new ThreadGroup(id);
 
   public static void main(String[] args) throws IOException, InterruptedException, KeeperException,
BlurException,
       TException {
@@ -125,26 +135,37 @@ public class MiniCluster {
   public void startBlurCluster(String path, int controllerCount, int shardCount) {
     startBlurCluster(path, controllerCount, shardCount, false, false);
   }
-  
+
   public void startBlurCluster(String path, int controllerCount, int shardCount, boolean
randomPort) {
     startBlurCluster(path, controllerCount, shardCount, randomPort, false);
   }
 
-  public void startBlurCluster(String path, int controllerCount, int shardCount, boolean
randomPort,
-      boolean externalProcesses) {
-    MemoryReporter.enable();
-    startDfs(path + "/hdfs");
-    startZooKeeper(path + "/zk", randomPort);
-    setupBuffers();
-    startControllers(controllerCount, randomPort, externalProcesses);
-    startShards(shardCount, randomPort, externalProcesses);
+  public void startBlurCluster(final String path, final int controllerCount, final int shardCount,
+      final boolean randomPort, final boolean externalProcesses) {
+    Thread thread = new Thread(group, new Runnable() {
+      @Override
+      public void run() {
+        MemoryReporter.enable();
+        startDfs(path + "/hdfs");
+        startZooKeeper(path + "/zk", randomPort);
+        setupBuffers();
+        startControllers(controllerCount, randomPort, externalProcesses);
+        startShards(shardCount, randomPort, externalProcesses);
+        try {
+          waitForSafeModeToExit();
+        } catch (BlurException e) {
+          throw new RuntimeException(e);
+        } catch (TException e) {
+          throw new RuntimeException(e);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+    thread.start();
     try {
-      waitForSafeModeToExit();
-    } catch (BlurException e) {
-      throw new RuntimeException(e);
-    } catch (TException e) {
-      throw new RuntimeException(e);
-    } catch (IOException e) {
+      thread.join();
+    } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
   }
@@ -681,7 +702,7 @@ public class MiniCluster {
       // This has got to be one of the worst hacks I have ever had to do.
       // This is needed to shutdown 2 thread pools that are not shutdown by
       // themselves.
-      ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
+      ThreadGroup threadGroup = group;
       Thread[] threads = new Thread[100];
       int enumerate = threadGroup.enumerate(threads);
       for (int i = 0; i < enumerate; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3f9079a1/blur-core/src/test/java/org/apache/blur/thrift/BlurClientTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClientTest.java b/blur-core/src/test/java/org/apache/blur/thrift/BlurClientTest.java
new file mode 100644
index 0000000..7d1834e
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClientTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.thrift;
+
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.junit.Test;
+
+public class BlurClientTest {
+  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp_BlurClientTest"));
+
+  @Test
+  public void testMultipleQuorums() throws BlurException, TException {
+    File testDirectory = new File(TMPDIR, "testMultipleQuorums").getAbsoluteFile();
+    testDirectory.mkdirs();
+    MiniCluster cluster1 = new MiniCluster();
+    cluster1.startBlurCluster(new File(testDirectory, "cluster1").getAbsolutePath(), 1, 1,
true, false);
+
+    MiniCluster cluster2 = new MiniCluster();
+    cluster2.startBlurCluster(new File(testDirectory, "cluster2").getAbsolutePath(), 1, 1,
true, false);
+
+    Iface client1 = BlurClient.getClientFromZooKeeperConnectionStr(cluster1.getZkConnectionString());
+    Iface client2 = BlurClient.getClientFromZooKeeperConnectionStr(cluster2.getZkConnectionString());
+
+    List<String> controllerServerList1 = client1.controllerServerList();
+    List<String> controllerServerList2 = client2.controllerServerList();
+
+    cluster1.shutdownBlurCluster();
+    cluster2.shutdownBlurCluster();
+
+    assertFalse(controllerServerList1.equals(controllerServerList2));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3f9079a1/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
index 9065137..ad7b013 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
@@ -21,15 +21,19 @@ import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT_DEFAULT;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.blur.BlurConfiguration;
@@ -42,8 +46,11 @@ import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.zookeeper.WatchChildren;
 import org.apache.blur.zookeeper.WatchChildren.OnChange;
-import org.apache.blur.zookeeper.ZkUtils;
+import org.apache.blur.zookeeper.ZooKeeperClient;
 import org.apache.blur.zookeeper.ZookeeperPathConstants;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 
 public class BlurClient {
@@ -100,10 +107,69 @@ public class BlurClient {
     }
   }
 
-  private static volatile BlurConfiguration _blurConfiguration;
-  private static final List<Connection> _connections = new CopyOnWriteArrayList<Connection>();
-  private static volatile ZooKeeper _zooKeeper;
-  private static WatchChildren _watchConntrollers;
+  private static class ZooKeeperConntrollerWatchInfo implements Closeable {
+
+    final List<Connection> _connections = new CopyOnWriteArrayList<Connection>();
+    final ZooKeeper _zooKeeper;
+    final WatchChildren _watchConntrollers;
+    final String _zooKeeperConnectionStr;
+    final int _zkSessionTimeout;
+
+    ZooKeeperConntrollerWatchInfo(BlurConfiguration conf) throws IOException, KeeperException,
InterruptedException {
+      _zooKeeperConnectionStr = conf.getExpected(BLUR_ZOOKEEPER_CONNECTION);
+      _zkSessionTimeout = conf.getInt(BLUR_ZOOKEEPER_TIMEOUT, BLUR_ZOOKEEPER_TIMEOUT_DEFAULT);
+      _zooKeeper = new ZooKeeperClient(_zooKeeperConnectionStr, _zkSessionTimeout, new Watcher()
{
+        @Override
+        public void process(WatchedEvent event) {
+
+        }
+      });
+      setConnections(_zooKeeper.getChildren(ZookeeperPathConstants.getOnlineControllersPath(),
false));
+      _watchConntrollers = new WatchChildren(_zooKeeper, ZookeeperPathConstants.getOnlineControllersPath());
+      _watchConntrollers.watch(new OnChange() {
+        @Override
+        public void action(List<String> children) {
+          setConnections(children);
+        }
+      });
+    }
+
+    void setConnections(List<String> children) {
+      Set<Connection> goodConnections = new HashSet<Connection>();
+      for (String s : children) {
+        Connection connection = new Connection(s);
+        goodConnections.add(connection);
+        if (!_connections.contains(connection)) {
+          _connections.add(connection);
+        }
+      }
+      Set<Connection> badConnections = new HashSet<Connection>();
+      for (Connection c : _connections) {
+        if (!goodConnections.contains(c)) {
+          badConnections.add(c);
+        }
+      }
+      _connections.removeAll(badConnections);
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeQuietly(_watchConntrollers);
+      closeQuietly(new Closeable() {
+        @Override
+        public void close() throws IOException {
+          try {
+            _zooKeeper.close();
+          } catch (InterruptedException e) {
+            throw new IOException(e);
+          }
+        }
+      });
+    }
+  }
+
+  private static ConcurrentMap<String, ZooKeeperConntrollerWatchInfo> _zkConnectionInfo
= new ConcurrentHashMap<String, ZooKeeperConntrollerWatchInfo>();
+  private static BlurConfiguration _defaultBlurConfiguration;
 
   public static Iface getClient() {
     try {
@@ -114,10 +180,10 @@ public class BlurClient {
   }
 
   private static synchronized BlurConfiguration getBlurConfiguration() throws IOException
{
-    if (_blurConfiguration == null) {
-      _blurConfiguration = new BlurConfiguration();
+    if (_defaultBlurConfiguration == null) {
+      _defaultBlurConfiguration = new BlurConfiguration();
     }
-    return _blurConfiguration;
+    return _defaultBlurConfiguration;
   }
 
   public static Iface getClient(BlurConfiguration conf) {
@@ -171,28 +237,31 @@ public class BlurClient {
   }
 
   private static List<Connection> getOnlineControllers(BlurConfiguration conf) {
+    String zooKeeperConnectionStr = getZooKeeperConnectionStr(conf);
+    ZooKeeperConntrollerWatchInfo zooKeeperConntrollerWatchInfo = _zkConnectionInfo.get(zooKeeperConnectionStr);
+    if (zooKeeperConntrollerWatchInfo != null) {
+      return zooKeeperConntrollerWatchInfo._connections;
+    }
     setupZooKeeper(conf);
-    return _connections;
+    zooKeeperConntrollerWatchInfo = _zkConnectionInfo.get(zooKeeperConnectionStr);
+    return zooKeeperConntrollerWatchInfo._connections;
+  }
+
+  private static String getZooKeeperConnectionStr(BlurConfiguration conf) {
+    return conf.getExpected(BLUR_ZOOKEEPER_CONNECTION);
   }
 
-  private static void setupZooKeeper(BlurConfiguration conf) {
-    if (_zooKeeper == null) {
-      String zkConn = conf.getExpected(BLUR_ZOOKEEPER_CONNECTION);
-      int zkSessionTimeout = conf.getInt(BLUR_ZOOKEEPER_TIMEOUT, BLUR_ZOOKEEPER_TIMEOUT_DEFAULT);
+  private static synchronized void setupZooKeeper(BlurConfiguration conf) {
+    String zooKeeperConnectionStr = getZooKeeperConnectionStr(conf);
+    ZooKeeperConntrollerWatchInfo zooKeeperConntrollerWatchInfo = _zkConnectionInfo.get(zooKeeperConnectionStr);
+    if (zooKeeperConntrollerWatchInfo == null) {
       try {
-        _zooKeeper = ZkUtils.newZooKeeper(zkConn, zkSessionTimeout);
-        setConnections(_zooKeeper.getChildren(ZookeeperPathConstants.getOnlineControllersPath(),
false));
-        _watchConntrollers = new WatchChildren(_zooKeeper, ZookeeperPathConstants.getOnlineControllersPath());
-        _watchConntrollers.watch(new OnChange() {
-          @Override
-          public void action(List<String> children) {
-            setConnections(children);
-          }
-        });
+        final ZooKeeperConntrollerWatchInfo zkcwi = new ZooKeeperConntrollerWatchInfo(conf);
+        _zkConnectionInfo.put(zooKeeperConnectionStr, zkcwi);
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
           @Override
           public void run() {
-            closeZooKeeper();
+            closeQuietly(zkcwi);
           }
         }));
       } catch (Exception e) {
@@ -201,39 +270,30 @@ public class BlurClient {
     }
   }
 
-  public static void closeZooKeeper() {
-    if (_watchConntrollers != null) {
-      try {
-        _watchConntrollers.close();
-      } catch (Exception e) {
-        LOG.error("Unknown error while closing Controller Watcher.", e);
-      }
-    }
-    if (_zooKeeper != null) {
-      try {
-        _zooKeeper.close();
-      } catch (Exception e) {
-        LOG.error("Unknown error while closing ZooKeeper client.", e);
-      }
+  public static void closeQuietly(Closeable closeable) {
+    try {
+      closeable.close();
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to close [{0}]", e);
     }
   }
 
-  private static void setConnections(List<String> children) {
-    Set<Connection> goodConnections = new HashSet<Connection>();
-    for (String s : children) {
-      Connection connection = new Connection(s);
-      goodConnections.add(connection);
-      if (!_connections.contains(connection)) {
-        _connections.add(connection);
-      }
+  public static void closeZooKeeper() {
+    Collection<ZooKeeperConntrollerWatchInfo> values = _zkConnectionInfo.values();
+    for (ZooKeeperConntrollerWatchInfo zooKeeperConntrollerWatchInfo : values) {
+      closeQuietly(zooKeeperConntrollerWatchInfo);
     }
-    Set<Connection> badConnections = new HashSet<Connection>();
-    for (Connection c : _connections) {
-      if (!goodConnections.contains(c)) {
-        badConnections.add(c);
-      }
+  }
+
+  public static Iface getClientFromZooKeeperConnectionStr(String zkConnectionString) {
+    BlurConfiguration blurConfiguration;
+    try {
+      blurConfiguration = new BlurConfiguration();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
-    _connections.removeAll(badConnections);
+    blurConfiguration.set(BLUR_ZOOKEEPER_CONNECTION, zkConnectionString);
+    return getClient(blurConfiguration);
   }
 
 }


Mime
View raw message