helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject helix git commit: [HELIX-575] Should not send FINALIZED callback when a bucketized resource is removed, rb=32032
Date Fri, 13 Mar 2015 07:48:19 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 3a4ff21b4 -> 86b2b25ac


[HELIX-575] Should not send FINALIZED callback when a bucketized resource is removed, rb=32032


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/86b2b25a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/86b2b25a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/86b2b25a

Branch: refs/heads/helix-0.6.x
Commit: 86b2b25acbe26286a52881c11539d343130bc4fa
Parents: 3a4ff21
Author: zzhang <zzhang5@uci.edu>
Authored: Fri Mar 13 00:48:11 2015 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Fri Mar 13 00:48:11 2015 -0700

----------------------------------------------------------------------
 .../helix/manager/zk/CallbackHandler.java       |  13 +-
 .../integration/TestBucketizedResource.java     | 193 +++++++++++++------
 .../integration/ZkIntegrationTestBase.java      |   3 +
 .../manager/ClusterControllerManager.java       |   4 +
 4 files changed, 149 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/86b2b25a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 65fe2f9..fd59ecc 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -378,17 +378,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
       if (parentPath != null && parentPath.startsWith(_path)) {
         NotificationContext changeContext = new NotificationContext(_manager);
 
-        if (currentChilds == null) {
-          // parentPath has been removed
-          if (parentPath.equals(_path)) {
-            // _path has been removed, remove this listener
-            _manager.removeListener(_propertyKey, _listener);
-          }
-          changeContext.setType(NotificationContext.Type.FINALIZE);
+        if (currentChilds == null && parentPath.equals(_path)) {
+          // _path has been removed, remove this listener
+          // removeListener will call handler.reset(), which in turn call invoke() on FINALIZE
type
+          _manager.removeListener(_propertyKey, _listener);
         } else {
           changeContext.setType(NotificationContext.Type.CALLBACK);
+          invoke(changeContext);
         }
-        invoke(changeContext);
       }
     } catch (Exception e) {
       String msg =

http://git-wip-us.apache.org/repos/asf/helix/blob/86b2b25a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
index 9b72e41..a29ca0d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
@@ -23,16 +23,17 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
+import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -43,48 +44,58 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestBucketizedResource extends ZkIntegrationTestBase {
+
+  private void setupCluster(String clusterName, List<String> instanceNames, String
dbName,
+      int replica, int partitions, int bucketSize) {
+    _gSetupTool.addCluster(clusterName, true);
+    _gSetupTool.addInstancesToCluster(clusterName,
+        instanceNames.toArray(new String[instanceNames.size()]));
+
+    // add a bucketized resource
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    ZNRecord idealStateRec =
+        DefaultIdealStateCalculator.calculateIdealState(instanceNames, partitions, replica
- 1,
+            dbName,
+            "MASTER", "SLAVE");
+    IdealState idealState = new IdealState(idealStateRec);
+    idealState.setBucketSize(bucketSize);
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    idealState.setReplicas(Integer.toString(replica));
+    accessor.setProperty(keyBuilder.idealStates(dbName), idealState);
+
+  }
+
   @Test()
-  public void testBucketizedResource() throws Exception {
+  public void testBucketizedResource() {
     // Logger.getRootLogger().setLevel(Level.INFO);
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
 
+    List<String> instanceNames =
+        Arrays.asList("localhost_12918", "localhost_12919", "localhost_12920", "localhost_12921",
"localhost_12922");
+    int n = instanceNames.size();
+    String dbName = "TestDB0";
+
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     MockParticipantManager[] participants = new MockParticipantManager[5];
-    // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
-
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-        "localhost", // participant name prefix
-        "TestDB", // resource name prefix
-        1, // resources
-        10, // partitions per resource
-        5, // number of nodes
-        3, // replicas
-        "MasterSlave", true); // do rebalance
-
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
-    // String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName,
-    // "TestDB0");
-    Builder keyBuilder = accessor.keyBuilder();
-    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
-    idealState.setBucketSize(1);
-    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
-
-    ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+
+    setupCluster(clusterName, instanceNames, dbName, 3, 10, 1);
+
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+
+    ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName);
     controller.syncStart();
 
     // start participants
-    for (int i = 0; i < 5; i++) {
-      String instanceName = "localhost_" + (12918 + i);
-
-      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+    for (int i = 0; i < n; i++) {
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(i));
       participants[i].syncStart();
     }
-    PropertyKey evKey = accessor.keyBuilder().externalView("TestDB0");
+    PropertyKey evKey = accessor.keyBuilder().externalView(dbName);
 
     boolean result =
         ClusterStateVerifier
@@ -114,7 +125,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
 
     // clean up
     controller.syncStop();
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }
 
@@ -122,44 +133,28 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
   }
 
   @Test
-  public void testBounceDisableAndDrop() throws Exception {
+  public void testBounceDisableAndDrop() {
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
     String dbName = "TestDB0";
-    int n = 5;
-    int r = 3;
     List<String> instanceNames =
         Arrays.asList("localhost_0", "localhost_1", "localhost_2", "localhost_3", "localhost_4");
+    int n = instanceNames.size();
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    // create cluster and add nodes to cluster
-    MockParticipantManager[] participants = new MockParticipantManager[n];
-    _gSetupTool.addCluster(clusterName, true);
-    _gSetupTool.addInstancesToCluster(clusterName,
-        instanceNames.toArray(new String[instanceNames.size()]));
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
-    // add a bucketized resource
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
-    Builder keyBuilder = accessor.keyBuilder();
-    ZNRecord idealStateRec =
-        DefaultIdealStateCalculator.calculateIdealState(instanceNames, 10, r - 1, dbName,
"MASTER",
-            "SLAVE");
-    IdealState idealState = new IdealState(idealStateRec);
-    idealState.setBucketSize(2);
-    idealState.setStateModelDefRef("MasterSlave");
-    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
-    idealState.setReplicas(Integer.toString(r));
-    accessor.setProperty(keyBuilder.idealStates(dbName), idealState);
+    setupCluster(clusterName, instanceNames, dbName, 3, 10, 2);
 
     // start controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName);
     controller.syncStart();
 
     // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(i));
       participants[i].syncStart();
@@ -184,7 +179,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
     String path =
         keyBuilder.currentState(instanceNames.get(0), participants[0].getSessionId(), dbName)
             .getPath();
-    ZNRecord record = baseAccessor.get(path, null, 0);
+    ZNRecord record = _baseAccessor.get(path, null, 0);
     Assert.assertTrue(record.getMapFields().size() == 0);
 
     // disable the bucketize resource
@@ -204,7 +199,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
 
     // make sure external-view is cleaned up
     path = keyBuilder.externalView(dbName).getPath();
-    result = baseAccessor.exists(path, 0);
+    result = _baseAccessor.exists(path, 0);
     Assert.assertFalse(result);
 
     // clean up
@@ -213,6 +208,92 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
       participant.syncStop();
     }
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  class TestExternalViewListener implements ExternalViewChangeListener {
+    int cbCnt = 0;
+
+    @Override
+    public void onExternalViewChange(List<ExternalView> externalViewList,
+        NotificationContext changeContext) {
+      if (changeContext.getType() == Type.CALLBACK) {
+        cbCnt++;
+      }
+    }
+
+  }
+
+  @Test
+  public void testListenerOnBucketizedResource() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    String dbName = "TestDB0";
+    List<String> instanceNames =
+        Arrays.asList("localhost_0", "localhost_1", "localhost_2", "localhost_3", "localhost_4");
+    int n = instanceNames.size();
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    setupCluster(clusterName, instanceNames, dbName, 3, 10, 2);
+
+    // start controller
+    ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName);
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(i));
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // add an external view listener
+    TestExternalViewListener listener = new TestExternalViewListener();
+    controller.addExternalViewChangeListener(listener);
+
+    // remove "TestDB0"
+    _gSetupTool.dropResourceFromCluster(clusterName, dbName);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+    // wait callback to finish
+    Thread.sleep(100);
+    listener.cbCnt = 0;
+
+    // add a new db
+    String newDbName = "TestDB1";
+    int r = 3;
+    ZNRecord idealStateRec =
+        DefaultIdealStateCalculator.calculateIdealState(instanceNames, 10, r - 1, newDbName,
+            "MASTER", "SLAVE");
+    IdealState idealState = new IdealState(idealStateRec);
+    idealState.setBucketSize(2);
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    idealState.setReplicas(Integer.toString(r));
+    accessor.setProperty(keyBuilder.idealStates(newDbName), idealState);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+    Assert.assertTrue(listener.cbCnt > 0);
+
+    // clean up
+    controller.syncStop();
+    for (MockParticipantManager participant : participants) {
+      participant.syncStop();
+    }
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/86b2b25a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index 8cb697b..22696c3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -25,6 +25,7 @@ import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
@@ -46,6 +47,7 @@ public class ZkIntegrationTestBase {
   protected static ZkServer _zkServer;
   protected static ZkClient _gZkClient;
   protected static ClusterSetup _gSetupTool;
+  protected static BaseDataAccessor<ZNRecord> _baseAccessor;
 
   public static final String ZK_ADDR = "localhost:2183";
   protected static final String CLUSTER_PREFIX = "CLUSTER";
@@ -67,6 +69,7 @@ public class ZkIntegrationTestBase {
     _gZkClient = new ZkClient(ZK_ADDR);
     _gZkClient.setZkSerializer(new ZNRecordSerializer());
     _gSetupTool = new ClusterSetup(ZK_ADDR);
+    _baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
   }
 
   @AfterSuite

http://git-wip-us.apache.org/repos/asf/helix/blob/86b2b25a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index b8f0f2b..9e10771 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -36,6 +36,10 @@ public class ClusterControllerManager extends ZKHelixManager implements
Runnable
   private final CountDownLatch _stopCountDown = new CountDownLatch(1);
   private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
 
+  public ClusterControllerManager(String zkAddr, String clusterName) {
+    this(zkAddr, clusterName, "controller");
+  }
+
   public ClusterControllerManager(String zkAddr, String clusterName, String controllerName)
{
     super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
   }


Mime
View raw message