Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 17E62111E4 for ; Mon, 18 Aug 2014 21:00:40 +0000 (UTC) Received: (qmail 5680 invoked by uid 500); 18 Aug 2014 21:00:40 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 5655 invoked by uid 500); 18 Aug 2014 21:00:40 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 5644 invoked by uid 99); 18 Aug 2014 21:00:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Aug 2014 21:00:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Aug 2014 21:00:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E305D2388A36; Mon, 18 Aug 2014 21:00:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1618732 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ Date: Mon, 18 Aug 2014 21:00:15 -0000 To: commits@zookeeper.apache.org From: ivank@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140818210015.E305D2388A36@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ivank Date: Mon Aug 18 21:00:15 2014 New Revision: 1618732 URL: http://svn.apache.org/r1618732 Log: BOOKKEEPER-704: reconnectable zookeeper client wrapper (sijie via ivank) Change-Id: I00c73788f4ed5911713906b4d7622ca6dcec79a5 Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/BoundExponentialBackoffRetryPolicy.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/RetryPolicy.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooWorker.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1618732&r1=1618731&r2=1618732&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Aug 18 21:00:15 2014 @@ -212,6 +212,8 @@ Trunk (unreleased changes) BOOKKEEPER-774: Flaky test org.apache.bookkeeper.test.ReadOnlyBookieTest.testBookieShouldTurnWritableFromReadOnly (sijie) + BOOKKEEPER-704: reconnectable zookeeper client wrapper (sijie via ivank) + bookkeeper-benchmark: BOOKKEEPER-768: fix typo 'seconds' to milliseconds in benchmark output (jialin via sijie) Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/BoundExponentialBackoffRetryPolicy.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/BoundExponentialBackoffRetryPolicy.java?rev=1618732&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/BoundExponentialBackoffRetryPolicy.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/BoundExponentialBackoffRetryPolicy.java Mon Aug 18 21:00:15 2014 @@ -0,0 +1,41 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.zookeeper; + +/** + * Retry policy that retries a set number of times with an increasing (up to a + * maximum bound) backoff time between retries. + */ +public class BoundExponentialBackoffRetryPolicy extends ExponentialBackoffRetryPolicy { + + private final long maxBackoffTime; + + public BoundExponentialBackoffRetryPolicy(long baseBackoffTime, long maxBackoffTime, int maxRetries) { + super(baseBackoffTime, maxRetries); + this.maxBackoffTime = maxBackoffTime; + } + + @Override + public long nextRetryWaitTime(int retryCount, long elapsedRetryTime) { + return Math.min(maxBackoffTime, super.nextRetryWaitTime(retryCount, elapsedRetryTime)); + } + +} Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java?rev=1618732&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ExponentialBackoffRetryPolicy.java Mon Aug 18 21:00:15 2014 @@ -0,0 +1,47 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.zookeeper; + +import java.util.Random; + +public class ExponentialBackoffRetryPolicy implements RetryPolicy { + + private final Random random; + private final int maxRetries; + private final long baseBackoffTime; + + public ExponentialBackoffRetryPolicy(long baseBackoffTime, int maxRetries) { + this.maxRetries = maxRetries; + this.baseBackoffTime = baseBackoffTime; + this.random = new Random(System.currentTimeMillis()); + } + + @Override + public boolean allowRetry(int retryCount, long elapsedRetryTime) { + return retryCount <= maxRetries; + } + + @Override + public long nextRetryWaitTime(int retryCount, long elapsedRetryTime) { + return baseBackoffTime * Math.max(1, random.nextInt(1 << (retryCount + 1))); + } + +} Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/RetryPolicy.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/RetryPolicy.java?rev=1618732&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/RetryPolicy.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/RetryPolicy.java Mon Aug 18 21:00:15 2014 @@ -0,0 +1,55 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.zookeeper; + +/** + * Interface of the policy to use when retrying operations. + */ +public interface RetryPolicy { + + /** + * Called when retrying an operation failed for some reason. Return true if + * another attempt is allowed to make. + * + * @param retryCount + * The number of times retried so far (1 for the first time). + * @param elapsedRetryTime + * The elapsed time since the operation attempted. (in + * milliseconds) + * @return true if anther attempt is allowed to make. otherwise, false. + */ + public boolean allowRetry(int retryCount, long elapsedRetryTime); + + /** + * Called before making an attempt to retry a failed operation. Return 0 if + * an attempt needs to be made immediately. + * + * @param retryCount + * The number of times retried so far (0 for the first time). + * @param elapsedRetryTime + * The elapsed time since the operation attempted. (in + * milliseconds) + * @return the elapsed time that the attempt needs to wait before retrying. + * (in milliseconds) + */ + public long nextRetryWaitTime(int retryCount, long elapsedRetryTime); + +} Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java?rev=1618732&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java Mon Aug 18 21:00:15 2014 @@ -0,0 +1,1046 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.zookeeper; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.bookkeeper.zookeeper.ZooWorker.ZooCallable; +import org.apache.zookeeper.AsyncCallback.ACLCallback; +import org.apache.zookeeper.AsyncCallback.Children2Callback; +import org.apache.zookeeper.AsyncCallback.ChildrenCallback; +import org.apache.zookeeper.AsyncCallback.DataCallback; +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.AsyncCallback.StringCallback; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.Transaction; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provide a zookeeper client to handle session expire + */ +public class ZooKeeperClient extends ZooKeeper implements Watcher { + + final static Logger logger = LoggerFactory.getLogger(ZooKeeperClient.class); + + // ZooKeeper client connection variables + private final String connectString; + private final int sessionTimeoutMs; + + // state for the zookeeper client + private final AtomicReference zk = new AtomicReference(); + private final ZooKeeperWatcherBase watcherManager; + + private final ScheduledExecutorService retryExecutor; + private final ExecutorService connectExecutor; + + // retry polices + private final RetryPolicy connectRetryPolicy; + private final RetryPolicy operationRetryPolicy; + + private final Callable clientCreator = new Callable() { + + @Override + public ZooKeeper call() throws Exception { + try { + return ZooWorker.syncCallWithRetries(null, new ZooCallable() { + + @Override + public ZooKeeper call() throws KeeperException, InterruptedException { + logger.info("Reconnecting zookeeper {}.", connectString); + ZooKeeper newZk; + try { + newZk = createZooKeeper(); + } catch (IOException ie) { + logger.error("Failed to create zookeeper instance to " + connectString, ie); + throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); + } + // close the previous one + closeZkHandle(); + zk.set(newZk); + if (logger.isDebugEnabled()) { + logger.debug("ZooKeeper session {} is created to {}.", + Long.toHexString(newZk.getSessionId()), connectString); + } + return newZk; + } + + @Override + public String toString() { + return String.format("ZooKeeper Client Creator (%s)", connectString); + } + + }, connectRetryPolicy); + } catch (Exception e) { + logger.error("Gave up reconnecting to ZooKeeper : ", e); + Runtime.getRuntime().exit(-1); + return null; + } + } + + }; + + public static ZooKeeper createConnectedZooKeeper(String connectString, int sessionTimeoutMs) + throws KeeperException, InterruptedException, IOException { + ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(sessionTimeoutMs); + ZooKeeper zk = new ZooKeeper(connectString, sessionTimeoutMs, watcher); + try { + watcher.waitForConnection(); + } catch (KeeperException ke) { + zk.close(); + throw ke; + } catch (InterruptedException ie) { + zk.close(); + throw ie; + } + return zk; + } + + public static ZooKeeperClient createConnectedZooKeeperClient(String connectString, int sessionTimeoutMs) + throws KeeperException, InterruptedException, IOException { + ZooKeeperWatcherBase watcherManager = new ZooKeeperWatcherBase(sessionTimeoutMs); + ZooKeeperClient client = new ZooKeeperClient(connectString, sessionTimeoutMs, watcherManager, + new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0)); + try { + watcherManager.waitForConnection(); + } catch (KeeperException ke) { + client.close(); + throw ke; + } catch (InterruptedException ie) { + client.close(); + throw ie; + } + return client; + } + + public static ZooKeeperClient createConnectedZooKeeperClient( + String connectString, int sessionTimeoutMs, RetryPolicy operationRetryPolicy) + throws KeeperException, InterruptedException, IOException { + ZooKeeperWatcherBase watcherManager = new ZooKeeperWatcherBase(sessionTimeoutMs); + ZooKeeperClient client = new ZooKeeperClient(connectString, sessionTimeoutMs, watcherManager, + operationRetryPolicy); + try { + watcherManager.waitForConnection(); + } catch (KeeperException ke) { + client.close(); + throw ke; + } catch (InterruptedException ie) { + client.close(); + throw ie; + } + return client; + } + + public static ZooKeeperClient createConnectedZooKeeperClient( + String connectString, int sessionTimeoutMs, Set childWatchers, + RetryPolicy operationRetryPolicy) + throws KeeperException, InterruptedException, IOException { + ZooKeeperWatcherBase watcherManager = + new ZooKeeperWatcherBase(sessionTimeoutMs, childWatchers); + ZooKeeperClient client = new ZooKeeperClient(connectString, sessionTimeoutMs, watcherManager, + operationRetryPolicy); + try { + watcherManager.waitForConnection(); + } catch (KeeperException ke) { + client.close(); + throw ke; + } catch (InterruptedException ie) { + client.close(); + throw ie; + } + return client; + } + + ZooKeeperClient(String connectString, int sessionTimeoutMs, ZooKeeperWatcherBase watcherManager, + RetryPolicy operationRetryPolicy) throws IOException { + this(connectString, sessionTimeoutMs, watcherManager, + new BoundExponentialBackoffRetryPolicy(6000, 60000, Integer.MAX_VALUE), + operationRetryPolicy); + } + + private ZooKeeperClient(String connectString, int sessionTimeoutMs, + ZooKeeperWatcherBase watcherManager, + RetryPolicy connectRetryPolicy, RetryPolicy operationRetryPolicy) throws IOException { + super(connectString, sessionTimeoutMs, watcherManager); + this.connectString = connectString; + this.sessionTimeoutMs = sessionTimeoutMs; + this.watcherManager = watcherManager; + this.connectRetryPolicy = connectRetryPolicy; + this.operationRetryPolicy = operationRetryPolicy; + this.retryExecutor = + Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); + this.connectExecutor = + Executors.newSingleThreadExecutor(); + // added itself to the watcher + watcherManager.addChildWatcher(this); + } + + @Override + public void close() throws InterruptedException { + connectExecutor.shutdown(); + retryExecutor.shutdown(); + closeZkHandle(); + } + + private void closeZkHandle() throws InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + super.close(); + } else { + zkHandle.close(); + } + } + + protected void waitForConnection() throws KeeperException, InterruptedException { + watcherManager.waitForConnection(); + } + + protected ZooKeeper createZooKeeper() throws IOException { + return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager); + } + + @Override + public void process(WatchedEvent event) { + if (event.getType() == EventType.None && + event.getState() == KeeperState.Expired) { + onExpired(); + } + } + + private void onExpired() { + if (logger.isDebugEnabled()) { + logger.debug("ZooKeeper session {} is expired from {}.", + Long.toHexString(getSessionId()), connectString); + } + try { + connectExecutor.submit(clientCreator); + } catch (RejectedExecutionException ree) { + logger.error("ZooKeeper reconnect task is rejected : ", ree); + } + } + + + static abstract class RetryRunnable implements Runnable { + + final ZooWorker worker; + final Runnable that; + + RetryRunnable(RetryPolicy retryPolicy) { + worker = new ZooWorker(retryPolicy); + that = this; + } + + } + + // inherits from ZooKeeper client for all operations + + @Override + public long getSessionId() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.getSessionId(); + } + return zkHandle.getSessionId(); + } + + @Override + public byte[] getSessionPasswd() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.getSessionPasswd(); + } + return zkHandle.getSessionPasswd(); + } + + @Override + public int getSessionTimeout() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.getSessionTimeout(); + } + return zkHandle.getSessionTimeout(); + } + + @Override + public void addAuthInfo(String scheme, byte[] auth) { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + super.addAuthInfo(scheme, auth); + return; + } + zkHandle.addAuthInfo(scheme, auth); + } + + @Override + public synchronized void register(Watcher watcher) { + watcherManager.addChildWatcher(watcher); + } + + @Override + public List multi(final Iterable ops) throws InterruptedException, KeeperException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable>() { + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.multi(ops); + } + return zkHandle.multi(ops); + } + + }, operationRetryPolicy); + } + + @Override + @Deprecated + public Transaction transaction() { + // since there is no reference about which client that the transaction could use + // so just use ZooKeeper instance directly. + // you'd better to use {@link #multi}. + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.transaction(); + } + return zkHandle.transaction(); + } + + @Override + public List getACL(final String path, final Stat stat) throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable>() { + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.getACL(path, stat); + } + return zkHandle.getACL(path, stat); + } + + }, operationRetryPolicy); + } + + @Override + public void getACL(final String path, final Stat stat, final ACLCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final ACLCallback aclCb = new ACLCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, List acl, Stat stat) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, acl, stat); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.getACL(path, stat, aclCb, worker); + } else { + zkHandle.getACL(path, stat, aclCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public Stat setACL(final String path, final List acl, final int version) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable() { + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.setACL(path, acl, version); + } + return zkHandle.setACL(path, acl, version); + } + + }, operationRetryPolicy); + } + + @Override + public void setACL(final String path, final List acl, final int version, + final StatCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.setACL(path, acl, version, stCb, worker); + } else { + zkHandle.setACL(path, acl, version, stCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void sync(final String path, final VoidCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final VoidCallback vCb = new VoidCallback() { + + @Override + public void processResult(int rc, String path, Object ctx) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.sync(path, vCb, worker); + } else { + zkHandle.sync(path, vCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public States getState() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.getState(); + } else { + return zkHandle.getState(); + } + } + + @Override + public String toString() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.toString(); + } else { + return zkHandle.toString(); + } + } + + @Override + public String create(final String path, final byte[] data, + final List acl, final CreateMode createMode) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable() { + + @Override + public String call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.create(path, data, acl, createMode); + } + return zkHandle.create(path, data, acl, createMode); + } + + }, operationRetryPolicy); + } + + @Override + public void create(final String path, final byte[] data, final List acl, + final CreateMode createMode, final StringCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final StringCallback createCb = new StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, name); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker); + } else { + zkHandle.create(path, data, acl, createMode, createCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void delete(final String path, final int version) throws KeeperException, InterruptedException { + ZooWorker.syncCallWithRetries(this, new ZooCallable() { + + @Override + public Void call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.delete(path, version); + } else { + zkHandle.delete(path, version); + } + return null; + } + + }, operationRetryPolicy); + } + + @Override + public void delete(final String path, final int version, final VoidCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final VoidCallback deleteCb = new VoidCallback() { + + @Override + public void processResult(int rc, String path, Object ctx) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.delete(path, version, deleteCb, worker); + } else { + zkHandle.delete(path, version, deleteCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public Stat exists(final String path, final Watcher watcher) throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable() { + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.exists(path, watcher); + } + return zkHandle.exists(path, watcher); + } + + }, operationRetryPolicy); + } + + @Override + public Stat exists(final String path, final boolean watch) throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable() { + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.exists(path, watch); + } + return zkHandle.exists(path, watch); + } + + }, operationRetryPolicy); + } + + @Override + public void exists(final String path, final Watcher watcher, final StatCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.exists(path, watcher, stCb, worker); + } else { + zkHandle.exists(path, watcher, stCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void exists(final String path, final boolean watch, final StatCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.exists(path, watch, stCb, worker); + } else { + zkHandle.exists(path, watch, stCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public byte[] getData(final String path, final Watcher watcher, final Stat stat) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable() { + + @Override + public byte[] call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.getData(path, watcher, stat); + } + return zkHandle.getData(path, watcher, stat); + } + + }, operationRetryPolicy); + } + + @Override + public byte[] getData(final String path, final boolean watch, final Stat stat) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable() { + + @Override + public byte[] call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.getData(path, watch, stat); + } + return zkHandle.getData(path, watch, stat); + } + + }, operationRetryPolicy); + } + + @Override + public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final DataCallback dataCb = new DataCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, data, stat); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.getData(path, watcher, dataCb, worker); + } else { + zkHandle.getData(path, watcher, dataCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void getData(final String path, final boolean watch, final DataCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final DataCallback dataCb = new DataCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, data, stat); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.getData(path, watch, dataCb, worker); + } else { + zkHandle.getData(path, watch, dataCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public Stat setData(final String path, final byte[] data, final int version) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable() { + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.setData(path, data, version); + } + return zkHandle.setData(path, data, version); + } + + }, operationRetryPolicy); + } + + @Override + public void setData(final String path, final byte[] data, final int version, + final StatCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.setData(path, data, version, stCb, worker); + } else { + zkHandle.setData(path, data, version, stCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public List getChildren(final String path, final Watcher watcher, final Stat stat) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable>() { + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.getChildren(path, watcher, stat); + } + return zkHandle.getChildren(path, watcher, stat); + } + + }, operationRetryPolicy); + } + + @Override + public List getChildren(final String path, final boolean watch, final Stat stat) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable>() { + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.getChildren(path, watch, stat); + } + return zkHandle.getChildren(path, watch, stat); + } + + }, operationRetryPolicy); + } + + @Override + public void getChildren(final String path, final Watcher watcher, + final Children2Callback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final Children2Callback childCb = new Children2Callback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List children, Stat stat) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, children, stat); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.getChildren(path, watcher, childCb, worker); + } else { + zkHandle.getChildren(path, watcher, childCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void getChildren(final String path, final boolean watch, final Children2Callback cb, + final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final Children2Callback childCb = new Children2Callback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List children, Stat stat) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, children, stat); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.getChildren(path, watch, childCb, worker); + } else { + zkHandle.getChildren(path, watch, childCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + + @Override + public List getChildren(final String path, final Watcher watcher) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable>() { + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.getChildren(path, watcher); + } + return zkHandle.getChildren(path, watcher); + } + + }, operationRetryPolicy); + } + + @Override + public List getChildren(final String path, final boolean watch) + throws KeeperException, InterruptedException { + return ZooWorker.syncCallWithRetries(this, new ZooCallable>() { + + @Override + public List call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return ZooKeeperClient.super.getChildren(path, watch); + } + return zkHandle.getChildren(path, watch); + } + + }, operationRetryPolicy); + } + + @Override + public void getChildren(final String path, final Watcher watcher, + final ChildrenCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final ChildrenCallback childCb = new ChildrenCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List children) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, children); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.getChildren(path, watcher, childCb, worker); + } else { + zkHandle.getChildren(path, watcher, childCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void getChildren(final String path, final boolean watch, + final ChildrenCallback cb, final Object context) { + final Runnable proc = new RetryRunnable(operationRetryPolicy) { + + final ChildrenCallback childCb = new ChildrenCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List children) { + ZooWorker worker = (ZooWorker)ctx; + if (worker.allowRetry(rc)) { + retryExecutor.schedule(that, worker.nextRetryWaitTime(), TimeUnit.MILLISECONDS); + } else { + cb.processResult(rc, path, context, children); + } + } + + }; + + @Override + public void run() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + ZooKeeperClient.super.getChildren(path, watch, childCb, worker); + } else { + zkHandle.getChildren(path, watch, childCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + +} Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java?rev=1618732&r1=1618731&r2=1618732&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java Mon Aug 18 21:00:15 2014 @@ -20,6 +20,8 @@ */ package org.apache.bookkeeper.zookeeper; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -39,17 +41,36 @@ public class ZooKeeperWatcherBase implem private final int zkSessionTimeOut; private CountDownLatch clientConnectLatch = new CountDownLatch(1); + private final CopyOnWriteArraySet childWatchers = + new CopyOnWriteArraySet(); public ZooKeeperWatcherBase(int zkSessionTimeOut) { this.zkSessionTimeOut = zkSessionTimeOut; } + public ZooKeeperWatcherBase(int zkSessionTimeOut, Set childWatchers) { + this.zkSessionTimeOut = zkSessionTimeOut; + this.childWatchers.addAll(childWatchers); + } + + public ZooKeeperWatcherBase addChildWatcher(Watcher watcher) { + this.childWatchers.add(watcher); + return this; + } + + public ZooKeeperWatcherBase removeChildWatcher(Watcher watcher) { + this.childWatchers.remove(watcher); + return this; + } + @Override public void process(WatchedEvent event) { // If event type is NONE, this is a connection status change if (event.getType() != EventType.None) { LOG.debug("Recieved event: {}, path: {} from ZooKeeper server", event.getType(), event.getPath()); + // notify the child watchers + notifyEvent(event); return; } @@ -60,16 +81,19 @@ public class ZooKeeperWatcherBase implem clientConnectLatch.countDown(); break; case Disconnected: + clientConnectLatch = new CountDownLatch(1); LOG.debug("Ignoring Disconnected event from ZooKeeper server"); break; case Expired: - LOG.error("ZooKeeper client connection to the " - + "ZooKeeper server has expired!"); + clientConnectLatch = new CountDownLatch(1); + LOG.error("ZooKeeper client connection to the ZooKeeper server has expired!"); break; default: // do nothing break; } + // notify the child watchers + notifyEvent(event); } /** @@ -80,8 +104,7 @@ public class ZooKeeperWatcherBase implem * @throws InterruptedException * interrupted while waiting for connection */ - public void waitForConnection() throws KeeperException, - InterruptedException { + public void waitForConnection() throws KeeperException, InterruptedException { if (!clientConnectLatch.await(zkSessionTimeOut, TimeUnit.MILLISECONDS)) { throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); } @@ -94,4 +117,17 @@ public class ZooKeeperWatcherBase implem return zkSessionTimeOut; } + /** + * Notify Event to child watchers. + * + * @param event + * Watched event received from ZooKeeper. + */ + private void notifyEvent(WatchedEvent event) { + // notify child watchers + for (Watcher w : childWatchers) { + w.process(event); + } + } + } Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooWorker.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooWorker.java?rev=1618732&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooWorker.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooWorker.java Mon Aug 18 21:00:15 2014 @@ -0,0 +1,147 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.zookeeper; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.util.MathUtils; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provide a mechanism to perform an operation on ZooKeeper that is safe on disconnections + * and recoverable errors. + */ +class ZooWorker { + + static final Logger logger = LoggerFactory.getLogger(ZooWorker.class); + + int attempts = 0; + long startTimeMs; + long elapsedTimeMs = 0L; + final RetryPolicy retryPolicy; + + ZooWorker(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + this.startTimeMs = MathUtils.now(); + } + + public boolean allowRetry(int rc) { + if (!ZooWorker.isRecoverableException(rc)) { + return false; + } + ++attempts; + elapsedTimeMs = MathUtils.now() - startTimeMs; + return retryPolicy.allowRetry(attempts, elapsedTimeMs); + } + + public long nextRetryWaitTime() { + return retryPolicy.nextRetryWaitTime(attempts, elapsedTimeMs); + } + + /** + * Check whether the given result code is recoverable by retry. + * + * @param rc result code + * @return true if given result code is recoverable. + */ + public static boolean isRecoverableException(int rc) { + return KeeperException.Code.CONNECTIONLOSS.intValue() == rc || + KeeperException.Code.OPERATIONTIMEOUT.intValue() == rc || + KeeperException.Code.SESSIONMOVED.intValue() == rc || + KeeperException.Code.SESSIONEXPIRED.intValue() == rc; + } + + /** + * Check whether the given exception is recoverable by retry. + * + * @param exception given exception + * @return true if given exception is recoverable. + */ + public static boolean isRecoverableException(KeeperException exception) { + return isRecoverableException(exception.code().intValue()); + } + + static interface ZooCallable { + /** + * Be compatible with ZooKeeper interface. + * + * @return value + * @throws InterruptedException + * @throws KeeperException + */ + public T call() throws InterruptedException, KeeperException; + } + + /** + * Execute a sync zookeeper operation with a given retry policy. + * + * @param client + * ZooKeeper client. + * @param proc + * Synchronous zookeeper operation wrapped in a {@link Callable}. + * @param retryPolicy + * Retry policy to execute the synchronous operation. + * @return result of the zookeeper operation + * @throws KeeperException any non-recoverable exception or recoverable exception exhausted all retires. + * @throws InterruptedException the operation is interrupted. + */ + public static T syncCallWithRetries( + ZooKeeperClient client, ZooCallable proc, RetryPolicy retryPolicy) + throws KeeperException, InterruptedException { + T result = null; + boolean isDone = false; + int attempts = 0; + long startTimeMs = MathUtils.now(); + while (!isDone) { + try { + if (null != client) { + client.waitForConnection(); + } + logger.debug("Execute {} at {} retry attempt.", proc, attempts); + result = proc.call(); + isDone = true; + } catch (KeeperException e) { + ++attempts; + boolean rethrow = true; + long elapsedTime = MathUtils.now() - startTimeMs; + if (((null != client && isRecoverableException(e)) || null == client) && + retryPolicy.allowRetry(attempts, elapsedTime)) { + rethrow = false; + } + if (rethrow) { + logger.debug("Stopped executing {} after {} attempts.", proc, attempts); + throw e; + } + TimeUnit.MILLISECONDS.sleep(retryPolicy.nextRetryWaitTime(attempts, elapsedTime)); + } + } + return result; + } + + static T syncCallWithRetries( + ZooCallable proc, RetryPolicy retryPolicy) throws KeeperException, InterruptedException { + return syncCallWithRetries(null, proc, retryPolicy); + } + +} Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1618732&r1=1618731&r2=1618732&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Mon Aug 18 21:00:15 2014 @@ -80,14 +80,23 @@ public class ZooKeeperUtil { ZkTmpDir.delete(); ZkTmpDir.mkdir(); + // start the server and client. + restartServer(); + + // initialize the zk client with values + zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + public void restartServer() throws Exception { zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, - ZooKeeperServer.DEFAULT_TICK_TIME); + ZooKeeperServer.DEFAULT_TICK_TIME); serverFactory = new NIOServerCnxnFactory(); serverFactory.configure(zkaddr, 100); serverFactory.startup(zks); boolean b = ClientBase.waitForServerUp(getZooKeeperConnectString(), - ClientBase.CONNECTION_TIMEOUT); + ClientBase.CONNECTION_TIMEOUT); LOG.debug("Server up: " + b); // create a zookeeper client @@ -95,10 +104,6 @@ public class ZooKeeperUtil { ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000); zkc = ZkUtils.createConnectedZookeeperClient( getZooKeeperConnectString(), w); - - // initialize the zk client with values - zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } public void sleepServer(final int seconds, final CountDownLatch l) @@ -137,7 +142,7 @@ public class ZooKeeperUtil { zk2.close(); } - public void killServer() throws Exception { + public void stopServer() throws Exception { if (zkc != null) { zkc.close(); } @@ -146,12 +151,16 @@ public class ZooKeeperUtil { if (serverFactory != null) { serverFactory.shutdown(); assertTrue("waiting for server down", - ClientBase.waitForServerDown(getZooKeeperConnectString(), - ClientBase.CONNECTION_TIMEOUT)); + ClientBase.waitForServerDown(getZooKeeperConnectString(), + ClientBase.CONNECTION_TIMEOUT)); } if (zks != null) { zks.getTxnLogFactory().close(); } + } + + public void killServer() throws Exception { + stopServer(); // ServerStats.unregister(); FileUtils.deleteDirectory(ZkTmpDir); }