hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1432807 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/BSPJobClient.java examples/src/test/java/org/apache/hama/examples/PageRankTest.java graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Date Mon, 14 Jan 2013 05:37:25 GMT
Author: edwardyoon
Date: Mon Jan 14 05:37:25 2013
New Revision: 1432807

URL: http://svn.apache.org/viewvc?rev=1432807&view=rev
Log:
HAMA-712: PartitioningRunner should works for multiple input files

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1432807&r1=1432806&r2=1432807&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Jan 14 05:37:25 2013
@@ -11,6 +11,8 @@ Release 0.7 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-712: PartitioningRunner should works for multiple input files (surajsmenon via edwardyoon)
+
   IMPROVEMENTS
 
    HAMA-531: Reimplementation of partitioner (edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1432807&r1=1432806&r2=1432807&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Jan 14 05:37:25
2013
@@ -412,10 +412,13 @@ public class BSPJobClient extends Config
           fs.delete(partitionDir, true);
         }
 
+        if (numTasks == 0) {
+          numTasks = numSplits;
+        }
+
         HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
 
-        conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT,
-            Integer.parseInt(job.getConfiguration().get("bsp.peers.num")));
+        conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, numTasks);
         if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
           conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
               .get(Constants.RUNTIME_PARTITIONING_DIR));

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1432807&r1=1432806&r2=1432807&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Mon Jan 14
05:37:25 2013
@@ -71,7 +71,7 @@ public class PageRankTest extends TestCa
     try {
       HamaConfiguration conf = new HamaConfiguration(new Configuration());
       conf.set("bsp.local.tasks.maximum", "10");
-      conf.set("bsp.peers.num", "7");
+      conf.setInt("bsp.peers.num", 7);
       conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
       GraphJob pageJob = PageRank.createJob(
           new String[] { INPUT, OUTPUT, "7" }, conf);

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1432807&r1=1432806&r2=1432807&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Mon Jan 14
05:37:25 2013
@@ -65,7 +65,6 @@ public class TestSubmitGraphJob extends 
     configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
     ClusterStatus cluster = jobClient.getClusterStatus(false);
     assertEquals(this.numOfGroom, cluster.getGroomServers());
-    bsp.setNumBspTask(2);
     LOG.info("Client finishes execution job.");
     bsp.setJobName("Pagerank");
     bsp.setVertexClass(PageRank.PageRankVertex.class);
@@ -119,12 +118,13 @@ public class TestSubmitGraphJob extends 
     assertTrue(sum > 0.9d && sum <= 1.1d);
   }
 
+
   private void generateTestData() {
     try {
-      SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(),
-          new Path(INPUT), PageRankVertex.class, NullWritable.class);
+      SequenceFile.Writer writer1 = SequenceFile.createWriter(fs, getConf(),
+          new Path(INPUT+"/part0"), PageRankVertex.class, NullWritable.class);
 
-      for (int i = 0; i < input.length; i++) {
+      for (int i = 0; i < input.length/2; i++) {
         String[] x = input[i].split("\t");
 
         PageRankVertex vertex = new PageRankVertex();
@@ -133,15 +133,33 @@ public class TestSubmitGraphJob extends 
           vertex.addEdge(new Edge<Text, NullWritable>(new Text(x[j]),
               NullWritable.get()));
         }
-        writer.append(vertex, NullWritable.get());
+        writer1.append(vertex, NullWritable.get());
       }
 
-      writer.close();
+      writer1.close();
+      
+      SequenceFile.Writer writer2 = SequenceFile.createWriter(fs, getConf(),
+          new Path(INPUT+"/part1"), PageRankVertex.class, NullWritable.class);
+
+      for (int i = 0; i < input.length/2 + 1; i++) {
+        String[] x = input[i].split("\t");
+
+        PageRankVertex vertex = new PageRankVertex();
+        vertex.setVertexID(new Text(x[0]));
+        for (int j = 1; j < x.length; j++) {
+          vertex.addEdge(new Edge<Text, NullWritable>(new Text(x[j]),
+              NullWritable.get()));
+        }
+        writer2.append(vertex, NullWritable.get());
+      }
+
+      writer2.close();
+      
     } catch (IOException e) {
       e.printStackTrace();
     }
   }
-
+  
   private void deleteTempDirs() {
     try {
       if (fs.exists(new Path(INPUT)))



Mime
View raw message