incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Blur now has a faster safemode startup, and the dependency on the time on all the servers to be insync has been removed.
Date Tue, 30 Apr 2013 02:48:42 GMT
Updated Branches:
  refs/heads/0.1.5 c4ff7dafc -> 53cf9b125


Blur now has a faster safemode startup, and the dependency on the time on all the servers
to be insync has been removed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4b5a579a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4b5a579a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4b5a579a

Branch: refs/heads/0.1.5
Commit: 4b5a579a84f66cba3b747c29febe15a30fa02328
Parents: c4ff7da
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Apr 29 22:46:08 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Apr 29 22:46:08 2013 -0400

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java        |   24 +--
 .../apache/blur/manager/indexserver/SafeMode.java  |  190 +++++++++++++++
 .../blur/thrift/ThriftBlurControllerServer.java    |    2 -
 .../apache/blur/thrift/ThriftBlurShardServer.java  |   11 -
 .../apache/blur/thrift/ZookeeperSystemTime.java    |   51 ----
 .../blur/manager/indexserver/SafeModeTest.java     |  155 ++++++++++++
 6 files changed, 353 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4b5a579a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index a1421fd..71ab719 100644
--- a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -157,14 +157,13 @@ public class DistributedIndexServer extends AbstractIndexServer {
     _closer = new IndexInputCloser();
     _closer.init();
     setupFlushCacheTimer();
-    String lockPath = BlurUtil.lockForSafeMode(_zookeeper, getNodeName(), _cluster);
-    try {
-      registerMyself();
-      setupSafeMode();
-    } finally {
-      BlurUtil.unlockForSafeMode(_zookeeper, lockPath);
-    }
-    waitInSafeModeIfNeeded();
+    
+    registerMyselfAsMemberOfCluster();
+    String onlineShardsPath = ZookeeperPathConstants.getOnlineShardsPath(_cluster);
+    String safemodePath = ZookeeperPathConstants.getSafemodePath(_cluster);
+    SafeMode safeMode = new SafeMode(_zookeeper, safemodePath, onlineShardsPath, TimeUnit.SECONDS,
5, TimeUnit.SECONDS, 60);
+    safeMode.registerNode(getNodeName(), BlurUtil.getVersion().getBytes());
+
     _running.set(true);
     setupTableWarmer();
     watchForShardServerChanges();
@@ -308,20 +307,13 @@ public class DistributedIndexServer extends AbstractIndexServer {
     }, _delay, _delay);
   }
 
-  private void registerMyself() {
+  private void registerMyselfAsMemberOfCluster() {
     String nodeName = getNodeName();
     String registeredShardsPath = ZookeeperPathConstants.getRegisteredShardsPath(_cluster)
+ "/" + nodeName;
-    String onlineShardsPath = ZookeeperPathConstants.getOnlineShardsPath(_cluster) + "/"
+ nodeName;
     try {
       if (_zookeeper.exists(registeredShardsPath, false) == null) {
         _zookeeper.create(registeredShardsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
       }
-      while (_zookeeper.exists(onlineShardsPath, false) != null) {
-        LOG.info("Node [{0}] already registered, waiting for path [{1}] to be released",
nodeName, onlineShardsPath);
-        Thread.sleep(3000);
-      }
-      String version = BlurUtil.getVersion();
-      _zookeeper.create(onlineShardsPath, version.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
     } catch (KeeperException e) {
       throw new RuntimeException(e);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4b5a579a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/SafeMode.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/SafeMode.java
b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/SafeMode.java
new file mode 100644
index 0000000..f5392ce
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/manager/indexserver/SafeMode.java
@@ -0,0 +1,190 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * This class controls the startup of the cluster. Basically the first node
+ * online waits until there is no more nodes that have started. The period that
+ * is required to have no activity is the waittime passed in through the
+ * constructor. If a new node comes online while the leader is waiting, the wait
+ * starts over. Once the wait period has been exhausted the cluster is to be
+ * settled and can now come online.
+ * 
+ */
+public class SafeMode {
+
+  private static final Log LOG = LogFactory.getLog(SafeMode.class);
+  private static final String STARTUP = "STARTUP";
+  private static final String SETUP = "SETUP";
+
+  private final ZooKeeper zooKeeper;
+  private final String lockPath;
+  private final long waitTime;
+  private final Object lock = new Object();
+  private final Watcher watcher = new Watcher() {
+    @Override
+    public void process(WatchedEvent event) {
+      synchronized (lock) {
+        lock.notify();
+      }
+    }
+  };
+  private final Map<String, String> lockMap = new HashMap<String, String>();
+  private final String nodePath;
+  private final long duplicateNodeTimeout;
+
+  public SafeMode(ZooKeeper zooKeeper, String lockPath, String nodePath, TimeUnit waitTimeUnit,
long waitTime,
+      TimeUnit duplicateNodeTimeoutTimeUnit, long duplicateNodeTimeout) {
+    this.zooKeeper = zooKeeper;
+    this.lockPath = lockPath;
+    this.waitTime = waitTimeUnit.toMillis(waitTime);
+    this.duplicateNodeTimeout = duplicateNodeTimeoutTimeUnit.toNanos(duplicateNodeTimeout);
+    this.nodePath = nodePath;
+  }
+
+  public void registerNode(String node, byte[] data) throws KeeperException, InterruptedException
{
+    lock(SETUP);
+    register(node, data);
+    if (isLeader(node)) {
+      // Create barrier for cluster
+      lock(STARTUP);
+
+      // Allow other nodes to register
+      unlock(SETUP);
+      waitForClusterToSettle();
+      unlock(STARTUP);
+    } else {
+      // Allow other nodes to register
+      unlock(SETUP);
+
+      // Block waiting on cluster to settle
+      lock(STARTUP);
+      unlock(STARTUP);
+    }
+  }
+
+  private void waitForClusterToSettle() throws InterruptedException, KeeperException {
+    long startingWaitTime = System.currentTimeMillis();
+    List<String> prev = null;
+    while (true) {
+      synchronized (lock) {
+        List<String> children = new ArrayList<String>(zooKeeper.getChildren(nodePath,
watcher));
+        Collections.sort(children);
+        if (children.equals(prev)) {
+          LOG.info("Clustered has settled.");
+          return;
+        } else {
+          prev = children;
+          LOG.info("Waiting for cluster to settle, current size [" + children.size() + "]
total time waited so far ["
+              + (System.currentTimeMillis() - startingWaitTime) + " ms] waiting another ["
+ waitTime + " ms].");
+          lock.wait(waitTime);
+        }
+      }
+    }
+  }
+
+  private boolean isLeader(String node) throws KeeperException, InterruptedException {
+    List<String> children = zooKeeper.getChildren(nodePath, false);
+    if (children.size() == 1) {
+      String n = children.get(0);
+      if (!n.equals(node)) {
+        throw new RuntimeException("We got a problem here!  Only one node register [" + n
+ "] and I'm not it [" + node
+            + "]");
+      }
+      return true;
+    }
+    return false;
+  }
+
+  private void unlock(String name) throws InterruptedException, KeeperException {
+    if (!lockMap.containsKey(name)) {
+      throw new RuntimeException("Lock [" + name + "] has not be created.");
+    }
+    String lockPath = lockMap.get(name);
+    LOG.debug("Unlocking on path [" + lockPath + "] with name [" + name + "]");
+    zooKeeper.delete(lockPath, -1);
+  }
+
+  private void register(String node, byte[] data) throws KeeperException, InterruptedException
{
+    String p = nodePath + "/" + node;
+    long start = System.nanoTime();
+    while (zooKeeper.exists(p, false) != null) {
+      if (start + duplicateNodeTimeout < System.nanoTime()) {
+        throw new RuntimeException("Node [" + node + "] cannot be registered, check to make
sure a "
+            + "process has not already been started or that server" + " names have not been
duplicated.");
+      }
+      LOG.info("Node [{0}] already registered, waiting for path [{1}] to be released", node,
p);
+      String tmpPath = p + "_" + UUID.randomUUID();
+      zooKeeper.create(tmpPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+      Thread.sleep(1000);
+      zooKeeper.delete(tmpPath, -1);
+    }
+    zooKeeper.create(p, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+  }
+
+  private void lock(String name) throws KeeperException, InterruptedException {
+    if (lockMap.containsKey(name)) {
+      throw new RuntimeException("Lock [" + name + "] already created.");
+    }
+    String newPath = zooKeeper.create(lockPath + "/" + name + "_", null, Ids.OPEN_ACL_UNSAFE,
+        CreateMode.EPHEMERAL_SEQUENTIAL);
+    lockMap.put(name, newPath);
+    while (true) {
+      synchronized (lock) {
+        List<String> children = getOnlyThisLocksChildren(name, zooKeeper.getChildren(lockPath,
watcher));
+        Collections.sort(children);
+        String firstElement = children.get(0);
+        if ((lockPath + "/" + firstElement).equals(newPath)) {
+          // yay!, we got the lock
+          LOG.debug("Lock on path [" + lockPath + "] with name [" + name + "]");
+          return;
+        } else {
+          LOG.debug("Waiting for lock on path [" + lockPath + "] with name [" + name + "]");
+          lock.wait();
+        }
+      }
+    }
+  }
+
+  private List<String> getOnlyThisLocksChildren(String name, List<String> children)
{
+    List<String> result = new ArrayList<String>();
+    for (String c : children) {
+      if (c.startsWith(name + "_")) {
+        result.add(c);
+      }
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4b5a579a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index 7e944b9..ee22f3e 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -37,7 +37,6 @@ import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
-import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE;
 import static org.apache.blur.utils.BlurUtil.quietClose;
 
 import java.util.concurrent.TimeUnit;
@@ -86,7 +85,6 @@ public class ThriftBlurControllerServer extends ThriftServer {
     BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);
 
     final ZooKeeper zooKeeper = ZkUtils.newZooKeeper(zkConnectionStr);
-    ZookeeperSystemTime.checkSystemTime(zooKeeper, configuration.getLong(BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE,
3000));
 
     BlurUtil.setupZookeeper(zooKeeper);
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4b5a579a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index cc4c923..1948bbf 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -34,7 +34,6 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_OPENER_THREAD_COUNT
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SAFEMODEDELAY;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
-import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE;
 import static org.apache.blur.utils.BlurUtil.quietClose;
 
 import java.lang.management.ManagementFactory;
@@ -72,8 +71,6 @@ import org.apache.blur.zookeeper.ZkUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.server.TServlet;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooKeeper;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.jetty.webapp.WebAppContext;
@@ -161,14 +158,6 @@ public class ThriftBlurShardServer extends ThriftServer {
     BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);
 
     final ZooKeeper zooKeeper = ZkUtils.newZooKeeper(zkConnectionStr);
-    try {
-      ZookeeperSystemTime.checkSystemTime(zooKeeper, configuration.getLong(BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE,
3000));
-    } catch (KeeperException e) {
-      if (e.code() == Code.CONNECTIONLOSS) {
-        System.err.println("Cannot connect zookeeper to [" + zkConnectionStr + "]");
-        System.exit(1);
-      }
-    }
 
     BlurUtil.setupZookeeper(zooKeeper, configuration.get(BLUR_CLUSTER_NAME));
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4b5a579a/src/blur-core/src/main/java/org/apache/blur/thrift/ZookeeperSystemTime.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ZookeeperSystemTime.java b/src/blur-core/src/main/java/org/apache/blur/thrift/ZookeeperSystemTime.java
deleted file mode 100644
index 98de816..0000000
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ZookeeperSystemTime.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.blur.thrift;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.zookeeper.ZkUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-
-public class ZookeeperSystemTime {
-  public static void main(String[] args) throws InterruptedException, KeeperException, IOException,
BlurException {
-    final ZooKeeper zooKeeper = ZkUtils.newZooKeeper("localhost");
-    long tolerance = 3000;
-    checkSystemTime(zooKeeper, tolerance);
-  }
-
-  public static void checkSystemTime(ZooKeeper zooKeeper, long tolerance) throws KeeperException,
InterruptedException, BlurException {
-    String path = zooKeeper.create("/" + UUID.randomUUID().toString(), null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
-    long now = System.currentTimeMillis();
-    Stat stat = zooKeeper.exists(path, false);
-    zooKeeper.delete(path, -1);
-    long ctime = stat.getCtime();
-
-    long low = now - tolerance;
-    long high = now + tolerance;
-    if (!(low <= ctime && ctime <= high)) {
-      throw new BlurException("The system time is too far out of sync with Zookeeper, check
your system time and try again.", null);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4b5a579a/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
b/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
new file mode 100644
index 0000000..612d2c5
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/manager/indexserver/SafeModeTest.java
@@ -0,0 +1,155 @@
+package org.apache.blur.manager.indexserver;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.blur.MiniCluster;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SafeModeTest {
+
+  private static String path = "./target/test-zk";
+  private static ZooKeeper zk;
+
+  @BeforeClass
+  public static void startZooKeeper() throws IOException {
+    new File(path).mkdirs();
+    MiniCluster.startZooKeeper(path);
+    zk = new ZooKeeper("localhost", 20000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+  }
+
+  @AfterClass
+  public static void stopZooKeeper() throws InterruptedException {
+    zk.close();
+    MiniCluster.shutdownZooKeeper();
+  }
+
+  @Test
+  public void testBasicStartup() throws IOException, InterruptedException {
+    List<AtomicReference<Throwable>> errors = new ArrayList<AtomicReference<Throwable>>();
+    List<AtomicLong> timeRegisteredLst = new ArrayList<AtomicLong>();
+    List<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < 32; i++) {
+      AtomicReference<Throwable> ref = new AtomicReference<Throwable>();
+      AtomicLong timeRegistered = new AtomicLong();
+      errors.add(ref);
+      timeRegisteredLst.add(timeRegistered);
+      threads.add(startThread(zk, "node" + i, ref, timeRegistered));
+      Thread.sleep(100);
+    }
+
+    for (Thread t : threads) {
+      t.join();
+    }
+
+    for (AtomicReference<Throwable> t : errors) {
+      assertNull(t.get());
+    }
+
+    long oldest = -1;
+    long newest = -1;
+    for (AtomicLong time : timeRegisteredLst) {
+      long l = time.get();
+      if (oldest == -1 || l < oldest) {
+        oldest = l;
+      }
+      if (newest == -1 || l > newest) {
+        newest = l;
+      }
+    }
+    assertTrue((newest - oldest) < TimeUnit.SECONDS.toMillis(1));
+  }
+
+  @Test
+  public void testExtraNodeStartup() throws IOException, InterruptedException, KeeperException
{
+    ZooKeeper zk = new ZooKeeper("localhost", 20000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+
+    SafeMode safeMode = new SafeMode(zk, "/testing/safemode", "/testing/nodepath", TimeUnit.SECONDS,
5,
+        TimeUnit.SECONDS, 60);
+    long s = System.nanoTime();
+    safeMode.registerNode("node101", null);
+    long e = System.nanoTime();
+
+    assertTrue((e - s) < TimeUnit.SECONDS.toNanos(1));
+    zk.close();
+  }
+
+  @Test
+  public void testSecondNodeStartup() throws IOException, InterruptedException, KeeperException
{
+    ZooKeeper zk = new ZooKeeper("localhost", 20000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+
+    SafeMode safeMode = new SafeMode(zk, "/testing/safemode", "/testing/nodepath", TimeUnit.SECONDS,
5,
+        TimeUnit.SECONDS, 15);
+    try {
+      safeMode.registerNode("node10", null);
+      fail("should throw exception.");
+    } catch (Exception e) {
+    }
+    zk.close();
+  }
+
+  private Thread startThread(final ZooKeeper zk, final String node, final AtomicReference<Throwable>
errorRef,
+      final AtomicLong timeRegistered) {
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          SafeMode safeMode = new SafeMode(zk, "/testing/safemode", "/testing/nodepath",
TimeUnit.SECONDS, 5,
+              TimeUnit.SECONDS, 60);
+          safeMode.registerNode(node, null);
+          timeRegistered.set(System.currentTimeMillis());
+        } catch (Throwable t) {
+          errorRef.set(t);
+        }
+      }
+    };
+    Thread thread = new Thread(runnable);
+    thread.start();
+    return thread;
+  }
+
+}


Mime
View raw message