Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B8BC910F6B for ; Fri, 25 Oct 2013 01:22:05 +0000 (UTC) Received: (qmail 75800 invoked by uid 500); 25 Oct 2013 01:22:04 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 75755 invoked by uid 500); 25 Oct 2013 01:22:04 -0000 Mailing-List: contact commits-help@helix.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.incubator.apache.org Delivered-To: mailing list commits@helix.incubator.apache.org Received: (qmail 75711 invoked by uid 99); 25 Oct 2013 01:22:04 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Oct 2013 01:22:03 +0000 X-ASF-Spam-Status: No, hits=-2000.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 25 Oct 2013 01:21:38 +0000 Received: (qmail 72079 invoked by uid 99); 25 Oct 2013 01:21:15 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Oct 2013 01:21:15 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DADBF81CC0B; Fri, 25 Oct 2013 01:21:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zzhang@apache.org To: commits@helix.incubator.apache.org Date: Fri, 25 Oct 2013 01:21:16 -0000 Message-Id: In-Reply-To: <5a96fe6f6be842149dae9f531a238e1e@git.apache.org> References: <5a96fe6f6be842149dae9f531a238e1e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/10] [HELIX-279] Apply gc handling fixes to main ZKHelixManager class X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java index 2354ebd..457b5fb 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java @@ -87,8 +87,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ String hostDest = "localhost_" + (START_PORT + 1); TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( - factory.getMessageType(), factory); + _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(), + factory); String msgId = new UUID(123, 456).toString(); Message msg = new Message(factory.getMessageType(), msgId); @@ -104,7 +104,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ cr.setRecipientInstanceType(InstanceType.PARTICIPANT); cr.setSessionSpecific(false); - int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg); + // int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg); + int nMsgs = _participants[0].getMessagingService().send(cr, msg); AssertJUnit.assertTrue(nMsgs == 1); Thread.sleep(2500); // Thread.currentThread().join(); @@ -116,7 +117,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ cr.setSessionSpecific(false); cr.setDataSource(DataSource.IDEALSTATES); - nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg); + // nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg); + nMsgs = _participants[0].getMessagingService().send(cr, msg); AssertJUnit.assertTrue(nMsgs == 1); Thread.sleep(2500); // Thread.currentThread().join(); @@ -179,11 +181,11 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ String hostDest = "localhost_" + (START_PORT + 1); TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( - factory.getMessageType(), factory); + _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(), + factory); - _startCMResultMap.get(hostSrc)._manager.getMessagingService().registerMessageHandlerFactory( - factory.getMessageType(), factory); + _participants[0].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(), + factory); String msgId = new UUID(123, 456).toString(); Message msg = new Message(factory.getMessageType(), msgId); @@ -202,7 +204,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ TestAsyncCallback callback = new TestAsyncCallback(60000); - _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000); + _participants[0].getMessagingService().send(cr, msg, callback, 60000); Thread.sleep(2000); // Thread.currentThread().join(); @@ -210,7 +212,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1); TestAsyncCallback callback2 = new TestAsyncCallback(500); - _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500); + _participants[0].getMessagingService().send(cr, msg, callback2, 500); Thread.sleep(3000); // Thread.currentThread().join(); @@ -224,7 +226,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ callback = new TestAsyncCallback(60000); - _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000); + _participants[0].getMessagingService().send(cr, msg, callback, 60000); Thread.sleep(2000); // Thread.currentThread().join(); @@ -232,7 +234,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1); callback2 = new TestAsyncCallback(500); - _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500); + _participants[0].getMessagingService().send(cr, msg, callback2, 500); Thread.sleep(3000); // Thread.currentThread().join(); @@ -246,8 +248,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ String hostDest = "localhost_" + (START_PORT + 1); TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( - factory.getMessageType(), factory); + _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(), + factory); String msgId = new UUID(123, 456).toString(); Message msg = new Message(factory.getMessageType(), msgId); @@ -266,8 +268,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AsyncCallback asyncCallback = new MockAsyncCallback(); int messagesSent = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - asyncCallback, 60000); + _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000); AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord() .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage") @@ -275,9 +276,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1); AsyncCallback asyncCallback2 = new MockAsyncCallback(); - messagesSent = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - asyncCallback2, 500); + messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500); AssertJUnit.assertTrue(asyncCallback2.isTimedOut()); } @@ -289,8 +288,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ for (int i = 0; i < NODE_NR; i++) { TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[0].getMessagingService().registerMessageHandlerFactory( factory.getMessageType(), factory); + } String msgId = new UUID(123, 456).toString(); Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId); @@ -308,8 +308,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ cr.setSessionSpecific(false); AsyncCallback callback1 = new MockAsyncCallback(); int messageSent1 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback1, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000); AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord() .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage") @@ -317,37 +316,32 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1); AsyncCallback callback2 = new MockAsyncCallback(); - int messageSent2 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback2, 500); + int messageSent2 = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500); + AssertJUnit.assertTrue(callback2.isTimedOut()); cr.setPartition("TestDB_17"); AsyncCallback callback3 = new MockAsyncCallback(); int messageSent3 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback3, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000); AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1); cr.setPartition("TestDB_15"); AsyncCallback callback4 = new MockAsyncCallback(); int messageSent4 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback4, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000); AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica); cr.setPartitionState("SLAVE"); AsyncCallback callback5 = new MockAsyncCallback(); int messageSent5 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback5, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000); AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1); cr.setDataSource(DataSource.IDEALSTATES); AsyncCallback callback6 = new MockAsyncCallback(); int messageSent6 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback6, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000); AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1); } @@ -358,8 +352,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ for (int i = 0; i < NODE_NR; i++) { TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( factory.getMessageType(), factory); + } String msgId = new UUID(123, 456).toString(); Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId); @@ -378,8 +373,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ cr.setSelfExcluded(false); AsyncCallback callback1 = new MockAsyncCallback(); int messageSent1 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback1, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000); AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR); AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord() @@ -394,8 +388,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ for (int i = 0; i < NODE_NR; i++) { TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( factory.getMessageType(), factory); + } String msgId = new UUID(123, 456).toString(); Message msg = new Message(MessageType.CONTROLLER_MSG, msgId); @@ -414,8 +409,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AsyncCallback callback1 = new MockAsyncCallback(); int messagesSent = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback1, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000); AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord() .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult") @@ -426,9 +420,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ msg.setMsgId(msgId); cr.setPartition("TestDB_17"); AsyncCallback callback2 = new MockAsyncCallback(); - messagesSent = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback2, 10000); + messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000); + AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord() .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult") .indexOf(hostSrc) != -1); @@ -439,9 +432,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ msg.setMsgId(msgId); cr.setPartitionState("SLAVE"); AsyncCallback callback3 = new MockAsyncCallback(); - messagesSent = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback3, 10000); + messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000); AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord() .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult") .indexOf(hostSrc) != -1); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java index 5a6e5e6..734e2b4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java @@ -22,9 +22,9 @@ package org.apache.helix.integration; import java.util.Date; import org.apache.helix.TestHelper; -import org.apache.helix.controller.HelixControllerMain; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZkClient; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.mock.participant.MockBootstrapModelFactory; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.tools.ClusterSetup; @@ -51,18 +51,19 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase { 1, // replicas "Bootstrap", true); // do rebalance - TestHelper - .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); // start participants - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); // add a state model with non-OFFLINE initial state - StateMachineEngine stateMach = participants[i].getManager().getStateMachineEngine(); + StateMachineEngine stateMach = participants[i].getStateMachineEngine(); MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory(); stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory); @@ -74,16 +75,21 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase { clusterName)); Assert.assertTrue(result); + // clean up + controller.syncStop(); + for (int i = 0; i < 5; i++) { + participants[i].syncStop(); + } + System.out.println("END testNonOfflineInitState at " + new Date(System.currentTimeMillis())); } private static void setupCluster(String clusterName, String ZkAddr, int startPort, String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb, int nodesNb, int replica, String stateModelDef, boolean doRebalance) throws Exception { - ZkClient zkClient = new ZkClient(ZkAddr); - if (zkClient.exists("/" + clusterName)) { + if (_gZkClient.exists("/" + clusterName)) { LOG.warn("Cluster already exists:" + clusterName + ". Deleting it"); - zkClient.deleteRecursive("/" + clusterName); + _gZkClient.deleteRecursive("/" + clusterName); } ClusterSetup setupTool = new ClusterSetup(ZkAddr); @@ -103,7 +109,6 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase { setupTool.rebalanceStorageCluster(clusterName, dbName, replica); } } - zkClient.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java index 496d1a6..0f5cc72 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java @@ -25,8 +25,8 @@ import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; -import org.apache.helix.mock.controller.ClusterController; -import org.apache.helix.mock.participant.MockParticipant; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.IdealState; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; @@ -44,7 +44,7 @@ public class TestNullReplica extends ZkIntegrationTestBase { System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port "localhost", // participant name prefix @@ -61,14 +61,15 @@ public class TestNullReplica extends ZkIntegrationTestBase { idealState.getSimpleFields().remove(IdealState.IdealStateProperty.REPLICAS.toString()); _gZkClient.writeData(idealStatePath, idealState); - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); // start participants for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); } @@ -78,13 +79,11 @@ public class TestNullReplica extends ZkIntegrationTestBase { Assert.assertTrue(result); // clean up + controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); } - Thread.sleep(2000); - controller.syncStop(); - System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java index 8336939..cf59ed0 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java @@ -31,10 +31,13 @@ import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageType; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.apache.log4j.Logger; import org.testng.Assert; import org.testng.annotations.Test; public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase { + private static Logger LOG = Logger.getLogger(TestParticipantErrorMessage.class); + @Test() public void TestParticipantErrorMessageSend() { String participant1 = "localhost_" + START_PORT; @@ -49,7 +52,7 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase { Criteria recipientCriteria = new Criteria(); recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER); recipientCriteria.setSessionSpecific(false); - _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria, + _participants[0].getMessagingService().send(recipientCriteria, errorMessage1); Message errorMessage2 = @@ -63,23 +66,22 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase { Criteria recipientCriteria2 = new Criteria(); recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER); recipientCriteria2.setSessionSpecific(false); - _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2, + _participants[1].getMessagingService().send(recipientCriteria2, errorMessage2); try { Thread.sleep(1500); } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + LOG.error("Interrupted sleep", e); } boolean result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); Assert.assertTrue(result); - Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder(); + Builder kb = _participants[1].getHelixDataAccessor().keyBuilder(); ExternalView externalView = - _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty( + _participants[1].getHelixDataAccessor().getProperty( kb.externalView("TestDB")); for (String partitionName : externalView.getRecord().getMapFields().keySet()) { http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java index 4227688..aa67ac9 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java @@ -22,7 +22,7 @@ package org.apache.helix.integration; import java.util.Date; import org.apache.helix.TestHelper; -import org.apache.helix.TestHelper.StartCMResult; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.log4j.Logger; import org.testng.annotations.Test; @@ -33,20 +33,21 @@ public class TestParticipantNameCollision extends ZkStandAloneCMTestBase { public void testParticiptantNameCollision() throws Exception { logger.info("RUN TestParticipantNameCollision() at " + new Date(System.currentTimeMillis())); - StartCMResult result = null; + MockParticipantManager newParticipant = null; for (int i = 0; i < 1; i++) { String instanceName = "localhost_" + (START_PORT + i); try { // the call fails on getClusterManagerForParticipant() // no threads start - result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName); + newParticipant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + newParticipant.syncStart(); } catch (Exception e) { e.printStackTrace(); } } Thread.sleep(30000); - TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, result._manager); + TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, newParticipant); logger.info("STOP TestParticipantNameCollision() at " + new Date(System.currentTimeMillis())); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java index d900d98..a1c413b 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java @@ -24,12 +24,12 @@ import java.util.Date; import org.apache.helix.HelixDataAccessor; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; -import org.apache.helix.mock.controller.ClusterController; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.model.PauseSignal; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; @@ -47,7 +47,7 @@ public class TestPauseSignal extends ZkIntegrationTestBase { System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port "localhost", // participant name prefix @@ -59,14 +59,15 @@ public class TestPauseSignal extends ZkIntegrationTestBase { "MasterSlave", true); // do rebalance // start controller - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); // start participants for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); } @@ -112,13 +113,11 @@ public class TestPauseSignal extends ZkIntegrationTestBase { Assert.assertTrue(result); // clean up + controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); } - Thread.sleep(2000); - controller.syncStop(); - System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java index ed056ab..5f4377d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java @@ -23,14 +23,15 @@ import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.controller.HelixControllerMain; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.tools.ClusterStateVerifier; @@ -39,6 +40,14 @@ import org.testng.Assert; import org.testng.annotations.Test; public class TestRenamePartition extends ZkIntegrationTestBase { + // map from clusterName to participants + final Map _participantMap = + new ConcurrentHashMap(); + + // map from clusterName to controllers + final Map _controllerMap = + new ConcurrentHashMap(); + @Test() public void testRenamePartitionAutoIS() throws Exception { String clusterName = "CLUSTER_" + getShortClassName() + "_auto"; @@ -57,7 +66,7 @@ public class TestRenamePartition extends ZkIntegrationTestBase { // rename partition name TestDB0_0 tp TestDB0_100 ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient)); + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient)); Builder keyBuilder = accessor.keyBuilder(); IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0")); @@ -71,8 +80,8 @@ public class TestRenamePartition extends ZkIntegrationTestBase { ZK_ADDR, clusterName)); Assert.assertTrue(result); + stop(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); - } @Test() @@ -103,7 +112,7 @@ public class TestRenamePartition extends ZkIntegrationTestBase { idealState.setStateModelDefRef("MasterSlave"); ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient)); + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient)); Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState); @@ -118,23 +127,25 @@ public class TestRenamePartition extends ZkIntegrationTestBase { ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier( ZK_ADDR, clusterName)); Assert.assertTrue(result); + + stop(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } private void startAndVerify(String clusterName) throws Exception { - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; - TestHelper - .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); // start participants for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); - // new Thread(participants[i]).start(); } boolean result = @@ -142,5 +153,21 @@ public class TestRenamePartition extends ZkIntegrationTestBase { ZK_ADDR, clusterName)); Assert.assertTrue(result); + _participantMap.put(clusterName, participants); + _controllerMap.put(clusterName, controller); + } + + private void stop(String clusterName) { + ClusterControllerManager controller = _controllerMap.get(clusterName); + if (controller != null) { + controller.syncStop(); + } + + MockParticipantManager[] participants = _participantMap.get(clusterName); + if (participants != null) { + for (MockParticipantManager participant : participants) { + participant.syncStop(); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java index 2715932..7159a17 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java @@ -25,8 +25,8 @@ import java.util.Map; import java.util.Set; import org.apache.helix.TestHelper; -import org.apache.helix.mock.controller.ClusterController; -import org.apache.helix.mock.participant.MockParticipant; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.mock.participant.ErrTransition; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; @@ -54,7 +54,8 @@ public class TestResetInstance extends ZkIntegrationTestBase { "MasterSlave", true); // do rebalance // start controller - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); Map> errPartitions = new HashMap>() { @@ -65,16 +66,15 @@ public class TestResetInstance extends ZkIntegrationTestBase { }; // start mock participants - MockParticipant[] participants = new MockParticipant[n]; + MockParticipantManager[] participants = new MockParticipantManager[n]; for (int i = 0; i < n; i++) { String instanceName = "localhost_" + (12918 + i); if (i == 0) { - participants[i] = - new MockParticipant(clusterName, instanceName, ZK_ADDR, - new ErrTransition(errPartitions)); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].setTransition(new ErrTransition(errPartitions)); } else { - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); } participants[i].syncStart(); } @@ -102,8 +102,6 @@ public class TestResetInstance extends ZkIntegrationTestBase { Assert.assertTrue(result, "Cluster verification fails"); // clean up - // wait for all zk callbacks done - Thread.sleep(1000); controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java index 09e57c6..c37a5ea 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java @@ -28,10 +28,10 @@ import org.apache.helix.NotificationContext; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.mock.controller.ClusterController; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.mock.participant.ErrTransition; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; @@ -80,7 +80,8 @@ public class TestResetPartitionState extends ZkIntegrationTestBase { "MasterSlave", true); // do rebalance // start controller - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); Map> errPartitions = new HashMap>() { @@ -91,16 +92,16 @@ public class TestResetPartitionState extends ZkIntegrationTestBase { }; // start mock participants - MockParticipant[] participants = new MockParticipant[n]; + MockParticipantManager[] participants = new MockParticipantManager[n]; for (int i = 0; i < n; i++) { String instanceName = "localhost_" + (12918 + i); if (i == 0) { participants[i] = - new MockParticipant(clusterName, instanceName, ZK_ADDR, - new ErrTransition(errPartitions)); + new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].setTransition(new ErrTransition(errPartitions)); } else { - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); } participants[i].syncStart(); } @@ -169,8 +170,6 @@ public class TestResetPartitionState extends ZkIntegrationTestBase { Assert.assertEquals(_errToOfflineInvoked, 2, "Should reset 2 partitions"); // clean up - // wait for all zk callbacks done - Thread.sleep(1000); controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java index de4ad1a..46a05d8 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java @@ -25,8 +25,8 @@ import java.util.Map; import java.util.Set; import org.apache.helix.TestHelper; -import org.apache.helix.mock.controller.ClusterController; -import org.apache.helix.mock.participant.MockParticipant; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.mock.participant.ErrTransition; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; @@ -53,7 +53,8 @@ public class TestResetResource extends ZkIntegrationTestBase { "MasterSlave", true); // do rebalance // start controller - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); Map> errPartitions = new HashMap>() { @@ -64,16 +65,15 @@ public class TestResetResource extends ZkIntegrationTestBase { }; // start mock participants - MockParticipant[] participants = new MockParticipant[n]; + MockParticipantManager[] participants = new MockParticipantManager[n]; for (int i = 0; i < n; i++) { String instanceName = "localhost_" + (12918 + i); if (i == 0) { - participants[i] = - new MockParticipant(clusterName, instanceName, ZK_ADDR, - new ErrTransition(errPartitions)); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].setTransition(new ErrTransition(errPartitions)); } else { - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); } participants[i].syncStart(); } @@ -101,8 +101,6 @@ public class TestResetResource extends ZkIntegrationTestBase { Assert.assertTrue(result, "Cluster verification fails"); // clean up - // wait for all zk callbacks done - Thread.sleep(1000); controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java index 008782c..64043ed 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java @@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.helix.NotificationContext; import org.apache.helix.TestHelper; -import org.apache.helix.controller.HelixControllerMain; -import org.apache.helix.mock.participant.MockParticipant; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.mock.participant.MockTransition; import org.apache.helix.model.Message; import org.apache.helix.tools.ClusterStateVerifier; @@ -35,15 +35,15 @@ import org.testng.annotations.Test; public class TestRestartParticipant extends ZkIntegrationTestBase { public class KillOtherTransition extends MockTransition { - final AtomicReference _other; + final AtomicReference _other; - public KillOtherTransition(MockParticipant other) { - _other = new AtomicReference(other); + public KillOtherTransition(MockParticipantManager other) { + _other = new AtomicReference(other); } @Override public void doTransition(Message message, NotificationContext context) { - MockParticipant other = _other.getAndSet(null); + MockParticipantManager other = _other.getAndSet(null); if (other != null) { System.err.println("Kill " + other.getInstanceName() + ". Interrupted exceptions are IGNORABLE"); @@ -58,7 +58,7 @@ public class TestRestartParticipant extends ZkIntegrationTestBase { System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis())); String clusterName = getShortClassName(); - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port "localhost", // participant name prefix @@ -69,19 +69,19 @@ public class TestRestartParticipant extends ZkIntegrationTestBase { 3, // replicas "MasterSlave", true); // do rebalance - TestHelper - .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); + // start participants for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); if (i == 4) { - participants[i] = - new MockParticipant(clusterName, instanceName, ZK_ADDR, new KillOtherTransition( - participants[0])); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].setTransition(new KillOtherTransition(participants[0])); } else { - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); - // Thread.sleep(100); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); } participants[i].syncStart(); @@ -94,9 +94,9 @@ public class TestRestartParticipant extends ZkIntegrationTestBase { // restart Thread.sleep(500); - MockParticipant participant = - new MockParticipant(participants[0].getClusterName(), participants[0].getInstanceName(), - ZK_ADDR, null); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, participants[0].getClusterName(), + participants[0].getInstanceName()); System.err.println("Restart " + participant.getInstanceName()); participant.syncStart(); result = @@ -104,6 +104,13 @@ public class TestRestartParticipant extends ZkIntegrationTestBase { clusterName)); Assert.assertTrue(result); + // clean up + controller.syncStop(); + for (int i = 0; i < 5; i++) { + participants[i].syncStop(); + } + participant.syncStop(); + System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis())); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java index 2c174c4..bf851cc 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java @@ -194,10 +194,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ _factory._results.clear(); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { - String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( _factory.getMessageType(), _factory); - manager = _startCMResultMap.get(hostDest)._manager; + + manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; } Message schedulerMessage = @@ -282,10 +282,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ _factory._results.clear(); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { - String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( _factory.getMessageType(), _factory); - manager = _startCMResultMap.get(hostDest)._manager; + + manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; } Message schedulerMessage = @@ -367,11 +367,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ String controllerStatusPath = HelixUtil.getControllerPropertyPath(manager.getClusterName(), PropertyType.STATUSUPDATES_CONTROLLER); - List subPaths = _zkClient.getChildren(controllerStatusPath); + List subPaths = _gZkClient.getChildren(controllerStatusPath); Assert.assertTrue(subPaths.size() > 0); for (String subPath : subPaths) { String nextPath = controllerStatusPath + "/" + subPath; - List subsubPaths = _zkClient.getChildren(nextPath); + List subsubPaths = _gZkClient.getChildren(nextPath); Assert.assertTrue(subsubPaths.size() > 0); } @@ -379,38 +379,38 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT), PropertyType.STATUSUPDATES); - subPaths = _zkClient.getChildren(instanceStatusPath); + subPaths = _gZkClient.getChildren(instanceStatusPath); Assert.assertTrue(subPaths.size() > 0); for (String subPath : subPaths) { String nextPath = instanceStatusPath + "/" + subPath; - List subsubPaths = _zkClient.getChildren(nextPath); + List subsubPaths = _gZkClient.getChildren(nextPath); Assert.assertTrue(subsubPaths.size() > 0); for (String subsubPath : subsubPaths) { String nextnextPath = nextPath + "/" + subsubPath; - Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0); + Assert.assertTrue(_gZkClient.getChildren(nextnextPath).size() > 0); } } Thread.sleep(3000); - ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0); + ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _gZkClient, 0); dumpTask.run(); - subPaths = _zkClient.getChildren(controllerStatusPath); + subPaths = _gZkClient.getChildren(controllerStatusPath); Assert.assertTrue(subPaths.size() > 0); for (String subPath : subPaths) { String nextPath = controllerStatusPath + "/" + subPath; - List subsubPaths = _zkClient.getChildren(nextPath); + List subsubPaths = _gZkClient.getChildren(nextPath); Assert.assertTrue(subsubPaths.size() == 0); } - subPaths = _zkClient.getChildren(instanceStatusPath); + subPaths = _gZkClient.getChildren(instanceStatusPath); Assert.assertTrue(subPaths.size() > 0); for (String subPath : subPaths) { String nextPath = instanceStatusPath + "/" + subPath; - List subsubPaths = _zkClient.getChildren(nextPath); + List subsubPaths = _gZkClient.getChildren(nextPath); Assert.assertTrue(subsubPaths.size() > 0); for (String subsubPath : subsubPaths) { String nextnextPath = nextPath + "/" + subsubPath; - Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0); + Assert.assertTrue(_gZkClient.getChildren(nextnextPath).size() == 0); } } } @@ -420,10 +420,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ _factory._results.clear(); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { - String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( _factory.getMessageType(), _factory); - manager = _startCMResultMap.get(hostDest)._manager; + + manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; } Message schedulerMessage = @@ -511,10 +511,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { - String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( factory.getMessageType(), factory); - manager = _startCMResultMap.get(hostDest)._manager; + + manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; } Message schedulerMessage = @@ -581,13 +581,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ _factory._results.clear(); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { - String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( _factory.getMessageType(), _factory); - // - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + + _participants[i].getMessagingService().registerMessageHandlerFactory( _factory.getMessageType(), _factory); - manager = _startCMResultMap.get(hostDest)._manager; + + manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; } Message schedulerMessage = @@ -702,13 +702,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ _factory._results.clear(); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { - String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( _factory.getMessageType(), _factory); - // - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + + _participants[i].getMessagingService().registerMessageHandlerFactory( _factory.getMessageType(), _factory); - manager = _startCMResultMap.get(hostDest)._manager; + + manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; } Message schedulerMessage = @@ -852,13 +852,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch(); HelixManager manager = null; for (int i = 0; i < NODE_NR; i++) { - String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( factory.getMessageType(), factory); - // - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + + _participants[i].getMessagingService().registerMessageHandlerFactory( factory.getMessageType(), factory); - manager = _startCMResultMap.get(hostDest)._manager; + + manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; } Message schedulerMessage = http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java index 3024f45..a927520 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java @@ -27,10 +27,10 @@ import org.apache.helix.HelixConstants; import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.mock.controller.ClusterController; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.tools.ClusterStateVerifier; @@ -46,7 +46,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase { String clusterName = className + "_" + methodName; int n = 5; - MockParticipant[] participants = new MockParticipant[n]; + MockParticipantManager[] participants = new MockParticipantManager[n]; System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); @@ -70,14 +70,15 @@ public class TestSchemataSM extends ZkIntegrationTestBase { Arrays.asList(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString())); accessor.setProperty(key, idealState); - ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); controller.syncStart(); // start n-1 participants for (int i = 1; i < n; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); } @@ -87,7 +88,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase { Assert.assertTrue(result); // start the remaining 1 participant - participants[0] = new MockParticipant(clusterName, "localhost_12918", ZK_ADDR, null); + participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918"); participants[0].syncStart(); // make sure we have all participants in MASTER state @@ -107,6 +108,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase { } // clean up + controller.syncStop(); for (int i = 0; i < n; i++) { participants[i].syncStop(); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java index f38c6de..965b8ef 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java @@ -25,10 +25,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.TestHelper; -import org.apache.helix.ZkHelixTestManager; import org.apache.helix.ZkTestHelper; -import org.apache.helix.mock.controller.ClusterController; -import org.apache.helix.mock.participant.MockParticipant; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.mock.participant.MockTransition; import org.apache.helix.model.Message; import org.apache.helix.tools.ClusterStateVerifier; @@ -39,13 +38,14 @@ import org.testng.Assert; import org.testng.annotations.Test; public class TestSessionExpiryInTransition extends ZkIntegrationTestBase { + private static Logger LOG = Logger.getLogger(TestSessionExpiryInTransition.class); public class SessionExpiryTransition extends MockTransition { private final AtomicBoolean _done = new AtomicBoolean(); @Override public void doTransition(Message message, NotificationContext context) { - ZkHelixTestManager manager = (ZkHelixTestManager) context.getManager(); + MockParticipantManager manager = (MockParticipantManager) context.getManager(); String instance = message.getTgtName(); String partition = message.getPartitionName(); @@ -55,8 +55,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase { try { ZkTestHelper.expireSession(manager.getZkClient()); } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); + LOG.error("Exception expire zk-session", e); } } } @@ -64,7 +63,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase { @Test public void testSessionExpiryInTransition() throws Exception { - Logger.getRootLogger().setLevel(Level.WARN); + // Logger.getRootLogger().setLevel(Level.WARN); String className = TestHelper.getTestClassName(); String methodName = TestHelper.getTestMethodName(); @@ -72,7 +71,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase { System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port "localhost", // participant name prefix @@ -84,15 +83,15 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase { "MasterSlave", true); // do rebalance // start controller - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); // start participants for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - ZkHelixTestManager manager = - new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR); - participants[i] = new MockParticipant(manager, new SessionExpiryTransition()); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].setTransition(new SessionExpiryTransition()); participants[i].syncStart(); } @@ -102,13 +101,11 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase { Assert.assertTrue(result); // clean up + controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); } - Thread.sleep(2000); - controller.syncStop(); - System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java index 02a34d8..6eb7a8c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java @@ -21,9 +21,15 @@ package org.apache.helix.integration; import java.util.Date; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; -import org.apache.helix.TestHelper.StartCMResult; -import org.apache.helix.controller.HelixControllerMain; +import org.apache.helix.TestHelper.Verifier; +import org.apache.helix.ZNRecord; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.LiveInstance; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.log4j.Logger; import org.testng.Assert; @@ -35,16 +41,34 @@ public class TestStandAloneCMMain extends ZkStandAloneCMTestBase { @Test() public void testStandAloneCMMain() throws Exception { logger.info("RUN testStandAloneCMMain() at " + new Date(System.currentTimeMillis())); - + ClusterControllerManager newController = null; for (int i = 1; i <= 2; i++) { String controllerName = "controller_" + i; - StartCMResult startResult = - TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR, - HelixControllerMain.STANDALONE); - _startCMResultMap.put(controllerName, startResult); + newController = + new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + newController.syncStart(); } - stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap); + // stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap); + _controller.syncStop(); + + final HelixDataAccessor accessor = + new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(_gZkClient)); + final PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + final String newControllerName = newController.getInstanceName(); + TestHelper.verify(new Verifier() { + + @Override + public boolean verify() throws Exception { + LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader()); + if (leader == null) { + return false; + } + return leader.getInstanceName().equals(newControllerName); + + } + }, 30 * 1000); + boolean result = ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier( ZK_ADDR, CLUSTER_NAME)); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java index 347ff7e..e8adf03 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java @@ -23,9 +23,9 @@ import java.util.Date; import org.apache.helix.InstanceType; import org.apache.helix.TestHelper; -import org.apache.helix.ZkHelixTestManager; import org.apache.helix.ZkTestHelper; -import org.apache.helix.mock.participant.MockParticipant; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.log4j.Logger; @@ -47,18 +47,16 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase { TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, PARTICIPANT_PREFIX, "TestDB", 1, 20, 5, 3, "MasterSlave", true); - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - ZkHelixTestManager manager = - new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR); - participants[i] = new MockParticipant(manager, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); } - ZkHelixTestManager controller = - new ZkHelixTestManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR); - controller.connect(); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); boolean result; result = @@ -67,7 +65,7 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase { Assert.assertTrue(result); // participant session expiry - ZkHelixTestManager participantToExpire = (ZkHelixTestManager) participants[1].getManager(); + MockParticipantManager participantToExpire = participants[1]; System.out.println("Expire participant session"); String oldSessionId = participantToExpire.getSessionId(); @@ -107,8 +105,7 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase { // clean up System.out.println("Clean up ..."); // Logger.getRootLogger().setLevel(Level.DEBUG); - controller.disconnect(); - Thread.sleep(100); + controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java index dce3fd4..d191c18 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java @@ -25,7 +25,7 @@ import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; import org.apache.helix.ZkTestHelper; -import org.apache.helix.mock.controller.ClusterController; +import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -54,10 +54,10 @@ public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestB // rebalance // start controller - ClusterController[] controllers = new ClusterController[4]; + ClusterControllerManager[] controllers = new ClusterControllerManager[4]; for (int i = 0; i < 4; i++) { - controllers[i] = new ClusterController(clusterName, "controller_0", ZK_ADDR); - controllers[i].start(); + controllers[i] = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controllers[i].syncStart(); } Thread.sleep(500); // wait leader election finishes @@ -69,7 +69,6 @@ public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestB // clean up for (int i = 0; i < 4; i++) { controllers[i].syncStop(); - Thread.sleep(1000); // wait for all zk callbacks done } System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java index edc10c6..a297752 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java @@ -28,22 +28,16 @@ import java.util.Set; import org.apache.helix.HelixDataAccessor; import org.apache.helix.NotificationContext; -import org.apache.helix.TestHelper; import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.TestHelper.StartCMResult; -import org.apache.helix.controller.HelixControllerMain; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.messaging.handling.MessageHandler.ErrorCode; -import org.apache.helix.mock.participant.MockJobIntf; import org.apache.helix.mock.participant.MockMSStateModel; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.mock.participant.MockTransition; import org.apache.helix.mock.participant.SleepTransition; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.Message; -import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.participant.statemachine.StateModelInfo; import org.apache.helix.participant.statemachine.StateTransitionError; @@ -59,15 +53,14 @@ import org.testng.annotations.Test; public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase { private static Logger LOG = Logger.getLogger(TestStateTransitionTimeout.class); + @Override @BeforeClass public void beforeClass() throws Exception { System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - _zkClient = new ZkClient(ZK_ADDR); - _zkClient.setZkSerializer(new ZNRecordSerializer()); String namespace = "/" + CLUSTER_NAME; - if (_zkClient.exists(namespace)) { - _zkClient.deleteRecursive(namespace); + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); } _setupTool = new ClusterSetup(ZK_ADDR); @@ -106,12 +99,14 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase { _sleep = sleep; } + @Override @Transition(to = "SLAVE", from = "OFFLINE") public void onBecomeSlaveFromOffline(Message message, NotificationContext context) { LOG.info("Become SLAVE from OFFLINE"); } + @Override @Transition(to = "MASTER", from = "SLAVE") public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException { @@ -121,23 +116,27 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase { } } + @Override @Transition(to = "SLAVE", from = "MASTER") public void onBecomeSlaveFromMaster(Message message, NotificationContext context) { LOG.info("Become SLAVE from MASTER"); } + @Override @Transition(to = "OFFLINE", from = "SLAVE") public void onBecomeOfflineFromSlave(Message message, NotificationContext context) { LOG.info("Become OFFLINE from SLAVE"); } + @Override @Transition(to = "DROPPED", from = "OFFLINE") public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { LOG.info("Become DROPPED from OFFLINE"); } + @Override public void rollbackOnError(Message message, NotificationContext context, StateTransitionError error) { _error = error; @@ -171,7 +170,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase { @Test public void testStateTransitionTimeOut() throws Exception { Map factories = new HashMap(); - MockParticipant[] participants = new MockParticipant[NODE_NR]; + // MockParticipantManager[] participants = new MockParticipantManager[NODE_NR]; IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB); for (int i = 0; i < NODE_NR; i++) { @@ -184,19 +183,20 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase { } } - participants[i] = new MockParticipant(factory, CLUSTER_NAME, instanceName, ZK_ADDR, null); - participants[i].syncStart(); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory); + _participants[i].syncStart(); } String controllerName = CONTROLLER_PREFIX + "_0"; - StartCMResult startResult = - TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR, - HelixControllerMain.STANDALONE); - _startCMResultMap.put(controllerName, startResult); + _controller = + new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + boolean result = ClusterStateVerifier .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); Assert.assertTrue(result); - HelixDataAccessor accessor = participants[0].getManager().getHelixDataAccessor(); + HelixDataAccessor accessor = _participants[0].getHelixDataAccessor(); Builder kb = accessor.keyBuilder(); ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB)); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java index a1f63aa..6cce716 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java @@ -21,9 +21,8 @@ package org.apache.helix.integration; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; -import org.apache.helix.TestHelper.StartCMResult; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.model.IdealState; import org.apache.helix.tools.ClusterStateVerifier; @@ -33,8 +32,7 @@ import org.testng.annotations.Test; public class TestSwapInstance extends ZkStandAloneCMTestBase { @Test public void TestSwap() throws Exception { - String controllerName = CONTROLLER_PREFIX + "_0"; - HelixManager manager = _startCMResultMap.get(controllerName)._manager; + HelixManager manager = _controller; HelixDataAccessor helixAccessor = manager.getHelixDataAccessor(); _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL); _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica); @@ -49,7 +47,7 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase { idealStateOld2.merge(is2.getRecord()); String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0); - ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient); + ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient); _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false); boolean result = @@ -68,8 +66,7 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase { } Assert.assertTrue(exception); - _startCMResultMap.get(instanceName)._manager.disconnect(); - _startCMResultMap.get(instanceName)._thread.interrupt(); + _participants[0].syncStop(); Thread.sleep(1000); exception = false; @@ -80,8 +77,9 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase { exception = true; } Assert.assertFalse(exception); - StartCMResult result2 = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2); - _startCMResultMap.put(instanceName2, result2); + MockParticipantManager newParticipant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName2); + newParticipant.syncStart(); result = ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(