helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [08/51] [partial] [HELIX-198] Unify helix code style, rb=13710
Date Wed, 21 Aug 2013 20:43:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
index 61af263..4b92670 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
@@ -32,15 +32,12 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.testng.Assert;
 
-
-public class TestStatusUpdate extends ZkStandAloneCMTestBase
-{
+public class TestStatusUpdate extends ZkStandAloneCMTestBase {
   // For now write participant StatusUpdates to log4j.
   // TODO: Need to investigate another data channel to report to controller and re-enable
   // this test
   // @Test
-  public void testParticipantStatusUpdates() throws Exception
-  {
+  public void testParticipantStatusUpdates() throws Exception {
     ZkClient zkClient = new ZkClient(ZkIntegrationTestBase.ZK_ADDR);
     zkClient.setZkSerializer(new ZNRecordSerializer());
     ZKHelixDataAccessor accessor =
@@ -50,54 +47,34 @@ public class TestStatusUpdate extends ZkStandAloneCMTestBase
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
     Assert.assertNotNull(extViews);
 
-    for (ExternalView extView : extViews)
-    {
+    for (ExternalView extView : extViews) {
       String resourceName = extView.getResourceName();
       Set<String> partitionSet = extView.getPartitionSet();
-      for (String partition : partitionSet)
-      {
+      for (String partition : partitionSet) {
         Map<String, String> stateMap = extView.getStateMap(partition);
-        for (String instance : stateMap.keySet())
-        {
+        for (String instance : stateMap.keySet()) {
           String state = stateMap.get(instance);
           StatusUpdateUtil.StatusUpdateContents statusUpdates =
-              StatusUpdateUtil.StatusUpdateContents.getStatusUpdateContents(accessor,
-                                                                            instance,
-                                                                            resourceName,
-                                                                            partition);
+              StatusUpdateUtil.StatusUpdateContents.getStatusUpdateContents(accessor, instance,
+                  resourceName, partition);
 
-          Map<String, StatusUpdateUtil.TaskStatus> taskMessages =
-              statusUpdates.getTaskMessages();
+          Map<String, StatusUpdateUtil.TaskStatus> taskMessages = statusUpdates.getTaskMessages();
           List<StatusUpdateUtil.Transition> transitions = statusUpdates.getTransitions();
-          if (state.equals("MASTER"))
-          {
-            Assert.assertEquals(transitions.size() >= 2,
-                                true,
-                                "Invalid number of transitions");
-            StatusUpdateUtil.Transition lastTransition =
-                transitions.get(transitions.size() - 1);
-            StatusUpdateUtil.Transition prevTransition =
-                transitions.get(transitions.size() - 2);
+          if (state.equals("MASTER")) {
+            Assert.assertEquals(transitions.size() >= 2, true, "Invalid number of transitions");
+            StatusUpdateUtil.Transition lastTransition = transitions.get(transitions.size() - 1);
+            StatusUpdateUtil.Transition prevTransition = transitions.get(transitions.size() - 2);
             Assert.assertEquals(taskMessages.get(lastTransition.getMsgID()),
-                                StatusUpdateUtil.TaskStatus.COMPLETED,
-                                "Incomplete transition");
+                StatusUpdateUtil.TaskStatus.COMPLETED, "Incomplete transition");
             Assert.assertEquals(taskMessages.get(prevTransition.getMsgID()),
-                                StatusUpdateUtil.TaskStatus.COMPLETED,
-                                "Incomplete transition");
+                StatusUpdateUtil.TaskStatus.COMPLETED, "Incomplete transition");
             Assert.assertEquals(lastTransition.getFromState(), "SLAVE", "Invalid State");
             Assert.assertEquals(lastTransition.getToState(), "MASTER", "Invalid State");
-          }
-          else if (state.equals("SLAVE"))
-          {
-            Assert.assertEquals(transitions.size() >= 1,
-                                true,
-                                "Invalid number of transitions");
-            StatusUpdateUtil.Transition lastTransition =
-                transitions.get(transitions.size() - 1);
+          } else if (state.equals("SLAVE")) {
+            Assert.assertEquals(transitions.size() >= 1, true, "Invalid number of transitions");
+            StatusUpdateUtil.Transition lastTransition = transitions.get(transitions.size() - 1);
             Assert.assertEquals(lastTransition.getFromState().equals("MASTER")
-                                    || lastTransition.getFromState().equals("OFFLINE"),
-                                true,
-                                "Invalid transition");
+                || lastTransition.getFromState().equals("OFFLINE"), true, "Invalid transition");
             Assert.assertEquals(lastTransition.getToState(), "SLAVE", "Invalid State");
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
index 19073d8..a1f63aa 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
@@ -30,115 +30,93 @@ import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-
-public class TestSwapInstance extends ZkStandAloneCMTestBase
-{
+public class TestSwapInstance extends ZkStandAloneCMTestBase {
   @Test
-  public void TestSwap() throws Exception
-  {
+  public void TestSwap() throws Exception {
     String controllerName = CONTROLLER_PREFIX + "_0";
     HelixManager manager = _startCMResultMap.get(controllerName)._manager;
     HelixDataAccessor helixAccessor = manager.getHelixDataAccessor();
     _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL);
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica);
-    
-    
+
     ZNRecord idealStateOld1 = new ZNRecord("TestDB");
     ZNRecord idealStateOld2 = new ZNRecord("MyDB");
-    
+
     IdealState is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
     idealStateOld1.merge(is1.getRecord());
-    
 
     IdealState is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
     idealStateOld2.merge(is2.getRecord());
-    
+
     String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
     ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
     _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
 
-    boolean result = ClusterStateVerifier.verifyByPolling(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    boolean result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
-    
+
     String instanceName2 = PARTICIPANT_PREFIX + "_" + (START_PORT + 444);
     _setupTool.addInstanceToCluster(CLUSTER_NAME, instanceName2);
-    
+
     boolean exception = false;
-    try
-    {
+    try {
       _setupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
-    }
-    catch(Exception e)
-    {
+    } catch (Exception e) {
       exception = true;
     }
     Assert.assertTrue(exception);
-    
+
     _startCMResultMap.get(instanceName)._manager.disconnect();
     _startCMResultMap.get(instanceName)._thread.interrupt();
     Thread.sleep(1000);
-    
+
     exception = false;
-    try
-    {
+    try {
       _setupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
-    }
-    catch(Exception e)
-    {
+    } catch (Exception e) {
       e.printStackTrace();
       exception = true;
     }
     Assert.assertFalse(exception);
-    StartCMResult result2 =
-        TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2);
+    StartCMResult result2 = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2);
     _startCMResultMap.put(instanceName2, result2);
-    
-    result = ClusterStateVerifier.verifyByPolling(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
-    
+
     is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
-    
+
     is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
-    
-    for(String key : idealStateOld1.getMapFields().keySet())
-    {
-      for(String host : idealStateOld1.getMapField(key).keySet())
-      {
-        if(host.equals(instanceName))
-        {
-          Assert.assertTrue(
-          idealStateOld1.getMapField(key).get(instanceName).equals(
-          is1.getRecord().getMapField(key).get(instanceName2)));
-        }
-        else
-        {
-          Assert.assertTrue(
-              idealStateOld1.getMapField(key).get(host).equals(
-              is1.getRecord().getMapField(key).get(host)));
+
+    for (String key : idealStateOld1.getMapFields().keySet()) {
+      for (String host : idealStateOld1.getMapField(key).keySet()) {
+        if (host.equals(instanceName)) {
+          Assert.assertTrue(idealStateOld1.getMapField(key).get(instanceName)
+              .equals(is1.getRecord().getMapField(key).get(instanceName2)));
+        } else {
+          Assert.assertTrue(idealStateOld1.getMapField(key).get(host)
+              .equals(is1.getRecord().getMapField(key).get(host)));
         }
       }
     }
-    
-    for(String key : idealStateOld1.getListFields().keySet())
-    {
-      Assert.assertEquals(idealStateOld1.getListField(key).size() , is1.getRecord().getListField(key).size());
-      for(int i = 0; i < idealStateOld1.getListField(key).size(); i++)
-      {
+
+    for (String key : idealStateOld1.getListFields().keySet()) {
+      Assert.assertEquals(idealStateOld1.getListField(key).size(), is1.getRecord()
+          .getListField(key).size());
+      for (int i = 0; i < idealStateOld1.getListField(key).size(); i++) {
         String host = idealStateOld1.getListField(key).get(i);
         String newHost = is1.getRecord().getListField(key).get(i);
-        if(host.equals(instanceName))
-        {
-          Assert.assertTrue(
-              newHost.equals(instanceName2));
-        }
-        else
-        {
-          //System.out.println(key + " " + i+ " " + host + " "+newHost);
-          //System.out.println(idealStateOld1.getListField(key));
-          //System.out.println(is1.getRecord().getListField(key));
-          
+        if (host.equals(instanceName)) {
+          Assert.assertTrue(newHost.equals(instanceName2));
+        } else {
+          // System.out.println(key + " " + i+ " " + host + " "+newHost);
+          // System.out.println(idealStateOld1.getListField(key));
+          // System.out.println(is1.getRecord().getListField(key));
+
           Assert.assertTrue(host.equals(newHost));
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 03cf4f8..347fcb6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -37,444 +37,484 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
-	
-	@Test
-	public void testCbHandlerLeakOnParticipantSessionExpiry() throws Exception
-	{
-	    // Logger.getRootLogger().setLevel(Level.INFO);
-	    String className = TestHelper.getTestClassName();
-	    String methodName = TestHelper.getTestMethodName();
-	    String clusterName = className + "_" + methodName;
-	    final int n = 2;
-
-	    System.out.println("START " + clusterName + " at "
-	        + new Date(System.currentTimeMillis()));
-
-	    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-	                            "localhost", // participant name prefix
-	                            "TestDB", // resource name prefix
-	                            1, // resources
-	                            32, // partitions per resource
-	                            n, // number of nodes
-	                            2, // replicas
-	                            "MasterSlave",
-	                            true); // do rebalance
-	    
-	    
-	    ClusterController controller =
-	        new ClusterController(clusterName, "controller_0", ZK_ADDR);
-	    controller.syncStart();
-
-	    // start participants
-	    MockParticipant[] participants = new MockParticipant[n];
-	    for (int i = 0; i < n; i++)
-	    {
-	      String instanceName = "localhost_" + (12918 + i);
-
-	      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
-	      participants[i].syncStart();
-	    }
-
-	    boolean result =
-	        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-	                                                                                 clusterName));
-	    Assert.assertTrue(result);
-	    final ZkHelixTestManager controllerManager = controller.getManager();
-	    final ZkHelixTestManager participantManagerToExpire = (ZkHelixTestManager)participants[1].getManager();
-
-	    // check controller zk-watchers
-	    result = TestHelper.verify(new TestHelper.Verifier() {
-        
-        @Override
-        public boolean verify() throws Exception {
-          Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
-          Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
-          // System.out.println("controller watch paths: " + watchPaths);
-          
-          // controller should have 5 + 2n + m + (m+2)n zk-watchers
-          // where n is number of nodes and m is number of resources
-          return watchPaths.size() == (6 + 5 * n);
-        }
-      }, 500);
-	    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers.");
-	    
-	    // check participant zk-watchers
-      result = TestHelper.verify(new TestHelper.Verifier() {
-        
-        @Override
-        public boolean verify() throws Exception {
-          Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
-          Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
-          // System.out.println("participant watch paths: " + watchPaths);
-          
-          // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
-          return watchPaths.size() == 2;
-        }
-      }, 500);
-      Assert.assertTrue(result, "Participant should have 2 zk-watchers.");
-
-	    
-	    // check HelixManager#_handlers
-	    // printHandlers(controllerManager);
-	    // printHandlers(participantManagerToExpire);
-	    int controllerHandlerNb = controllerManager.getHandlers().size();
-	    int particHandlerNb = participantManagerToExpire.getHandlers().size();
-	    Assert.assertEquals(controllerHandlerNb, 9, "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
-	    Assert.assertEquals(particHandlerNb, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers");
-	    
-	    // expire the session of participant
-	    System.out.println("Expiring participant session...");
-	    String oldSessionId = participantManagerToExpire.getSessionId();
-	    
-	    ZkTestHelper.expireSession(participantManagerToExpire.getZkClient());
-	    String newSessionId = participantManagerToExpire.getSessionId();
-	    System.out.println("Expried participant session. oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
-
-	    result =
-	        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-	                                                                                                   clusterName));
-	    Assert.assertTrue(result);
-	    
-	    // check controller zk-watchers
-     result = TestHelper.verify(new TestHelper.Verifier() {
-        
-        @Override
-        public boolean verify() throws Exception {
-          Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
-          Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
-          // System.out.println("controller watch paths after session expiry: " + watchPaths);
-          
-          // controller should have 5 + 2n + m + (m+2)n zk-watchers
-          // where n is number of nodes and m is number of resources
-          return watchPaths.size() == (6 + 5 * n);
-        }
-      }, 500);
-      Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
-
-      // check participant zk-watchers
-      result = TestHelper.verify(new TestHelper.Verifier() {
-        
-        @Override
-        public boolean verify() throws Exception {
-          Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
-          Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
-          // System.out.println("participant watch paths after session expiry: " + watchPaths);
-          
-          // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
-          return watchPaths.size() == 2;
-        }
-      }, 500);
-      Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
-
-
-	    // check handlers
-	    // printHandlers(controllerManager);
-	    // printHandlers(participantManagerToExpire);
-	    int handlerNb = controllerManager.getHandlers().size();
-	    Assert.assertEquals(handlerNb, controllerHandlerNb, "controller callback handlers should not increase after participant session expiry");
-	    handlerNb = participantManagerToExpire.getHandlers().size();
-	    Assert.assertEquals(handlerNb, particHandlerNb, "participant callback handlers should not increase after participant session expiry");
-
-	    System.out.println("END " + clusterName + " at "
-	            + new Date(System.currentTimeMillis()));
-	}
-	
-	@Test
-	public void testCbHandlerLeakOnControllerSessionExpiry() throws Exception
-	{
-	    // Logger.getRootLogger().setLevel(Level.INFO);
-	    String className = TestHelper.getTestClassName();
-	    String methodName = TestHelper.getTestMethodName();
-	    String clusterName = className + "_" + methodName;
-	    final int n = 2;
-
-	    System.out.println("START " + clusterName + " at "
-	        + new Date(System.currentTimeMillis()));
-
-	    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-	                            "localhost", // participant name prefix
-	                            "TestDB", // resource name prefix
-	                            1, // resources
-	                            32, // partitions per resource
-	                            n, // number of nodes
-	                            2, // replicas
-	                            "MasterSlave",
-	                            true); // do rebalance
-	    
-	    
-	    ClusterController controller =
-	        new ClusterController(clusterName, "controller_0", ZK_ADDR);
-	    controller.syncStart();
-
-	    // start participants
-	    MockParticipant[] participants = new MockParticipant[n];
-	    for (int i = 0; i < n; i++)
-	    {
-	      String instanceName = "localhost_" + (12918 + i);
-
-	      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
-	      participants[i].syncStart();
-	    }
-
-	    boolean result =
-	        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-	                                                                                 clusterName));
-	    Assert.assertTrue(result);
-	    final ZkHelixTestManager controllerManager = controller.getManager();
-	    final ZkHelixTestManager participantManager = participants[0].getManager();
-
-	    // wait until we get all the listeners registered
-      result = TestHelper.verify(new TestHelper.Verifier() {
-        
-        @Override
-        public boolean verify() throws Exception {
-          int controllerHandlerNb = controllerManager.getHandlers().size();
-          int particHandlerNb = participantManager.getHandlers().size();
-          if (controllerHandlerNb == 9 && particHandlerNb == 2)
-            return true;
-          else
-            return false;
-        }
-      }, 1000);
-	    
-	    int controllerHandlerNb = controllerManager.getHandlers().size();
-	    int particHandlerNb = participantManager.getHandlers().size();
-    	Assert.assertEquals(controllerHandlerNb, 9, "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was " 
-    	    		+ controllerHandlerNb + ", "
-    	    		+ printHandlers(controllerManager));
-    	Assert.assertEquals(particHandlerNb, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was " 
-    	    		+ particHandlerNb + ", "
-    	    		+ printHandlers(participantManager));
-
-	    // expire controller
-	    System.out.println("Expiring controller session...");
-	    String oldSessionId = controllerManager.getSessionId();
-	    
-	    ZkTestHelper.expireSession(controllerManager.getZkClient());
-	    String newSessionId = controllerManager.getSessionId();
-	    System.out.println("Expired controller session. oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
-	    
-	    result =
-	        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-	                                                                                                   clusterName));
-	    Assert.assertTrue(result);
-	    
-	    // check controller zk-watchers
-     result = TestHelper.verify(new TestHelper.Verifier() {
-        
-        @Override
-        public boolean verify() throws Exception {
-          Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
-          Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
-          // System.out.println("controller watch paths after session expiry: " + watchPaths);
-          
-          // controller should have 5 + 2n + m + (m+2)n zk-watchers
-          // where n is number of nodes and m is number of resources
-          return watchPaths.size() == (6 + 5 * n);
-        }
-      }, 500);
-      Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
-
-      // check participant zk-watchers
-      result = TestHelper.verify(new TestHelper.Verifier() {
-        
-        @Override
-        public boolean verify() throws Exception {
-          Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
-          Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId());
-          // System.out.println("participant watch paths after session expiry: " + watchPaths);
-          
-          // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
-          return watchPaths.size() == 2;
-        }
-      }, 500);
-      Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
-	    
-	    // check HelixManager#_handlers
-	    // printHandlers(controllerManager);
-	    int handlerNb = controllerManager.getHandlers().size();
-	    Assert.assertEquals(handlerNb, controllerHandlerNb, "controller callback handlers should not increase after participant session expiry, but was "
-	    		+ printHandlers(controllerManager));
-	    handlerNb = participantManager.getHandlers().size();
-	    Assert.assertEquals(handlerNb, particHandlerNb, "participant callback handlers should not increase after participant session expiry, but was " 
-	    		+ printHandlers(participantManager));
-
-
-	    System.out.println("END " + clusterName + " at "
-	            + new Date(System.currentTimeMillis()));
-	}
-
-
-    @Test
-    public void testRemoveUserCbHandlerOnPathRemoval() throws  Exception {
-        String className = TestHelper.getTestClassName();
-        String methodName = TestHelper.getTestMethodName();
-        String clusterName = className + "_" + methodName;
-        final int n = 3;
-        final String zkAddr = ZK_ADDR;
-        System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-        TestHelper.setupCluster(clusterName, zkAddr, 12918,
-                "localhost",
-                "TestDB",
-                1,  // resource
-                32, // partitions
-                n,  // nodes
-                2,  // replicas
-                "MasterSlave",
-                true);
-
-        ClusterController controller = new ClusterController(clusterName, "controller_0", zkAddr);
-        controller.syncStart();
-
-        MockParticipant[] participants = new MockParticipant[n];
-        for (int i = 0; i < n; i++) {
-            String instanceName = "localhost_" + (12918 + i);
-            participants[i] = new MockParticipant(clusterName, instanceName, zkAddr, null);
-            participants[i].syncStart();
-
-            // register a controller listener on participant_0
-            if (i == 0) {
-                ZkHelixTestManager manager = participants[0].getManager();
-                manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
-                    @Override
-                    public void onStateChange(String instanceName, List<CurrentState> statesInfo, NotificationContext changeContext) {
-                        //To change body of implemented methods use File | Settings | File Templates.
-                        // System.out.println(instanceName + " on current-state change, type: " + changeContext.getType());
-                    }
-                }, manager.getInstanceName(), manager.getSessionId());
-            }
-        }
-
-        Boolean result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
-                clusterName));
-        Assert.assertTrue(result);
-
-        ZkHelixTestManager participantToExpire = participants[0].getManager();
-        String oldSessionId = participantToExpire.getSessionId();
-        PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
-
-
-        // check manager#hanlders
-        Assert.assertEquals(participantToExpire.getHandlers().size(), 3, "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
-
-        // check zkclient#listeners
-        Map<String, Set<IZkDataListener>> dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
-        Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
-        // printZkListeners(participantToExpire.getZkClient());
-        Assert.assertEquals(dataListeners.size(), 1, "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners");
-        String path = keyBuilder.currentState(participantToExpire.getInstanceName(), oldSessionId, "TestDB0").getPath();
-        Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: " + path);
-        Assert.assertEquals(childListeners.size(), 3, "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
-        path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
-        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
-        path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
-        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
-        path = keyBuilder.controller().getPath();
-        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
-
-        // check zookeeper#watches on client side
-        Map<String, List<String>> watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
-        // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
-        Assert.assertEquals(watchPaths.get("dataWatches").size(), 4, "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
-        Assert.assertEquals(watchPaths.get("childWatches").size(), 3, "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
-
-
-        // expire localhost_12918
-        System.out.println("Expire participant: " + participantToExpire.getInstanceName() + ", session: " + participantToExpire.getSessionId());
-        ZkTestHelper.expireSession(participantToExpire.getZkClient());
-        String newSessionId = participantToExpire.getSessionId();
-        System.out.println(participantToExpire.getInstanceName() + " oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
-        result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
-                clusterName));
-        Assert.assertTrue(result);
 
-        // check manager#hanlders
-        Assert.assertEquals(participantToExpire.getHandlers().size(), 2, "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
+  @Test
+  public void testCbHandlerLeakOnParticipantSessionExpiry() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
 
-        // check zkclient#listeners
-        dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
-        childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
-        // printZkListeners(participantToExpire.getZkClient());
-        Assert.assertTrue(dataListeners.isEmpty(), "Should have no data-listeners");
-        Assert.assertEquals(childListeners.size(), 3, "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
-                + "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
-        path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
-        Assert.assertEquals(childListeners.get(path).size(), 0, "Should have no child-listener on path: " + path);
-        path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
-        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
-        path = keyBuilder.controller().getPath();
-        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
-
-        // check zookeeper#watches on client side
-        watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
-        // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
-        Assert.assertEquals(watchPaths.get("dataWatches").size(), 2, "Should have 2 data-watches: CONTROLLER and MESSAGES");
-        Assert.assertEquals(watchPaths.get("childWatches").size(), 2, "Should have 2 child-watches: CONTROLLER and MESSAGES");
-        Assert.assertEquals(watchPaths.get("existWatches").size(), 2, "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
-
-        // another session expiry on localhost_12918 should clear the two exist-watches on CURRENTSTATE/{oldSessionId}
-        System.out.println("Expire participant: " + participantToExpire.getInstanceName() + ", session: " + participantToExpire.getSessionId());
-        ZkTestHelper.expireSession(participantToExpire.getZkClient());
-        result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
                 clusterName));
-        Assert.assertTrue(result);
-
-        // check zookeeper#watches on client side
-        watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
-        // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
-        Assert.assertEquals(watchPaths.get("dataWatches").size(), 2, "Should have 2 data-watches: CONTROLLER and MESSAGES");
-        Assert.assertEquals(watchPaths.get("childWatches").size(), 2, "Should have 2 child-watches: CONTROLLER and MESSAGES");
-        Assert.assertEquals(watchPaths.get("existWatches").size(), 0, "Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
-
-        // Thread.sleep(1000);
-        System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    Assert.assertTrue(result);
+    final ZkHelixTestManager controllerManager = controller.getManager();
+    final ZkHelixTestManager participantManagerToExpire =
+        (ZkHelixTestManager) participants[1].getManager();
+
+    // check controller zk-watchers
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
+        Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+        // System.out.println("controller watch paths: " + watchPaths);
+
+        // controller should have 5 + 2n + m + (m+2)n zk-watchers
+        // where n is number of nodes and m is number of resources
+        return watchPaths.size() == (6 + 5 * n);
+      }
+    }, 500);
+    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers.");
+
+    // check participant zk-watchers
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
+        Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
+        // System.out.println("participant watch paths: " + watchPaths);
+
+        // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
+        return watchPaths.size() == 2;
+      }
+    }, 500);
+    Assert.assertTrue(result, "Participant should have 2 zk-watchers.");
+
+    // check HelixManager#_handlers
+    // printHandlers(controllerManager);
+    // printHandlers(participantManagerToExpire);
+    int controllerHandlerNb = controllerManager.getHandlers().size();
+    int particHandlerNb = participantManagerToExpire.getHandlers().size();
+    Assert.assertEquals(controllerHandlerNb, 9,
+        "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
+    Assert.assertEquals(particHandlerNb, 2,
+        "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+
+    // expire the session of participant
+    System.out.println("Expiring participant session...");
+    String oldSessionId = participantManagerToExpire.getSessionId();
+
+    ZkTestHelper.expireSession(participantManagerToExpire.getZkClient());
+    String newSessionId = participantManagerToExpire.getSessionId();
+    System.out.println("Expried participant session. oldSessionId: " + oldSessionId
+        + ", newSessionId: " + newSessionId);
+
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // check controller zk-watchers
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
+        Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+        // System.out.println("controller watch paths after session expiry: " + watchPaths);
+
+        // controller should have 5 + 2n + m + (m+2)n zk-watchers
+        // where n is number of nodes and m is number of resources
+        return watchPaths.size() == (6 + 5 * n);
+      }
+    }, 500);
+    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
+
+    // check participant zk-watchers
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
+        Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
+        // System.out.println("participant watch paths after session expiry: " + watchPaths);
+
+        // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
+        return watchPaths.size() == 2;
+      }
+    }, 500);
+    Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+
+    // check handlers
+    // printHandlers(controllerManager);
+    // printHandlers(participantManagerToExpire);
+    int handlerNb = controllerManager.getHandlers().size();
+    Assert.assertEquals(handlerNb, controllerHandlerNb,
+        "controller callback handlers should not increase after participant session expiry");
+    handlerNb = participantManagerToExpire.getHandlers().size();
+    Assert.assertEquals(handlerNb, particHandlerNb,
+        "participant callback handlers should not increase after participant session expiry");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testCbHandlerLeakOnControllerSessionExpiry() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
     }
 
-	// debug code
-	static String printHandlers(ZkHelixTestManager manager) 
-	{
-		StringBuilder sb = new StringBuilder();
-	    List<CallbackHandler> handlers = manager.getHandlers();
-    	sb.append(manager.getInstanceName() + " has " + handlers.size() + " cb-handlers. [");
-    	
-	    for (int i = 0; i < handlers.size(); i++) {
-	    	CallbackHandler handler = handlers.get(i);
-	    	String path = handler.getPath();
-	    	sb.append(path.substring(manager.getClusterName().length() + 1) + ": " + handler.getListener());
-	    	if (i < (handlers.size() - 1) ) {
-	    		sb.append(", ");
-	    	}
-	    }
-	    sb.append("]");
-	    
-	    return sb.toString();
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+    final ZkHelixTestManager controllerManager = controller.getManager();
+    final ZkHelixTestManager participantManager = participants[0].getManager();
+
+    // wait until we get all the listeners registered
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        int controllerHandlerNb = controllerManager.getHandlers().size();
+        int particHandlerNb = participantManager.getHandlers().size();
+        if (controllerHandlerNb == 9 && particHandlerNb == 2)
+          return true;
+        else
+          return false;
+      }
+    }, 1000);
+
+    int controllerHandlerNb = controllerManager.getHandlers().size();
+    int particHandlerNb = participantManager.getHandlers().size();
+    Assert.assertEquals(controllerHandlerNb, 9,
+        "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
+            + controllerHandlerNb + ", " + printHandlers(controllerManager));
+    Assert.assertEquals(particHandlerNb, 2,
+        "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was "
+            + particHandlerNb + ", " + printHandlers(participantManager));
+
+    // expire controller
+    System.out.println("Expiring controller session...");
+    String oldSessionId = controllerManager.getSessionId();
+
+    ZkTestHelper.expireSession(controllerManager.getZkClient());
+    String newSessionId = controllerManager.getSessionId();
+    System.out.println("Expired controller session. oldSessionId: " + oldSessionId
+        + ", newSessionId: " + newSessionId);
+
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // check controller zk-watchers
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
+        Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+        // System.out.println("controller watch paths after session expiry: " + watchPaths);
+
+        // controller should have 5 + 2n + m + (m+2)n zk-watchers
+        // where n is number of nodes and m is number of resources
+        return watchPaths.size() == (6 + 5 * n);
+      }
+    }, 500);
+    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry.");
+
+    // check participant zk-watchers
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
+        Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId());
+        // System.out.println("participant watch paths after session expiry: " + watchPaths);
+
+        // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
+        return watchPaths.size() == 2;
+      }
+    }, 500);
+    Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+
+    // check HelixManager#_handlers
+    // printHandlers(controllerManager);
+    int handlerNb = controllerManager.getHandlers().size();
+    Assert.assertEquals(handlerNb, controllerHandlerNb,
+        "controller callback handlers should not increase after participant session expiry, but was "
+            + printHandlers(controllerManager));
+    handlerNb = participantManager.getHandlers().size();
+    Assert.assertEquals(handlerNb, particHandlerNb,
+        "participant callback handlers should not increase after participant session expiry, but was "
+            + printHandlers(participantManager));
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testRemoveUserCbHandlerOnPathRemoval() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 3;
+    final String zkAddr = ZK_ADDR;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, zkAddr, 12918, "localhost", "TestDB", 1, // resource
+        32, // partitions
+        n, // nodes
+        2, // replicas
+        "MasterSlave", true);
+
+    ClusterController controller = new ClusterController(clusterName, "controller_0", zkAddr);
+    controller.syncStart();
+
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipant(clusterName, instanceName, zkAddr, null);
+      participants[i].syncStart();
+
+      // register a controller listener on participant_0
+      if (i == 0) {
+        ZkHelixTestManager manager = participants[0].getManager();
+        manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
+          @Override
+          public void onStateChange(String instanceName, List<CurrentState> statesInfo,
+              NotificationContext changeContext) {
+            // To change body of implemented methods use File | Settings | File Templates.
+            // System.out.println(instanceName + " on current-state change, type: " +
+            // changeContext.getType());
+          }
+        }, manager.getInstanceName(), manager.getSessionId());
+      }
     }
 
-    void printZkListeners(ZkClient client) throws  Exception{
-        Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
-        Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
-
-        System.out.println("dataListeners {");
-        for (String path : datalisteners.keySet()) {
-            System.out.println("\t" + path + ": ");
-            Set<IZkDataListener> set = datalisteners.get(path);
-            for (IZkDataListener listener : set) {
-                CallbackHandler handler = (CallbackHandler)listener;
-                System.out.println("\t\t" + handler.getListener());
-            }
-        }
-        System.out.println("}");
-
-        System.out.println("childListeners {");
-        for (String path : childListeners.keySet()) {
-            System.out.println("\t" + path + ": ");
-            Set<IZkChildListener> set = childListeners.get(path);
-            for (IZkChildListener listener : set) {
-                CallbackHandler handler = (CallbackHandler)listener;
-                System.out.println("\t\t" + handler.getListener());
-            }
-        }
-        System.out.println("}");
+    Boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    ZkHelixTestManager participantToExpire = participants[0].getManager();
+    String oldSessionId = participantToExpire.getSessionId();
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+    // check manager#hanlders
+    Assert.assertEquals(participantToExpire.getHandlers().size(), 3,
+        "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
+
+    // check zkclient#listeners
+    Map<String, Set<IZkDataListener>> dataListeners =
+        ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
+    Map<String, Set<IZkChildListener>> childListeners =
+        ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
+    // printZkListeners(participantToExpire.getZkClient());
+    Assert.assertEquals(dataListeners.size(), 1,
+        "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners");
+    String path =
+        keyBuilder.currentState(participantToExpire.getInstanceName(), oldSessionId, "TestDB0")
+            .getPath();
+    Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: "
+        + path);
+    Assert
+        .assertEquals(
+            childListeners.size(),
+            3,
+            "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
+    path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
+    Assert.assertEquals(childListeners.get(path).size(), 1,
+        "Should have 1 child-listener on path: " + path);
+    path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
+    Assert.assertEquals(childListeners.get(path).size(), 1,
+        "Should have 1 child-listener on path: " + path);
+    path = keyBuilder.controller().getPath();
+    Assert.assertEquals(childListeners.get(path).size(), 1,
+        "Should have 1 child-listener on path: " + path);
+
+    // check zookeeper#watches on client side
+    Map<String, List<String>> watchPaths =
+        ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+    // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
+    Assert
+        .assertEquals(
+            watchPaths.get("dataWatches").size(),
+            4,
+            "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
+    Assert.assertEquals(watchPaths.get("childWatches").size(), 3,
+        "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
+
+    // expire localhost_12918
+    System.out.println("Expire participant: " + participantToExpire.getInstanceName()
+        + ", session: " + participantToExpire.getSessionId());
+    ZkTestHelper.expireSession(participantToExpire.getZkClient());
+    String newSessionId = participantToExpire.getSessionId();
+    System.out.println(participantToExpire.getInstanceName() + " oldSessionId: " + oldSessionId
+        + ", newSessionId: " + newSessionId);
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // check manager#hanlders
+    Assert
+        .assertEquals(
+            participantToExpire.getHandlers().size(),
+            2,
+            "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
+
+    // check zkclient#listeners
+    dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
+    childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
+    // printZkListeners(participantToExpire.getZkClient());
+    Assert.assertTrue(dataListeners.isEmpty(), "Should have no data-listeners");
+    Assert
+        .assertEquals(
+            childListeners.size(),
+            3,
+            "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
+                + "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
+    path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
+    Assert.assertEquals(childListeners.get(path).size(), 0,
+        "Should have no child-listener on path: " + path);
+    path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
+    Assert.assertEquals(childListeners.get(path).size(), 1,
+        "Should have 1 child-listener on path: " + path);
+    path = keyBuilder.controller().getPath();
+    Assert.assertEquals(childListeners.get(path).size(), 1,
+        "Should have 1 child-listener on path: " + path);
+
+    // check zookeeper#watches on client side
+    watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+    // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
+    Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
+        "Should have 2 data-watches: CONTROLLER and MESSAGES");
+    Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
+        "Should have 2 child-watches: CONTROLLER and MESSAGES");
+    Assert
+        .assertEquals(watchPaths.get("existWatches").size(), 2,
+            "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
+
+    // another session expiry on localhost_12918 should clear the two exist-watches on
+    // CURRENTSTATE/{oldSessionId}
+    System.out.println("Expire participant: " + participantToExpire.getInstanceName()
+        + ", session: " + participantToExpire.getSessionId());
+    ZkTestHelper.expireSession(participantToExpire.getZkClient());
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // check zookeeper#watches on client side
+    watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+    // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
+    Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
+        "Should have 2 data-watches: CONTROLLER and MESSAGES");
+    Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
+        "Should have 2 child-watches: CONTROLLER and MESSAGES");
+    Assert
+        .assertEquals(
+            watchPaths.get("existWatches").size(),
+            0,
+            "Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
+
+    // Thread.sleep(1000);
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  // debug code
+  static String printHandlers(ZkHelixTestManager manager) {
+    StringBuilder sb = new StringBuilder();
+    List<CallbackHandler> handlers = manager.getHandlers();
+    sb.append(manager.getInstanceName() + " has " + handlers.size() + " cb-handlers. [");
+
+    for (int i = 0; i < handlers.size(); i++) {
+      CallbackHandler handler = handlers.get(i);
+      String path = handler.getPath();
+      sb.append(path.substring(manager.getClusterName().length() + 1) + ": "
+          + handler.getListener());
+      if (i < (handlers.size() - 1)) {
+        sb.append(", ");
+      }
+    }
+    sb.append("]");
+
+    return sb.toString();
+  }
+
+  void printZkListeners(ZkClient client) throws Exception {
+    Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
+    Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
+
+    System.out.println("dataListeners {");
+    for (String path : datalisteners.keySet()) {
+      System.out.println("\t" + path + ": ");
+      Set<IZkDataListener> set = datalisteners.get(path);
+      for (IZkDataListener listener : set) {
+        CallbackHandler handler = (CallbackHandler) listener;
+        System.out.println("\t\t" + handler.getListener());
+      }
+    }
+    System.out.println("}");
+
+    System.out.println("childListeners {");
+    for (String path : childListeners.keySet()) {
+      System.out.println("\t" + path + ": ");
+      Set<IZkChildListener> set = childListeners.get(path);
+      for (IZkChildListener listener : set) {
+        CallbackHandler handler = (CallbackHandler) listener;
+        System.out.println("\t\t" + handler.getListener());
+      }
     }
+    System.out.println("}");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
index 1edfa30..cc6c0b5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
@@ -52,7 +52,7 @@ public class TestZkReconnect {
     final String zkAddr = String.format("localhost:%d", zkPort);
     ZkServer zkServer = TestHelper.startZkServer(zkAddr);
     zkServerRef.set(zkServer);
-    
+
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
@@ -64,7 +64,8 @@ public class TestZkReconnect {
 
     // Registers and starts controller
     LOG.info("Starts controller");
-    HelixManager controller = HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr);
+    HelixManager controller =
+        HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr);
     controller.connect();
 
     // Registers and starts participant
@@ -72,25 +73,25 @@ public class TestZkReconnect {
     String hostname = "localhost";
     String instanceId = String.format("%s_%d", hostname, 1);
     clusterSetup.addInstanceToCluster(clusterName, instanceId);
-    HelixManager participant = HelixManagerFactory.getZKHelixManager(clusterName, 
-        instanceId, InstanceType.PARTICIPANT, zkAddr);
+    HelixManager participant =
+        HelixManagerFactory.getZKHelixManager(clusterName, instanceId, InstanceType.PARTICIPANT,
+            zkAddr);
     participant.connect();
 
     LOG.info("Register state machine");
     final CountDownLatch latch = new CountDownLatch(1);
     participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
-            new StateModelFactory<StateModel>() {
-                @Override
-                public StateModel createNewStateModel(String stateUnitKey) {
-                    return new SimpleStateModel(latch);
-                }
-            }, "test"
-    );
+        new StateModelFactory<StateModel>() {
+          @Override
+          public StateModel createNewStateModel(String stateUnitKey) {
+            return new SimpleStateModel(latch);
+          }
+        }, "test");
 
     String resourceName = "test-resource";
     LOG.info("Ideal state assignment");
     HelixAdmin helixAdmin = participant.getClusterManagmentTool();
-    helixAdmin.addResource(clusterName, resourceName, 1, "OnlineOffline", 
+    helixAdmin.addResource(clusterName, resourceName, 1, "OnlineOffline",
         IdealState.RebalanceMode.CUSTOMIZED.toString());
 
     IdealState idealState = helixAdmin.getResourceIdealState(clusterName, resourceName);
@@ -102,37 +103,37 @@ public class TestZkReconnect {
     TestHelper.stopZkServer(zkServerRef.get());
     Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
 
-        @Override
-        public void run() {
-            try {
-                LOG.info("Restart ZK server");
-                // zkServer.set(TestUtils.startZookeeper(zkDir, zkPort));
-                zkServerRef.set(TestHelper.startZkServer(zkAddr, null, false));
-            } catch (Exception e) {
-                LOG.error(e.getMessage(), e);
-            }
+      @Override
+      public void run() {
+        try {
+          LOG.info("Restart ZK server");
+          // zkServer.set(TestUtils.startZookeeper(zkDir, zkPort));
+          zkServerRef.set(TestHelper.startZkServer(zkAddr, null, false));
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
         }
+      }
     }, 2L, TimeUnit.SECONDS);
 
     // future.get();
-    
+
     LOG.info("Before update ideal state");
     helixAdmin.setResourceIdealState(clusterName, resourceName, idealState);
     LOG.info("After update ideal state");
 
     LOG.info("Wait for OFFLINE->ONLINE state transition");
     try {
-        Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-        
-        // wait until stable state
-        boolean result =
-            ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkAddr,
-                                                                                     clusterName));
-        Assert.assertTrue(result);
+      Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+      // wait until stable state
+      boolean result =
+          ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkAddr,
+              clusterName));
+      Assert.assertTrue(result);
 
     } finally {
-        participant.disconnect();
-        zkServerRef.get().shutdown();
+      participant.disconnect();
+      zkServerRef.get().shutdown();
     }
   }
 
@@ -141,11 +142,11 @@ public class TestZkReconnect {
     private final CountDownLatch latch;
 
     public SimpleStateModel(CountDownLatch latch) {
-        this.latch = latch;
+      this.latch = latch;
     }
 
     public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
-        // LOG.info(HelixUtils.toString(message));
+      // LOG.info(HelixUtils.toString(message));
       LOG.info("message: " + message);
       latch.countDown();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index 21790b1..2ab0aaf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -42,30 +42,26 @@ import org.testng.AssertJUnit;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 
+public class ZkIntegrationTestBase {
+  private static Logger LOG = Logger.getLogger(ZkIntegrationTestBase.class);
 
-public class ZkIntegrationTestBase
-{
-  private static Logger         LOG                       =
-                                                              Logger.getLogger(ZkIntegrationTestBase.class);
-
-  protected static ZkServer     _zkServer;
-  protected static ZkClient     _gZkClient;
+  protected static ZkServer _zkServer;
+  protected static ZkClient _gZkClient;
   protected static ClusterSetup _gSetupTool;
 
-  public static final String    ZK_ADDR                   = "localhost:2183";
-  protected static final String CLUSTER_PREFIX            = "CLUSTER";
+  public static final String ZK_ADDR = "localhost:2183";
+  protected static final String CLUSTER_PREFIX = "CLUSTER";
   protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
 
-  protected final String        CONTROLLER_PREFIX         = "controller";
-  protected final String        PARTICIPANT_PREFIX        = "localhost";
+  protected final String CONTROLLER_PREFIX = "controller";
+  protected final String PARTICIPANT_PREFIX = "localhost";
 
   @BeforeSuite
-  public void beforeSuite() throws Exception
-  {
+  public void beforeSuite() throws Exception {
     // TODO: use logging.properties file to config java.util.logging.Logger levels
     java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
     topJavaLogger.setLevel(Level.WARNING);
-    
+
     _zkServer = TestHelper.startZkServer(ZK_ADDR);
     AssertJUnit.assertTrue(_zkServer != null);
     ZKClientPool.reset();
@@ -76,28 +72,24 @@ public class ZkIntegrationTestBase
   }
 
   @AfterSuite
-  public void afterSuite()
-  {
+  public void afterSuite() {
     ZKClientPool.reset();
     _gZkClient.close();
     TestHelper.stopZkServer(_zkServer);
   }
 
-  protected String getShortClassName()
-  {
+  protected String getShortClassName() {
     String className = this.getClass().getName();
     return className.substring(className.lastIndexOf('.') + 1);
   }
 
-  protected String getCurrentLeader(ZkClient zkClient, String clusterName)
-  {
+  protected String getCurrentLeader(ZkClient zkClient, String clusterName) {
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
-    if (leader == null)
-    {
+    if (leader == null) {
       return null;
     }
     return leader.getInstanceName();
@@ -105,16 +97,13 @@ public class ZkIntegrationTestBase
 
   /**
    * Stop current leader and returns the new leader
-   * 
    * @param zkClient
    * @param clusterName
    * @param startCMResultMap
    * @return
    */
-  protected String stopCurrentLeader(ZkClient zkClient,
-                                     String clusterName,
-                                     Map<String, StartCMResult> startCMResultMap)
-  {
+  protected String stopCurrentLeader(ZkClient zkClient, String clusterName,
+      Map<String, StartCMResult> startCMResultMap) {
     String leader = getCurrentLeader(zkClient, clusterName);
     Assert.assertTrue(leader != null);
     System.out.println("stop leader: " + leader + " in " + clusterName);
@@ -129,34 +118,27 @@ public class ZkIntegrationTestBase
 
     boolean isNewLeaderElected = false;
     String newLeader = null;
-    try
-    {
-      for (int i = 0; i < 5; i++)
-      {
+    try {
+      for (int i = 0; i < 5; i++) {
         Thread.sleep(1000);
         newLeader = getCurrentLeader(zkClient, clusterName);
-        if (!newLeader.equals(leader))
-        {
+        if (!newLeader.equals(leader)) {
           isNewLeaderElected = true;
           System.out.println("new leader elected: " + newLeader + " in " + clusterName);
           break;
         }
       }
-    }
-    catch (InterruptedException e)
-    {
+    } catch (InterruptedException e) {
       e.printStackTrace();
     }
-    if (isNewLeaderElected == false)
-    {
+    if (isNewLeaderElected == false) {
       System.out.println("fail to elect a new leader in " + clusterName);
     }
     AssertJUnit.assertTrue(isNewLeaderElected);
     return newLeader;
   }
 
-  protected void enableHealthCheck(String clusterName)
-  {
+  protected void enableHealthCheck(String clusterName) {
     ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName).build();
     new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index 13d966b..e759fc7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -39,47 +39,38 @@ import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
-
 /**
- * 
  * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
  * start 5 dummy participants verify the current states at end
  */
 
-public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
-{
-  private static Logger                LOG               =
-                                                             Logger.getLogger(ZkStandAloneCMTestBase.class);
-
-  protected static final int           NODE_NR           = 5;
-  protected static final int           START_PORT        = 12918;
-  protected static final String        STATE_MODEL       = "MasterSlave";
-  protected static final String        TEST_DB           = "TestDB";
-  protected static final int           _PARTITIONS       = 20;
-
-  protected ClusterSetup               _setupTool        = null;
-  protected final String               CLASS_NAME        = getShortClassName();
-  protected final String               CLUSTER_NAME      = CLUSTER_PREFIX + "_"
-                                                             + CLASS_NAME;
-
-  protected Map<String, StartCMResult> _startCMResultMap =
-                                                             new HashMap<String, StartCMResult>();
-  protected ZkClient                   _zkClient;
-  
+public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
+  private static Logger LOG = Logger.getLogger(ZkStandAloneCMTestBase.class);
+
+  protected static final int NODE_NR = 5;
+  protected static final int START_PORT = 12918;
+  protected static final String STATE_MODEL = "MasterSlave";
+  protected static final String TEST_DB = "TestDB";
+  protected static final int _PARTITIONS = 20;
+
+  protected ClusterSetup _setupTool = null;
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
+  protected ZkClient _zkClient;
+
   int _replica = 3;
 
   @BeforeClass
-  public void beforeClass() throws Exception
-  {
-//    Logger.getRootLogger().setLevel(Level.INFO);
-    System.out.println("START " + CLASS_NAME + " at "
-        + new Date(System.currentTimeMillis()));
+  public void beforeClass() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
     _zkClient = new ZkClient(ZK_ADDR);
     _zkClient.setZkSerializer(new ZNRecordSerializer());
     String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace))
-    {
+    if (_zkClient.exists(namespace)) {
       _zkClient.deleteRecursive(namespace);
     }
     _setupTool = new ClusterSetup(ZK_ADDR);
@@ -87,26 +78,20 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
     // setup storage cluster
     _setupTool.addCluster(CLUSTER_NAME, true);
     _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
-    for (int i = 0; i < NODE_NR; i++)
-    {
+    for (int i = 0; i < NODE_NR; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     }
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
 
     // start dummy participants
-    for (int i = 0; i < NODE_NR; i++)
-    {
+    for (int i = 0; i < NODE_NR; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null)
-      {
+      if (_startCMResultMap.get(instanceName) != null) {
         LOG.error("fail to start particpant:" + instanceName
             + "(participant with same name already exists)");
-      }
-      else
-      {
-        StartCMResult result =
-            TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+      } else {
+        StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
         _startCMResultMap.put(instanceName, result);
       }
     }
@@ -114,36 +99,31 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
     StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME,
-                                   controllerName,
-                                   ZK_ADDR,
-                                   HelixControllerMain.STANDALONE);
+        TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
+            HelixControllerMain.STANDALONE);
     _startCMResultMap.put(controllerName, startResult);
-    
+
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
-                                                                              CLUSTER_NAME));
+        ClusterStateVerifier
+            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
 
     result =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-                                                                                 CLUSTER_NAME));
+            CLUSTER_NAME));
     Assert.assertTrue(result);
   }
 
   @AfterClass
-  public void afterClass() throws Exception
-  {
+  public void afterClass() throws Exception {
     /**
      * shutdown order: 1) disconnect the controller 2) disconnect participants
      */
-   
+
     StartCMResult result;
     Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext())
-    {
+    while (it.hasNext()) {
       String instanceName = it.next().getKey();
-      if (instanceName.startsWith(CONTROLLER_PREFIX))
-      {
+      if (instanceName.startsWith(CONTROLLER_PREFIX)) {
         result = _startCMResultMap.get(instanceName);
         result._manager.disconnect();
         result._thread.interrupt();
@@ -153,8 +133,7 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
 
     Thread.sleep(100);
     it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext())
-    {
+    while (it.hasNext()) {
       String instanceName = it.next().getKey();
       result = _startCMResultMap.get(instanceName);
       result._manager.disconnect();
@@ -164,7 +143,6 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
 
     _zkClient.close();
     // logger.info("END at " + new Date(System.currentTimeMillis()));
-    System.out.println("END " + CLASS_NAME + " at "
-        + new Date(System.currentTimeMillis()));
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
index bca59bf..f19e5dd 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
@@ -31,59 +31,51 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
 /**
- * 
  * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
  * start 5 dummy participants verify the current states at end
  */
 
-public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase
-{
+public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase {
   @BeforeClass
-  public void beforeClass() throws Exception
-  {
+  public void beforeClass() throws Exception {
     ZKPropertyTransferServer.PERIOD = 500;
     ZkPropertyTransferClient.SEND_PERIOD = 500;
     ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
     super.beforeClass();
-    
+
     Thread.sleep(1000);
-    for (int i = 0; i < NODE_NR; i++)
-    {
+    for (int i = 0; i < NODE_NR; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null)
-      {
-        HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+      if (_startCMResultMap.get(instanceName) != null) {
+        HelixDataAccessor accessor =
+            _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
         Builder kb = accessor.keyBuilder();
-        List<StatusUpdate> statusUpdates = accessor.getChildValues(
-            kb.stateTransitionStatus(instanceName, _startCMResultMap.get(instanceName)._manager.getSessionId(),
-                TEST_DB));
-        for(int j = 0;j < 10; j++)
-        {
-          statusUpdates = accessor.getChildValues(
-            kb.stateTransitionStatus(instanceName, _startCMResultMap.get(instanceName)._manager.getSessionId(),
-                TEST_DB));
-          if(statusUpdates.size() == 0)
-          {
+        List<StatusUpdate> statusUpdates =
+            accessor.getChildValues(kb.stateTransitionStatus(instanceName,
+                _startCMResultMap.get(instanceName)._manager.getSessionId(), TEST_DB));
+        for (int j = 0; j < 10; j++) {
+          statusUpdates =
+              accessor.getChildValues(kb.stateTransitionStatus(instanceName,
+                  _startCMResultMap.get(instanceName)._manager.getSessionId(), TEST_DB));
+          if (statusUpdates.size() == 0) {
             Thread.sleep(500);
-          }
-          else
-          {
+          } else {
             break;
           }
         }
         Assert.assertTrue(statusUpdates.size() > 0);
-        for(StatusUpdate update : statusUpdates)
-        {
-          Assert.assertTrue(update.getRecord().getSimpleField(ZkPropertyTransferClient.USE_PROPERTYTRANSFER).equals("true"));
-          Assert.assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
+        for (StatusUpdate update : statusUpdates) {
+          Assert.assertTrue(update.getRecord()
+              .getSimpleField(ZkPropertyTransferClient.USE_PROPERTYTRANSFER).equals("true"));
+          Assert
+              .assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
         }
       }
     }
   }
 
   @AfterClass
-  public void afterClass() throws Exception
-  {
+  public void afterClass() throws Exception {
     super.afterClass();
     ZKPropertyTransferServer.getInstance().shutdown();
     ZKPropertyTransferServer.getInstance().reset();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index ef2d013..e0da9fb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -27,63 +27,47 @@ import org.apache.helix.manager.zk.ControllerManager;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.log4j.Logger;
 
-public class ClusterControllerManager extends ControllerManager implements Runnable, ZkTestManager
-{
+public class ClusterControllerManager extends ControllerManager implements Runnable, ZkTestManager {
   private static Logger LOG = Logger.getLogger(ClusterControllerManager.class);
 
   private final CountDownLatch _startCountDown = new CountDownLatch(1);
   private final CountDownLatch _stopCountDown = new CountDownLatch(1);
   private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
 
-  public ClusterControllerManager(String zkAddr, String clusterName, String controllerName)
-  {
+  public ClusterControllerManager(String zkAddr, String clusterName, String controllerName) {
     super(zkAddr, clusterName, controllerName);
   }
 
-  public void syncStop()
-  {
+  public void syncStop() {
     _stopCountDown.countDown();
-    try
-    {
+    try {
       _waitStopFinishCountDown.await();
-    }
-    catch (InterruptedException e)
-    {
+    } catch (InterruptedException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
     }
   }
 
-  public void syncStart()
-  {
+  public void syncStart() {
     // TODO: prevent start multiple times
     new Thread(this).start();
-    try
-    {
+    try {
       _startCountDown.await();
-    }
-    catch (InterruptedException e)
-    {
+    } catch (InterruptedException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
     }
   }
 
   @Override
-  public void run()
-  {
-    try
-    {
+  public void run() {
+    try {
       connect();
       _startCountDown.countDown();
       _stopCountDown.await();
-    }
-    catch (Exception e)
-    {
+    } catch (Exception e) {
       LOG.error("exception running controller-manager", e);
-    }
-    finally
-    {
+    } finally {
       _startCountDown.countDown();
       disconnect();
       _waitStopFinishCountDown.countDown();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f414aad4/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 56d3aea..751c3cb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -19,7 +19,6 @@ package org.apache.helix.integration.manager;
  * under the License.
  */
 
-
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
@@ -28,63 +27,48 @@ import org.apache.helix.manager.zk.DistributedControllerManager;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.log4j.Logger;
 
-public class ClusterDistributedController extends DistributedControllerManager implements Runnable, ZkTestManager
-{
+public class ClusterDistributedController extends DistributedControllerManager implements Runnable,
+    ZkTestManager {
   private static Logger LOG = Logger.getLogger(ClusterDistributedController.class);
 
   private final CountDownLatch _startCountDown = new CountDownLatch(1);
   private final CountDownLatch _stopCountDown = new CountDownLatch(1);
   private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
 
-  public ClusterDistributedController(String zkAddr, String clusterName, String controllerName)
-  {
+  public ClusterDistributedController(String zkAddr, String clusterName, String controllerName) {
     super(zkAddr, clusterName, controllerName);
   }
 
-  public void syncStop()
-  {
+  public void syncStop() {
     _stopCountDown.countDown();
-    try
-    {
+    try {
       _waitStopFinishCountDown.await();
-    }
-    catch (InterruptedException e)
-    {
+    } catch (InterruptedException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
     }
   }
 
-  public void syncStart()
-  {
+  public void syncStart() {
     // TODO: prevent start multiple times
     new Thread(this).start();
-    try
-    {
+    try {
       _startCountDown.await();
-    }
-    catch (InterruptedException e)
-    {
+    } catch (InterruptedException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
     }
   }
 
   @Override
-  public void run()
-  {
-    try
-    {
+  public void run() {
+    try {
       connect();
       _startCountDown.countDown();
       _stopCountDown.await();
-    }
-    catch (Exception e)
-    {
+    } catch (Exception e) {
       LOG.error("exception running controller-manager", e);
-    }
-    finally
-    {
+    } finally {
       _startCountDown.countDown();
       disconnect();
       _waitStopFinishCountDown.countDown();


Mime
View raw message