incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/3] git commit: Added some auto reconnect logic.
Date Sun, 28 Apr 2013 23:49:39 GMT
Updated Branches:
  refs/heads/0.1.5 7dd0ea550 -> c4ff7dafc


Added some auto reconnect logic.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/f8f33ae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/f8f33ae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/f8f33ae1

Branch: refs/heads/0.1.5
Commit: f8f33ae1a30a013e27f42b937da40c30041c571b
Parents: 7dd0ea5
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Apr 27 17:19:56 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Apr 27 17:19:56 2013 -0400

----------------------------------------------------------------------
 .../java/org/apache/blur/zookeeper/ZkUtils.java    |   13 +-
 .../org/apache/blur/zookeeper/ZooKeeperClient.java |  227 +++++++++++++++
 2 files changed, 239 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f8f33ae1/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZkUtils.java
----------------------------------------------------------------------
diff --git a/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZkUtils.java b/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZkUtils.java
index 7184e26..6998814 100644
--- a/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZkUtils.java
+++ b/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZkUtils.java
@@ -18,6 +18,7 @@ package org.apache.blur.zookeeper;
  */
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -62,12 +63,22 @@ public class ZkUtils {
 
   }
 
+  public static void pause(Object o) {
+    synchronized (o) {
+      try {
+        o.wait(TimeUnit.SECONDS.toMillis(1));
+      } catch (InterruptedException e) {
+        return;
+      }
+    }
+  }
+  
   public static ZooKeeper newZooKeeper(String zkConnectionString) throws IOException {
     int sessionTimeout = DEFAULT_ZK_SESSION_TIMEOUT;
     ConnectionWatcher watcher = new ConnectionWatcher();
     watcher.setSessionTimeout(sessionTimeout);
     watcher.setZkConnectionString(zkConnectionString);
-    return new ZooKeeper(zkConnectionString, sessionTimeout, watcher);
+    return new ZooKeeperClient(zkConnectionString, sessionTimeout, watcher);
   }
 
   public static void mkNodesStr(ZooKeeper zk, String path) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f8f33ae1/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZooKeeperClient.java b/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZooKeeperClient.java
new file mode 100644
index 0000000..9996eb7
--- /dev/null
+++ b/src/blur-util/src/main/java/org/apache/blur/zookeeper/ZooKeeperClient.java
@@ -0,0 +1,227 @@
+package org.apache.blur.zookeeper;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooKeeperClient extends ZooKeeper {
+
+  private static final Log LOG = LogFactory.getLog(ZooKeeperClient.class);
+
+  public ZooKeeperClient(String connectString, int sessionTimeout, Watcher watcher) throws
IOException {
+    super(connectString, sessionTimeout, watcher);
+  }
+
+  public ZooKeeperClient(String connectString, int sessionTimeout, Watcher watcher, boolean
canBeReadOnly)
+      throws IOException {
+    super(connectString, sessionTimeout, watcher, canBeReadOnly);
+  }
+
+  public ZooKeeperClient(String connectString, int sessionTimeout, Watcher watcher, long
sessionId,
+      byte[] sessionPasswd, boolean canBeReadOnly) throws IOException {
+    super(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, canBeReadOnly);
+  }
+
+  public ZooKeeperClient(String connectString, int sessionTimeout, Watcher watcher, long
sessionId, byte[] sessionPasswd)
+      throws IOException {
+    super(connectString, sessionTimeout, watcher, sessionId, sessionPasswd);
+  }
+
+  static abstract class ZKExecutor<T> {
+    abstract T execute() throws KeeperException, InterruptedException;
+  }
+
+  public <T> T execute(ZKExecutor<T> executor) throws KeeperException, InterruptedException
{
+    final long timestmap = System.currentTimeMillis();
+    int sessionTimeout = getSessionTimeout();
+    while (true) {
+      try {
+        return executor.execute();
+      } catch (KeeperException e) {
+        if (e.code() == Code.CONNECTIONLOSS && timestmap + sessionTimeout >= System.currentTimeMillis())
{
+          LOG.warn("Connection loss");
+          ZkUtils.pause(this);
+          continue;
+        }
+        throw e;
+      }
+    }
+  }
+
+  @Override
+  public String create(final String path, final byte[] data, final List<ACL> acl, final
CreateMode createMode)
+      throws KeeperException, InterruptedException {
+    return execute(new ZKExecutor<String>() {
+      @Override
+      String execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.create(path, data, acl, createMode);
+      }
+    });
+  }
+
+  @Override
+  public void delete(final String path, final int version) throws InterruptedException, KeeperException
{
+    execute(new ZKExecutor<Void>() {
+      @Override
+      Void execute() throws KeeperException, InterruptedException {
+        ZooKeeperClient.super.delete(path, version);
+        return null;
+      }
+    });
+  }
+
+  @Override
+  public List<OpResult> multi(final Iterable<Op> ops) throws InterruptedException,
KeeperException {
+    return execute(new ZKExecutor<List<OpResult>>() {
+      @Override
+      List<OpResult> execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.multi(ops);
+      }
+    });
+  }
+
+  @Override
+  public Stat exists(final String path, final Watcher watcher) throws KeeperException, InterruptedException
{
+    return execute(new ZKExecutor<Stat>() {
+      @Override
+      Stat execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.exists(path, watcher);
+      }
+    });
+  }
+
+  @Override
+  public Stat exists(final String path, final boolean watch) throws KeeperException, InterruptedException
{
+    return execute(new ZKExecutor<Stat>() {
+      @Override
+      Stat execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.exists(path, watch);
+      }
+    });
+  }
+
+  @Override
+  public byte[] getData(final String path, final Watcher watcher, final Stat stat) throws
KeeperException,
+      InterruptedException {
+    return execute(new ZKExecutor<byte[]>() {
+      @Override
+      byte[] execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.getData(path, watcher, stat);
+      }
+    });
+  }
+
+  @Override
+  public byte[] getData(final String path, final boolean watch, final Stat stat) throws KeeperException,
+      InterruptedException {
+    return execute(new ZKExecutor<byte[]>() {
+      @Override
+      byte[] execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.getData(path, watch, stat);
+      }
+    });
+  }
+
+  @Override
+  public Stat setData(final String path, final byte[] data, final int version) throws KeeperException,
+      InterruptedException {
+    return execute(new ZKExecutor<Stat>() {
+      @Override
+      Stat execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.setData(path, data, version);
+      }
+    });
+  }
+
+  @Override
+  public List<ACL> getACL(final String path, final Stat stat) throws KeeperException,
InterruptedException {
+    return execute(new ZKExecutor<List<ACL>>() {
+      @Override
+      List<ACL> execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.getACL(path, stat);
+      }
+    });
+  }
+
+  @Override
+  public Stat setACL(final String path, final List<ACL> acl, final int version) throws
KeeperException,
+      InterruptedException {
+    return execute(new ZKExecutor<Stat>() {
+      @Override
+      Stat execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.setACL(path, acl, version);
+      }
+    });
+  }
+
+  @Override
+  public List<String> getChildren(final String path, final Watcher watcher) throws
KeeperException,
+      InterruptedException {
+    return execute(new ZKExecutor<List<String>>() {
+      @Override
+      List<String> execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.getChildren(path, watcher);
+      }
+    });
+  }
+
+  @Override
+  public List<String> getChildren(final String path, final boolean watch) throws KeeperException,
InterruptedException {
+    return execute(new ZKExecutor<List<String>>() {
+      @Override
+      List<String> execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.getChildren(path, watch);
+      }
+    });
+  }
+
+  @Override
+  public List<String> getChildren(final String path, final Watcher watcher, final Stat
stat) throws KeeperException,
+      InterruptedException {
+    return execute(new ZKExecutor<List<String>>() {
+      @Override
+      List<String> execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.getChildren(path, watcher, stat);
+      }
+    });
+  }
+
+  @Override
+  public List<String> getChildren(final String path, final boolean watch, final Stat
stat) throws KeeperException,
+      InterruptedException {
+    return execute(new ZKExecutor<List<String>>() {
+      @Override
+      List<String> execute() throws KeeperException, InterruptedException {
+        return ZooKeeperClient.super.getChildren(path, watch, stat);
+      }
+    });
+  }
+
+}


Mime
View raw message