Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E2B1CD1A1 for ; Thu, 1 Nov 2012 04:05:08 +0000 (UTC) Received: (qmail 12407 invoked by uid 500); 1 Nov 2012 04:05:08 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 12328 invoked by uid 500); 1 Nov 2012 04:05:07 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 12296 invoked by uid 99); 1 Nov 2012 04:05:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Nov 2012 04:05:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Nov 2012 04:05:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1845123888E3 for ; Thu, 1 Nov 2012 04:04:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1404460 - /hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Date: Thu, 01 Nov 2012 04:04:18 -0000 To: commits@hama.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121101040418.1845123888E3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: edwardyoon Date: Thu Nov 1 04:04:17 2012 New Revision: 1404460 URL: http://svn.apache.org/viewvc?rev=1404460&view=rev Log: HAMA-662: Fix bugs of uni tests Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1404460&r1=1404459&r2=1404460&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Thu Nov 1 04:04:17 2012 @@ -64,8 +64,7 @@ public class TestBSPTaskFaults extends T public static final String TEST_POINT = "bsp.ft.test.point"; public static final String TEST_GROOM_PORT = "bsp.ft.test.groomport"; private static int TEST_NUMBER = 0; - - + private volatile MinimalGroomServer groom; private volatile BSPPeerProtocol umbilical; private Server workerServer; @@ -75,8 +74,8 @@ public class TestBSPTaskFaults extends T public volatile HamaConfiguration conf; private ScheduledExecutorService testBSPTaskService; - - private static synchronized int incrementTestNumber(){ + + private static synchronized int incrementTestNumber() { return ++TEST_NUMBER; } @@ -104,7 +103,6 @@ public class TestBSPTaskFaults extends T @Override public void close() throws IOException { isShutDown = true; - } @Override @@ -172,7 +170,7 @@ public class TestBSPTaskFaults extends T private class TestBSPTaskThreadRunner extends Thread { BSPJob job; - + TestBSPTaskThreadRunner(BSPJob jobConf) { job = jobConf; } @@ -240,7 +238,8 @@ public class TestBSPTaskFaults extends T } public void destroyProcess() { - + bspTaskProcess.destroy(); + sched.shutdown(); } @Override @@ -265,8 +264,7 @@ public class TestBSPTaskFaults extends T commands.add(TestBSPProcessRunner.class.getName()); - LOG.info("starting process for failure case - " - + testPoint); + LOG.info("starting process for failure case - " + testPoint); commands.add("" + testPoint); commands.add("" + testPort); @@ -331,7 +329,7 @@ public class TestBSPTaskFaults extends T BSPJob job = new BSPJob(hamaConf); job.setInputFormat(NullInputFormat.class); job.setOutputFormat(NullOutputFormat.class); - + final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy( BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, new InetSocketAddress("127.0.0.1", port), hamaConf); @@ -425,7 +423,8 @@ public class TestBSPTaskFaults extends T LocalBSPRunner.LocalSyncClient.class, SyncClient.class); int testNumber = incrementTestNumber(); - InetSocketAddress inetAddress = new InetSocketAddress(BSPNetUtils.getFreePort(34321) + testNumber); + InetSocketAddress inetAddress = new InetSocketAddress( + BSPNetUtils.getFreePort(34321) + testNumber); groom = new MinimalGroomServer(conf); workerServer = RPC.getServer(groom, inetAddress.getHostName(), inetAddress.getPort(), conf); @@ -462,8 +461,10 @@ public class TestBSPTaskFaults extends T CompletionService completionService = new ExecutorCompletionService( this.testBSPTaskService); - Future future = completionService - .submit(new TestBSPProcessRunner(0, workerServer.getListenerAddress().getPort())); + TestBSPProcessRunner runner = new TestBSPProcessRunner(0, workerServer + .getListenerAddress().getPort()); + + Future future = completionService.submit(runner); try { future.get(20000, TimeUnit.MILLISECONDS); @@ -478,7 +479,7 @@ public class TestBSPTaskFaults extends T checkIfPingTestPassed(); groom.setPingCount(0); this.testBSPTaskService.shutdownNow(); - + runner.destroyProcess(); } /* @@ -490,11 +491,11 @@ public class TestBSPTaskFaults extends T LOG.info("Testing ping failure case - 1"); conf.setInt(TEST_POINT, 1); - CompletionService completionService = - new ExecutorCompletionService(this.testBSPTaskService); - Future future = completionService - .submit(new TestBSPProcessRunner(1, - workerServer.getListenerAddress().getPort())); + CompletionService completionService = new ExecutorCompletionService( + this.testBSPTaskService); + TestBSPProcessRunner runner = new TestBSPProcessRunner(1, workerServer + .getListenerAddress().getPort()); + Future future = completionService.submit(runner); try { future.get(20000, TimeUnit.MILLISECONDS); @@ -509,18 +510,18 @@ public class TestBSPTaskFaults extends T checkIfPingTestPassed(); groom.setPingCount(0); this.testBSPTaskService.shutdownNow(); - + runner.destroyProcess(); } public void testPingOnTaskExecFailure() { LOG.info("Testing ping failure case - 2"); conf.setInt(TEST_POINT, 2); - CompletionService completionService = - new ExecutorCompletionService(this.testBSPTaskService); - Future future = completionService - .submit(new TestBSPProcessRunner(2, - workerServer.getListenerAddress().getPort())); + CompletionService completionService = new ExecutorCompletionService( + this.testBSPTaskService); + TestBSPProcessRunner runner = new TestBSPProcessRunner(2, workerServer + .getListenerAddress().getPort()); + Future future = completionService.submit(runner); try { future.get(20000, TimeUnit.MILLISECONDS); @@ -535,7 +536,7 @@ public class TestBSPTaskFaults extends T checkIfPingTestPassed(); groom.setPingCount(0); this.testBSPTaskService.shutdownNow(); - + runner.destroyProcess(); } public void testPingOnTaskCleanupFailure() { @@ -543,12 +544,12 @@ public class TestBSPTaskFaults extends T LOG.info("Testing ping failure case - 3"); conf.setInt(TEST_POINT, 3); - CompletionService completionService = - new ExecutorCompletionService(this.testBSPTaskService); - - Future future = completionService - .submit(new TestBSPProcessRunner(3, - workerServer.getListenerAddress().getPort())); + CompletionService completionService = new ExecutorCompletionService( + this.testBSPTaskService); + TestBSPProcessRunner runner = new TestBSPProcessRunner(3, workerServer + .getListenerAddress().getPort()); + + Future future = completionService.submit(runner); try { future.get(20000, TimeUnit.MILLISECONDS); @@ -563,17 +564,18 @@ public class TestBSPTaskFaults extends T checkIfPingTestPassed(); groom.setPingCount(0); this.testBSPTaskService.shutdownNow(); - + runner.destroyProcess(); } public void testBSPTaskSelfDestroy() { LOG.info("Testing self kill on lost contact."); - CompletionService completionService = - new ExecutorCompletionService(this.testBSPTaskService); - Future future = completionService - .submit(new TestBSPProcessRunner(0, - workerServer.getListenerAddress().getPort())); + CompletionService completionService = new ExecutorCompletionService( + this.testBSPTaskService); + TestBSPProcessRunner runner = new TestBSPProcessRunner(0, workerServer + .getListenerAddress().getPort()); + + Future future = completionService.submit(runner); try { while (groom.pingCount == 0) { @@ -598,12 +600,13 @@ public class TestBSPTaskFaults extends T } assertEquals(69, exitValue.intValue()); + runner.destroyProcess(); } @Override protected void tearDown() throws Exception { - super.tearDown(); + if (groom != null) groom.setPingCount(0); if (umbilical != null) {