Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A7F02100C7 for ; Wed, 20 Nov 2013 21:12:47 +0000 (UTC) Received: (qmail 76215 invoked by uid 500); 20 Nov 2013 21:12:47 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 76176 invoked by uid 500); 20 Nov 2013 21:12:47 -0000 Mailing-List: contact commits-help@helix.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.incubator.apache.org Delivered-To: mailing list commits@helix.incubator.apache.org Received: (qmail 76130 invoked by uid 99); 20 Nov 2013 21:12:47 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Nov 2013 21:12:47 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 20 Nov 2013 21:12:36 +0000 Received: (qmail 74367 invoked by uid 99); 20 Nov 2013 21:12:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Nov 2013 21:12:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 44148885AFB; Wed, 20 Nov 2013 21:12:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kishoreg@apache.org To: commits@helix.incubator.apache.org Date: Wed, 20 Nov 2013 21:12:19 -0000 Message-Id: In-Reply-To: <4b53611394d042768c47696b1202ee8c@git.apache.org> References: <4b53611394d042768c47696b1202ee8c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/52] [abbrv] [HELIX-279] Apply gc handling fixes to ZKHelixManager X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java index 8cbe55b..0c7c131 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java @@ -29,13 +29,13 @@ import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; -import org.apache.helix.controller.HelixControllerMain; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.model.ClusterConstraints.ConstraintType; import org.apache.helix.model.Message; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.builder.ConstraintItemBuilder; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; @@ -49,8 +49,7 @@ public class TestMessageThrottle extends ZkIntegrationTestBase { // Logger.getRootLogger().setLevel(Level.INFO); String clusterName = getShortClassName(); - MockParticipant[] participants = new MockParticipant[5]; - // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + MockParticipantManager[] participants = new MockParticipantManager[5]; System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); @@ -114,13 +113,15 @@ public class TestMessageThrottle extends ZkIntegrationTestBase { }); } - TestHelper - .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); + // start participants for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); } @@ -137,6 +138,7 @@ public class TestMessageThrottle extends ZkIntegrationTestBase { Assert.assertTrue(success.get()); // clean up + controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java index a947445..a182753 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java @@ -32,7 +32,6 @@ import org.apache.helix.ExternalViewChangeListener; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; import org.apache.helix.IdealStateChangeListener; import org.apache.helix.InstanceConfigChangeListener; import org.apache.helix.InstanceType; @@ -246,8 +245,7 @@ public class TestMessageThrottle2 extends ZkIntegrationTestBase { public void start() throws Exception { helixManager = - HelixManagerFactory.getZKHelixManager(clusterName, instanceName, - InstanceType.PARTICIPANT, ZK_ADDR); + new ZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR); { // hack to set sessionTimeout Field sessionTimeout = ZKHelixManager.class.getDeclaredField("_sessionTimeout"); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java index 9c6b4b7..86f1ce4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java @@ -89,8 +89,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ String hostDest = "localhost_" + (START_PORT + 1); TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( - factory.getMessageType(), factory); + _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(), + factory); MessageId msgId = MessageId.from(new UUID(123, 456).toString()); Message msg = new Message(factory.getMessageType(), msgId); @@ -106,7 +106,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ cr.setRecipientInstanceType(InstanceType.PARTICIPANT); cr.setSessionSpecific(false); - int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg); + // int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg); + int nMsgs = _participants[0].getMessagingService().send(cr, msg); AssertJUnit.assertTrue(nMsgs == 1); Thread.sleep(2500); // Thread.currentThread().join(); @@ -118,7 +119,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ cr.setSessionSpecific(false); cr.setDataSource(DataSource.IDEALSTATES); - nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg); + // nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg); + nMsgs = _participants[0].getMessagingService().send(cr, msg); AssertJUnit.assertTrue(nMsgs == 1); Thread.sleep(2500); // Thread.currentThread().join(); @@ -181,11 +183,11 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ String hostDest = "localhost_" + (START_PORT + 1); TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( - factory.getMessageType(), factory); + _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(), + factory); - _startCMResultMap.get(hostSrc)._manager.getMessagingService().registerMessageHandlerFactory( - factory.getMessageType(), factory); + _participants[0].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(), + factory); MessageId msgId = MessageId.from(new UUID(123, 456).toString()); Message msg = new Message(factory.getMessageType(), msgId); @@ -204,7 +206,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ TestAsyncCallback callback = new TestAsyncCallback(60000); - _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000); + _participants[0].getMessagingService().send(cr, msg, callback, 60000); Thread.sleep(2000); // Thread.currentThread().join(); @@ -212,7 +214,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1); TestAsyncCallback callback2 = new TestAsyncCallback(500); - _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500); + _participants[0].getMessagingService().send(cr, msg, callback2, 500); Thread.sleep(3000); // Thread.currentThread().join(); @@ -226,7 +228,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ callback = new TestAsyncCallback(60000); - _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000); + _participants[0].getMessagingService().send(cr, msg, callback, 60000); Thread.sleep(2000); // Thread.currentThread().join(); @@ -234,7 +236,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1); callback2 = new TestAsyncCallback(500); - _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500); + _participants[0].getMessagingService().send(cr, msg, callback2, 500); Thread.sleep(3000); // Thread.currentThread().join(); @@ -248,8 +250,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ String hostDest = "localhost_" + (START_PORT + 1); TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( - factory.getMessageType(), factory); + _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(), + factory); MessageId msgId = MessageId.from(new UUID(123, 456).toString()); Message msg = new Message(factory.getMessageType(), msgId); @@ -268,8 +270,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AsyncCallback asyncCallback = new MockAsyncCallback(); int messagesSent = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - asyncCallback, 60000); + _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000); AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord() .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage") @@ -277,9 +278,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1); AsyncCallback asyncCallback2 = new MockAsyncCallback(); - messagesSent = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - asyncCallback2, 500); + messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500); AssertJUnit.assertTrue(asyncCallback2.isTimedOut()); } @@ -291,8 +290,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ for (int i = 0; i < NODE_NR; i++) { TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[0].getMessagingService().registerMessageHandlerFactory( factory.getMessageType(), factory); + } MessageId msgId = MessageId.from(new UUID(123, 456).toString()); Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId); @@ -310,8 +310,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ cr.setSessionSpecific(false); AsyncCallback callback1 = new MockAsyncCallback(); int messageSent1 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback1, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000); AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord() .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage") @@ -319,37 +318,32 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1); AsyncCallback callback2 = new MockAsyncCallback(); - int messageSent2 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback2, 500); + int messageSent2 = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500); + AssertJUnit.assertTrue(callback2.isTimedOut()); cr.setPartition("TestDB_17"); AsyncCallback callback3 = new MockAsyncCallback(); int messageSent3 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback3, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000); AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1); cr.setPartition("TestDB_15"); AsyncCallback callback4 = new MockAsyncCallback(); int messageSent4 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback4, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000); AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica); cr.setPartitionState("SLAVE"); AsyncCallback callback5 = new MockAsyncCallback(); int messageSent5 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback5, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000); AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1); cr.setDataSource(DataSource.IDEALSTATES); AsyncCallback callback6 = new MockAsyncCallback(); int messageSent6 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback6, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000); AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1); } @@ -360,8 +354,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ for (int i = 0; i < NODE_NR; i++) { TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( factory.getMessageType(), factory); + } MessageId msgId = MessageId.from(new UUID(123, 456).toString()); Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId); @@ -380,8 +375,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ cr.setSelfExcluded(false); AsyncCallback callback1 = new MockAsyncCallback(); int messageSent1 = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback1, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000); AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR); AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord() @@ -396,8 +390,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ for (int i = 0; i < NODE_NR; i++) { TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); String hostDest = "localhost_" + (START_PORT + i); - _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory( + _participants[i].getMessagingService().registerMessageHandlerFactory( factory.getMessageType(), factory); + } MessageId msgId = MessageId.from(new UUID(123, 456).toString()); Message msg = new Message(MessageType.CONTROLLER_MSG, msgId); @@ -416,8 +411,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ AsyncCallback callback1 = new MockAsyncCallback(); int messagesSent = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback1, 10000); + _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000); AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord() .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult") @@ -428,9 +422,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ msg.setMessageId(msgId); cr.setPartition("TestDB_17"); AsyncCallback callback2 = new MockAsyncCallback(); - messagesSent = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback2, 10000); + messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000); + AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord() .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult") .indexOf(hostSrc) != -1); @@ -441,9 +434,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ msg.setMessageId(msgId); cr.setPartitionState("SLAVE"); AsyncCallback callback3 = new MockAsyncCallback(); - messagesSent = - _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg, - callback3, 10000); + messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000); AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord() .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult") .indexOf(hostSrc) != -1); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java index c5558ca..aa48c90 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java @@ -22,10 +22,9 @@ package org.apache.helix.integration; import java.util.Date; import org.apache.helix.TestHelper; -import org.apache.helix.controller.HelixControllerMain; -import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.mock.participant.MockBootstrapModelFactory; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; @@ -51,18 +50,19 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase { 1, // replicas "Bootstrap", true); // do rebalance - TestHelper - .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); // start participants - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); // add a state model with non-OFFLINE initial state - StateMachineEngine stateMach = participants[i].getManager().getStateMachineEngine(); + StateMachineEngine stateMach = participants[i].getStateMachineEngine(); MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory(); stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory); @@ -74,16 +74,21 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase { clusterName)); Assert.assertTrue(result); + // clean up + controller.syncStop(); + for (int i = 0; i < 5; i++) { + participants[i].syncStop(); + } + System.out.println("END testNonOfflineInitState at " + new Date(System.currentTimeMillis())); } private static void setupCluster(String clusterName, String ZkAddr, int startPort, String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb, int nodesNb, int replica, String stateModelDef, boolean doRebalance) throws Exception { - ZkClient zkClient = new ZkClient(ZkAddr); - if (zkClient.exists("/" + clusterName)) { + if (_gZkClient.exists("/" + clusterName)) { LOG.warn("Cluster already exists:" + clusterName + ". Deleting it"); - zkClient.deleteRecursive("/" + clusterName); + _gZkClient.deleteRecursive("/" + clusterName); } ClusterSetup setupTool = new ClusterSetup(ZkAddr); @@ -103,7 +108,6 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase { setupTool.rebalanceStorageCluster(clusterName, dbName, replica); } } - zkClient.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java index 496d1a6..0f5cc72 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java @@ -25,8 +25,8 @@ import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; -import org.apache.helix.mock.controller.ClusterController; -import org.apache.helix.mock.participant.MockParticipant; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.IdealState; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; @@ -44,7 +44,7 @@ public class TestNullReplica extends ZkIntegrationTestBase { System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port "localhost", // participant name prefix @@ -61,14 +61,15 @@ public class TestNullReplica extends ZkIntegrationTestBase { idealState.getSimpleFields().remove(IdealState.IdealStateProperty.REPLICAS.toString()); _gZkClient.writeData(idealStatePath, idealState); - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); // start participants for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); } @@ -78,13 +79,11 @@ public class TestNullReplica extends ZkIntegrationTestBase { Assert.assertTrue(result); // clean up + controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); } - Thread.sleep(2000); - controller.syncStop(); - System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java index 07a2fc0..750e2b7 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java @@ -35,10 +35,13 @@ import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageType; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.apache.log4j.Logger; import org.testng.Assert; import org.testng.annotations.Test; public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase { + private static Logger LOG = Logger.getLogger(TestParticipantErrorMessage.class); + @Test() public void TestParticipantErrorMessageSend() { String participant1 = "localhost_" + START_PORT; @@ -54,7 +57,7 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase { Criteria recipientCriteria = new Criteria(); recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER); recipientCriteria.setSessionSpecific(false); - _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria, + _participants[0].getMessagingService().send(recipientCriteria, errorMessage1); Message errorMessage2 = @@ -69,23 +72,22 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase { Criteria recipientCriteria2 = new Criteria(); recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER); recipientCriteria2.setSessionSpecific(false); - _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2, + _participants[1].getMessagingService().send(recipientCriteria2, errorMessage2); try { Thread.sleep(1500); } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + LOG.error("Interrupted sleep", e); } boolean result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); Assert.assertTrue(result); - Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder(); + Builder kb = _participants[1].getHelixDataAccessor().keyBuilder(); ExternalView externalView = - _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty( + _participants[1].getHelixDataAccessor().getProperty( kb.externalView("TestDB")); for (String partitionName : externalView.getRecord().getMapFields().keySet()) { http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java index 4227688..aa67ac9 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java @@ -22,7 +22,7 @@ package org.apache.helix.integration; import java.util.Date; import org.apache.helix.TestHelper; -import org.apache.helix.TestHelper.StartCMResult; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.log4j.Logger; import org.testng.annotations.Test; @@ -33,20 +33,21 @@ public class TestParticipantNameCollision extends ZkStandAloneCMTestBase { public void testParticiptantNameCollision() throws Exception { logger.info("RUN TestParticipantNameCollision() at " + new Date(System.currentTimeMillis())); - StartCMResult result = null; + MockParticipantManager newParticipant = null; for (int i = 0; i < 1; i++) { String instanceName = "localhost_" + (START_PORT + i); try { // the call fails on getClusterManagerForParticipant() // no threads start - result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName); + newParticipant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + newParticipant.syncStart(); } catch (Exception e) { e.printStackTrace(); } } Thread.sleep(30000); - TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, result._manager); + TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, newParticipant); logger.info("STOP TestParticipantNameCollision() at " + new Date(System.currentTimeMillis())); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java index d900d98..a1c413b 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java @@ -24,12 +24,12 @@ import java.util.Date; import org.apache.helix.HelixDataAccessor; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; -import org.apache.helix.mock.controller.ClusterController; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.model.PauseSignal; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; @@ -47,7 +47,7 @@ public class TestPauseSignal extends ZkIntegrationTestBase { System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port "localhost", // participant name prefix @@ -59,14 +59,15 @@ public class TestPauseSignal extends ZkIntegrationTestBase { "MasterSlave", true); // do rebalance // start controller - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); // start participants for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); } @@ -112,13 +113,11 @@ public class TestPauseSignal extends ZkIntegrationTestBase { Assert.assertTrue(result); // clean up + controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); } - Thread.sleep(2000); - controller.syncStop(); - System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java b/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java index be2de65..f2af156 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java @@ -9,10 +9,10 @@ import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.mock.controller.ClusterController; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.StateModelDefinition; @@ -49,15 +49,16 @@ public class TestRedefineStateModelDef extends ZkUnitTestBase { autoRebalance(clusterName); // start controller - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); // start participants - MockParticipant[] participants = new MockParticipant[n]; + MockParticipantManager[] participants = new MockParticipantManager[n]; for (int i = 0; i < n; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); } @@ -70,7 +71,7 @@ public class TestRedefineStateModelDef extends ZkUnitTestBase { // stop controller, redefine state model definition, and re-start controller controller.syncStop(); redefineStateModelDef(clusterName); - controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); result = http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java index e04cc79..d3a370d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java @@ -23,16 +23,17 @@ import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.api.id.StateModelDefId; -import org.apache.helix.controller.HelixControllerMain; import org.apache.helix.controller.strategy.DefaultTwoStateStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.tools.ClusterStateVerifier; @@ -40,6 +41,14 @@ import org.testng.Assert; import org.testng.annotations.Test; public class TestRenamePartition extends ZkIntegrationTestBase { + // map from clusterName to participants + final Map _participantMap = + new ConcurrentHashMap(); + + // map from clusterName to controllers + final Map _controllerMap = + new ConcurrentHashMap(); + @Test() public void testRenamePartitionAutoIS() throws Exception { String clusterName = "CLUSTER_" + getShortClassName() + "_auto"; @@ -72,8 +81,8 @@ public class TestRenamePartition extends ZkIntegrationTestBase { ZK_ADDR, clusterName)); Assert.assertTrue(result); + stop(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); - } @Test() @@ -104,7 +113,7 @@ public class TestRenamePartition extends ZkIntegrationTestBase { idealState.setStateModelDefId(StateModelDefId.from("MasterSlave")); ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient)); + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient)); Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState); @@ -119,23 +128,25 @@ public class TestRenamePartition extends ZkIntegrationTestBase { ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier( ZK_ADDR, clusterName)); Assert.assertTrue(result); + + stop(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } private void startAndVerify(String clusterName) throws Exception { - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; - TestHelper - .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); // start participants for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); participants[i].syncStart(); - // new Thread(participants[i]).start(); } boolean result = @@ -143,5 +154,21 @@ public class TestRenamePartition extends ZkIntegrationTestBase { ZK_ADDR, clusterName)); Assert.assertTrue(result); + _participantMap.put(clusterName, participants); + _controllerMap.put(clusterName, controller); + } + + private void stop(String clusterName) { + ClusterControllerManager controller = _controllerMap.get(clusterName); + if (controller != null) { + controller.syncStop(); + } + + MockParticipantManager[] participants = _participantMap.get(clusterName); + if (participants != null) { + for (MockParticipantManager participant : participants) { + participant.syncStop(); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java index 72920a8..7159a17 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java @@ -25,9 +25,9 @@ import java.util.Map; import java.util.Set; import org.apache.helix.TestHelper; -import org.apache.helix.mock.controller.ClusterController; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.mock.participant.ErrTransition; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; import org.testng.Assert; @@ -54,7 +54,8 @@ public class TestResetInstance extends ZkIntegrationTestBase { "MasterSlave", true); // do rebalance // start controller - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); Map> errPartitions = new HashMap>() { @@ -65,16 +66,15 @@ public class TestResetInstance extends ZkIntegrationTestBase { }; // start mock participants - MockParticipant[] participants = new MockParticipant[n]; + MockParticipantManager[] participants = new MockParticipantManager[n]; for (int i = 0; i < n; i++) { String instanceName = "localhost_" + (12918 + i); if (i == 0) { - participants[i] = - new MockParticipant(clusterName, instanceName, ZK_ADDR, - new ErrTransition(errPartitions)); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].setTransition(new ErrTransition(errPartitions)); } else { - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); } participants[i].syncStart(); } @@ -102,8 +102,6 @@ public class TestResetInstance extends ZkIntegrationTestBase { Assert.assertTrue(result, "Cluster verification fails"); // clean up - // wait for all zk callbacks done - Thread.sleep(1000); controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java index 9479cff..af1ef13 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java @@ -31,9 +31,9 @@ import org.apache.helix.ZNRecord; import org.apache.helix.api.State; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.mock.controller.ClusterController; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.mock.participant.ErrTransition; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.tools.ClusterSetup; @@ -81,7 +81,8 @@ public class TestResetPartitionState extends ZkIntegrationTestBase { "MasterSlave", true); // do rebalance // start controller - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); Map> errPartitions = new HashMap>() { @@ -92,16 +93,16 @@ public class TestResetPartitionState extends ZkIntegrationTestBase { }; // start mock participants - MockParticipant[] participants = new MockParticipant[n]; + MockParticipantManager[] participants = new MockParticipantManager[n]; for (int i = 0; i < n; i++) { String instanceName = "localhost_" + (12918 + i); if (i == 0) { participants[i] = - new MockParticipant(clusterName, instanceName, ZK_ADDR, - new ErrTransition(errPartitions)); + new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].setTransition(new ErrTransition(errPartitions)); } else { - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); } participants[i].syncStart(); } @@ -170,8 +171,6 @@ public class TestResetPartitionState extends ZkIntegrationTestBase { Assert.assertEquals(_errToOfflineInvoked, 2, "Should reset 2 partitions"); // clean up - // wait for all zk callbacks done - Thread.sleep(1000); controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java index f8b4dc9..46a05d8 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java @@ -25,9 +25,9 @@ import java.util.Map; import java.util.Set; import org.apache.helix.TestHelper; -import org.apache.helix.mock.controller.ClusterController; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.mock.participant.ErrTransition; -import org.apache.helix.mock.participant.MockParticipant; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; import org.testng.Assert; @@ -53,7 +53,8 @@ public class TestResetResource extends ZkIntegrationTestBase { "MasterSlave", true); // do rebalance // start controller - ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); controller.syncStart(); Map> errPartitions = new HashMap>() { @@ -64,16 +65,15 @@ public class TestResetResource extends ZkIntegrationTestBase { }; // start mock participants - MockParticipant[] participants = new MockParticipant[n]; + MockParticipantManager[] participants = new MockParticipantManager[n]; for (int i = 0; i < n; i++) { String instanceName = "localhost_" + (12918 + i); if (i == 0) { - participants[i] = - new MockParticipant(clusterName, instanceName, ZK_ADDR, - new ErrTransition(errPartitions)); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].setTransition(new ErrTransition(errPartitions)); } else { - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); } participants[i].syncStart(); } @@ -101,8 +101,6 @@ public class TestResetResource extends ZkIntegrationTestBase { Assert.assertTrue(result, "Cluster verification fails"); // clean up - // wait for all zk callbacks done - Thread.sleep(1000); controller.syncStop(); for (int i = 0; i < 5; i++) { participants[i].syncStop(); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java index 008782c..64043ed 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java @@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.helix.NotificationContext; import org.apache.helix.TestHelper; -import org.apache.helix.controller.HelixControllerMain; -import org.apache.helix.mock.participant.MockParticipant; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.mock.participant.MockTransition; import org.apache.helix.model.Message; import org.apache.helix.tools.ClusterStateVerifier; @@ -35,15 +35,15 @@ import org.testng.annotations.Test; public class TestRestartParticipant extends ZkIntegrationTestBase { public class KillOtherTransition extends MockTransition { - final AtomicReference _other; + final AtomicReference _other; - public KillOtherTransition(MockParticipant other) { - _other = new AtomicReference(other); + public KillOtherTransition(MockParticipantManager other) { + _other = new AtomicReference(other); } @Override public void doTransition(Message message, NotificationContext context) { - MockParticipant other = _other.getAndSet(null); + MockParticipantManager other = _other.getAndSet(null); if (other != null) { System.err.println("Kill " + other.getInstanceName() + ". Interrupted exceptions are IGNORABLE"); @@ -58,7 +58,7 @@ public class TestRestartParticipant extends ZkIntegrationTestBase { System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis())); String clusterName = getShortClassName(); - MockParticipant[] participants = new MockParticipant[5]; + MockParticipantManager[] participants = new MockParticipantManager[5]; TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port "localhost", // participant name prefix @@ -69,19 +69,19 @@ public class TestRestartParticipant extends ZkIntegrationTestBase { 3, // replicas "MasterSlave", true); // do rebalance - TestHelper - .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); + // start participants for (int i = 0; i < 5; i++) { String instanceName = "localhost_" + (12918 + i); if (i == 4) { - participants[i] = - new MockParticipant(clusterName, instanceName, ZK_ADDR, new KillOtherTransition( - participants[0])); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].setTransition(new KillOtherTransition(participants[0])); } else { - participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); - // Thread.sleep(100); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); } participants[i].syncStart(); @@ -94,9 +94,9 @@ public class TestRestartParticipant extends ZkIntegrationTestBase { // restart Thread.sleep(500); - MockParticipant participant = - new MockParticipant(participants[0].getClusterName(), participants[0].getInstanceName(), - ZK_ADDR, null); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, participants[0].getClusterName(), + participants[0].getInstanceName()); System.err.println("Restart " + participant.getInstanceName()); participant.syncStart(); result = @@ -104,6 +104,13 @@ public class TestRestartParticipant extends ZkIntegrationTestBase { clusterName)); Assert.assertTrue(result); + // clean up + controller.syncStop(); + for (int i = 0; i < 5; i++) { + participants[i].syncStop(); + } + participant.syncStop(); + System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis())); }