helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: HELIX-89: User added listeners are not re-init after zk session expiry
Date Thu, 25 Apr 2013 20:09:40 GMT
Updated Branches:
  refs/heads/master ddd47fed0 -> d6ab245d8


HELIX-89: User added listeners are not re-init after zk session expiry


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/d6ab245d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/d6ab245d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/d6ab245d

Branch: refs/heads/master
Commit: d6ab245d824f9c47206426de7aba15ceb1ed9590
Parents: ddd47fe
Author: zzhang <zzhang5@uci.edu>
Authored: Thu Apr 25 13:09:34 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Thu Apr 25 13:09:34 2013 -0700

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java    |   34 +++++++++-----
 .../test/java/org/apache/helix/ZkUnitTestBase.java |    2 +-
 .../helix/integration/ZkIntegrationTestBase.java   |    2 +-
 3 files changed, 24 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d6ab245d/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 78204a8..d0d7c32 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -98,7 +98,7 @@ public class ZKHelixManager implements HelixManager
   private ZKHelixDataAccessor                  _helixAccessor;
   private ConfigAccessor                       _configAccessor;
   protected ZkClient                           _zkClient;
-  protected List<CallbackHandler>         	   _handlers;
+  protected final List<CallbackHandler>        _handlers = new ArrayList<CallbackHandler>();
   private final ZkStateChangeListener          _zkStateChangeListener;
   private final InstanceType                   _instanceType;
   volatile String                              _sessionId;
@@ -250,13 +250,23 @@ public class ZKHelixManager implements HelixManager
     checkConnected();
 
     PropertyType type = propertyKey.getType();
-    CallbackHandler handler =
+    CallbackHandler newHandler =
         createCallBackHandler(propertyKey, listener, eventType, changeType);
 
     synchronized (this)
     {
-      _handlers.add(handler);
-      logger.info("Add listener: " + listener + " for type: " + type + " to path: " + handler.getPath());
+      for (CallbackHandler handler : _handlers)
+      {
+        // compare property-key path and listener reference
+        if (handler.getPath().equals(propertyKey.getPath()) && handler.getListener().equals(listener))
+        {
+          // TODO add log
+          return;
+        }
+      }
+
+      _handlers.add(newHandler);
+      logger.info("Add listener: " + listener + " for type: " + type + " to path: " + newHandler.getPath());
     }
   }
   
@@ -677,7 +687,6 @@ public class ZKHelixManager implements HelixManager
     // reset all handlers so they have a chance to unsubscribe zk changes from zkclient
     // abandon all callback-handlers added in expired session
     resetHandlers();
-    _handlers = new ArrayList<CallbackHandler>();
 
     logger.info("Handling new session, session id:" + _sessionId + ", instance:"
         + _instanceName + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
@@ -754,16 +763,17 @@ public class ZKHelixManager implements HelixManager
                        .registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
                                                       defaultParticipantErrorMessageHandlerFactory);
 
-      // create a new leader-election handler for a new session
       if (_leaderElectionHandler != null) {
     	  _leaderElectionHandler.reset();
+    	  _leaderElectionHandler.init();
+      } else {
+        _leaderElectionHandler =
+              createCallBackHandler(new Builder(_clusterName).controller(),
+                                    new DistClusterControllerElection(_zkConnectString),
+                                    new EventType[] { EventType.NodeChildrenChanged,
+                                        EventType.NodeDeleted, EventType.NodeCreated },
+                                    ChangeType.CONTROLLER);
       }
-      _leaderElectionHandler =
-            createCallBackHandler(new Builder(_clusterName).controller(),
-                                  new DistClusterControllerElection(_zkConnectString),
-                                  new EventType[] { EventType.NodeChildrenChanged,
-                                      EventType.NodeDeleted, EventType.NodeCreated },
-                                  ChangeType.CONTROLLER);
     }
 
     if (_instanceType == InstanceType.PARTICIPANT

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d6ab245d/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index 66fe89d..4703c19 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -86,9 +86,9 @@ public class ZkUnitTestBase
   @AfterSuite(alwaysRun = true)
   public void afterTest()
   {
+    _gZkClient.close();
     TestHelper.stopZkServer(_zkServer);
     _zkServer = null;
-    _gZkClient.close();
 
     // System.out.println("Number of open zkClient after ZkUnitTests: "
     // + ZkClient.getNumberOfConnections());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d6ab245d/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index 9605b76..70d4770 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -79,8 +79,8 @@ public class ZkIntegrationTestBase
   public void afterSuite()
   {
     ZKClientPool.reset();
-    TestHelper.stopZkServer(_zkServer);
     _gZkClient.close();
+    TestHelper.stopZkServer(_zkServer);
   }
 
   protected String getShortClassName()


Mime
View raw message