hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-15292 Refined ZooKeeperWatcher to prevent ZooKeeper's callback while construction (Hiroshi Ikeda)
Date Thu, 05 May 2016 23:09:18 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 5c3a18e76 -> 6844474a1


HBASE-15292 Refined ZooKeeperWatcher to prevent ZooKeeper's callback while construction (Hiroshi
Ikeda)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6844474a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6844474a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6844474a

Branch: refs/heads/branch-1.3
Commit: 6844474a122739f2cf42553280687bbef9bcd24a
Parents: 5c3a18e
Author: tedyu <yuzhihong@gmail.com>
Authored: Thu May 5 16:09:17 2016 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Thu May 5 16:09:17 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/zookeeper/InstancePending.java | 80 ++++++++++++++++++++
 .../hadoop/hbase/zookeeper/PendingWatcher.java  | 53 +++++++++++++
 .../hbase/zookeeper/ZooKeeperWatcher.java       | 40 ++--------
 .../hbase/zookeeper/TestInstancePending.java    | 49 ++++++++++++
 4 files changed, 187 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6844474a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java
new file mode 100644
index 0000000..7464dbb
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Placeholder of an instance which will be accessed by other threads
+ * but is not yet created. Thread safe.
+ */
+class InstancePending<T> {
+  // Based on a subtle part of the Java Language Specification,
+  // in order to avoid a slight overhead of synchronization for each access.
+
+  private final CountDownLatch pendingLatch = new CountDownLatch(1);
+
+  /** Piggybacking on {@code pendingLatch}. */
+  private InstanceHolder<T> instanceHolder;
+
+  private static class InstanceHolder<T> {
+    // The JLS ensures the visibility of a final field and its contents
+    // unless they are exposed to another thread while the construction.
+    final T instance;
+
+    InstanceHolder(T instance) {
+      this.instance = instance;
+    }
+  }
+
+  /**
+   * Returns the instance given by the method {@link #prepare}.
+   * This is an interruptible blocking method
+   * and the interruption flag will be set just before returning if any.
+   */
+  T get() {
+    InstanceHolder<T> instanceHolder;
+    boolean interrupted = false;
+
+    while ((instanceHolder = this.instanceHolder) == null) {
+      try {
+        pendingLatch.await();
+      } catch (InterruptedException e) {
+        interrupted = true;
+      }
+    }
+
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+    return instanceHolder.instance;
+  }
+
+  /**
+   * Associates the given instance for the method {@link #get}.
+   * This method should be called once, and {@code instance} should be non-null.
+   * This method is expected to call as soon as possible
+   * because the method {@code get} is uninterruptibly blocked until this method is called.
+   */
+  void prepare(T instance) {
+    assert instance != null;
+    instanceHolder = new InstanceHolder<T>(instance);
+    pendingLatch.countDown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6844474a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java
new file mode 100644
index 0000000..11d0e5d
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Placeholder of a watcher which might be triggered before the instance is not yet created.
+ * <p>
+ * {@code ZooKeeper} starts its event thread within its constructor (and that is an anti-pattern),
+ * and the watcher passed to the constructor might be called back by the event thread
+ * before you get the instance of {@code ZooKeeper} from the constructor.
+ * If your watcher calls methods of {@code ZooKeeper},
+ * pass this placeholder to the constructor of the {@code ZooKeeper},
+ * create your watcher using the instance of {@code ZooKeeper},
+ * and then call the method {@code PendingWatcher.prepare}.
+ */
+class PendingWatcher implements Watcher {
+  private final InstancePending<Watcher> pending = new InstancePending<Watcher>();
+
+  @Override
+  public void process(WatchedEvent event) {
+    pending.get().process(event);
+  }
+
+  /**
+   * Associates the substantial watcher of processing events.
+   * This method should be called once, and {@code watcher} should be non-null.
+   * This method is expected to call as soon as possible
+   * because the event processing, being invoked by the ZooKeeper event thread,
+   * is uninterruptibly blocked until this method is called.
+   */
+  void prepare(Watcher watcher) {
+    pending.prepare(watcher);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6844474a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index f17acb8..7670ce7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -75,7 +75,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
   private String quorum;
 
   // zookeeper connection
-  private RecoverableZooKeeper recoverableZooKeeper;
+  private final RecoverableZooKeeper recoverableZooKeeper;
 
   // abortable in case of zk failure
   protected Abortable abortable;
@@ -140,8 +140,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable
{
 
   private final Configuration conf;
 
-  private final Exception constructorCaller;
-
   /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
   private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
 
@@ -172,13 +170,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable
{
       Abortable abortable, boolean canCreateBaseZNode)
   throws IOException, ZooKeeperConnectionException {
     this.conf = conf;
-    // Capture a stack trace now.  Will print it out later if problem so we can
-    // distingush amongst the myriad ZKWs.
-    try {
-      throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
-    } catch (Exception e) {
-      this.constructorCaller = e;
-    }
     this.quorum = ZKConfig.getZKQuorumServersString(conf);
     this.prefix = identifier;
     // Identifier will get the sessionid appended later below down when we
@@ -186,7 +177,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable
{
     this.identifier = identifier + "0x0";
     this.abortable = abortable;
     setNodeNames(conf);
-    this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
+    PendingWatcher pendingWatcher = new PendingWatcher();
+    this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
+    pendingWatcher.prepare(this);
     if (canCreateBaseZNode) {
       createBaseZNodes();
     }
@@ -665,27 +658,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable
{
   private void connectionEvent(WatchedEvent event) {
     switch(event.getState()) {
       case SyncConnected:
-        // Now, this callback can be invoked before the this.zookeeper is set.
-        // Wait a little while.
-        long finished = System.currentTimeMillis() +
-          this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
-        while (System.currentTimeMillis() < finished) {
-          try {
-            Thread.sleep(1);
-          } catch (InterruptedException e) {
-            LOG.warn("Interrupted while sleeping");
-            throw new RuntimeException("Interrupted while waiting for" +
-                " recoverableZooKeeper is set");
-          }
-          if (this.recoverableZooKeeper != null) break;
-        }
-
-        if (this.recoverableZooKeeper == null) {
-          LOG.error("ZK is null on connection event -- see stack trace " +
-            "for the stack trace when constructor was called on this zkw",
-            this.constructorCaller);
-          throw new NullPointerException("ZK is null");
-        }
         this.identifier = this.prefix + "-0x" +
           Long.toHexString(this.recoverableZooKeeper.getSessionId());
         // Update our identifier.  Otherwise ignore.
@@ -774,9 +746,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable
{
   @Override
   public void close() {
     try {
-      if (recoverableZooKeeper != null) {
-        recoverableZooKeeper.close();
-      }
+      recoverableZooKeeper.close();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6844474a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java
new file mode 100644
index 0000000..667fed8
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestInstancePending {
+  @Test(timeout = 1000)
+  public void test() throws Exception {
+    final InstancePending<String> pending = new InstancePending<String>();
+    final AtomicReference<String> getResultRef = new AtomicReference<String>();
+
+    new Thread() {
+      @Override
+      public void run() {
+        getResultRef.set(pending.get());
+      }
+    }.start();
+
+    Thread.sleep(100);
+    Assert.assertNull(getResultRef.get());
+
+    pending.prepare("abc");
+    Thread.sleep(100);
+    Assert.assertEquals("abc", getResultRef.get());
+  }
+}


Mime
View raw message