helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-423] Remove election code duplication, fix metrics reset
Date Sun, 06 Apr 2014 05:43:31 GMT
Repository: helix
Updated Branches:
  refs/heads/master fdc8e5063 -> 7937b8a27


[HELIX-423] Remove election code duplication, fix metrics reset


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7937b8a2
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7937b8a2
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7937b8a2

Branch: refs/heads/master
Commit: 7937b8a27b8a209d974209f57d5578b3b9e007f2
Parents: fdc8e50
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Wed Apr 2 13:35:26 2014 -0700
Committer: Kanak Biscuitwala <kanak.b@hotmail.com>
Committed: Sat Apr 5 22:42:19 2014 -0700

----------------------------------------------------------------------
 .../manager/zk/DistributedLeaderElection.java   |   1 +
 .../helix/manager/zk/ZkHelixLeaderElection.java |   1 +
 .../DistClusterControllerElection.java          | 103 ------------------
 .../mbeans/TestResetClusterMetrics.java         | 107 +++++++++++++++++++
 .../helix/participant/MockZKHelixManager.java   |   4 +-
 .../participant/TestDistControllerElection.java |  26 ++++-
 6 files changed, 132 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7937b8a2/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index 86b8d41..fe1ca4b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -95,6 +95,7 @@ public class DistributedLeaderElection implements ControllerChangeListener
{
             + _manager.getClusterName());
         controllerHelper.stopControllerTimerTasks();
         controllerHelper.removeListenersFromController(_controller);
+        _controller.shutdownClusterStatusMonitor(_manager.getClusterName());
 
         /**
          * clear write-through cache

http://git-wip-us.apache.org/repos/asf/helix/blob/7937b8a2/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
index cc99b8e..88ca1b0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -97,6 +97,7 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
         LOG.info(_controllerId + " reqlinquishes leadership of cluster: " + _clusterId);
         _controller.stopTimerTasks();
         _controller.removeListenersFromController(_pipeline);
+        _pipeline.shutdownClusterStatusMonitor(_manager.getClusterName());
 
         /**
          * clear write-through cache

http://git-wip-us.apache.org/repos/asf/helix/blob/7937b8a2/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
deleted file mode 100644
index ee7efcd..0000000
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package org.apache.helix.participant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZkHelixLeaderElection;
-import org.apache.log4j.Logger;
-
-// TODO: merge with GenericHelixController
-public class DistClusterControllerElection implements ControllerChangeListener {
-  private static Logger LOG = Logger.getLogger(DistClusterControllerElection.class);
-  private final String _zkAddr;
-  private final GenericHelixController _controller = new GenericHelixController();
-  private HelixManager _leader;
-
-  public DistClusterControllerElection(String zkAddr) {
-    _zkAddr = zkAddr;
-  }
-
-  /**
-   * may be accessed by multiple threads: zk-client thread and
-   * ZkHelixManager.disconnect()->reset() TODO: Refactor accessing
-   * HelixMaangerMain class statically
-   */
-  @Override
-  public synchronized void onControllerChange(NotificationContext changeContext) {
-    HelixManager manager = changeContext.getManager();
-    if (manager == null) {
-      LOG.error("missing attributes in changeContext. requires HelixManager");
-      return;
-    }
-
-    InstanceType type = manager.getInstanceType();
-    if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT)
{
-      LOG.error("fail to become controller because incorrect instanceType (was " + type.toString()
-          + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
-      return;
-    }
-
-    try {
-      if (changeContext.getType().equals(NotificationContext.Type.INIT)
-          || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
-        // DataAccessor dataAccessor = manager.getDataAccessor();
-        HelixDataAccessor accessor = manager.getHelixDataAccessor();
-        Builder keyBuilder = accessor.keyBuilder();
-
-        while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
-          boolean success = ZkHelixLeaderElection.tryUpdateController(manager);
-          if (success) {
-            ZkHelixLeaderElection.updateHistory(manager);
-            if (type == InstanceType.CONTROLLER) {
-              HelixControllerMain.addListenersToController(manager, _controller);
-              manager.startTimerTasks();
-            } else if (type == InstanceType.CONTROLLER_PARTICIPANT) {
-              String clusterName = manager.getClusterName();
-              String controllerName = manager.getInstanceName();
-              _leader =
-                  HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
-                      InstanceType.CONTROLLER, _zkAddr);
-
-              _leader.connect();
-              _leader.startTimerTasks();
-              HelixControllerMain.addListenersToController(_leader, _controller);
-            }
-
-          }
-        }
-      } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
-        if (_leader != null) {
-          _leader.disconnect();
-        }
-        _controller.shutdownClusterStatusMonitor(manager.getClusterName());
-      }
-    } catch (Exception e) {
-      LOG.error("Exception when trying to become leader", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/7937b8a2/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
new file mode 100644
index 0000000..74f84e8
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
@@ -0,0 +1,107 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestResetClusterMetrics extends ZkUnitTestBase {
+  /**
+   * Ensure cluster status lifecycle is tied to controller leader status
+   */
+  @Test
+  public void testControllerDisconnect() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    // Set up a cluster with one of everything
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "Resource", 1, 1, 1,
1,
+        "OnlineOffline", RebalanceMode.FULL_AUTO, true);
+
+    // Add a participant
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
+    participant.syncStart();
+
+    // Add a controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // Make sure everything gets assigned
+    Thread.sleep(1000);
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Check the metrics
+    Assert.assertTrue(metricsExist(clusterName, participant.getInstanceName()));
+
+    // Stop the controller
+    controller.syncStop();
+
+    // Check the metrics
+    Thread.sleep(1000);
+    Assert.assertFalse(metricsExist(clusterName, participant.getInstanceName()));
+  }
+
+  private boolean metricsExist(String clusterName, String instanceName) throws Exception
{
+    MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+    String instanceBeanName =
+        ClusterStatusMonitor.CLUSTER_DN_KEY + "=" + clusterName + ","
+            + ClusterStatusMonitor.INSTANCE_DN_KEY + "=" + instanceName;
+    boolean instanceBeanFound;
+    try {
+      MBeanInfo info = server.getMBeanInfo(objectName(instanceBeanName));
+      instanceBeanFound = info != null;
+    } catch (InstanceNotFoundException e) {
+      instanceBeanFound = false;
+    }
+    String clusterBeanName = ClusterStatusMonitor.CLUSTER_DN_KEY + "=" + clusterName;
+    boolean clusterBeanFound;
+    try {
+      MBeanInfo info = server.getMBeanInfo(objectName(clusterBeanName));
+      clusterBeanFound = info != null;
+    } catch (InstanceNotFoundException e) {
+      clusterBeanFound = false;
+    }
+    return instanceBeanFound && clusterBeanFound;
+  }
+
+  private ObjectName objectName(String beanName) throws Exception {
+    return new ObjectName(ClusterStatusMonitor.CLUSTER_STATUS_KEY + ": " + beanName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7937b8a2/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index d97b22a..7d252c5 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -46,6 +46,7 @@ import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 
@@ -165,8 +166,7 @@ public class MockZKHelixManager implements HelixManager {
 
   @Override
   public ClusterMessagingService getMessagingService() {
-    // TODO Auto-generated method stub
-    return null;
+    return new DefaultMessagingService(this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7937b8a2/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerElection.java
b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerElection.java
index 28068ec..2e175d5 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerElection.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/TestDistControllerElection.java
@@ -19,9 +19,12 @@ package org.apache.helix.participant;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
@@ -30,6 +33,8 @@ import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.manager.zk.DistributedLeaderElection;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.LiveInstance;
@@ -61,8 +66,11 @@ public class TestDistControllerElection extends ZkUnitTestBase {
     final String controllerName = "controller_0";
     HelixManager manager =
         new MockZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, _gZkClient);
+    GenericHelixController controller0 = new GenericHelixController();
 
-    DistClusterControllerElection election = new DistClusterControllerElection(ZK_ADDR);
+    List<HelixTimerTask> timerTasks = Collections.emptyList();
+    DistributedLeaderElection election =
+        new DistributedLeaderElection(manager, controller0, timerTasks);
     NotificationContext context = new NotificationContext(manager);
     context.setType(NotificationContext.Type.INIT);
     election.onControllerChange(context);
@@ -76,7 +84,8 @@ public class TestDistControllerElection extends ZkUnitTestBase {
 
     manager =
         new MockZKHelixManager(clusterName, "controller_1", InstanceType.CONTROLLER, _gZkClient);
-    election = new DistClusterControllerElection(ZK_ADDR);
+    GenericHelixController controller1 = new GenericHelixController();
+    election = new DistributedLeaderElection(manager, controller1, timerTasks);
     context = new NotificationContext(manager);
     context.setType(NotificationContext.Type.INIT);
     election.onControllerChange(context);
@@ -111,8 +120,11 @@ public class TestDistControllerElection extends ZkUnitTestBase {
     HelixManager manager =
         new MockZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT,
             _gZkClient);
+    GenericHelixController controller0 = new GenericHelixController();
+    List<HelixTimerTask> timerTasks = Collections.emptyList();
 
-    DistClusterControllerElection election = new DistClusterControllerElection(ZK_ADDR);
+    DistributedLeaderElection election =
+        new DistributedLeaderElection(manager, controller0, timerTasks);
     NotificationContext context = new NotificationContext(manager);
     context.setType(NotificationContext.Type.CALLBACK);
     election.onControllerChange(context);
@@ -129,7 +141,8 @@ public class TestDistControllerElection extends ZkUnitTestBase {
     manager =
         new MockZKHelixManager(clusterName, "controller_1", InstanceType.CONTROLLER_PARTICIPANT,
             _gZkClient);
-    election = new DistClusterControllerElection(ZK_ADDR);
+    GenericHelixController controller1 = new GenericHelixController();
+    election = new DistributedLeaderElection(manager, controller1, timerTasks);
     context = new NotificationContext(manager);
     context.setType(NotificationContext.Type.CALLBACK);
     election.onControllerChange(context);
@@ -160,8 +173,11 @@ public class TestDistControllerElection extends ZkUnitTestBase {
     final String controllerName = "participant_0";
     HelixManager manager =
         new MockZKHelixManager(clusterName, controllerName, InstanceType.PARTICIPANT, _gZkClient);
+    GenericHelixController participant0 = new GenericHelixController();
+    List<HelixTimerTask> timerTasks = Collections.emptyList();
 
-    DistClusterControllerElection election = new DistClusterControllerElection(ZK_ADDR);
+    DistributedLeaderElection election =
+        new DistributedLeaderElection(manager, participant0, timerTasks);
     NotificationContext context = new NotificationContext(manager);
     context.setType(NotificationContext.Type.INIT);
     election.onControllerChange(context);


Mime
View raw message