helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [4/5] helix git commit: Support configurable data prefetch in ZkClient during watch event callback.
Date Fri, 06 Apr 2018 00:04:07 GMT
Support configurable data prefetch in ZkClient during watch event callback.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1bf6d604
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1bf6d604
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1bf6d604

Branch: refs/heads/master
Commit: 1bf6d60405b81f4d978d84560ec5bdc531121384
Parents: 5839b39
Author: Lei Xia <lxia@linkedin.com>
Authored: Mon Mar 19 10:16:38 2018 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Thu Apr 5 16:41:20 2018 -0700

----------------------------------------------------------------------
 .../helix/manager/zk/CallbackHandler.java       |   8 +-
 .../helix/manager/zk/zookeeper/ZkClient.java    | 142 +++++++++++++++----
 .../helix/tools/ClusterStateVerifier.java       |   2 +
 .../org/apache/helix/tools/ClusterVerifier.java |   2 +
 .../ZkHelixClusterVerifier.java                 |   2 +
 5 files changed, 125 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1bf6d604/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index f86b9e0..f6ec59c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -73,6 +72,7 @@ import org.apache.zookeeper.Watcher.Event.EventType;
 
 import static org.apache.helix.HelixConstants.ChangeType.*;
 
+@PreFetch (enabled = false)
 public class CallbackHandler implements IZkChildListener, IZkDataListener {
   private static Logger logger = LoggerFactory.getLogger(CallbackHandler.class);
 
@@ -407,13 +407,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     NotificationContext.Type type = context.getType();
     if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK)
{
       logger.info(
-          _manager.getInstanceName() + " subscribes child-change. path: " + path + ", listener:
"
-              + _listener);
+          _manager.getInstanceName() + " subscribes child-change. path: " + path + ", listener:
" + _listener);
       _zkClient.subscribeChildChanges(path, this);
     } else if (type == NotificationContext.Type.FINALIZE) {
       logger.info(
-          _manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener:
"
-              + _listener);
+          _manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener:
" + _listener);
 
       _zkClient.unsubscribeChildChanges(path, this);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/1bf6d604/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 2ae7f63..d6a65ee 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -10,6 +10,7 @@
  */
 package org.apache.helix.manager.zk.zookeeper;
 
+import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
@@ -38,6 +39,7 @@ import org.I0Itec.zkclient.exception.ZkTimeoutException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.manager.zk.BasicZkSerializer;
 import org.apache.helix.manager.zk.PathBasedZkSerializer;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks;
@@ -70,7 +72,7 @@ public class ZkClient implements Watcher {
   protected final long operationRetryTimeoutInMillis;
   private final Map<String, Set<IZkChildListener>> _childListener =
       new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, Set<IZkDataListener>> _dataListener
=
+  private final ConcurrentHashMap<String, Set<IZkDataListenerEntry>> _dataListener
=
       new ConcurrentHashMap<>();
   private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<>();
   private KeeperState _currentState;
@@ -84,6 +86,49 @@ public class ZkClient implements Watcher {
   private ZkClientMonitor _monitor;
 
 
+  private class IZkDataListenerEntry {
+    final IZkDataListener _dataListener;
+    final boolean _prefetchData;
+
+    public IZkDataListenerEntry(IZkDataListener dataListener, boolean prefetchData) {
+      _dataListener = dataListener;
+      _prefetchData = prefetchData;
+    }
+
+    public IZkDataListenerEntry(IZkDataListener dataListener) {
+      _dataListener = dataListener;
+      _prefetchData = false;
+    }
+
+    public IZkDataListener getDataListener() {
+      return _dataListener;
+    }
+
+    public boolean isPrefetchData() {
+      return _prefetchData;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof IZkDataListenerEntry)) {
+        return false;
+      }
+
+      IZkDataListenerEntry that = (IZkDataListenerEntry) o;
+
+      return _dataListener.equals(that._dataListener);
+    }
+
+    @Override
+    public int hashCode() {
+      return _dataListener.hashCode();
+    }
+  }
+
+
   protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
       String monitorInstanceName, boolean monitorRootPathOnly) {
@@ -132,14 +177,22 @@ public class ZkClient implements Watcher {
   }
 
   public void subscribeDataChanges(String path, IZkDataListener listener) {
-    Set<IZkDataListener> listeners;
+    Set<IZkDataListenerEntry> listenerEntries;
     synchronized (_dataListener) {
-      listeners = _dataListener.get(path);
-      if (listeners == null) {
-        listeners = new CopyOnWriteArraySet<>();
-        _dataListener.put(path, listeners);
+      listenerEntries = _dataListener.get(path);
+      if (listenerEntries == null) {
+        listenerEntries = new CopyOnWriteArraySet<>();
+        _dataListener.put(path, listenerEntries);
+      }
+
+      boolean prefetchEnabled = isPrefetchEnabled(listener);
+      IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(listener, prefetchEnabled);
+      listenerEntries.add(listenerEntry);
+      if (prefetchEnabled) {
+        LOG.debug(
+            "Subscribed data changes for " + path + ", listener: " + listener + ", prefetch
data: "
+                + prefetchEnabled);
       }
-      listeners.add(listener);
     }
     watchForData(path);
     if (LOG.isDebugEnabled()) {
@@ -147,11 +200,35 @@ public class ZkClient implements Watcher {
     }
   }
 
+  private boolean isPrefetchEnabled(IZkDataListener dataListener) {
+    PreFetch preFetch = dataListener.getClass().getAnnotation(PreFetch.class);
+    if (preFetch != null) {
+      return preFetch.enabled();
+    }
+
+    Method callbackMethod = IZkDataListener.class.getMethods()[0];
+    try {
+      Method method = dataListener.getClass()
+          .getMethod(callbackMethod.getName(), callbackMethod.getParameterTypes());
+      PreFetch preFetchInMethod = method.getAnnotation(PreFetch.class);
+      if (preFetchInMethod != null) {
+        return preFetchInMethod.enabled();
+      }
+    } catch (NoSuchMethodException e) {
+      LOG.warn(
+          "No method " + callbackMethod.getName() + " defined in listener " + dataListener.getClass()
+              .getCanonicalName());
+    }
+
+    return true;
+  }
+
   public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
     synchronized (_dataListener) {
-      final Set<IZkDataListener> listeners = _dataListener.get(path);
+      final Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
       if (listeners != null) {
-        listeners.remove(dataListener);
+        IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(dataListener);
+        listeners.remove(listenerEntry);
       }
       if (listeners == null || listeners.isEmpty()) {
         _dataListener.remove(path);
@@ -539,6 +616,12 @@ public class ZkClient implements Watcher {
         || event.getType() == Event.EventType.NodeCreated
         || event.getType() == Event.EventType.NodeChildrenChanged;
 
+
+    if (event.getType() == Event.EventType.NodeDeleted) {
+      String path = event.getPath();
+      LOG.debug(path);
+    }
+
     getEventLock().lock();
     try {
 
@@ -590,7 +673,7 @@ public class ZkClient implements Watcher {
     for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet())
{
       fireChildChangedEvents(entry.getKey(), entry.getValue());
     }
-    for (Entry<String, Set<IZkDataListener>> entry : _dataListener.entrySet())
{
+    for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet())
{
       fireDataChangedEvents(entry.getKey(), entry.getValue());
     }
   }
@@ -741,7 +824,7 @@ public class ZkClient implements Watcher {
   }
 
   private boolean hasListeners(String path) {
-    Set<IZkDataListener> dataListeners = _dataListener.get(path);
+    Set<IZkDataListenerEntry> dataListeners = _dataListener.get(path);
     if (dataListeners != null && dataListeners.size() > 0) {
       return true;
     }
@@ -810,25 +893,35 @@ public class ZkClient implements Watcher {
 
     if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted
         || event.getType() == EventType.NodeCreated) {
-      Set<IZkDataListener> listeners = _dataListener.get(path);
+      Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
       if (listeners != null && !listeners.isEmpty()) {
         fireDataChangedEvents(event.getPath(), listeners);
       }
     }
   }
 
-  private void fireDataChangedEvents(final String path, Set<IZkDataListener> listeners)
{
-    for (final IZkDataListener listener : listeners) {
-      _eventThread.send(new ZkEvent("Data of " + path + " changed sent to " + listener) {
+  private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners)
{
+    for (final IZkDataListenerEntry listener : listeners) {
+      _eventThread.send(new ZkEvent(
+          "Data of " + path + " changed sent to " + listener.getDataListener() + " prefetch
data: "
+              + listener.isPrefetchData()) {
 
         @Override public void run() throws Exception {
           // reinstall watch
-          exists(path, true);
-          try {
-            Object data = readData(path, null, true);
-            listener.handleDataChange(path, data);
-          } catch (ZkNoNodeException e) {
-            listener.handleDataDeleted(path);
+          boolean exist = exists(path, true);
+          if (exist) {
+            try {
+              Object data = null;
+              if (listener.isPrefetchData()) {
+                LOG.debug("Prefetch data for path: " + path);
+                data = readData(path, null, true);
+              }
+              listener.getDataListener().handleDataChange(path, data);
+            } catch (ZkNoNodeException e) {
+              listener.getDataListener().handleDataDeleted(path);
+            }
+          } else {
+            listener.getDataListener().handleDataDeleted(path);
           }
         }
       });
@@ -883,10 +976,6 @@ public class ZkClient implements Watcher {
     }
   }
 
-  protected Set<IZkDataListener> getDataListener(String path) {
-    return _dataListener.get(path);
-  }
-
   public IZkConnection getConnection() {
     return _connection;
   }
@@ -1045,6 +1134,7 @@ public class ZkClient implements Watcher {
       record(path, null, startT, ZkClientMonitor.AccessType.WRITE);
     } catch (Exception e) {
       recordFailure(path, ZkClientMonitor.AccessType.WRITE);
+      LOG.warn("Failed to delete path " + path + "! " + e);
       throw e;
     } finally {
       long endT = System.currentTimeMillis();
@@ -1459,7 +1549,7 @@ public class ZkClient implements Watcher {
     for (Set<IZkChildListener> childListeners : _childListener.values()) {
       listeners += childListeners.size();
     }
-    for (Set<IZkDataListener> dataListeners : _dataListener.values()) {
+    for (Set<IZkDataListenerEntry> dataListeners : _dataListener.values()) {
       listeners += dataListeners.size();
     }
     listeners += _stateListener.size();

http://git-wip-us.apache.org/repos/asf/helix/blob/1bf6d604/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 3052fe5..89a7f28 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -45,6 +45,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.stages.AttributeName;
@@ -107,6 +108,7 @@ public class ClusterStateVerifier {
     }
 
     @Override
+    @PreFetch(enabled = false)
     public void handleDataChange(String dataPath, Object data) throws Exception {
       boolean result = _verifier.verify();
       if (result == true) {

http://git-wip-us.apache.org/repos/asf/helix/blob/1bf6d604/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
index dbf51c3..5697bcf 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
@@ -24,6 +24,7 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
@@ -101,6 +102,7 @@ public abstract class ClusterVerifier implements IZkChildListener, IZkDataListen
   }
 
   @Override
+  @PreFetch(enabled = false)
   public void handleDataChange(String dataPath, Object data) throws Exception {
     boolean success = verify();
     if (success) {

http://git-wip-us.apache.org/repos/asf/helix/blob/1bf6d604/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index b4f9ab4..123ab72 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -27,6 +27,7 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
@@ -258,6 +259,7 @@ public abstract class ZkHelixClusterVerifier
   }
 
   @Override
+  @PreFetch (enabled = false)
   public void handleDataChange(String dataPath, Object data) throws Exception {
     if (!_verifyTaskThreadPool.isShutdown()) {
       _verifyTaskThreadPool.submit(new VerifyStateCallbackTask());


Mime
View raw message