helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-12] helix controller sends multiple state transition messages
Date Wed, 28 Nov 2012 02:52:43 GMT
Updated Branches:
  refs/heads/master 02afc50b2 -> 310431629


[HELIX-12] helix controller sends multiple state transition messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/31043162
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/31043162
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/31043162

Branch: refs/heads/master
Commit: 310431629d7212eac8f0680264599c98e62f655c
Parents: 02afc50
Author: zzhang <zzhang@apache.org>
Authored: Tue Nov 27 18:52:33 2012 -0800
Committer: zzhang <zzhang@apache.org>
Committed: Tue Nov 27 18:52:33 2012 -0800

----------------------------------------------------------------------
 .../participant/DistClusterControllerElection.java |  277 +++++++--------
 .../integration/TestAutoRebalanceDuringStart.java  |   68 ----
 .../TestStartMultipleControllersWithSameName.java  |   60 +++
 3 files changed, 181 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/31043162/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
index 39a7e3f..4a7c8b4 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java
@@ -27,8 +27,8 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyType;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyType;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
@@ -36,168 +36,133 @@ import org.apache.helix.model.LeaderHistory;
 import org.apache.helix.model.LiveInstance;
 import org.apache.log4j.Logger;
 
-
 // TODO: merge with GenericHelixController
-public class DistClusterControllerElection implements ControllerChangeListener
-{
-  private static Logger                LOG         =
-                                                       Logger.getLogger(DistClusterControllerElection.class);
-  private final String                 _zkAddr;
-  private final GenericHelixController _controller = new GenericHelixController();
-  private HelixManager                 _leader;
-
-  public DistClusterControllerElection(String zkAddr)
-  {
-    _zkAddr = zkAddr;
-  }
-
-  /**
-   * may be accessed by multiple threads: zk-client thread and
-   * ZkHelixManager.disconnect()->reset() TODO: Refactor accessing HelixMaangerMain class
-   * statically
-   */
-  @Override
-  public synchronized void onControllerChange(NotificationContext changeContext)
-  {
-    HelixManager manager = changeContext.getManager();
-    if (manager == null)
-    {
-      LOG.error("missing attributes in changeContext. requires HelixManager");
-      return;
-    }
-
-    InstanceType type = manager.getInstanceType();
-    if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT)
-    {
-      LOG.error("fail to become controller because incorrect instanceType (was "
-          + type.toString() + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
-      return;
+public class DistClusterControllerElection implements ControllerChangeListener {
+    private static Logger LOG = Logger.getLogger(DistClusterControllerElection.class);
+    private final String _zkAddr;
+    private final GenericHelixController _controller = new GenericHelixController();
+    private HelixManager _leader;
+
+    public DistClusterControllerElection(String zkAddr) {
+	_zkAddr = zkAddr;
     }
 
-    try
-    {
-      if (changeContext.getType().equals(NotificationContext.Type.INIT)
-          || changeContext.getType().equals(NotificationContext.Type.CALLBACK))
-      {
-        // DataAccessor dataAccessor = manager.getDataAccessor();
-        HelixDataAccessor accessor = manager.getHelixDataAccessor();
-        Builder keyBuilder = accessor.keyBuilder();
-
-        while (accessor.getProperty(keyBuilder.controllerLeader()) == null)
-        {
-          boolean success = tryUpdateController(manager);
-          if (success)
-          {
-            updateHistory(manager);
-            if (type == InstanceType.CONTROLLER)
-            {
-              HelixControllerMain.addListenersToController(manager, _controller);
-              manager.startTimerTasks();
-            }
-            else if (type == InstanceType.CONTROLLER_PARTICIPANT)
-            {
-              String clusterName = manager.getClusterName();
-              String controllerName = manager.getInstanceName();
-              _leader =
-                  HelixManagerFactory.getZKHelixManager(clusterName,
-                                                        controllerName,
-                                                        InstanceType.CONTROLLER,
-                                                        _zkAddr);
-
-              _leader.connect();
-              _leader.startTimerTasks();
-              HelixControllerMain.addListenersToController(_leader, _controller);
-            }
-
-          }
-        }
-      }
-      else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE))
-      {
-
-        if (_leader != null)
-        {
-          _leader.disconnect();
-        }
-      }
-
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception when trying to become leader", e);
-    }
-  }
-
-  private boolean tryUpdateController(HelixManager manager)
-  {
-    // DataAccessor dataAccessor = manager.getDataAccessor();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    LiveInstance leader = new LiveInstance(manager.getInstanceName());
-    try
-    {
-      leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
-      // TODO: this session id is not the leader's session id in distributed mode
-      leader.setSessionId(manager.getSessionId());
-      leader.setHelixVersion(manager.getVersion());
-      if(ZKPropertyTransferServer.getInstance() != null)
-      {
-        String zkPropertyTransferServiceUrl = ZKPropertyTransferServer.getInstance().getWebserviceUrl();
-        if(zkPropertyTransferServiceUrl != null)
-        {
-          leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
-        }
-      }
-      else
-      {
-        LOG.warn("ZKPropertyTransferServer instnace is null");
-      }
-      boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
-      if (success)
-      {
-        return true;
-      }
-      else
-      {
-        LOG.info("Unable to become leader probably because some other controller becames
the leader");
-      }
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception when trying to updating leader record in cluster:"
-                    + manager.getClusterName()
-                    + ". Need to check again whether leader node has been created or not",
-                e);
+    /**
+     * may be accessed by multiple threads: zk-client thread and
+     * ZkHelixManager.disconnect()->reset() TODO: Refactor accessing
+     * HelixMaangerMain class statically
+     */
+    @Override
+    public synchronized void onControllerChange(NotificationContext changeContext) {
+	HelixManager manager = changeContext.getManager();
+	if (manager == null) {
+	    LOG.error("missing attributes in changeContext. requires HelixManager");
+	    return;
+	}
+
+	InstanceType type = manager.getInstanceType();
+	if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT)
{
+	    LOG.error("fail to become controller because incorrect instanceType (was "
+		    + type.toString() + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
+	    return;
+	}
+
+	try {
+	    if (changeContext.getType().equals(NotificationContext.Type.INIT)
+		    || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
+		// DataAccessor dataAccessor = manager.getDataAccessor();
+		HelixDataAccessor accessor = manager.getHelixDataAccessor();
+		Builder keyBuilder = accessor.keyBuilder();
+
+		while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
+		    boolean success = tryUpdateController(manager);
+		    if (success) {
+			updateHistory(manager);
+			if (type == InstanceType.CONTROLLER) {
+			    HelixControllerMain.addListenersToController(manager, _controller);
+			    manager.startTimerTasks();
+			} else if (type == InstanceType.CONTROLLER_PARTICIPANT) {
+			    String clusterName = manager.getClusterName();
+			    String controllerName = manager.getInstanceName();
+			    _leader = HelixManagerFactory.getZKHelixManager(clusterName,
+				    controllerName, InstanceType.CONTROLLER, _zkAddr);
+
+			    _leader.connect();
+			    _leader.startTimerTasks();
+			    HelixControllerMain.addListenersToController(_leader, _controller);
+			}
+
+		    }
+		}
+	    } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
+
+		if (_leader != null) {
+		    _leader.disconnect();
+		}
+	    }
+
+	} catch (Exception e) {
+	    LOG.error("Exception when trying to become leader", e);
+	}
     }
 
-    leader = accessor.getProperty(keyBuilder.controllerLeader());
-    if (leader != null)
-    {
-      String leaderName = leader.getInstanceName(); // leader.getLeader();
-      LOG.info("Leader exists for cluster:" + manager.getClusterName()
-          + ", currentLeader:" + leaderName);
-
-      if (leaderName != null && leaderName.equals(manager.getInstanceName()))
-      {
-        return true;
-      }
+    private boolean tryUpdateController(HelixManager manager) {
+	// DataAccessor dataAccessor = manager.getDataAccessor();
+	HelixDataAccessor accessor = manager.getHelixDataAccessor();
+	Builder keyBuilder = accessor.keyBuilder();
+
+	LiveInstance leader = new LiveInstance(manager.getInstanceName());
+	try {
+	    leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+	    // TODO: this session id is not the leader's session id in
+	    // distributed mode
+	    leader.setSessionId(manager.getSessionId());
+	    leader.setHelixVersion(manager.getVersion());
+	    if (ZKPropertyTransferServer.getInstance() != null) {
+		String zkPropertyTransferServiceUrl = ZKPropertyTransferServer.getInstance()
+		        .getWebserviceUrl();
+		if (zkPropertyTransferServiceUrl != null) {
+		    leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
+		}
+	    } else {
+		LOG.warn("ZKPropertyTransferServer instnace is null");
+	    }
+	    boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
+	    if (success) {
+		return true;
+	    } else {
+		LOG.info("Unable to become leader probably because some other controller becames the leader");
+	    }
+	} catch (Exception e) {
+	    LOG.error(
+		    "Exception when trying to updating leader record in cluster:"
+		            + manager.getClusterName()
+		            + ". Need to check again whether leader node has been created or not",
+		    e);
+	}
+
+	leader = accessor.getProperty(keyBuilder.controllerLeader());
+	if (leader != null) {
+	    String leaderSessionId = leader.getSessionId();
+	    LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader:
"
+		    + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
+
+	    if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId()))
{
+		return true;
+	    }
+	}
+	return false;
     }
 
-    return false;
-  }
-
-  private void updateHistory(HelixManager manager)
-  {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
+    private void updateHistory(HelixManager manager) {
+	HelixDataAccessor accessor = manager.getHelixDataAccessor();
+	Builder keyBuilder = accessor.keyBuilder();
 
-    LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
-    if (history == null)
-    {
-      history = new LeaderHistory(PropertyType.HISTORY.toString());
+	LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
+	if (history == null) {
+	    history = new LeaderHistory(PropertyType.HISTORY.toString());
+	}
+	history.updateHistory(manager.getClusterName(), manager.getInstanceName());
+	accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
     }
-    history.updateHistory(manager.getClusterName(), manager.getInstanceName());
-    accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/31043162/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceDuringStart.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceDuringStart.java
b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceDuringStart.java
deleted file mode 100644
index 2baf550..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceDuringStart.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.helix.integration;
-
-import java.util.Date;
-
-import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
-import org.apache.helix.model.IdealState.IdealStateModeProperty;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-// test auto-rebalance ideal state mode
-// change ideal state when state transitions are in progress
-public class TestAutoRebalanceDuringStart extends ZkIntegrationTestBase {
-    @Test
-    public void test() throws Exception {
-	// Logger.getRootLogger().setLevel(Level.INFO);
-	String className = TestHelper.getTestClassName();
-	String methodName = TestHelper.getTestMethodName();
-	String clusterName = className + "_" + methodName;
-	final int n = 3;
-
-	System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-	MockParticipant[] participants = new MockParticipant[n];
-
-	TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-	        "localhost", // participant name prefix
-	        "TestDB", // resource name prefix
-	        1, // resources
-	        4, // partitions per resource
-	        n, // number of nodes
-	        3, // replicas
-	        "MasterSlave", 
-	        IdealStateModeProperty.AUTO_REBALANCE, 
-	        true); // do rebalance
-
-	// start controller
-	ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
-	controller.syncStart();
-
-	// start participants
-	for (int i = 0; i < n; i++) {
-	    String instanceName = "localhost_" + (12918 + i);
-
-	    participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR,
-		    new MockParticipant.SleepTransition(1000));
-	    participants[i].syncStart();
-	    Thread.sleep(300);
-	}
-	boolean result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(
-	        ZK_ADDR, clusterName));
-	Assert.assertTrue(result);
-
-	// clean up
-	controller.syncStop();
-	Thread.sleep(1000);  // wait for all zk callbacks done
-	for (int i = 0; i < n; i++) {
-	    participants[i].syncStop();
-	}
-
-	System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/31043162/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
new file mode 100644
index 0000000..3c7a9d1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
@@ -0,0 +1,60 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestBase {
+    @Test
+    public void test() throws Exception {
+	Logger.getRootLogger().setLevel(Level.WARN);
+	String className = TestHelper.getTestClassName();
+	String methodName = TestHelper.getTestMethodName();
+	String clusterName = className + "_" + methodName;
+	final int n = 3;
+
+	System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+	TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+	        "localhost", // participant name prefix
+	        "TestDB", // resource name prefix
+	        1, // resources
+	        10, // partitions per resource
+	        n, // number of nodes
+	        1, // replicas
+	        "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE, true); // do
+									       // rebalance
+
+	// start controller
+	ClusterController[] controllers = new ClusterController[4];
+	for (int i = 0; i < 4; i++) {
+	    controllers[i] = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+	    controllers[i].start();
+	}
+
+	Thread.sleep(500); // wait leader election finishes
+	String liPath = PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES, clusterName);
+	int listenerNb = TestHelper.numberOfListeners(ZK_ADDR, liPath);
+	// System.out.println("listenerNb: " + listenerNb);
+	Assert.assertEquals(listenerNb, 1, "Only one controller should succeed in becoming leader");
+	
+
+	// clean up
+	for (int i = 0; i < 4; i++) {
+	    controllers[i].syncStop();
+	    Thread.sleep(1000); // wait for all zk callbacks done
+	}
+
+	System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    }
+
+}


Mime
View raw message