distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [12/29] incubator-distributedlog git commit: LogSegmentMetadataStore should only notify when the list of log segments is updated
Date Wed, 21 Dec 2016 08:00:20 GMT
LogSegmentMetadataStore should only notify when the list of log segments is updated

Currently it notifies the listeners not only when there is a change but also when session
expires. it would break the readahead loop and cause readers have to wait until it is able
to connect to zookeeper again.

With this change, it would only notify when the list of log segments is updated. If it disconnects
to zookeeper, the listener won't be notified and it would keep reading from the log segments
it knows.

RB_ID=842998


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/72a786e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/72a786e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/72a786e7

Branch: refs/heads/merge/DL-98
Commit: 72a786e78540fb5a3f8e27f9d71d3616d06d1548
Parents: f008f75
Author: Sijie Guo <sijieg@twitter.com>
Authored: Mon Jul 11 10:10:55 2016 -0700
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Mon Dec 12 16:52:18 2016 -0800

----------------------------------------------------------------------
 .../impl/ZKLogSegmentMetadataStore.java         | 94 ++++++++++++++++----
 .../impl/TestZKLogSegmentMetadataStore.java     | 22 ++---
 2 files changed, 84 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/72a786e7/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index c0796a1..cb53b23 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog.impl;
 
+import com.google.common.collect.ImmutableList;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.ZooKeeperClient;
@@ -45,10 +46,14 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -64,7 +69,9 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore,
Watch
 
     private static final Logger logger = LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class);
 
-    private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<List<String>>
{
+    private static final List<String> EMPTY_LIST = ImmutableList.of();
+
+    private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<Versioned<List<String>>>
{
 
         private final String logSegmentsPath;
         private final ZKLogSegmentMetadataStore store;
@@ -78,15 +85,16 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore,
Watch
         }
 
         @Override
-        public void onSuccess(final List<String> segments) {
+        public void onSuccess(final Versioned<List<String>> segments) {
             // reset the back off after a successful operation
             currentZKBackOffMs = store.minZKBackoffMs;
-            final Set<LogSegmentNamesListener> listenerSet = store.listeners.get(logSegmentsPath);
+            final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet
=
+                    store.listeners.get(logSegmentsPath);
             if (null != listenerSet) {
                 store.submitTask(logSegmentsPath, new Runnable() {
                     @Override
                     public void run() {
-                        for (LogSegmentNamesListener listener : listenerSet) {
+                        for (VersionedLogSegmentNamesListener listener : listenerSet.values())
{
                             listener.onSegmentsUpdated(segments);
                         }
                     }
@@ -120,6 +128,48 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore,
Watch
         }
     }
 
+    /**
+     * A log segment names listener that keeps tracking the version of list of log segments
that it has been notified.
+     * It only notify the newer log segments.
+     */
+    static class VersionedLogSegmentNamesListener {
+
+        private final LogSegmentNamesListener listener;
+        private Versioned<List<String>> lastNotifiedLogSegments;
+
+        VersionedLogSegmentNamesListener(LogSegmentNamesListener listener) {
+            this.listener = listener;
+            this.lastNotifiedLogSegments = new Versioned<List<String>>(EMPTY_LIST,
Version.NEW);
+        }
+
+        synchronized void onSegmentsUpdated(Versioned<List<String>> logSegments)
{
+            if (lastNotifiedLogSegments.getVersion() == Version.NEW ||
+                    lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion())
== Version.Occurred.BEFORE) {
+                lastNotifiedLogSegments = logSegments;
+                listener.onSegmentsUpdated(logSegments.getValue());
+            }
+        }
+
+        @Override
+        public int hashCode() {
+            return listener.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof VersionedLogSegmentNamesListener)) {
+                return false;
+            }
+            VersionedLogSegmentNamesListener other = (VersionedLogSegmentNamesListener) obj;
+            return listener.equals(other.listener);
+        }
+
+        @Override
+        public String toString() {
+            return listener.toString();
+        }
+    }
+
     final DistributedLogConfiguration conf;
     // settings
     final int minZKBackoffMs;
@@ -128,7 +178,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore,
Watch
 
     final ZooKeeperClient zkc;
     // log segment listeners
-    final ConcurrentMap<String, Set<LogSegmentNamesListener>> listeners;
+    final ConcurrentMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>>
listeners;
     // scheduler
     final OrderedScheduler scheduler;
     final ReentrantReadWriteLock closeLock;
@@ -139,7 +189,8 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore,
Watch
                                      OrderedScheduler scheduler) {
         this.conf = conf;
         this.zkc = zkc;
-        this.listeners = new ConcurrentHashMap<String, Set<LogSegmentNamesListener>>();
+        this.listeners =
+                new ConcurrentHashMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>>();
         this.scheduler = scheduler;
         this.closeLock = new ReentrantReadWriteLock();
         // settings
@@ -275,11 +326,16 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore,
Watch
 
     @Override
     public Future<List<String>> getLogSegmentNames(String logSegmentsPath) {
-        return getLogSegmentNames(logSegmentsPath, null);
+        return getLogSegmentNames(logSegmentsPath, null).map(new AbstractFunction1<Versioned<List<String>>,
List<String>>() {
+            @Override
+            public List<String> apply(Versioned<List<String>> list) {
+                return list.getValue();
+            }
+        });
     }
 
-    Future<List<String>> getLogSegmentNames(String logSegmentsPath, Watcher watcher)
{
-        Promise<List<String>> result = new Promise<List<String>>();
+    Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
Watcher watcher) {
+        Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>();
         try {
             zkc.get().getChildren(logSegmentsPath, watcher, this, result);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
@@ -293,9 +349,11 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore,
Watch
     @Override
     @SuppressWarnings("unchecked")
     public void processResult(int rc, String path, Object ctx, List<String> children,
Stat stat) {
-        Promise<List<String>> result = ((Promise<List<String>>) ctx);
+        Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>)
ctx);
         if (KeeperException.Code.OK.intValue() == rc) {
-            result.setValue(children);
+            /** cversion: the number of changes to the children of this znode **/
+            ZkVersion zkVersion = new ZkVersion(stat.getCversion());
+            result.setValue(new Versioned(children, zkVersion));
         } else {
             result.setException(KeeperException.create(KeeperException.Code.get(rc)));
         }
@@ -312,10 +370,13 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore,
Watch
             if (closed) {
                 return;
             }
-            Set<LogSegmentNamesListener> listenerSet = listeners.get(logSegmentsPath);
+            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet
=
+                    listeners.get(logSegmentsPath);
             if (null == listenerSet) {
-                Set<LogSegmentNamesListener> newListenerSet = new HashSet<LogSegmentNamesListener>();
-                Set<LogSegmentNamesListener> oldListenerSet = listeners.putIfAbsent(logSegmentsPath,
newListenerSet);
+                Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet
=
+                        new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>();
+                Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet
=
+                        listeners.putIfAbsent(logSegmentsPath, newListenerSet);
                 if (null != oldListenerSet) {
                     listenerSet = oldListenerSet;
                 } else {
@@ -323,7 +384,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore,
Watch
                 }
             }
             synchronized (listenerSet) {
-                listenerSet.add(listener);
+                listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener));
                 if (!listeners.containsKey(logSegmentsPath)) {
                     // listener set has been removed, add it back
                     listeners.put(logSegmentsPath, listenerSet);
@@ -343,7 +404,8 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore,
Watch
             if (closed) {
                 return;
             }
-            Set<LogSegmentNamesListener> listenerSet = listeners.get(logSegmentsPath);
+            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet
=
+                    listeners.get(logSegmentsPath);
             if (null == listenerSet) {
                 return;
             }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/72a786e7/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index e4c774b..f8fd3eb 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -367,7 +367,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase
{
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -429,7 +429,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase
{
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -496,7 +496,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase
{
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -510,16 +510,6 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase
{
         ZooKeeperClientUtils.expireSession(zkc,
                 DLUtils.getZKServersFromDLUri(uri), conf.getZKSessionTimeoutMilliseconds());
 
-        while (numNotifications.get() < 2) {
-            TimeUnit.MILLISECONDS.sleep(10);
-        }
-        assertEquals("Should receive second segment list update",
-                2, numNotifications.get());
-        List<String> secondSegmentList = segmentLists.get(1);
-        Collections.sort(secondSegmentList);
-        assertEquals("List of segments should be same",
-                children, secondSegmentList);
-
         logger.info("Create another {} segments.", numSegments);
 
         // create another log segment, it should trigger segment list updated
@@ -532,12 +522,12 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase
{
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
         Collections.sort(newChildren);
         logger.info("All log segments become {}", newChildren);
-        while (numNotifications.get() < 3) {
+        while (numNotifications.get() < 2) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
         assertEquals("Should receive third segment list update",
-                3, numNotifications.get());
-        List<String> thirdSegmentList = segmentLists.get(2);
+                2, numNotifications.get());
+        List<String> thirdSegmentList = segmentLists.get(1);
         Collections.sort(thirdSegmentList);
         assertEquals("List of segments should be updated",
                 2 * numSegments, thirdSegmentList.size());


Mime
View raw message