incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Fixing issue with BlurClient leaking ZooKeeper clients as well as not following a controller cluster through changes.
Date Thu, 16 Oct 2014 18:52:31 GMT
Fixing issue with BlurClient leaking ZooKeeper clients as well as not following a controller
cluster through changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/b2fe85ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/b2fe85ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/b2fe85ae

Branch: refs/heads/master
Commit: b2fe85ae44a07c831aa5d77a5e72efde57c7fd88
Parents: 63504e5
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Oct 16 14:52:16 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Oct 16 14:52:16 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/blur/thrift/BlurClient.java | 92 +++++++++++++++-----
 .../blur/thrift/util/ListControllers.java       | 40 +++++++++
 2 files changed, 112 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2fe85ae/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
index 8ba12f9..9065137 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
@@ -27,23 +27,29 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.blur.BlurConfiguration;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.commands.BlurCommand;
 import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.zookeeper.WatchChildren;
+import org.apache.blur.zookeeper.WatchChildren.OnChange;
 import org.apache.blur.zookeeper.ZkUtils;
 import org.apache.blur.zookeeper.ZookeeperPathConstants;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 
 public class BlurClient {
 
+  private static final Log LOG = LogFactory.getLog(BlurClient.class);
+
   public static class BlurClientInvocationHandler implements InvocationHandler {
 
     private final List<Connection> _connections;
@@ -94,6 +100,11 @@ public class BlurClient {
     }
   }
 
+  private static volatile BlurConfiguration _blurConfiguration;
+  private static final List<Connection> _connections = new CopyOnWriteArrayList<Connection>();
+  private static volatile ZooKeeper _zooKeeper;
+  private static WatchChildren _watchConntrollers;
+
   public static Iface getClient() {
     try {
       return getClient(getBlurConfiguration());
@@ -102,9 +113,6 @@ public class BlurClient {
     }
   }
 
-  private static volatile BlurConfiguration _blurConfiguration;
-  private static final AtomicReference<List<Connection>> _connections = new AtomicReference<List<Connection>>();
-
   private static synchronized BlurConfiguration getBlurConfiguration() throws IOException
{
     if (_blurConfiguration == null) {
       _blurConfiguration = new BlurConfiguration();
@@ -163,25 +171,69 @@ public class BlurClient {
   }
 
   private static List<Connection> getOnlineControllers(BlurConfiguration conf) {
-setupZooKeeper(conf);
+    setupZooKeeper(conf);
+    return _connections;
+  }
 
+  private static void setupZooKeeper(BlurConfiguration conf) {
+    if (_zooKeeper == null) {
+      String zkConn = conf.getExpected(BLUR_ZOOKEEPER_CONNECTION);
+      int zkSessionTimeout = conf.getInt(BLUR_ZOOKEEPER_TIMEOUT, BLUR_ZOOKEEPER_TIMEOUT_DEFAULT);
+      try {
+        _zooKeeper = ZkUtils.newZooKeeper(zkConn, zkSessionTimeout);
+        setConnections(_zooKeeper.getChildren(ZookeeperPathConstants.getOnlineControllersPath(),
false));
+        _watchConntrollers = new WatchChildren(_zooKeeper, ZookeeperPathConstants.getOnlineControllersPath());
+        _watchConntrollers.watch(new OnChange() {
+          @Override
+          public void action(List<String> children) {
+            setConnections(children);
+          }
+        });
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+          @Override
+          public void run() {
+            closeZooKeeper();
+          }
+        }));
+      } catch (Exception e) {
+        throw new RuntimeException("Unknown error while trying to connect to ZooKeeper and
fetch controllers.", e);
+      }
+    }
+  }
 
+  public static void closeZooKeeper() {
+    if (_watchConntrollers != null) {
+      try {
+        _watchConntrollers.close();
+      } catch (Exception e) {
+        LOG.error("Unknown error while closing Controller Watcher.", e);
+      }
+    }
+    if (_zooKeeper != null) {
+      try {
+        _zooKeeper.close();
+      } catch (Exception e) {
+        LOG.error("Unknown error while closing ZooKeeper client.", e);
+      }
+    }
   }
 
-  private static void setupZooKeeper(BlurConfiguration conf) {
-    String zkConn = conf.getExpected(BLUR_ZOOKEEPER_CONNECTION);
-    int zkSessionTimeout = conf.getInt(BLUR_ZOOKEEPER_TIMEOUT, BLUR_ZOOKEEPER_TIMEOUT_DEFAULT);
-    ZooKeeper zkClient = null;
-    try {
-      zkClient = ZkUtils.newZooKeeper(zkConn, zkSessionTimeout);
-      return zkClient.getChildren(ZookeeperPathConstants.getOnlineControllersPath(), false);
-    } catch (KeeperException e) {
-      throw new RuntimeException("Error communicating with Zookeeper", e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException("Error communicating with Zookeeper", e);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to initialize Zookeeper", e);
+  private static void setConnections(List<String> children) {
+    Set<Connection> goodConnections = new HashSet<Connection>();
+    for (String s : children) {
+      Connection connection = new Connection(s);
+      goodConnections.add(connection);
+      if (!_connections.contains(connection)) {
+        _connections.add(connection);
+      }
+    }
+    Set<Connection> badConnections = new HashSet<Connection>();
+    for (Connection c : _connections) {
+      if (!goodConnections.contains(c)) {
+        badConnections.add(c);
+      }
     }
+    _connections.removeAll(badConnections);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b2fe85ae/blur-thrift/src/main/java/org/apache/blur/thrift/util/ListControllers.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/ListControllers.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/ListControllers.java
new file mode 100644
index 0000000..b8ad705
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/ListControllers.java
@@ -0,0 +1,40 @@
+package org.apache.blur.thrift.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.utils.BlurConstants;
+
+public class ListControllers {
+
+  public static void main(String[] args) throws BlurException, TException, IOException, InterruptedException
{
+    BlurConfiguration blurConfiguration = new BlurConfiguration();
+    blurConfiguration.set(BlurConstants.BLUR_ZOOKEEPER_CONNECTION, "localhost");
+    Iface client = BlurClient.getClient(blurConfiguration);
+
+    while (true) {
+      System.out.println(client.controllerServerList());
+      Thread.sleep(1000);
+    }
+  }
+}


Mime
View raw message