Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BF9502009F3 for ; Fri, 6 May 2016 01:09:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BE48D160A05; Thu, 5 May 2016 23:09:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BC8EF160A04 for ; Fri, 6 May 2016 01:09:19 +0200 (CEST) Received: (qmail 26265 invoked by uid 500); 5 May 2016 23:09:19 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 26255 invoked by uid 99); 5 May 2016 23:09:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 May 2016 23:09:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D0C1EDFAAB; Thu, 5 May 2016 23:09:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Message-Id: <22a421f211c043cf84faa58590a82ae1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-15292 Refined ZooKeeperWatcher to prevent ZooKeeper's callback while construction (Hiroshi Ikeda) Date: Thu, 5 May 2016 23:09:18 +0000 (UTC) archived-at: Thu, 05 May 2016 23:09:20 -0000 Repository: hbase Updated Branches: refs/heads/branch-1.3 5c3a18e76 -> 6844474a1 HBASE-15292 Refined ZooKeeperWatcher to prevent ZooKeeper's callback while construction (Hiroshi Ikeda) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6844474a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6844474a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6844474a Branch: refs/heads/branch-1.3 Commit: 6844474a122739f2cf42553280687bbef9bcd24a Parents: 5c3a18e Author: tedyu Authored: Thu May 5 16:09:17 2016 -0700 Committer: tedyu Committed: Thu May 5 16:09:17 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/zookeeper/InstancePending.java | 80 ++++++++++++++++++++ .../hadoop/hbase/zookeeper/PendingWatcher.java | 53 +++++++++++++ .../hbase/zookeeper/ZooKeeperWatcher.java | 40 ++-------- .../hbase/zookeeper/TestInstancePending.java | 49 ++++++++++++ 4 files changed, 187 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6844474a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java new file mode 100644 index 0000000..7464dbb --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/InstancePending.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.util.concurrent.CountDownLatch; + +/** + * Placeholder of an instance which will be accessed by other threads + * but is not yet created. Thread safe. + */ +class InstancePending { + // Based on a subtle part of the Java Language Specification, + // in order to avoid a slight overhead of synchronization for each access. + + private final CountDownLatch pendingLatch = new CountDownLatch(1); + + /** Piggybacking on {@code pendingLatch}. */ + private InstanceHolder instanceHolder; + + private static class InstanceHolder { + // The JLS ensures the visibility of a final field and its contents + // unless they are exposed to another thread while the construction. + final T instance; + + InstanceHolder(T instance) { + this.instance = instance; + } + } + + /** + * Returns the instance given by the method {@link #prepare}. + * This is an interruptible blocking method + * and the interruption flag will be set just before returning if any. + */ + T get() { + InstanceHolder instanceHolder; + boolean interrupted = false; + + while ((instanceHolder = this.instanceHolder) == null) { + try { + pendingLatch.await(); + } catch (InterruptedException e) { + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + return instanceHolder.instance; + } + + /** + * Associates the given instance for the method {@link #get}. + * This method should be called once, and {@code instance} should be non-null. + * This method is expected to call as soon as possible + * because the method {@code get} is uninterruptibly blocked until this method is called. + */ + void prepare(T instance) { + assert instance != null; + instanceHolder = new InstanceHolder(instance); + pendingLatch.countDown(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6844474a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java new file mode 100644 index 0000000..11d0e5d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +/** + * Placeholder of a watcher which might be triggered before the instance is not yet created. + *

+ * {@code ZooKeeper} starts its event thread within its constructor (and that is an anti-pattern), + * and the watcher passed to the constructor might be called back by the event thread + * before you get the instance of {@code ZooKeeper} from the constructor. + * If your watcher calls methods of {@code ZooKeeper}, + * pass this placeholder to the constructor of the {@code ZooKeeper}, + * create your watcher using the instance of {@code ZooKeeper}, + * and then call the method {@code PendingWatcher.prepare}. + */ +class PendingWatcher implements Watcher { + private final InstancePending pending = new InstancePending(); + + @Override + public void process(WatchedEvent event) { + pending.get().process(event); + } + + /** + * Associates the substantial watcher of processing events. + * This method should be called once, and {@code watcher} should be non-null. + * This method is expected to call as soon as possible + * because the event processing, being invoked by the ZooKeeper event thread, + * is uninterruptibly blocked until this method is called. + */ + void prepare(Watcher watcher) { + pending.prepare(watcher); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6844474a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index f17acb8..7670ce7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -75,7 +75,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private String quorum; // zookeeper connection - private RecoverableZooKeeper recoverableZooKeeper; + private final RecoverableZooKeeper recoverableZooKeeper; // abortable in case of zk failure protected Abortable abortable; @@ -140,8 +140,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private final Configuration conf; - private final Exception constructorCaller; - /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); @@ -172,13 +170,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { Abortable abortable, boolean canCreateBaseZNode) throws IOException, ZooKeeperConnectionException { this.conf = conf; - // Capture a stack trace now. Will print it out later if problem so we can - // distingush amongst the myriad ZKWs. - try { - throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING"); - } catch (Exception e) { - this.constructorCaller = e; - } this.quorum = ZKConfig.getZKQuorumServersString(conf); this.prefix = identifier; // Identifier will get the sessionid appended later below down when we @@ -186,7 +177,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { this.identifier = identifier + "0x0"; this.abortable = abortable; setNodeNames(conf); - this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier); + PendingWatcher pendingWatcher = new PendingWatcher(); + this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier); + pendingWatcher.prepare(this); if (canCreateBaseZNode) { createBaseZNodes(); } @@ -665,27 +658,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private void connectionEvent(WatchedEvent event) { switch(event.getState()) { case SyncConnected: - // Now, this callback can be invoked before the this.zookeeper is set. - // Wait a little while. - long finished = System.currentTimeMillis() + - this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000); - while (System.currentTimeMillis() < finished) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - LOG.warn("Interrupted while sleeping"); - throw new RuntimeException("Interrupted while waiting for" + - " recoverableZooKeeper is set"); - } - if (this.recoverableZooKeeper != null) break; - } - - if (this.recoverableZooKeeper == null) { - LOG.error("ZK is null on connection event -- see stack trace " + - "for the stack trace when constructor was called on this zkw", - this.constructorCaller); - throw new NullPointerException("ZK is null"); - } this.identifier = this.prefix + "-0x" + Long.toHexString(this.recoverableZooKeeper.getSessionId()); // Update our identifier. Otherwise ignore. @@ -774,9 +746,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { @Override public void close() { try { - if (recoverableZooKeeper != null) { - recoverableZooKeeper.close(); - } + recoverableZooKeeper.close(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6844474a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java new file mode 100644 index 0000000..667fed8 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestInstancePending { + @Test(timeout = 1000) + public void test() throws Exception { + final InstancePending pending = new InstancePending(); + final AtomicReference getResultRef = new AtomicReference(); + + new Thread() { + @Override + public void run() { + getResultRef.set(pending.get()); + } + }.start(); + + Thread.sleep(100); + Assert.assertNull(getResultRef.get()); + + pending.prepare("abc"); + Thread.sleep(100); + Assert.assertEquals("abc", getResultRef.get()); + } +}