Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 568E89292 for ; Fri, 15 Jun 2012 17:29:22 +0000 (UTC) Received: (qmail 82653 invoked by uid 500); 15 Jun 2012 17:29:22 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 82608 invoked by uid 500); 15 Jun 2012 17:29:22 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 82598 invoked by uid 99); 15 Jun 2012 17:29:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jun 2012 17:29:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jun 2012 17:29:16 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5C32C2388C6A for ; Fri, 15 Jun 2012 17:28:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1350713 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/util/ graph/src/test/java/org/apache/hama/graph/ Date: Fri, 15 Jun 2012 17:28:53 -0000 To: commits@hama.apache.org From: tjungblut@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120615172854.5C32C2388C6A@eris.apache.org> Author: tjungblut Date: Fri Jun 15 17:28:52 2012 New Revision: 1350713 URL: http://svn.apache.org/viewvc?rev=1350713&view=rev Log: making testcases more robust against state and parallelism Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1350713&r1=1350712&r2=1350713&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Fri Jun 15 17:28:52 2012 @@ -497,7 +497,7 @@ public class BSPMaster implements JobSub LOG.error(e); } } else { - this.clearZKNodes(); + this.clearZKNodes(zk); } } } @@ -505,11 +505,15 @@ public class BSPMaster implements JobSub /** * Clears all sub-children of node bspRoot */ - public void clearZKNodes() { + public void clearZKNodes(ZooKeeper zk) { + clearZKNodes(zk, bspRoot); + } + + public static void clearZKNodes(ZooKeeper zk, String path) { try { - Stat s = zk.exists(bspRoot, false); + Stat s = zk.exists(path, false); if (s != null) { - clearZKNodes(bspRoot); + clearZKNodesInternal(zk, path); } } catch (Exception e) { @@ -519,13 +523,9 @@ public class BSPMaster implements JobSub /** * Clears all sub-children of node rooted at path. - * - * @param path - * @throws InterruptedException - * @throws KeeperException */ - private void clearZKNodes(String path) throws KeeperException, - InterruptedException { + private static void clearZKNodesInternal(ZooKeeper zk, String path) + throws KeeperException, InterruptedException { ArrayList list = (ArrayList) zk.getChildren(path, false); if (list.size() == 0) { @@ -533,7 +533,7 @@ public class BSPMaster implements JobSub } else { for (String node : list) { - clearZKNodes(path + "/" + node); + clearZKNodes(zk, path + "/" + node); zk.delete(path + "/" + node, -1); // delete any version of this node. } } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1350713&r1=1350712&r2=1350713&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Fri Jun 15 17:28:52 2012 @@ -99,7 +99,7 @@ public class GroomServer implements Runn }; private HttpServer server; - private static ZooKeeper zk = null; + private ZooKeeper zk = null; // Running States and its related things volatile boolean initialized = false; @@ -621,7 +621,7 @@ public class GroomServer implements Runn synchronized (rjob) { if (!rjob.localized) { - + FileSystem dfs = FileSystem.get(conf); FileSystem localFs = FileSystem.getLocal(conf); Path jobDir = localJobFile.getParent(); if (localFs.exists(jobDir)) { @@ -634,7 +634,7 @@ public class GroomServer implements Runn Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/" + task.getTaskID() + "/" + "job.jar"); - systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile); + dfs.copyToLocalFile(new Path(task.getJobFile()), localJobFile); HamaConfiguration conf = new HamaConfiguration(); conf.addResource(localJobFile); @@ -650,7 +650,7 @@ public class GroomServer implements Runn jobConf.setJar(localJarFile.toString()); if (jarFile != null) { - systemFS.copyToLocalFile(jarFile, localJarFile); + dfs.copyToLocalFile(jarFile, localJarFile); // also unjar the job.jar files in workdir File workDir = new File( @@ -908,7 +908,7 @@ public class GroomServer implements Runn + task.getTaskID() + "/job.jar"); String jobFile = task.getJobFile(); - systemFS.copyToLocalFile(new Path(jobFile), localJobFile); + FileSystem.get(conf).copyToLocalFile(new Path(jobFile), localJobFile); task.setJobFile(localJobFile.toString()); localJobConf = new BSPJob(task.getJobID(), localJobFile.toString()); @@ -916,7 +916,7 @@ public class GroomServer implements Runn String jarFile = localJobConf.getJar(); if (jarFile != null) { - systemFS.copyToLocalFile(new Path(jarFile), localJarFile); + FileSystem.get(conf).copyToLocalFile(new Path(jarFile), localJarFile); localJobConf.setJar(localJarFile.toString()); } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1350713&r1=1350712&r2=1350713&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Fri Jun 15 17:28:52 2012 @@ -242,8 +242,8 @@ class JobInProgress { this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(), 0L, 0L, JobStatus.RUNNING, counters); - // delete all nodes before start - master.clearZKNodes(); + // delete all nodes belonging to that job before start + BSPMaster.clearZKNodes(master.zk, this.getJobID().toString()); master.createJobRoot(this.getJobID().toString()); tasksInited = true; Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1350713&r1=1350712&r2=1350713&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Fri Jun 15 17:28:52 2012 @@ -526,7 +526,7 @@ public class LocalBSPRunner implements J @Override public void close() throws InterruptedException { - + barrier = null; } } Modified: hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java?rev=1350713&r1=1350712&r2=1350713&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java Fri Jun 15 17:28:52 2012 @@ -27,7 +27,7 @@ import org.apache.commons.logging.LogFac */ public abstract class HamaCluster extends HamaClusterTestCase { public static final Log LOG = LogFactory.getLog(HamaCluster.class); - protected final static HamaConfiguration conf = new HamaConfiguration(); + private final static HamaConfiguration conf = new HamaConfiguration(); public HamaCluster(){ super(); Modified: hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java?rev=1350713&r1=1350712&r2=1350713&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java Fri Jun 15 17:28:52 2012 @@ -35,7 +35,6 @@ import org.apache.hama.bsp.BSPMaster; import org.apache.hama.bsp.GroomServer; import org.apache.hama.HamaConfiguration; - public class MiniBSPCluster { public static final Log LOG = LogFactory.getLog(MiniBSPCluster.class); @@ -44,85 +43,94 @@ public class MiniBSPCluster { private HamaConfiguration configuration; private BSPMasterRunner master; - private List groomServerList = - new CopyOnWriteArrayList(); + private List groomServerList = new CopyOnWriteArrayList(); private int grooms; - public class BSPMasterRunner implements Runnable{ + public class BSPMasterRunner implements Runnable { BSPMaster bspm; HamaConfiguration conf; - public BSPMasterRunner(HamaConfiguration conf){ + public BSPMasterRunner(HamaConfiguration conf) { this.conf = conf; - if(null == this.conf) + if (null == this.conf) throw new NullPointerException("No Configuration for BSPMaster."); - } + } @Override - public void run(){ - try{ + public void run() { + try { LOG.info("Starting BSP Master."); - this.bspm = BSPMaster.startMaster(this.conf); + this.bspm = BSPMaster.startMaster(this.conf); this.bspm.offerService(); - }catch(IOException ioe){ + } catch (IOException ioe) { LOG.error("Fail to startup BSP Master.", ioe); - }catch(InterruptedException ie){ + } catch (InterruptedException ie) { LOG.error("BSP Master fails in offerService().", ie); Thread.currentThread().interrupt(); } } - public void shutdown(){ - if(null != this.bspm) this.bspm.shutdown(); + public void shutdown() { + if (null != this.bspm) + this.bspm.shutdown(); } - public boolean isRunning(){ - if(null == this.bspm) return false; + public boolean isRunning() { + if (null == this.bspm) + return false; - if(this.bspm.currentState().equals(BSPMaster.State.RUNNING)){ + if (this.bspm.currentState().equals(BSPMaster.State.RUNNING)) { return true; - } + } return false; } - public BSPMaster getMaster(){ + public BSPMaster getMaster() { return this.bspm; } } - public class GroomServerRunner implements Runnable{ + public class GroomServerRunner implements Runnable { GroomServer gs; HamaConfiguration conf; - public GroomServerRunner(HamaConfiguration conf){ + public GroomServerRunner(HamaConfiguration conf) { this.conf = conf; } - + @Override - public void run(){ - try{ + public void run() { + try { this.gs = GroomServer.constructGroomServer(GroomServer.class, conf); GroomServer.startGroomServer(this.gs).join(); - }catch(InterruptedException ie){ + } catch (InterruptedException ie) { LOG.error("Fail to start GroomServer. ", ie); Thread.currentThread().interrupt(); + } finally { + try { + gs.close(); + } catch (IOException e) { + e.printStackTrace(); + } } } - public void shutdown(){ - try{ - if(null != this.gs) this.gs.shutdown(); - }catch(IOException ioe){ + public void shutdown() { + try { + if (null != this.gs) + this.gs.shutdown(); + } catch (IOException ioe) { LOG.info("Fail to shutdown GroomServer.", ioe); } } - - public boolean isRunning(){ - if(null == this.gs) return false; - return this.gs.isRunning(); + + public boolean isRunning() { + if (null == this.gs) + return false; + return this.gs.isRunning(); } - public GroomServer getGroomServer(){ + public GroomServer getGroomServer() { return this.gs; } } @@ -130,73 +138,73 @@ public class MiniBSPCluster { public MiniBSPCluster(HamaConfiguration conf, int groomServers) { this.configuration = conf; this.grooms = groomServers; - if(1 > this.grooms) { - this.grooms = 2; + if (1 > this.grooms) { + this.grooms = 2; } - LOG.info("Groom server number "+this.grooms); + LOG.info("Groom server number " + this.grooms); int threadpool = conf.getInt("bsp.test.threadpool", 10); - LOG.info("Thread pool value "+threadpool); + LOG.info("Thread pool value " + threadpool); scheduler = Executors.newScheduledThreadPool(threadpool); } - public void startBSPCluster(){ + public void startBSPCluster() { startMaster(); startGroomServers(); } - public void shutdownBSPCluster(){ - if(null != this.master && this.master.isRunning()) + public void shutdownBSPCluster() { + if (null != this.master && this.master.isRunning()) this.master.shutdown(); - if(0 < groomServerList.size()){ - for(GroomServerRunner groom: groomServerList){ - if(groom.isRunning()) groom.shutdown(); + if (0 < groomServerList.size()) { + for (GroomServerRunner groom : groomServerList) { + if (groom.isRunning()) + groom.shutdown(); } } } - - public void startMaster(){ - if(null == this.scheduler) + public void startMaster() { + if (null == this.scheduler) throw new NullPointerException("No ScheduledExecutorService exists."); this.master = new BSPMasterRunner(this.configuration); scheduler.schedule(this.master, 0, SECONDS); } - public void startGroomServers(){ - if(null == this.scheduler) + public void startGroomServers() { + if (null == this.scheduler) throw new NullPointerException("No ScheduledExecutorService exists."); - if(null == this.master) + if (null == this.master) throw new NullPointerException("No BSPMaster exists."); - int cnt=0; - while(!this.master.isRunning()){ + int cnt = 0; + while (!this.master.isRunning()) { LOG.info("Waiting BSPMaster up."); - try{ + try { Thread.sleep(1000); cnt++; - if(100 < cnt){ + if (100 < cnt) { fail("Fail to launch BSPMaster."); } - }catch(InterruptedException ie){ + } catch (InterruptedException ie) { LOG.error("Fail to check BSP Master's state.", ie); Thread.currentThread().interrupt(); } } - for(int i=0; i < this.grooms; i++){ + for (int i = 0; i < this.grooms; i++) { HamaConfiguration c = new HamaConfiguration(this.configuration); randomPort(c); GroomServerRunner gsr = new GroomServerRunner(c); groomServerList.add(gsr); scheduler.schedule(gsr, 0, SECONDS); cnt = 0; - while(!gsr.isRunning()){ + while (!gsr.isRunning()) { LOG.info("Waitin for GroomServer up."); - try{ + try { Thread.sleep(1000); cnt++; - if(10 < cnt){ + if (10 < cnt) { fail("Fail to launch groom server."); } - }catch(InterruptedException ie){ + } catch (InterruptedException ie) { LOG.error("Fail to check Groom Server's state.", ie); Thread.currentThread().interrupt(); } @@ -205,14 +213,14 @@ public class MiniBSPCluster { } - private static void randomPort(HamaConfiguration conf){ - try{ + private static void randomPort(HamaConfiguration conf) { + try { ServerSocket skt = new ServerSocket(0); - int p = skt.getLocalPort(); + int p = skt.getLocalPort(); skt.close(); conf.set(Constants.PEER_PORT, new Integer(p).toString()); - conf.setInt(Constants.GROOM_RPC_PORT, p+100); - }catch(IOException ioe){ + conf.setInt(Constants.GROOM_RPC_PORT, p + 100); + } catch (IOException ioe) { LOG.error("Can not find a free port for BSPPeer.", ioe); } } @@ -224,7 +232,7 @@ public class MiniBSPCluster { public List getGroomServerThreads() { List list = new ArrayList(); - for(GroomServerRunner gsr: groomServerList){ + for (GroomServerRunner gsr : groomServerList) { list.add(new Thread(gsr)); } return list; @@ -234,21 +242,21 @@ public class MiniBSPCluster { return new Thread(this.master); } - public List getGroomServers(){ + public List getGroomServers() { List list = new ArrayList(); - for(GroomServerRunner gsr: groomServerList){ + for (GroomServerRunner gsr : groomServerList) { list.add(gsr.getGroomServer()); } return list; } - public BSPMaster getBSPMaster(){ - if(null != this.master) + public BSPMaster getBSPMaster() { + if (null != this.master) return this.master.getMaster(); return null; } - public ScheduledExecutorService getScheduler(){ + public ScheduledExecutorService getScheduler() { return this.scheduler; } } Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1350713&r1=1350712&r2=1350713&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Fri Jun 15 17:28:52 2012 @@ -19,9 +19,6 @@ */ package org.apache.hama.bsp; -import java.io.IOException; -import java.util.ArrayList; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,14 +33,6 @@ import org.apache.hama.HamaCluster; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.message.DiskQueue; import org.apache.hama.examples.ClassSerializePrinting; -import org.apache.hama.zookeeper.QuorumPeer; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; public class TestBSPMasterGroomServer extends HamaCluster { @@ -54,6 +43,9 @@ public class TestBSPMasterGroomServer ex protected HamaConfiguration configuration; + // these variables are preventing from rebooting the whole stuff again since + // setup and teardown are called per method. + public TestBSPMasterGroomServer() { configuration = new HamaConfiguration(); configuration.set("bsp.master.address", "localhost"); @@ -61,7 +53,7 @@ public class TestBSPMasterGroomServer ex assertEquals("Make sure master addr is set to localhost:", "localhost", configuration.get("bsp.master.address")); configuration.set("bsp.local.dir", "/tmp/hama-test"); - conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH); + configuration.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH); configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost"); configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810); configuration.set("hama.sync.client.class", @@ -99,10 +91,10 @@ public class TestBSPMasterGroomServer ex assertEquals(this.numOfGroom, cluster.getGroomServers()); bsp.setNumBspTask(2); - FileSystem fileSys = FileSystem.get(conf); + FileSystem fileSys = FileSystem.get(configuration); if (bsp.waitForCompletion(true)) { - checkOutput(fileSys, conf, 2); + checkOutput(fileSys, configuration, 2); } else { fail(); } @@ -115,8 +107,8 @@ public class TestBSPMasterGroomServer ex assertEquals(listStatus.length, tasks); for (FileStatus status : listStatus) { if (!status.isDir()) { - SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, status - .getPath(), conf); + SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, + status.getPath(), conf); int superStep = 0; int taskstep = 0; IntWritable key = new IntWritable(); @@ -144,53 +136,4 @@ public class TestBSPMasterGroomServer ex * END: Job submission tests. */ - /* - * BEGIN: ZooKeeper tests. - */ - public void testClearZKNodes() throws IOException, KeeperException, - InterruptedException { - // Clear any existing znode with the same path as bspRoot. - bspCluster.getBSPMaster().clearZKNodes(); - int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, - 6000); - String connectStr = QuorumPeer.getZKQuorumServersString(configuration); - String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT, - Constants.DEFAULT_ZOOKEEPER_ROOT); // Establishing a zk session. - ZooKeeper zk = new ZooKeeper(connectStr, timeout, new Watcher() { - - @Override - public void process(WatchedEvent arg0) { - // do nothing. - } - - }); - // Creating dummy bspRoot if it doesn't already exist. - - Stat s = zk.exists(bspRoot, false); - if (s == null) { - zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - // Creating dummy child nodes at depth 1. - String node1 = bspRoot + "/task1"; - String node2 = bspRoot + "/task2"; - zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - // Creating dummy child node at depth 2. - String node11 = node1 + "/superstep1"; - zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - ArrayList list = (ArrayList) zk.getChildren(bspRoot, false); - assertEquals(2, list.size()); - System.out.println(list.size()); - bspCluster.getBSPMaster().clearZKNodes(); - list = (ArrayList) zk.getChildren(bspRoot, false); - System.out.println(list.size()); - assertEquals(0, list.size()); - try { - zk.getData(node11, false, null); - fail(); - } catch (KeeperException.NoNodeException e) { - System.out.println("Node has been removed correctly!"); - } - } } Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1350713&r1=1350712&r2=1350713&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Fri Jun 15 17:28:52 2012 @@ -47,8 +47,10 @@ public class TestCheckpoint extends Test @SuppressWarnings({ "unchecked", "rawtypes" }) public void testCheckpoint() throws Exception { - Configuration config = new HamaConfiguration(); - config.set(SyncServiceFactory.SYNC_CLIENT_CLASS, LocalBSPRunner.LocalSyncClient.class.getName()); + Configuration config = new Configuration(); + config.set(SyncServiceFactory.SYNC_CLIENT_CLASS, + LocalBSPRunner.LocalSyncClient.class.getName()); + config.set("bsp.output.dir", "/tmp/hama-test_out"); FileSystem dfs = FileSystem.get(config); BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs); @@ -85,7 +87,8 @@ public class TestCheckpoint extends Test public void testCheckpointInterval() throws Exception { - HamaConfiguration conf = new HamaConfiguration(); + Configuration conf = new Configuration(); + conf.set("bsp.output.dir", "/tmp/hama-test_out"); conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS, LocalBSPRunner.LocalSyncClient.class, SyncClient.class); @@ -100,16 +103,18 @@ public class TestCheckpoint extends Test LOG.info("Started RPC server"); conf.setInt("bsp.groom.rpc.port", inetAddress.getPort()); + conf.setInt("bsp.peers.num", 1); BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy( - BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress, conf); + BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress, + conf); LOG.info("Started the proxy connections"); TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID( "job_201110102255", 1), 1), 1); try { - BSPJob job = new BSPJob(conf); + BSPJob job = new BSPJob(new HamaConfiguration(conf)); job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH); job.setOutputFormat(TextOutputFormat.class); final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy( @@ -154,7 +159,7 @@ public class TestCheckpoint extends Test bspPeer.sync(); LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), true); + assertEquals(bspPeer.isReadyToCheckpoint(), false); } catch (Exception e) { LOG.error("Error testing BSPPeer.", e); Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1350713&r1=1350712&r2=1350713&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Fri Jun 15 17:28:52 2012 @@ -22,19 +22,20 @@ package org.apache.hama.bsp; import java.io.IOException; import java.util.ArrayList; +import junit.framework.TestCase; + import org.apache.hama.Constants; -import org.apache.hama.HamaCluster; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl; +import org.apache.hama.util.BSPNetUtils; import org.apache.hama.zookeeper.QuorumPeer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; -public class TestZooKeeper extends HamaCluster { +public class TestZooKeeper extends TestCase { private HamaConfiguration configuration; @@ -44,75 +45,80 @@ public class TestZooKeeper extends HamaC assertEquals("Make sure master addr is set to localhost:", "localhost", configuration.get("bsp.master.address")); configuration.set("bsp.local.dir", "/tmp/hama-test"); + configuration.set("bsp.output.dir", "/tmp/hama-test_out"); configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost"); - configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810); + configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, + BSPNetUtils.getFreePort(20000)); configuration.set("hama.sync.client.class", org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class .getCanonicalName()); } - @Override - public void setUp() throws Exception { - super.setUp(); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - } - public void testClearZKNodes() throws IOException, KeeperException, InterruptedException { + final ZooKeeperSyncServerImpl server = new ZooKeeperSyncServerImpl(); + try { + server.init(configuration); + new Thread(new Runnable() { - // Clear any existing znode with the same path as bspRoot. - bspCluster.getBSPMaster().clearZKNodes(); - - int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, - 6000); - String connectStr = QuorumPeer.getZKQuorumServersString(configuration); - String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT, - Constants.DEFAULT_ZOOKEEPER_ROOT); - - // Establishing a zk session. - ZooKeeper zk = new ZooKeeper(connectStr, timeout, new Watcher() { - @Override - public void process(WatchedEvent event) { - // Do nothing.(Dummy Watcher) + @Override + public void run() { + try { + server.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }).start(); + + int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, + 6000); + String connectStr = QuorumPeer.getZKQuorumServersString(configuration); + String bspRoot = "/"; + // Establishing a zk session. + ZooKeeper zk = new ZooKeeper(connectStr, timeout, null); + + // Creating dummy bspRoot if it doesn't already exist. + Stat s = zk.exists(bspRoot, false); + if (s == null) { + zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); } - }); - - // Creating dummy bspRoot if it doesn't already exist. - Stat s = zk.exists(bspRoot, false); - if (s == null) { - zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - // Creating dummy child nodes at depth 1. - String node1 = bspRoot + "/task1"; - String node2 = bspRoot + "/task2"; - zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - // Creating dummy child node at depth 2. - String node11 = node1 + "/superstep1"; - zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - ArrayList list = (ArrayList) zk.getChildren(bspRoot, false); - assertEquals(2, list.size()); - System.out.println(list.size()); - - bspCluster.getBSPMaster().clearZKNodes(); - - list = (ArrayList) zk.getChildren(bspRoot, false); - System.out.println(list.size()); - assertEquals(0, list.size()); - - try { - zk.getData(node11, false, null); - fail(); - } catch (KeeperException.NoNodeException e) { - System.out.println("Node has been removed correctly!"); + // Creating dummy child nodes at depth 1. + String node1 = bspRoot + "task1"; + String node2 = bspRoot + "task2"; + zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // Creating dummy child node at depth 2. + String node11 = node1 + "superstep1"; + zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + ArrayList list = (ArrayList) zk.getChildren(bspRoot, + false); + assertEquals(2, list.size()); + System.out.println(list.size()); + + // clear it + BSPMaster.clearZKNodes(zk, "/"); + + list = (ArrayList) zk.getChildren(bspRoot, false); + System.out.println(list.size()); + assertEquals(0, list.size()); + + try { + zk.getData(node11, false, null); + fail(); + } catch (KeeperException.NoNodeException e) { + System.out.println("Node has been removed correctly!"); + } finally { + zk.close(); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + server.stopServer(); } } Modified: hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java?rev=1350713&r1=1350712&r2=1350713&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/util/TestZKUtil.java Fri Jun 15 17:28:52 2012 @@ -39,23 +39,24 @@ public class TestZKUtil extends TestCase class MockZK extends ZooKeeper { - public MockZK(String connectString, int timeout, Watcher watcher) - throws IOException { + public MockZK(String connectString, int timeout, Watcher watcher) + throws IOException { super(connectString, timeout, watcher); } - - // create is called in for loop + + // create is called in for loop @Override - public String create(String path, byte[] data, List acl, - CreateMode createMode) throws KeeperException, InterruptedException { - parts[pos] = path; + public String create(String path, byte[] data, List acl, + CreateMode createMode) throws KeeperException, InterruptedException { + parts[pos] = path; pos++; - sb.append(ZKUtil.ZK_SEPARATOR+path); + sb.append(ZKUtil.ZK_SEPARATOR + path); StringBuilder builder = new StringBuilder(); - for(int i=0;i