Return-Path: X-Original-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D882E733D for ; Mon, 1 Aug 2011 14:13:59 +0000 (UTC) Received: (qmail 74671 invoked by uid 500); 1 Aug 2011 14:13:59 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 74642 invoked by uid 500); 1 Aug 2011 14:13:59 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@incubator.apache.org Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 74634 invoked by uid 99); 1 Aug 2011 14:13:59 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2011 14:13:59 +0000 X-ASF-Spam-Status: No, hits=-1998.0 required=5.0 tests=ALL_TRUSTED,FB_GET_MEDS X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2011 14:13:47 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D994E2388A66; Mon, 1 Aug 2011 14:13:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1152788 [7/9] - in /incubator/hama/trunk: ./ bin/ conf/ core/ core/bin/ core/conf/ core/src/ core/src/main/ core/src/main/java/ core/src/main/java/org/ core/src/main/java/org/apache/ core/src/main/java/org/apache/hama/ core/src/main/java/o... Date: Mon, 01 Aug 2011 14:12:56 -0000 To: hama-commits@incubator.apache.org From: tommaso@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110801141300.D994E2388A66@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/hama/trunk/core/src/main/webapp/bspmaster/index.html URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/webapp/bspmaster/index.html?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/webapp/bspmaster/index.html (added) +++ incubator/hama/trunk/core/src/main/webapp/bspmaster/index.html Mon Aug 1 14:12:46 2011 @@ -0,0 +1,36 @@ + + + + + +Hama Administration + + + + +

Hama Administration

+ + + + + + Added: incubator/hama/trunk/core/src/main/webapp/bspmaster/machines.jsp URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/webapp/bspmaster/machines.jsp?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/webapp/bspmaster/machines.jsp (added) +++ incubator/hama/trunk/core/src/main/webapp/bspmaster/machines.jsp Mon Aug 1 14:12:46 2011 @@ -0,0 +1,61 @@ + +<%@ page contentType="text/html; charset=UTF-8" import="javax.servlet.*" + import="javax.servlet.http.*" import="java.io.*" import="java.util.*" + import="java.text.DecimalFormat" import="org.apache.hama.bsp.*" + import="org.apache.hama.util.*"%> +<%!private static final long serialVersionUID = 1L;%> +<% + BSPMaster tracker = (BSPMaster) application + .getAttribute("bsp.master"); + ClusterStatus status = tracker.getClusterStatus(true); + String trackerName = tracker.getBSPMasterName(); + String type = request.getParameter("type"); +%> +<%!public void generateGroomsTable(JspWriter out, String type, + ClusterStatus status, BSPMaster master) throws IOException { + out.print("
\n"); + out.print("\n"); + out.print("\n"); + out.print("" + "" + + "\n"); + for (Map.Entry entry : status.getActiveGroomNames() + .entrySet()) { + out.print("" + "\n"); + } + out.print("
Groom Servers
NameHost# running tasks
"); + out.print(entry.getValue() + ""); + out.print(entry.getValue() + "" + 1 + "
\n"); + out.print("
\n"); + }%> + + + +<%=trackerName%> Hama Machine List + + +

<%=trackerName%> Hama Machine List

+ +

Grooms

+<% + generateGroomsTable(out, type, status, tracker); +%> + +<% + out.println(BSPServletUtil.htmlFooter()); +%> \ No newline at end of file Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,47 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Forming up the miniDfs and miniZooKeeper + */ +public abstract class HamaCluster extends HamaClusterTestCase { + public static final Log LOG = LogFactory.getLog(HamaCluster.class); + protected final static HamaConfiguration conf = new HamaConfiguration(); + + public HamaCluster(){ + super(); + } + + public HamaCluster(boolean startDfs) { + super(startDfs); + } + + protected void setUp() throws Exception { + super.setUp(); + } + + protected static HamaConfiguration getConf() { + return conf; + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import java.io.File; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; + +public abstract class HamaClusterTestCase extends HamaTestCase { + public static final Log LOG = LogFactory.getLog(HamaClusterTestCase.class); + protected MiniDFSCluster dfsCluster; + protected MiniBSPCluster bspCluster; + protected MiniZooKeeperCluster zooKeeperCluster; + protected boolean startDfs; + protected int numOfGroom = 2; + + /** default constructor */ + public HamaClusterTestCase() { + this(false); + } + + public HamaClusterTestCase(boolean startDfs) { + super(); + this.startDfs = startDfs; + } + + /** + * Actually start the MiniBSP instance. + */ + protected void hamaClusterSetup() throws Exception { + File testDir = new File(getUnitTestdir(getName()).toString()); + + // Note that this is done before we create the MiniHamaCluster because we + // need to edit the config to add the ZooKeeper servers. + this.zooKeeperCluster = new MiniZooKeeperCluster(); + int clientPort = this.zooKeeperCluster.startup(testDir); + conf.set("hama.zookeeper.property.clientPort", Integer.toString(clientPort)); + bspCluster = new MiniBSPCluster(this.conf, numOfGroom); + bspCluster.startBSPCluster(); + } + + @Override + protected void setUp() throws Exception { + try { + if (this.startDfs) { + // This spews a bunch of warnings about missing scheme. TODO: fix. + this.dfsCluster = new MiniDFSCluster(0, this.conf, 2, true, true, true, + null, null, null, null); + + // mangle the conf so that the fs parameter points to the minidfs we + // just started up + FileSystem filesystem = dfsCluster.getFileSystem(); + conf.set("fs.defaultFS", filesystem.getUri().toString()); + Path parentdir = filesystem.getHomeDirectory(); + + filesystem.mkdirs(parentdir); + } + + // do the super setup now. if we had done it first, then we would have + // gotten our conf all mangled and a local fs started up. + super.setUp(); + + // start the instance + hamaClusterSetup(); + } catch (Exception e) { + if (zooKeeperCluster != null) { + zooKeeperCluster.shutdown(); + } + if (dfsCluster != null) { + shutdownDfs(dfsCluster); + } + throw e; + } + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + try { + if (startDfs) { + shutdownDfs(dfsCluster); + } + bspCluster.shutdown(); + } catch (Exception e) { + LOG.error(e); + } + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import java.io.File; +import java.io.IOException; + +import junit.framework.AssertionFailedError; +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hama.util.Bytes; + +public abstract class HamaTestCase extends TestCase { + private static Log LOG = LogFactory.getLog(HamaTestCase.class); + + /** configuration parameter name for test directory */ + public static final String TEST_DIRECTORY_KEY = "test.build.data"; + + private boolean localfs = false; + protected Path testDir = null; + protected FileSystem fs = null; + + static { + initialize(); + } + + public volatile HamaConfiguration conf; + + /** constructor */ + public HamaTestCase() { + super(); + init(); + } + + /** + * @param name + */ + public HamaTestCase(String name) { + super(name); + init(); + } + + private void init() { + conf = new HamaConfiguration(); + conf.setStrings("bsp.local.dir", "/tmp/hama-test"); + conf.set("bsp.master.address", "localhost"); + conf.set("bsp.groom.report.address", "127.0.0.1:0"); + } + + /** + * Note that this method must be called after the mini hdfs cluster has + * started or we end up with a local file system. + */ + @Override + protected void setUp() throws Exception { + super.setUp(); + localfs = + (conf.get("fs.defaultFS", "file:///").compareTo("file:///") == 0); + + if (fs == null) { + this.fs = FileSystem.get(conf); + } + try { + if (localfs) { + this.testDir = getUnitTestdir(getName()); + if (fs.exists(testDir)) { + fs.delete(testDir, true); + } + } else { + this.testDir = + this.fs.makeQualified(new Path("/tmp/hama-test")); + } + } catch (Exception e) { + LOG.fatal("error during setup", e); + throw e; + } + } + + @Override + protected void tearDown() throws Exception { + try { + if (localfs) { + if (this.fs.exists(testDir)) { + this.fs.delete(testDir, true); + } + } + } catch (Exception e) { + LOG.fatal("error during tear down", e); + } + super.tearDown(); + } + + protected Path getUnitTestdir(String testName) { + return new Path( + conf.get(TEST_DIRECTORY_KEY, "/tmp/hama-test/build/data"), testName); + } + + /** + * Initializes parameters used in the test environment: + * + * Sets the configuration parameter TEST_DIRECTORY_KEY if not already set. + * Sets the boolean debugging if "DEBUGGING" is set in the environment. + * If debugging is enabled, reconfigures logging so that the root log level is + * set to WARN and the logging level for the package is set to DEBUG. + */ + public static void initialize() { + if (System.getProperty(TEST_DIRECTORY_KEY) == null) { + System.setProperty(TEST_DIRECTORY_KEY, new File( + "build/hama/test").getAbsolutePath()); + } + } + + /** + * Common method to close down a MiniDFSCluster and the associated file system + * + * @param cluster + */ + public static void shutdownDfs(MiniDFSCluster cluster) { + if (cluster != null) { + LOG.info("Shutting down Mini DFS "); + try { + cluster.shutdown(); + } catch (Exception e) { + /// Can get a java.lang.reflect.UndeclaredThrowableException thrown + // here because of an InterruptedException. Don't let exceptions in + // here be cause of test failure. + } + try { + FileSystem fs = cluster.getFileSystem(); + if (fs != null) { + LOG.info("Shutting down FileSystem"); + fs.close(); + } + FileSystem.closeAll(); + } catch (IOException e) { + LOG.error("error closing file system", e); + } + } + } + + public void assertByteEquals(byte[] expected, + byte[] actual) { + if (Bytes.compareTo(expected, actual) != 0) { + throw new AssertionFailedError("expected:<" + + Bytes.toString(expected) + "> but was:<" + + Bytes.toString(actual) + ">"); + } + } + + public static void assertEquals(byte[] expected, + byte[] actual) { + if (Bytes.compareTo(expected, actual) != 0) { + throw new AssertionFailedError("expected:<" + + Bytes.toStringBinary(expected) + "> but was:<" + + Bytes.toStringBinary(actual) + ">"); + } + } + +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import java.io.IOException; + +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import static java.util.concurrent.TimeUnit.*; + +import static junit.framework.Assert.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hama.bsp.BSPMaster; +import org.apache.hama.bsp.GroomServer; +import org.apache.hama.HamaConfiguration; + + +public class MiniBSPCluster { + + public static final Log LOG = LogFactory.getLog(MiniBSPCluster.class); + + private ScheduledExecutorService scheduler; + + private HamaConfiguration configuration; + private BSPMasterRunner master; + private List groomServerList = + new CopyOnWriteArrayList(); + private int grooms; + + public class BSPMasterRunner implements Runnable{ + BSPMaster bspm; + HamaConfiguration conf; + + public BSPMasterRunner(HamaConfiguration conf){ + this.conf = conf; + if(null == this.conf) + throw new NullPointerException("No Configuration for BSPMaster."); + } + + public void run(){ + try{ + LOG.info("Starting BSP Master."); + this.bspm = BSPMaster.startMaster(this.conf); + this.bspm.offerService(); + }catch(IOException ioe){ + LOG.error("Fail to startup BSP Master.", ioe); + }catch(InterruptedException ie){ + LOG.error("BSP Master fails in offerService().", ie); + Thread.currentThread().interrupt(); + } + } + + public void shutdown(){ + if(null != this.bspm) this.bspm.shutdown(); + } + + public boolean isRunning(){ + if(null == this.bspm) return false; + + if(this.bspm.currentState().equals(BSPMaster.State.RUNNING)){ + return true; + } + return false; + } + + public BSPMaster getMaster(){ + return this.bspm; + } + } + + public class GroomServerRunner implements Runnable{ + GroomServer gs; + HamaConfiguration conf; + + public GroomServerRunner(HamaConfiguration conf){ + this.conf = conf; + } + + public void run(){ + try{ + this.gs = GroomServer.constructGroomServer(GroomServer.class, conf); + GroomServer.startGroomServer(this.gs).join(); + }catch(InterruptedException ie){ + LOG.error("Fail to start GroomServer. ", ie); + Thread.currentThread().interrupt(); + } + } + + public void shutdown(){ + try{ + if(null != this.gs) this.gs.shutdown(); + }catch(IOException ioe){ + LOG.info("Fail to shutdown GroomServer.", ioe); + } + } + + public boolean isRunning(){ + if(null == this.gs) return false; + return this.gs.isRunning(); + } + + public GroomServer getGroomServer(){ + return this.gs; + } + } + + public MiniBSPCluster(HamaConfiguration conf, int groomServers) { + this.configuration = conf; + this.grooms = groomServers; + if(1 > this.grooms) { + this.grooms = 2; + } + LOG.info("Groom server number "+this.grooms); + int threadpool = conf.getInt("bsp.test.threadpool", 10); + LOG.info("Thread pool value "+threadpool); + scheduler = Executors.newScheduledThreadPool(threadpool); + } + + public void startBSPCluster(){ + startMaster(); + startGroomServers(); + } + + public void shutdownBSPCluster(){ + if(null != this.master && this.master.isRunning()) + this.master.shutdown(); + if(0 < groomServerList.size()){ + for(GroomServerRunner groom: groomServerList){ + if(groom.isRunning()) groom.shutdown(); + } + } + } + + + public void startMaster(){ + if(null == this.scheduler) + throw new NullPointerException("No ScheduledExecutorService exists."); + this.master = new BSPMasterRunner(this.configuration); + scheduler.schedule(this.master, 0, SECONDS); + } + + public void startGroomServers(){ + if(null == this.scheduler) + throw new NullPointerException("No ScheduledExecutorService exists."); + if(null == this.master) + throw new NullPointerException("No BSPMaster exists."); + int cnt=0; + while(!this.master.isRunning()){ + LOG.info("Waiting BSPMaster up."); + try{ + Thread.sleep(1000); + cnt++; + if(100 < cnt){ + fail("Fail to launch BSPMaster."); + } + }catch(InterruptedException ie){ + LOG.error("Fail to check BSP Master's state.", ie); + Thread.currentThread().interrupt(); + } + } + for(int i=0; i < this.grooms; i++){ + HamaConfiguration c = new HamaConfiguration(this.configuration); + randomPort(c); + GroomServerRunner gsr = new GroomServerRunner(c); + groomServerList.add(gsr); + scheduler.schedule(gsr, 0, SECONDS); + cnt = 0; + while(!gsr.isRunning()){ + LOG.info("Waitin for GroomServer up."); + try{ + Thread.sleep(1000); + cnt++; + if(10 < cnt){ + fail("Fail to launch groom server."); + } + }catch(InterruptedException ie){ + LOG.error("Fail to check Groom Server's state.", ie); + Thread.currentThread().interrupt(); + } + } + } + + } + + private void randomPort(HamaConfiguration conf){ + try{ + ServerSocket skt = new ServerSocket(0); + int p = skt.getLocalPort(); + skt.close(); + conf.set(Constants.PEER_PORT, new Integer(p).toString()); + conf.setInt(Constants.GROOM_RPC_PORT, p+100); + }catch(IOException ioe){ + LOG.error("Can not find a free port for BSPPeer.", ioe); + } + } + + public void shutdown() { + shutdownBSPCluster(); + scheduler.shutdown(); + } + + public List getGroomServerThreads() { + List list = new ArrayList(); + for(GroomServerRunner gsr: groomServerList){ + list.add(new Thread(gsr)); + } + return list; + } + + public Thread getMaster() { + return new Thread(this.master); + } + + public List getGroomServers(){ + List list = new ArrayList(); + for(GroomServerRunner gsr: groomServerList){ + list.add(gsr.getGroomServer()); + } + return list; + } + + public BSPMaster getBSPMaster(){ + if(null != this.master) + return this.master.getMaster(); + return null; + } + + public ScheduledExecutorService getScheduler(){ + return this.scheduler; + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,94 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hama.Constants; +import org.apache.hama.HamaCluster; +import org.apache.hama.HamaConfiguration; + +public class TestBSPMasterGroomServer extends HamaCluster { + + private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class); + private static String TMP_OUTPUT = "/tmp/test-example/"; + private HamaConfiguration configuration; + private String TEST_JOB = "src/test/java/testjar/testjob.jar"; + + public TestBSPMasterGroomServer() { + configuration = new HamaConfiguration(); + configuration.set("bsp.master.address", "localhost"); + assertEquals("Make sure master addr is set to localhost:", "localhost", + configuration.get("bsp.master.address")); + configuration.setStrings("bsp.local.dir", "/tmp/hama-test"); + configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost"); + configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810); + } + + public void setUp() throws Exception { + super.setUp(); + } + + public void testSubmitJob() throws Exception { + BSPJob bsp = new BSPJob(configuration); + bsp.setJobName("Test Serialize Printing"); + bsp.setBspClass(testjar.ClassSerializePrinting.HelloBSP.class); + bsp.setJar(System.getProperty("user.dir")+"/"+TEST_JOB); + + // Set the task size as a number of GroomServer + BSPJobClient jobClient = new BSPJobClient(configuration); + configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600); + ClusterStatus cluster = jobClient.getClusterStatus(false); + assertEquals(this.numOfGroom, cluster.getMaxTasks()); + + // TODO test the multi-tasks + bsp.setNumBspTask(1); + + FileSystem fileSys = FileSystem.get(conf); + + if (bsp.waitForCompletion(true)) { + checkOutput(fileSys, cluster, conf); + } + LOG.info("Client finishes execution job."); + } + + private static void checkOutput(FileSystem fileSys, ClusterStatus cluster, + HamaConfiguration conf) throws Exception { + for (int i = 0; i < 1; i++) { // TODO test the multi-tasks + SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path( + TMP_OUTPUT + i), conf); + LongWritable timestamp = new LongWritable(); + Text message = new Text(); + reader.next(timestamp, message); + assertTrue("Check if `Hello BSP' gets printed.", message.toString() + .indexOf("Hello BSP from") >= 0); + reader.close(); + } + } + + public void tearDown() throws Exception { + super.tearDown(); + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,81 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import junit.framework.TestCase; + +public class TestBSPMessageBundle extends TestCase { + + public void testEmpty() throws IOException { + BSPMessageBundle bundle = new BSPMessageBundle(); + // Serialize it. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + bundle.write(new DataOutputStream(baos)); + baos.close(); + // Deserialize it. + BSPMessageBundle readBundle = new BSPMessageBundle(); + readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos + .toByteArray()))); + assertEquals(0, readBundle.getMessages().size()); + } + + public void testSerializationDeserialization() throws IOException { + BSPMessageBundle bundle = new BSPMessageBundle(); + ByteMessage[] testMessages = new ByteMessage[16]; + for (int i = 0; i < testMessages.length; ++i) { + // Create a one byte tag containing the number of the message. + byte[] tag = new byte[1]; + tag[0] = (byte) i; + // Create a four bytes data part containing serialized number of the + // message. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + baos.write(i); + baos.close(); + byte[] data = baos.toByteArray(); + testMessages[i] = new ByteMessage(tag, data); + bundle.addMessage(testMessages[i]); + } + // Serialize it. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + bundle.write(new DataOutputStream(baos)); + baos.close(); + // Deserialize it. + BSPMessageBundle readBundle = new BSPMessageBundle(); + readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos + .toByteArray()))); + // Check contents. + int messageNumber = 0; + for (BSPMessage message : readBundle.getMessages()) { + ByteMessage byteMessage = (ByteMessage) message; + assertTrue(Arrays.equals(testMessages[messageNumber].getTag(), + byteMessage.getTag())); + assertTrue(Arrays.equals(testMessages[messageNumber].getData(), + byteMessage.getData())); + ++messageNumber; + } + assertEquals(testMessages.length, messageNumber); + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hama.bsp.BSPMaster; +import org.apache.hama.bsp.ClusterStatus; + +public class TestClusterStatus extends TestCase { + Random rnd = new Random(); + + protected void setUp() throws Exception { + super.setUp(); + } + + public final void testWriteAndReadFields() throws IOException { + DataOutputBuffer out = new DataOutputBuffer(); + DataInputBuffer in = new DataInputBuffer(); + + ClusterStatus status1; + Map grooms = new HashMap(); + + for (int i = 0; i < 10; i++) { + int num = rnd.nextInt(); + String groomName = "groom_" + num; + String peerName = "peerhost:" + num; + grooms.put(groomName, peerName); + } + + int tasks = rnd.nextInt(100); + int maxTasks = rnd.nextInt(100); + BSPMaster.State state = BSPMaster.State.RUNNING; + + status1 = new ClusterStatus(grooms, tasks, maxTasks, state); + status1.write(out); + + in.reset(out.getData(), out.getLength()); + + ClusterStatus status2 = new ClusterStatus(); + status2.readFields(in); + + Map grooms_s = new HashMap(status1 + .getActiveGroomNames()); + Map grooms_o = new HashMap(status2 + .getActiveGroomNames()); + + assertEquals(status1.getGroomServers(), status2.getGroomServers()); + + assertTrue(grooms_s.entrySet().containsAll(grooms_o.entrySet())); + assertTrue(grooms_o.entrySet().containsAll(grooms_s.entrySet())); + + assertEquals(status1.getTasks(), status2.getTasks()); + assertEquals(status1.getMaxTasks(), status2.getMaxTasks()); + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestMessages.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestMessages.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestMessages.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestMessages.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import junit.framework.TestCase; + +import org.apache.hama.util.Bytes; + +public class TestMessages extends TestCase { + + public void testByteMessage() { + int dataSize = (int) (Runtime.getRuntime().maxMemory() * 0.60); + ByteMessage msg = new ByteMessage(Bytes.toBytes("tag"), new byte[dataSize]); + assertEquals(msg.getData().length, dataSize); + msg = null; + + byte[] dummyData = new byte[1024]; + ByteMessage msg2 = new ByteMessage(Bytes.tail(dummyData, 128), dummyData); + assertEquals( + Bytes.compareTo(msg2.getTag(), 0, 128, msg2.getData(), + msg2.getData().length - 128, 128), 0); + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.StringUtils; + +public class TestIPC extends TestCase { + public static final Log LOG = LogFactory.getLog(TestIPC.class); + + final private static Configuration conf = new Configuration(); + final static private int PING_INTERVAL = 1000; + + static { + Client.setPingInterval(conf, PING_INTERVAL); + } + + public TestIPC(String name) { + super(name); + } + + private static final Random RANDOM = new Random(); + + private static final String ADDRESS = "0.0.0.0"; + + private static class TestServer extends Server { + private boolean sleep; + + public TestServer(int handlerCount, boolean sleep) throws IOException { + super(ADDRESS, 0, LongWritable.class, handlerCount, conf); + this.sleep = sleep; + } + + @Override + public Writable call(Class protocol, Writable param, long receiveTime) + throws IOException { + if (sleep) { + try { + Thread.sleep(RANDOM.nextInt(2 * PING_INTERVAL)); // sleep a bit + } catch (InterruptedException e) { + } + } + return param; // echo param as result + } + } + + private static class SerialCaller extends Thread { + private Client client; + private InetSocketAddress server; + private int count; + private boolean failed; + + public SerialCaller(Client client, InetSocketAddress server, int count) { + this.client = client; + this.server = server; + this.count = count; + } + + public void run() { + for (int i = 0; i < count; i++) { + try { + LongWritable param = new LongWritable(RANDOM.nextLong()); + LongWritable value = (LongWritable) client.call(param, server, null, + null); + if (!param.equals(value)) { + LOG.fatal("Call failed!"); + failed = true; + break; + } + } catch (Exception e) { + LOG.fatal("Caught: " + StringUtils.stringifyException(e)); + failed = true; + } + } + } + } + + private static class ParallelCaller extends Thread { + private Client client; + private int count; + private InetSocketAddress[] addresses; + private boolean failed; + + public ParallelCaller(Client client, InetSocketAddress[] addresses, + int count) { + this.client = client; + this.addresses = addresses; + this.count = count; + } + + public void run() { + for (int i = 0; i < count; i++) { + try { + Writable[] params = new Writable[addresses.length]; + for (int j = 0; j < addresses.length; j++) + params[j] = new LongWritable(RANDOM.nextLong()); + Writable[] values = client.call(params, addresses, null, null); + for (int j = 0; j < addresses.length; j++) { + if (!params[j].equals(values[j])) { + LOG.fatal("Call failed!"); + failed = true; + break; + } + } + } catch (Exception e) { + LOG.fatal("Caught: " + StringUtils.stringifyException(e)); + failed = true; + } + } + } + } + + public void testSerial() throws Exception { + testSerial(3, false, 2, 5, 100); + } + + public void testSerial(int handlerCount, boolean handlerSleep, + int clientCount, int callerCount, int callCount) throws Exception { + Server server = new TestServer(handlerCount, handlerSleep); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + Client[] clients = new Client[clientCount]; + for (int i = 0; i < clientCount; i++) { + clients[i] = new Client(LongWritable.class, conf); + } + + SerialCaller[] callers = new SerialCaller[callerCount]; + for (int i = 0; i < callerCount; i++) { + callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount); + callers[i].start(); + } + for (int i = 0; i < callerCount; i++) { + callers[i].join(); + assertFalse(callers[i].failed); + } + for (int i = 0; i < clientCount; i++) { + clients[i].stop(); + } + server.stop(); + } + + public void testParallel() throws Exception { + testParallel(10, false, 2, 4, 2, 4, 100); + } + + public void testParallel(int handlerCount, boolean handlerSleep, + int serverCount, int addressCount, int clientCount, int callerCount, + int callCount) throws Exception { + Server[] servers = new Server[serverCount]; + for (int i = 0; i < serverCount; i++) { + servers[i] = new TestServer(handlerCount, handlerSleep); + servers[i].start(); + } + + InetSocketAddress[] addresses = new InetSocketAddress[addressCount]; + for (int i = 0; i < addressCount; i++) { + addresses[i] = NetUtils.getConnectAddress(servers[i % serverCount]); + } + + Client[] clients = new Client[clientCount]; + for (int i = 0; i < clientCount; i++) { + clients[i] = new Client(LongWritable.class, conf); + } + + ParallelCaller[] callers = new ParallelCaller[callerCount]; + for (int i = 0; i < callerCount; i++) { + callers[i] = new ParallelCaller(clients[i % clientCount], addresses, + callCount); + callers[i].start(); + } + for (int i = 0; i < callerCount; i++) { + callers[i].join(); + assertFalse(callers[i].failed); + } + for (int i = 0; i < clientCount; i++) { + clients[i].stop(); + } + for (int i = 0; i < serverCount; i++) { + servers[i].stop(); + } + } + + public void testStandAloneClient() throws Exception { + testParallel(10, false, 2, 4, 2, 4, 100); + Client client = new Client(LongWritable.class, conf); + InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10); + try { + client.call(new LongWritable(RANDOM.nextLong()), address, null, null); + fail("Expected an exception to have been thrown"); + } catch (IOException e) { + String message = e.getMessage(); + String addressText = address.toString(); + assertTrue("Did not find " + addressText + " in " + message, message + .contains(addressText)); + Throwable cause = e.getCause(); + assertNotNull("No nested exception in " + e, cause); + String causeText = cause.getMessage(); + assertTrue("Did not find " + causeText + " in " + message, message + .contains(causeText)); + } + } + +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ipc; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.VersionedProtocol; + +import junit.framework.TestCase; + +public class TestRPC extends TestCase { + private static final int PORT = 1234; + private static final String ADDRESS = "0.0.0.0"; + + public static final Log LOG = LogFactory + .getLog("org.apache.hadoop.ipc.TestRPC"); + + private static Configuration conf = new Configuration(); + + public TestRPC(String name) { + super(name); + } + + public interface TestProtocol extends VersionedProtocol { + public static final long versionID = 1L; + + void ping() throws IOException; + + String echo(String value) throws IOException; + + String[] echo(String[] value) throws IOException; + + Writable echo(Writable value) throws IOException; + + int add(int v1, int v2) throws IOException; + + int add(int[] values) throws IOException; + + int error() throws IOException; + + void testServerGet() throws IOException; + } + + public class TestImpl implements TestProtocol { + + public long getProtocolVersion(String protocol, long clientVersion) { + return TestProtocol.versionID; + } + + public void ping() { + } + + public String echo(String value) throws IOException { + return value; + } + + public String[] echo(String[] values) throws IOException { + return values; + } + + public Writable echo(Writable writable) { + return writable; + } + + public int add(int v1, int v2) { + return v1 + v2; + } + + public int add(int[] values) { + int sum = 0; + for (int i = 0; i < values.length; i++) { + sum += values[i]; + } + return sum; + } + + public int error() throws IOException { + throw new IOException("bobo"); + } + + public void testServerGet() throws IOException { + if (!(Server.get() instanceof RPC.Server)) { + throw new IOException("Server.get() failed"); + } + } + + } + + public void testCalls() throws Exception { + Server server = RPC.getServer(new TestImpl(), ADDRESS, PORT, conf); + server.start(); + + InetSocketAddress addr = new InetSocketAddress(PORT); + TestProtocol proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, addr, conf); + + proxy.ping(); + + String stringResult = proxy.echo("foo"); + assertEquals(stringResult, "foo"); + + stringResult = proxy.echo((String) null); + assertEquals(stringResult, null); + + String[] stringResults = proxy.echo(new String[] { "foo", "bar" }); + assertTrue(Arrays.equals(stringResults, new String[] { "foo", "bar" })); + + stringResults = proxy.echo((String[]) null); + assertTrue(Arrays.equals(stringResults, null)); + + int intResult = proxy.add(1, 2); + assertEquals(intResult, 3); + + intResult = proxy.add(new int[] { 1, 2 }); + assertEquals(intResult, 3); + + boolean caught = false; + try { + proxy.error(); + } catch (IOException e) { + LOG.debug("Caught " + e); + caught = true; + } + assertTrue(caught); + + proxy.testServerGet(); + + // try some multi-calls + Method echo = TestProtocol.class.getMethod("echo", + new Class[] { String.class }); + String[] strings = (String[]) RPC.call(echo, new String[][] { { "a" }, + { "b" } }, new InetSocketAddress[] { addr, addr }, null, conf); + assertTrue(Arrays.equals(strings, new String[] { "a", "b" })); + + Method ping = TestProtocol.class.getMethod("ping", new Class[] {}); + Object[] voids = (Object[]) RPC.call(ping, new Object[][] { {}, {} }, + new InetSocketAddress[] { addr, addr }, null, conf); + assertEquals(voids, null); + + server.stop(); + } + + public static void main(String[] args) throws Exception { + + new TestRPC("test").testCalls(); + + } + +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import java.io.IOException; +import java.util.Arrays; + +import junit.framework.TestCase; + +public class TestBytes extends TestCase { + public void testNullHashCode() { + byte [] b = null; + Exception ee = null; + try { + Bytes.hashCode(b); + } catch (Exception e) { + ee = e; + } + assertNotNull(ee); + } + + public void testSplit() throws Exception { + byte [] lowest = Bytes.toBytes("AAA"); + byte [] middle = Bytes.toBytes("CCC"); + byte [] highest = Bytes.toBytes("EEE"); + byte [][] parts = Bytes.split(lowest, highest, 1); + for (int i = 0; i < parts.length; i++) { + System.out.println(Bytes.toString(parts[i])); + } + assertEquals(3, parts.length); + assertTrue(Bytes.equals(parts[1], middle)); + // Now divide into three parts. Change highest so split is even. + highest = Bytes.toBytes("DDD"); + parts = Bytes.split(lowest, highest, 2); + for (int i = 0; i < parts.length; i++) { + System.out.println(Bytes.toString(parts[i])); + } + assertEquals(4, parts.length); + // Assert that 3rd part is 'CCC'. + assertTrue(Bytes.equals(parts[2], middle)); + } + + public void testSplit2() throws Exception { + // More split tests. + byte [] lowest = Bytes.toBytes("http://A"); + byte [] highest = Bytes.toBytes("http://z"); + byte [] middle = Bytes.toBytes("http://]"); + byte [][] parts = Bytes.split(lowest, highest, 1); + for (int i = 0; i < parts.length; i++) { + System.out.println(Bytes.toString(parts[i])); + } + assertEquals(3, parts.length); + assertTrue(Bytes.equals(parts[1], middle)); + } + + public void testToLong() throws Exception { + long [] longs = {-1l, 123l, 122232323232l}; + for (int i = 0; i < longs.length; i++) { + byte [] b = Bytes.toBytes(longs[i]); + assertEquals(longs[i], Bytes.toLong(b)); + } + } + + public void testToFloat() throws Exception { + float [] floats = {-1f, 123.123f, Float.MAX_VALUE}; + for (int i = 0; i < floats.length; i++) { + byte [] b = Bytes.toBytes(floats[i]); + assertEquals(floats[i], Bytes.toFloat(b)); + } + } + + public void testToDouble() throws Exception { + double [] doubles = {Double.MIN_VALUE, Double.MAX_VALUE}; + for (int i = 0; i < doubles.length; i++) { + byte [] b = Bytes.toBytes(doubles[i]); + assertEquals(doubles[i], Bytes.toDouble(b)); + } + } + + public void testBinarySearch() throws Exception { + byte [][] arr = { + {1}, + {3}, + {5}, + {7}, + {9}, + {11}, + {13}, + {15}, + }; + byte [] key1 = {3,1}; + byte [] key2 = {4,9}; + byte [] key2_2 = {4}; + byte [] key3 = {5,11}; + + assertEquals(1, Bytes.binarySearch(arr, key1, 0, 1, + Bytes.BYTES_RAWCOMPARATOR)); + assertEquals(0, Bytes.binarySearch(arr, key1, 1, 1, + Bytes.BYTES_RAWCOMPARATOR)); + assertEquals(-(2+1), Arrays.binarySearch(arr, key2_2, + Bytes.BYTES_COMPARATOR)); + assertEquals(-(2+1), Bytes.binarySearch(arr, key2, 0, 1, + Bytes.BYTES_RAWCOMPARATOR)); + assertEquals(4, Bytes.binarySearch(arr, key2, 1, 1, + Bytes.BYTES_RAWCOMPARATOR)); + assertEquals(2, Bytes.binarySearch(arr, key3, 0, 1, + Bytes.BYTES_RAWCOMPARATOR)); + assertEquals(5, Bytes.binarySearch(arr, key3, 1, 1, + Bytes.BYTES_RAWCOMPARATOR)); + } + + public void testIncrementBytes() throws IOException { + + assertTrue(checkTestIncrementBytes(10, 1)); + assertTrue(checkTestIncrementBytes(12, 123435445)); + assertTrue(checkTestIncrementBytes(124634654, 1)); + assertTrue(checkTestIncrementBytes(10005460, 5005645)); + assertTrue(checkTestIncrementBytes(1, -1)); + assertTrue(checkTestIncrementBytes(10, -1)); + assertTrue(checkTestIncrementBytes(10, -5)); + assertTrue(checkTestIncrementBytes(1005435000, -5)); + assertTrue(checkTestIncrementBytes(10, -43657655)); + assertTrue(checkTestIncrementBytes(-1, 1)); + assertTrue(checkTestIncrementBytes(-26, 5034520)); + assertTrue(checkTestIncrementBytes(-10657200, 5)); + assertTrue(checkTestIncrementBytes(-12343250, 45376475)); + assertTrue(checkTestIncrementBytes(-10, -5)); + assertTrue(checkTestIncrementBytes(-12343250, -5)); + assertTrue(checkTestIncrementBytes(-12, -34565445)); + assertTrue(checkTestIncrementBytes(-1546543452, -34565445)); + } + + private static boolean checkTestIncrementBytes(long val, long amount) + throws IOException { + byte[] value = Bytes.toBytes(val); + byte [] testValue = {-1, -1, -1, -1, -1, -1, -1, -1}; + if (value[0] > 0) { + testValue = new byte[Bytes.SIZEOF_LONG]; + } + System.arraycopy(value, 0, testValue, testValue.length - value.length, + value.length); + + long incrementResult = Bytes.toLong(Bytes.incrementBytes(value, amount)); + + return (Bytes.toLong(testValue) + amount) == incrementResult; + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestNumeric.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestNumeric.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestNumeric.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestNumeric.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,34 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import junit.framework.TestCase; + +public class TestNumeric extends TestCase { + final static int TEST_INT = 3; + final static double TEST_DOUBLE = 0.4; + + /** + * Double conversion test + */ + public void testDouble() { + assertEquals(Bytes.toDouble(Bytes.toBytes(TEST_DOUBLE)), TEST_DOUBLE); + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,74 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import org.apache.log4j.Logger; + +import junit.framework.TestCase; + +/** + * Random variable generation test + */ +public class TestRandomVariable extends TestCase { + static final Logger LOG = Logger.getLogger(TestRandomVariable.class); + final static int COUNT = 50; + + /** + * Random object test + * + * @throws Exception + */ + public void testRand() throws Exception { + for (int i = 0; i < COUNT; i++) { + double result = RandomVariable.rand(); + assertTrue(result >= 0.0d && result <= 1.0); + } + } + + /** + * Random integer test + * + * @throws Exception + */ + public void testRandInt() throws Exception { + final int min = 122; + final int max = 561; + + for (int i = 0; i < COUNT; i++) { + int result = RandomVariable.randInt(min, max); + assertTrue(result >= min && result <= max); + } + } + + /** + * Uniform test + * + * @throws Exception + */ + public void testUniform() throws Exception { + final double min = 1.0d; + final double max = 3.0d; + + for (int i = 0; i < COUNT; i++) { + double result = RandomVariable.uniform(min, max); + assertTrue(result >= min && result <= max); + } + } +} Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.zookeeper; + +import org.apache.hama.Constants; +import org.apache.hama.HamaConfiguration; + +import junit.framework.TestCase; + +public class TestZKTools extends TestCase { + + public void testZKProps() { + HamaConfiguration conf = new HamaConfiguration(); + conf.set(Constants.ZOOKEEPER_QUORUM, "test.com:123"); + conf.set(Constants.ZOOKEEPER_CLIENT_PORT, "2222"); + + assertEquals("test.com:2222", QuorumPeer.getZKQuorumServersString(conf)); + } +} Added: incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java (added) +++ incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package testjar; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPPeer; +import org.apache.zookeeper.KeeperException; + +public class ClassSerializePrinting { + private static String TMP_OUTPUT = "/tmp/test-example/"; + + public static class HelloBSP extends BSP { + public static final Log LOG = LogFactory.getLog(HelloBSP.class); + private Configuration conf; + private final static int PRINT_INTERVAL = 1000; + private FileSystem fileSys; + private int num; + + public void bsp(BSPPeer bspPeer) throws IOException, + KeeperException, InterruptedException { + + int i = 0; + for (String otherPeer : bspPeer.getAllPeerNames()) { + String peerName = bspPeer.getPeerName(); + if (peerName.equals(otherPeer)) { + writeLogToFile(peerName, i); + } + + Thread.sleep(PRINT_INTERVAL); + bspPeer.sync(); + i++; + } + } + + private void writeLogToFile(String string, int i) throws IOException { + SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, + new Path(TMP_OUTPUT + i), LongWritable.class, Text.class, + CompressionType.NONE); + writer.append(new LongWritable(System.currentTimeMillis()), new Text( + "Hello BSP from " + (i + 1) + " of " + num + ": " + string)); + writer.close(); + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + num = Integer.parseInt(conf.get("bsp.peers.num")); + try { + fileSys = FileSystem.get(conf); + } catch (IOException e) { + e.printStackTrace(); + } + } + + } + +} Added: incubator/hama/trunk/core/src/test/java/testjar/testjob.jar URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/testjar/testjob.jar?rev=1152788&view=auto ============================================================================== Binary file - no diff available. Propchange: incubator/hama/trunk/core/src/test/java/testjar/testjob.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Propchange: incubator/hama/trunk/examples/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Mon Aug 1 14:12:46 2011 @@ -0,0 +1,3 @@ +*.iml + +target Added: incubator/hama/trunk/examples/pom.xml URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/pom.xml?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/examples/pom.xml (added) +++ incubator/hama/trunk/examples/pom.xml Mon Aug 1 14:12:46 2011 @@ -0,0 +1,76 @@ + + + + + + org.apache.hama + hama-parent + 0.4.0-incubating-SNAPSHOT + + + 4.0.0 + org.apache.hama + hama-examples + Apache Hama Examples + 0.4.0-incubating-SNAPSHOT + jar + + + + org.apache.hama + hama-core + ${project.version} + + + + hama-examples-${project.version} + + + + maven-surefire-plugin + + true + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 1.4 + + + package + + shade + + + + + org.apache.hama.examples.ExampleDriver + + + + + + + + + Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (added) +++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,42 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import org.apache.hadoop.util.ProgramDriver; +import org.apache.hama.examples.graph.PageRank; +import org.apache.hama.examples.graph.ShortestPaths; + +public class ExampleDriver { + + public static void main(String[] args) { + ProgramDriver pgd = new ProgramDriver(); + try { + pgd.addClass("pi", PiEstimator.class, "Pi Estimator"); + pgd.addClass("bench", RandBench.class, "Random Communication Benchmark"); + pgd.addClass("test", SerializePrinting.class, "Serialize Printing Test"); + pgd.addClass("sssp", ShortestPaths.class, "Single Source Shortest Path"); + pgd.addClass("pagerank", PageRank.class, "PageRank"); + + pgd.driver(args); + } catch (Throwable e) { + e.printStackTrace(); + } + } +} Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (added) +++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.bsp.DoubleMessage; +import org.apache.zookeeper.KeeperException; + +public class PiEstimator { + private static String MASTER_TASK = "master.task."; + private static Path TMP_OUTPUT = new Path("/tmp/pi-example/output"); + + public static class MyEstimator extends BSP { + public static final Log LOG = LogFactory.getLog(MyEstimator.class); + private Configuration conf; + private String masterTask; + private static final int iterations = 10000; + + public void bsp(BSPPeer bspPeer) throws IOException, + KeeperException, InterruptedException { + int in = 0, out = 0; + for (int i = 0; i < iterations; i++) { + double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0; + if ((Math.sqrt(x * x + y * y) < 1.0)) { + in++; + } else { + out++; + } + } + + double data = 4.0 * (double) in / (double) iterations; + DoubleMessage estimate = new DoubleMessage(bspPeer.getPeerName(), data); + + bspPeer.send(masterTask, estimate); + bspPeer.sync(); + + if (bspPeer.getPeerName().equals(masterTask)) { + double pi = 0.0; + int numPeers = bspPeer.getNumCurrentMessages(); + DoubleMessage received; + while ((received = (DoubleMessage) bspPeer.getCurrentMessage()) != null) { + pi += received.getData(); + } + + pi = pi / numPeers; + writeResult(pi); + } + } + + private void writeResult(double pi) throws IOException { + FileSystem fileSys = FileSystem.get(conf); + + SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, + TMP_OUTPUT, DoubleWritable.class, DoubleWritable.class, + CompressionType.NONE); + writer.append(new DoubleWritable(pi), new DoubleWritable(0)); + writer.close(); + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + this.masterTask = conf.get(MASTER_TASK); + } + + } + + private static void initTempDir(FileSystem fileSys) throws IOException { + if (fileSys.exists(TMP_OUTPUT)) { + fileSys.delete(TMP_OUTPUT, true); + } + } + + private static void printOutput(FileSystem fileSys, HamaConfiguration conf) + throws IOException { + SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, TMP_OUTPUT, + conf); + DoubleWritable output = new DoubleWritable(); + DoubleWritable zero = new DoubleWritable(); + reader.next(output, zero); + reader.close(); + + System.out.println("Estimated value of PI is " + output); + } + + public static void main(String[] args) throws InterruptedException, + IOException, ClassNotFoundException { + // BSP job configuration + HamaConfiguration conf = new HamaConfiguration(); + + BSPJob bsp = new BSPJob(conf, PiEstimator.class); + // Set the job name + bsp.setJobName("Pi Estimation Example"); + bsp.setBspClass(MyEstimator.class); + + BSPJobClient jobClient = new BSPJobClient(conf); + ClusterStatus cluster = jobClient.getClusterStatus(true); + + if (args.length > 0) { + bsp.setNumBspTask(Integer.parseInt(args[0])); + } else { + // Set to maximum + bsp.setNumBspTask(cluster.getMaxTasks()); + } + + // Choose one as a master + for (String peerName : cluster.getActiveGroomNames().values()) { + conf.set(MASTER_TASK, peerName); + break; + } + + FileSystem fileSys = FileSystem.get(conf); + initTempDir(fileSys); + + long startTime = System.currentTimeMillis(); + + if (bsp.waitForCompletion(true)) { + printOutput(fileSys, conf); + + System.out.println("Job Finished in " + + (double) (System.currentTimeMillis() - startTime) / 1000.0 + + " seconds"); + } + } +} Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (added) +++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.BSPMessage; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.ByteMessage; +import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.util.Bytes; +import org.apache.zookeeper.KeeperException; + +public class RandBench { + private static final String SIZEOFMSG = "msg.size"; + private static final String N_COMMUNICATIONS = "communications.num"; + private static final String N_SUPERSTEPS = "supersteps.num"; + + public static class RandBSP extends BSP { + public static final Log LOG = LogFactory.getLog(RandBSP.class); + private Configuration conf; + private Random r = new Random(); + private int sizeOfMsg; + private int nCommunications; + private int nSupersteps; + + @Override + public void bsp(BSPPeer bspPeer) throws IOException, + KeeperException, InterruptedException { + byte[] dummyData = new byte[sizeOfMsg]; + BSPMessage msg = null; + String[] peers = bspPeer.getAllPeerNames(); + String peerName = bspPeer.getPeerName(); + + for (int i = 0; i < nSupersteps; i++) { + + for (int j = 0; j < nCommunications; j++) { + String tPeer = peers[r.nextInt(peers.length)]; + String tag = peerName + " to " + tPeer; + msg = new ByteMessage(Bytes.toBytes(tag), dummyData); + bspPeer.send(tPeer, msg); + } + + bspPeer.sync(); + + ByteMessage received; + while ((received = (ByteMessage) bspPeer.getCurrentMessage()) != null) { + LOG.info(Bytes.toString(received.getTag()) + " : " + + received.getData().length); + } + + } + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1); + this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1); + this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1); + } + + @Override + public Configuration getConf() { + return conf; + } + + } + + public static void main(String[] args) throws Exception { + if (args.length < 3) { + System.out.println("Usage: "); + System.exit(-1); + } + + // BSP job configuration + HamaConfiguration conf = new HamaConfiguration(); + + conf.setInt(SIZEOFMSG, Integer.parseInt(args[0])); + conf.setInt(N_COMMUNICATIONS, Integer.parseInt(args[1])); + conf.setInt(N_SUPERSTEPS, Integer.parseInt(args[2])); + + BSPJob bsp = new BSPJob(conf, RandBench.class); + // Set the job name + bsp.setJobName("Random Communication Benchmark"); + bsp.setBspClass(RandBSP.class); + + // Set the task size as a number of GroomServer + BSPJobClient jobClient = new BSPJobClient(conf); + ClusterStatus cluster = jobClient.getClusterStatus(false); + bsp.setNumBspTask(cluster.getMaxTasks()); + + long startTime = System.currentTimeMillis(); + bsp.waitForCompletion(true); + System.out.println("Job Finished in " + + (double) (System.currentTimeMillis() - startTime) / 1000.0 + + " seconds"); + } +} Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java?rev=1152788&view=auto ============================================================================== --- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java (added) +++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java Mon Aug 1 14:12:46 2011 @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.IOException; +import java.util.Date; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.ClusterStatus; +import org.apache.zookeeper.KeeperException; + +public class SerializePrinting { + private static String TMP_OUTPUT = "/tmp/test-example/"; + + public static class HelloBSP extends BSP { + public static final Log LOG = LogFactory.getLog(HelloBSP.class); + private Configuration conf; + private final static int PRINT_INTERVAL = 1000; + private FileSystem fileSys; + private int num; + + public void bsp(BSPPeer bspPeer) throws IOException, + KeeperException, InterruptedException { + + int i = 0; + for (String otherPeer : bspPeer.getAllPeerNames()) { + String peerName = bspPeer.getPeerName(); + if (peerName.equals(otherPeer)) { + writeLogToFile(peerName, i); + } + + Thread.sleep(PRINT_INTERVAL); + bspPeer.sync(); + i++; + } + } + + private void writeLogToFile(String string, int i) throws IOException { + SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, + new Path(TMP_OUTPUT + i), LongWritable.class, Text.class, + CompressionType.NONE); + writer.append(new LongWritable(System.currentTimeMillis()), new Text( + "Hello BSP from " + (i + 1) + " of " + num + ": " + string)); + writer.close(); + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + num = Integer.parseInt(conf.get("bsp.peers.num")); + try { + fileSys = FileSystem.get(conf); + } catch (IOException e) { + e.printStackTrace(); + } + } + + } + + private static void printOutput(FileSystem fileSys, ClusterStatus cluster, + HamaConfiguration conf) throws IOException { + System.out.println("Each task printed the \"Hello World\" as below:"); + for (int i = 0; i < cluster.getGroomServers(); i++) { + SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path( + TMP_OUTPUT + i), conf); + LongWritable timestamp = new LongWritable(); + Text message = new Text(); + reader.next(timestamp, message); + System.out.println(new Date(timestamp.get()) + ": " + message); + reader.close(); + } + } + + private static void initTempDir(FileSystem fileSys) throws IOException { + if (fileSys.exists(new Path(TMP_OUTPUT))) { + fileSys.delete(new Path(TMP_OUTPUT), true); + } + } + + public static void main(String[] args) throws InterruptedException, + IOException, ClassNotFoundException { + // BSP job configuration + HamaConfiguration conf = new HamaConfiguration(); + + BSPJob bsp = new BSPJob(conf, SerializePrinting.class); + // Set the job name + bsp.setJobName("Serialize Printing"); + bsp.setBspClass(HelloBSP.class); + + // Set the task size as a number of GroomServer + BSPJobClient jobClient = new BSPJobClient(conf); + ClusterStatus cluster = jobClient.getClusterStatus(false); + bsp.setNumBspTask(cluster.getGroomServers()); + + FileSystem fileSys = FileSystem.get(conf); + initTempDir(fileSys); + + if (bsp.waitForCompletion(true)) { + printOutput(fileSys, cluster, conf); + } + } + +}