distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [13/29] incubator-distributedlog git commit: Make the zookeeper client used by bookkeeper client retry on session expires
Date Wed, 21 Dec 2016 08:00:21 GMT
Make the zookeeper client used by bookkeeper client retry on session expires

* the zookeeper client used by bookkeeper client is purely for metadata accesses, so we should
retry on session expires.
* remove the unnessary zookeeper session handling in bk log handler. as we don't necessary
to fail bookkeeper client or log handler when session expires as it would be handled and retried
by the zookeeper client.
* Make the retry infinitely if the retry settings for bkc zookeeper client is set to 0 or
negative.

RB_ID=843057


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f18fe172
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f18fe172
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f18fe172

Branch: refs/heads/merge/DL-98
Commit: f18fe172fb4559bcabb7173d1cd0b9d27fc23c93
Parents: 72a786e
Author: Sijie Guo <sijieg@twitter.com>
Authored: Mon Jul 11 10:21:44 2016 -0700
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Mon Dec 12 16:58:20 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReaderDLSN.java    | 15 +-----
 .../BKDistributedLogNamespace.java              |  5 +-
 .../twitter/distributedlog/BKLogHandler.java    | 17 -------
 .../distributedlog/BookKeeperClient.java        | 48 ++++----------------
 .../DistributedLogConfiguration.java            |  8 +++-
 5 files changed, 17 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index 7d3d53d..b1a9273 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -49,7 +49,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Function1;
@@ -73,7 +72,7 @@ import scala.runtime.AbstractFunction1;
  * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors.
  * </ul>
  */
-class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNotifier, AsyncLogReader,
Runnable, AsyncNotification {
+class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotification {
     static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReaderDLSN.class);
 
     private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN>
READ_NEXT_MAP_FUNCTION =
@@ -86,7 +85,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
 
     protected final BKDistributedLogManager bkDistributedLogManager;
     protected final BKLogReadHandler bkLedgerManager;
-    private Watcher sessionExpireWatcher = null;
     private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
     private final ScheduledExecutorService executorService;
     private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
@@ -218,7 +216,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
         this.executorService = executorService;
         this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId,
                 lockStateExecutor, this, deserializeRecordSet, true);
-        sessionExpireWatcher = this.bkLedgerManager.registerExpirationHandler(this);
         LOG.debug("Starting async reader at {}", startDLSN);
         this.startDLSN = startDLSN;
         this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
@@ -255,14 +252,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
         this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
     }
 
-    @Override
-    public void notifySessionExpired() {
-        // ZK Session notification is an indication to check if this has resulted in a fatal
error
-        // of the underlying reader, in itself this reader doesnt error out unless the underlying
-        // reader has hit an error
-        scheduleBackgroundRead();
-    }
-
     private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
         if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
             // Dont run the task more than once every seconds (for sanity)
@@ -494,8 +483,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
 
         cancelAllPendingReads(exception);
 
-        bkLedgerManager.unregister(sessionExpireWatcher);
-
         FutureUtils.ignore(bkLedgerManager.asyncClose()).proxyTo(closePromise);
         return closePromise;
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 0b522d0..7a4fd7f 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -622,13 +622,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
                                                                   DistributedLogConfiguration
conf,
                                                                   String zkServers,
                                                                   StatsLogger statsLogger)
{
-        RetryPolicy retryPolicy = null;
-        if (conf.getZKNumRetries() > 0) {
-            retryPolicy = new BoundExponentialBackoffRetryPolicy(
+        RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
                     conf.getBKClientZKRetryBackoffStartMillis(),
                     conf.getBKClientZKRetryBackoffMaxMillis(),
                     conf.getBKClientZKNumRetries());
-        }
         ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
                 .name(zkcName)
                 .sessionTimeoutMs(conf.getBKClientZKSessionTimeoutMilliSeconds())

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index a6ec318..a84261a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -1293,21 +1293,4 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable,
AsyncAbor
         }
     }
 
-    // ZooKeeper Watchers
-
-    Watcher registerExpirationHandler(final ZooKeeperClient.ZooKeeperSessionExpireNotifier
onExpired) {
-        if (conf.getZKNumRetries() > 0) {
-            return new Watcher() {
-                @Override
-                public void process(WatchedEvent event) {
-                    // nop
-                }
-            };
-        }
-        return zooKeeperClient.registerExpirationHandler(onExpired);
-    }
-
-    boolean unregister(Watcher watcher) {
-        return zooKeeperClient.unregister(watcher);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
index fd22b8f..c39ae4c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog;
 
+import com.google.common.base.Optional;
 import com.twitter.distributedlog.ZooKeeperClient.Credentials;
 import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials;
 import com.twitter.distributedlog.exceptions.AlreadyClosedException;
@@ -41,16 +42,12 @@ import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.RetryPolicy;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.base.Optional;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -62,7 +59,7 @@ import static com.google.common.base.Charsets.UTF_8;
  * <li> bookkeeper operation stats are exposed under current scope by {@link BookKeeper}
  * </ul>
  */
-public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireNotifier {
+public class BookKeeperClient {
     static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class);
 
     // Parameters to build bookkeeper client
@@ -83,14 +80,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
     // feature provider
     private final Optional<FeatureProvider> featureProvider;
 
-    private Watcher sessionExpireWatcher = null;
-    private AtomicBoolean zkSessionExpired = new AtomicBoolean(false);
-
     @SuppressWarnings("deprecation")
     private synchronized void commonInitialization(
             DistributedLogConfiguration conf, String ledgersPath,
-            ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer
requestTimer,
-            boolean registerExpirationHandler)
+            ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer
requestTimer)
         throws IOException, InterruptedException, KeeperException {
         ClientConfiguration bkConfig = new ClientConfiguration();
         bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
@@ -124,10 +117,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
             .requestTimer(requestTimer)
             .featureProvider(featureProvider.orNull())
             .build();
-
-        if (registerExpirationHandler) {
-            sessionExpireWatcher = this.zkc.registerExpirationHandler(this);
-        }
     }
 
     BookKeeperClient(DistributedLogConfiguration conf,
@@ -159,16 +148,11 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
         if (null != this.bkc) {
             return;
         }
-        boolean registerExpirationHandler;
         if (null == this.zkc) {
             int zkSessionTimeout = conf.getBKClientZKSessionTimeoutMilliSeconds();
-            RetryPolicy retryPolicy = null;
-            if (conf.getBKClientZKNumRetries() > 0) {
-                retryPolicy = new BoundExponentialBackoffRetryPolicy(
+            RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
                         conf.getBKClientZKRetryBackoffStartMillis(),
                         conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries());
-            }
-
             Credentials credentials = Credentials.NONE;
             if (conf.getZkAclId() != null) {
                 credentials = new DigestCredentials(conf.getZkAclId(), conf.getZkAclId());
@@ -178,10 +162,9 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
                                            retryPolicy, statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(),
                                            conf.getBKClientZKRequestRateLimit(), credentials);
         }
-        registerExpirationHandler = conf.getBKClientZKNumRetries() <= 0;
 
         try {
-            commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer,
registerExpirationHandler);
+            commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer);
         } catch (InterruptedException e) {
             throw new DLInterruptedException("Interrupted on creating bookkeeper client "
+ name + " : ", e);
         } catch (KeeperException e) {
@@ -190,18 +173,18 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
 
         if (ownZK) {
             LOG.info("BookKeeper Client created {} with its own ZK Client : ledgersPath =
{}, numRetries = {}, " +
-                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {},
registerExpirationHandler = {}",
+                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}",
                     new Object[] { name, ledgersPath,
                     conf.getBKClientZKNumRetries(), conf.getBKClientZKSessionTimeoutMilliSeconds(),
                     conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(),
-                    conf.getBkDNSResolverOverrides(), registerExpirationHandler });
+                    conf.getBkDNSResolverOverrides() });
         } else {
             LOG.info("BookKeeper Client created {} with shared zookeeper client : ledgersPath
= {}, numRetries = {}, " +
-                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {},
registerExpirationHandler = {}",
+                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}",
                     new Object[] { name, ledgersPath,
                     conf.getZKNumRetries(), conf.getZKSessionTimeoutMilliseconds(),
                     conf.getZKRetryBackoffStartMillis(), conf.getZKRetryBackoffMaxMillis(),
-                    conf.getBkDNSResolverOverrides(), registerExpirationHandler });
+                    conf.getBkDNSResolverOverrides() });
         }
     }
 
@@ -284,9 +267,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
             }
         }
         if (null != zkc) {
-            if (null != sessionExpireWatcher) {
-                zkc.unregister(sessionExpireWatcher);
-            }
             if (ownZK) {
                 zkc.close();
             }
@@ -294,20 +274,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
         closed = true;
     }
 
-    @Override
-    public void notifySessionExpired() {
-        zkSessionExpired.set(true);
-    }
-
     public synchronized void checkClosedOrInError() throws AlreadyClosedException {
         if (closed) {
             LOG.error("BookKeeper Client {} is already closed", name);
             throw new AlreadyClosedException("BookKeeper Client " + name + " is already closed");
         }
-
-        if (zkSessionExpired.get()) {
-            LOG.error("BookKeeper Client {}'s Zookeeper session has expired", name);
-            throw new AlreadyClosedException("BookKeeper Client " + name + "'s Zookeeper
session has expired");
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index c2057df..5d0e59a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -809,12 +809,16 @@ public class DistributedLogConfiguration extends CompositeConfiguration
{
      * Get num of retries for zookeeper client that used by bookkeeper client.
      * <p>Retries only happen on retryable failures like session expired,
      * session moved. for permanent failures, the request will fail immediately.
-     * The default value is 3.
+     * The default value is 3. Setting it to zero or negative will retry infinitely.
      *
      * @return num of retries of zookeeper client used by bookkeeper client.
      */
     public int getBKClientZKNumRetries() {
-        return this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+        int zkNumRetries = this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+        if (zkNumRetries <= 0) {
+            return Integer.MAX_VALUE;
+        }
+        return zkNumRetries;
     }
 
     /**


Mime
View raw message