hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1420530 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apach...
Date Wed, 12 Dec 2012 05:32:55 GMT
Author: edwardyoon
Date: Wed Dec 12 05:32:53 2012
New Revision: 1420530

URL: http://svn.apache.org/viewvc?rev=1420530&view=rev
Log:
Reimplementation of partitioner

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Dec 12 05:32:53 2012
@@ -10,6 +10,8 @@ Release 0.7 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-531: Reimplementation of partitioner (edwardyoon)
+
 Release 0.6 - November 28, 2012
 
   NEW FEATURES

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Wed Dec 12 05:32:53 2012
@@ -22,6 +22,7 @@ import java.net.URL;
 import java.net.URLDecoder;
 import java.util.Enumeration;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -219,8 +220,41 @@ public class BSPJob extends BSPJobContex
     state = JobState.RUNNING;
   }
 
+  boolean isPartitioned = false;
+
   public boolean waitForCompletion(boolean verbose) throws IOException,
       InterruptedException, ClassNotFoundException {
+    if (this.getConfiguration().get("bsp.input.partitioner.class") != null
+        && !isPartitioned) {
+      FileSystem fs = FileSystem.get(conf);
+      Path inputDir = new Path(conf.get("bsp.input.dir"));
+      if (fs.isFile(inputDir)) {
+        inputDir = inputDir.getParent();
+      }
+      Path partitionDir = new Path(inputDir + "/partitions");
+
+      if (fs.exists(partitionDir)) {
+        fs.delete(partitionDir, true);
+      }
+
+      HamaConfiguration conf = new HamaConfiguration();
+      conf.setInt("desired.num.of.tasks",
+          Integer.parseInt(this.getConfiguration().get("bsp.peers.num")));
+      BSPJob partitioningJob = new BSPJob(conf);
+      partitioningJob.setInputPath(new Path(this.getConfiguration().get(
+          "bsp.input.dir")));
+      partitioningJob.setInputFormat(this.getInputFormat().getClass());
+      partitioningJob.setInputKeyClass(this.getInputKeyClass());
+      partitioningJob.setInputValueClass(getInputValueClass());
+      partitioningJob.setOutputFormat(NullOutputFormat.class);
+      partitioningJob.setBspClass(PartitioningRunner.class);
+
+      isPartitioned = partitioningJob.waitForCompletion(true);
+      if (isPartitioned) {
+        this.setInputPath(new Path(inputDir + "/partitions"));
+      }
+    }
+
     if (state == JobState.DEFINE) {
       submit();
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Wed Dec 12 05:32:53
2012
@@ -30,9 +30,7 @@ import java.io.OutputStreamWriter;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLConnection;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.StringTokenizer;
@@ -55,7 +53,6 @@ import org.apache.hadoop.io.WritableUtil
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -301,9 +298,10 @@ public class BSPJobClient extends Config
       throws IOException {
     BSPJob job = pJob;
     job.setJobID(jobId);
-    int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, 0);
+    int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
+        0);
     int maxTasks = checkTaskLimits(job, limitTasks);
-    
+
     Path submitJobDir = new Path(getSystemDir(), "submit_"
         + Integer.toString(Math.abs(r.nextInt()), 36));
     Path submitSplitFile = new Path(submitJobDir, "job.split");
@@ -325,12 +323,6 @@ public class BSPJobClient extends Config
     if (job.get("bsp.input.dir") != null) {
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
-      if (job.getConfiguration().get("bsp.input.partitioner.class") != null
-          && !job.getConfiguration()
-              .getBoolean("hama.graph.runtime.partitioning", false)) {
-        job = partition(job, maxTasks);
-        maxTasks = job.getInt("hama.partition.count", maxTasks);
-      }
       job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks));
       job.set("bsp.job.split.file", submitSplitFile.toString());
     }
@@ -375,15 +367,16 @@ public class BSPJobClient extends Config
   protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
     int maxTasks;
     ClusterStatus clusterStatus = getClusterStatus(true);
-    
-    if(limitTasks > 0) {
+
+    if (limitTasks > 0) {
       maxTasks = limitTasks;
     } else {
       maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
     }
-    
+
     if (maxTasks < job.getNumBspTask()) {
-      throw new IOException("Job failed! The number of tasks has exceeded the maximum allowed.");
+      throw new IOException(
+          "Job failed! The number of tasks has exceeded the maximum allowed.");
     }
     return maxTasks;
   }
@@ -402,97 +395,10 @@ public class BSPJobClient extends Config
     }
   }
 
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
-    InputSplit[] splits = job.getInputFormat().getSplits(
-        job,
-        (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
-            : maxTasks);
-
-    String input = job.getConfiguration().get("bsp.input.dir");
-
-    if (input != null) {
-      InputFormat<?, ?> inputFormat = job.getInputFormat();
-
-      Path partitionedPath = new Path(input, "hama-partitions");
-      Path inputPath = new Path(input);
-      if (fs.isFile(inputPath)) {
-        partitionedPath = new Path(inputPath.getParent(), "hama-partitions");
-      }
-
-      String alternatePart = job.get("bsp.partitioning.dir");
-      if (alternatePart != null) {
-        partitionedPath = new Path(alternatePart, job.getJobID().toString());
-      }
-
-      if (fs.exists(partitionedPath)) {
-        fs.delete(partitionedPath, true);
-      } else {
-        fs.mkdirs(partitionedPath);
-      }
-      // FIXME this is soo unsafe
-      RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job);
-
-      List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
-          splits.length);
-
-      CompressionType compressionType = getOutputCompressionType(job);
-      Class<? extends CompressionCodec> outputCompressorClass = getOutputCompressorClass(
-          job, null);
-      CompressionCodec codec = null;
-      if (outputCompressorClass != null) {
-        codec = ReflectionUtils.newInstance(outputCompressorClass,
-            job.getConfiguration());
-      }
-
-      try {
-        for (int i = 0; i < splits.length; i++) {
-          Path p = new Path(partitionedPath, getPartitionName(i));
-          if (codec == null) {
-            writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p,
-                sampleReader.createKey().getClass(), sampleReader.createValue()
-                    .getClass(), CompressionType.NONE));
-          } else {
-            writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p,
-                sampleReader.createKey().getClass(), sampleReader.createValue()
-                    .getClass(), compressionType, codec));
-          }
-        }
-
-        Partitioner partitioner = job.getPartitioner();
-        for (int i = 0; i < splits.length; i++) {
-          InputSplit split = splits[i];
-          RecordReader recordReader = inputFormat.getRecordReader(split, job);
-          Object key = recordReader.createKey();
-          Object value = recordReader.createValue();
-          while (recordReader.next(key, value)) {
-            int index = Math.abs(partitioner.getPartition(key, value,
-                splits.length));
-            writers.get(index).append(key, value);
-          }
-          LOG.debug("Done with split " + i);
-        }
-      } finally {
-        for (SequenceFile.Writer wr : writers) {
-          wr.close();
-        }
-      }
-      job.set("hama.partition.count", writers.size() + "");
-      job.setInputFormat(SequenceFileInputFormat.class);
-      job.setInputPath(partitionedPath);
-    }
-
-    return job;
-  }
-
   private static boolean isProperSize(int numBspTask, int maxTasks) {
     return (numBspTask > 1 && numBspTask < maxTasks);
   }
 
-  private static String getPartitionName(int i) {
-    return "part-" + String.valueOf(100000 + i).substring(1, 6);
-  }
-
   /**
    * Get the {@link CompressionType} for the output {@link SequenceFile}.
    * 

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1420530&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Wed Dec 12 05:32:53
2012
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+
+public class PartitioningRunner extends
+    BSP<Writable, Writable, Writable, Writable, NullWritable> {
+  private Configuration conf;
+  private int desiredNum;
+  private FileSystem fs = null;
+  private Path partitionDir;
+  private Map<Integer, Map<Writable, Writable>> values = new HashMap<Integer,
Map<Writable, Writable>>();
+
+  @Override
+  public final void setup(
+      BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+      throws IOException, SyncException, InterruptedException {
+    this.conf = peer.getConfiguration();
+    this.desiredNum = conf.getInt("desired.num.of.tasks", 1);
+    this.fs = FileSystem.get(conf);
+
+    Path inputDir = new Path(conf.get("bsp.input.dir"));
+    if (fs.isFile(inputDir)) {
+      inputDir = inputDir.getParent();
+    }
+
+    this.partitionDir = new Path(inputDir + "/partitions");
+  }
+
+  @Override
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void bsp(
+      BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+      throws IOException, SyncException, InterruptedException {
+    Partitioner partitioner = getPartitioner();
+    KeyValuePair<Writable, Writable> pair = null;
+
+    Class keyClass = null;
+    Class valueClass = null;
+    while ((pair = peer.readNext()) != null) {
+      if (keyClass == null && valueClass == null) {
+        keyClass = pair.getKey().getClass();
+        valueClass = pair.getValue().getClass();
+      }
+
+      int index = Math.abs(partitioner.getPartition(pair.getKey(),
+          pair.getValue(), desiredNum));
+
+      if (!values.containsKey(index)) {
+        values.put(index, new HashMap<Writable, Writable>());
+      }
+      values.get(index).put(pair.getKey(), pair.getValue());
+    }
+
+    for (Map.Entry<Integer, Map<Writable, Writable>> e : values.entrySet()) {
+      Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
+          + peer.getPeerIndex());
+      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+          destFile, keyClass, valueClass, CompressionType.NONE);
+      for (Map.Entry<Writable, Writable> v : e.getValue().entrySet()) {
+        writer.append(v.getKey(), v.getValue());
+      }
+      writer.close();
+    }
+
+    peer.sync();
+
+    // merge files into one.
+    FileStatus[] status = fs.listStatus(partitionDir);
+    for (int j = 0; j < status.length; j++) {
+      int idx = Integer.parseInt(status[j].getPath().getName().split("[-]")[1]);
+      int assignedID = idx / (desiredNum / peer.getNumPeers());
+      if (assignedID == peer.getNumPeers())
+        assignedID = assignedID - 1;
+
+      if (assignedID == peer.getPeerIndex()) {
+        FileStatus[] files = fs.listStatus(status[j].getPath());
+        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+            new Path(partitionDir + "/" + getPartitionName(j)), keyClass,
+            valueClass, CompressionType.NONE);
+
+        for (int i = 0; i < files.length; i++) {
+          SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+              files[i].getPath(), conf);
+
+          Writable key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
+          Writable value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+
+          while (reader.next(key, value)) {
+            writer.append(key, value);
+          }
+          reader.close();
+        }
+
+        writer.close();
+        fs.delete(status[j].getPath(), true);
+      }
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Partitioner getPartitioner() {
+    return ReflectionUtils.newInstance(conf
+        .getClass("bsp.input.partitioner.class", HashPartitioner.class,
+            Partitioner.class), conf);
+  }
+
+  private static String getPartitionName(int i) {
+    return "part-" + String.valueOf(100000 + i).substring(1, 6);
+  }
+
+}

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Wed Dec 12
05:32:53 2012
@@ -22,17 +22,15 @@ import java.util.Iterator;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
-import org.apache.hama.bsp.TextInputFormat;
-import org.apache.hama.graph.Edge;
+import org.apache.hama.bsp.TextArrayWritable;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
-import org.apache.hama.graph.VertexInputReader;
 
 public class InlinkCount extends Vertex<Text, NullWritable, IntWritable> {
 
@@ -51,34 +49,6 @@ public class InlinkCount extends Vertex<
     }
   }
 
-  public static class InlinkCountTextReader extends
-      VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {
-
-    /**
-     * The text file essentially should look like: <br/>
-     * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
-     * E.G:<br/>
-     * 1\t2\t3\t4<br/>
-     * 2\t3\t1<br/>
-     * etc.
-     */
-    @Override
-    public boolean parseVertex(LongWritable key, Text value,
-        Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {
-      String[] split = value.toString().split("\t");
-      for (int i = 0; i < split.length; i++) {
-        if (i == 0) {
-          vertex.setVertexID(new Text(split[i]));
-        } else {
-          vertex
-              .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
-        }
-      }
-      return true;
-    }
-
-  }
-
   private static void printUsage() {
     System.out.println("Usage: <input> <output> [tasks]");
     System.exit(-1);
@@ -104,14 +74,14 @@ public class InlinkCount extends Vertex<
     }
 
     inlinkJob.setVertexClass(InlinkCount.class);
-    inlinkJob.setInputFormat(TextInputFormat.class);
-    inlinkJob.setInputKeyClass(LongWritable.class);
-    inlinkJob.setInputValueClass(Text.class);
+
+    inlinkJob.setInputFormat(SequenceFileInputFormat.class);
+    inlinkJob.setInputKeyClass(Text.class);
+    inlinkJob.setInputValueClass(TextArrayWritable.class);
 
     inlinkJob.setVertexIDClass(Text.class);
     inlinkJob.setVertexValueClass(IntWritable.class);
     inlinkJob.setEdgeValueClass(NullWritable.class);
-    inlinkJob.setVertexInputReaderClass(InlinkCountTextReader.class);
 
     inlinkJob.setPartitioner(HashPartitioner.class);
     inlinkJob.setOutputFormat(SequenceFileOutputFormat.class);

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Wed Dec
12 05:32:53 2012
@@ -22,18 +22,17 @@ import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.TextArrayWritable;
 import org.apache.hama.bsp.TextOutputFormat;
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
-import org.apache.hama.graph.VertexInputReader;
 
 /**
  * Finding the mindist vertex in a connected component.
@@ -97,34 +96,6 @@ public class MindistSearch {
 
   }
 
-  public static class MindistSearchCountReader extends
-      VertexInputReader<LongWritable, Text, Text, NullWritable, Text> {
-
-    /**
-     * The text file essentially should look like: <br/>
-     * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
-     * E.G:<br/>
-     * 1\t2\t3\t4<br/>
-     * 2\t3\t1<br/>
-     * etc.
-     */
-    @Override
-    public boolean parseVertex(LongWritable key, Text value,
-        Vertex<Text, NullWritable, Text> vertex) throws Exception {
-      String[] split = value.toString().split("\t");
-      for (int i = 0; i < split.length; i++) {
-        if (i == 0) {
-          vertex.setVertexID(new Text(split[i]));
-        } else {
-          vertex
-              .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
-        }
-      }
-      return true;
-    }
-
-  }
-
   private static void printUsage() {
     System.out
         .println("Usage: <input> <output> [maximum iterations (default 30)] [tasks]");
@@ -157,10 +128,10 @@ public class MindistSearch {
     job.setVertexValueClass(Text.class);
     job.setEdgeValueClass(NullWritable.class);
 
-    job.setInputKeyClass(LongWritable.class);
-    job.setInputValueClass(Text.class);
-    job.setInputFormat(TextInputFormat.class);
-    job.setVertexInputReaderClass(MindistSearchCountReader.class);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(Text.class);
+    job.setInputValueClass(TextArrayWritable.class);
+
     job.setPartitioner(HashPartitioner.class);
     job.setOutputFormat(TextOutputFormat.class);
     job.setOutputKeyClass(Text.class);

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Wed Dec 12 05:32:53
2012
@@ -23,19 +23,17 @@ import java.util.Iterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.TextArrayWritable;
 import org.apache.hama.bsp.TextOutputFormat;
 import org.apache.hama.graph.AbstractAggregator;
 import org.apache.hama.graph.AverageAggregator;
-import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
-import org.apache.hama.graph.VertexInputReader;
 
 /**
  * Real pagerank with dangling node contribution.
@@ -99,37 +97,8 @@ public class PageRank {
     }
   }
 
-  public static class PagerankTextReader extends
-      VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
-
-    /**
-     * The text file essentially should look like: <br/>
-     * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
-     * E.G:<br/>
-     * 1\t2\t3\t4<br/>
-     * 2\t3\t1<br/>
-     * etc.
-     */
-    @Override
-    public boolean parseVertex(LongWritable key, Text value,
-        Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
-      String[] split = value.toString().split("\t");
-      for (int i = 0; i < split.length; i++) {
-        if (i == 0) {
-          vertex.setVertexID(new Text(split[i]));
-        } else {
-          vertex
-              .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
-        }
-      }
-      return true;
-    }
-
-  }
-
   private static void printUsage() {
-    System.out
-        .println("Usage: <input> <output> [damping factor (default 0.85)] [Epsilon
(convergence error, default 0.001)] [Max iterations (default 30)] [tasks]");
+    System.out.println("Usage: <input> <output> [tasks]");
     System.exit(-1);
   }
 
@@ -161,15 +130,11 @@ public class PageRank {
     // set the defaults
     pageJob.setMaxIteration(30);
     pageJob.set("hama.pagerank.alpha", "0.85");
+    pageJob.set("hama.graph.max.convergence.error", "0.001");
 
-    if (args.length == 6)
-      pageJob.setNumBspTask(Integer.parseInt(args[5]));
-    if (args.length >= 5)
-      pageJob.setMaxIteration(Integer.parseInt(args[4]));
-    if (args.length >= 4)
-      pageJob.set("hama.graph.max.convergence.error", args[3]);
-    if (args.length >= 3)
-      pageJob.set("hama.pagerank.alpha", args[2]);
+    if (args.length == 3) {
+      pageJob.setNumBspTask(Integer.parseInt(args[2]));
+    }
 
     // error, dangling node probability sum
     pageJob.setAggregatorClass(AverageAggregator.class,
@@ -179,10 +144,10 @@ public class PageRank {
     pageJob.setVertexValueClass(DoubleWritable.class);
     pageJob.setEdgeValueClass(NullWritable.class);
 
-    pageJob.setInputKeyClass(LongWritable.class);
-    pageJob.setInputValueClass(Text.class);
-    pageJob.setInputFormat(TextInputFormat.class);
-    pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+    pageJob.setInputFormat(SequenceFileInputFormat.class);
+    pageJob.setInputKeyClass(Text.class);
+    pageJob.setInputValueClass(TextArrayWritable.class);
+
     pageJob.setPartitioner(HashPartitioner.class);
     pageJob.setOutputFormat(TextOutputFormat.class);
     pageJob.setOutputKeyClass(Text.class);

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Wed
Dec 12 05:32:53 2012
@@ -18,8 +18,6 @@
 package org.apache.hama.examples;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Arrays;
@@ -31,8 +29,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TextArrayWritable;
 import org.apache.hama.examples.MindistSearch.MinTextCombiner;
 
 public class MindistSearchTest extends TestCase {
@@ -95,22 +96,25 @@ public class MindistSearchTest extends T
   }
 
   private void generateTestData() {
-    BufferedWriter bw = null;
     try {
-      bw = new BufferedWriter(new FileWriter(INPUT));
-      for (String s : input) {
-        bw.write(s + "\n");
-      }
-    } catch (IOException e) {
-      e.printStackTrace();
-    } finally {
-      if (bw != null) {
-        try {
-          bw.close();
-        } catch (IOException e) {
-          e.printStackTrace();
+      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+          new Path(INPUT), Text.class, TextArrayWritable.class);
+
+      for (int i = 0; i < input.length; i++) {
+        String[] x = input[i].split("\t");
+        Text key = new Text(x[0]);
+        Writable[] values = new Writable[x.length - 1];
+        for (int j = 1; j < x.length; j++) {
+          values[j - 1] = new Text(x[j]);
         }
+        TextArrayWritable value = new TextArrayWritable();
+        value.set(values);
+        writer.append(key, value);
       }
+
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
     }
   }
 

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Wed Dec 12
05:32:53 2012
@@ -18,8 +18,6 @@
 package org.apache.hama.examples;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
@@ -30,33 +28,15 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.util.SymmetricMatrixGen;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.GraphJobRunner;
 
 public class PageRankTest extends TestCase {
-  /**
-   * The graph looks like this (adjacency list, [] contains outlinks):<br/>
-   * stackoverflow.com [yahoo.com] <br/>
-   * google.com []<br/>
-   * facebook.com [twitter.com, google.com, nasa.gov]<br/>
-   * yahoo.com [nasa.gov, stackoverflow.com]<br/>
-   * twitter.com [google.com, facebook.com]<br/>
-   * nasa.gov [yahoo.com, stackoverflow.com]<br/>
-   * youtube.com [google.com, yahoo.com]<br/>
-   * Note that google is removed in this part mainly to test the repair
-   * functionality.
-   */
-  String[] input = new String[] { "stackoverflow.com\tyahoo.com",
-      "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov",
-      "yahoo.com\tnasa.gov\tstackoverflow.com",
-      "twitter.com\tgoogle.com\tfacebook.com",
-      "nasa.gov\tyahoo.com\tstackoverflow.com",
-      "youtube.com\tgoogle.com\tyahoo.com" };
-
-  private static String INPUT = "/tmp/pagerank-tmp.seq";
-  private static String TEXT_INPUT = "/tmp/pagerank.txt";
+  private static String INPUT = "/tmp/pagerank/pagerank-tmp.seq";
+  private static String TEXT_INPUT = "/tmp/pagerank/pagerank.txt";
   private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
-  private static String OUTPUT = "/tmp/pagerank-out";
+  private static String OUTPUT = "/tmp/pagerank/pagerank-out";
   private Configuration conf = new HamaConfiguration();
   private FileSystem fs;
 
@@ -87,9 +67,10 @@ public class PageRankTest extends TestCa
     generateTestData();
     try {
       HamaConfiguration conf = new HamaConfiguration(new Configuration());
-      conf.set("bsp.local.tasks.maximum", "1");
+      conf.set("bsp.local.tasks.maximum", "10");
+      conf.set("bsp.peers.num", "7");
       conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
-      GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT },
+      GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT, "7" },
           conf);
 
       if (!pageJob.waitForCompletion(true)) {
@@ -101,24 +82,9 @@ public class PageRankTest extends TestCa
     }
   }
 
-  private void generateTestData() {
-    BufferedWriter bw = null;
-    try {
-      bw = new BufferedWriter(new FileWriter(INPUT));
-      for (String s : input) {
-        bw.write(s + "\n");
-      }
-    } catch (IOException e) {
-      e.printStackTrace();
-    } finally {
-      if (bw != null) {
-        try {
-          bw.close();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    }
+  private void generateTestData() throws ClassNotFoundException,
+      InterruptedException, IOException {
+    SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "3" });
   }
 
   private void deleteTempDirs() {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Wed Dec 12 05:32:53
2012
@@ -165,9 +165,6 @@ public class GraphJob extends BSPJob {
         .checkArgument(this.getConfiguration()
             .get(VERTEX_EDGE_VALUE_CLASS_ATTR) != null,
             "Please provide an edge value class, if you don't need one, use NullWritable!");
-    Preconditions.checkArgument(
-        this.getConfiguration().get(VERTEX_GRAPH_INPUT_READER) != null,
-        "Please provide a vertex input reader!");
     super.submit();
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Dec 12 05:32:53
2012
@@ -28,8 +28,8 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -273,53 +273,24 @@ public final class GraphJobRunner<V exte
   private void loadVertices(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
-
-    /*
-     * Several partitioning constants begin
-     */
-
-    final VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable,
Writable, V, E, M>) ReflectionUtils
-        .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER,
-            VertexInputReader.class), conf);
-
     final boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
-    final boolean runtimePartitioning = conf.getBoolean(
-        GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
-
-    final long splitSize = peer.getSplitSize();
-    final int partitioningSteps = partitionMultiSteps(peer, splitSize);
-    final long interval = splitSize / partitioningSteps;
 
     final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
 
-    /*
-     * Several partitioning constants end
-     */
-
     LOG.debug("vertex class: " + vertexClass);
     Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
     vertex.runner = this;
 
-    long startPos = peer.getPos();
-    if (startPos == 0)
-      startPos = 1L;
-
     KeyValuePair<Writable, Writable> next = null;
-    int steps = 1;
     while ((next = peer.readNext()) != null) {
-      boolean vertexFinished = false;
-      try {
-        vertexFinished = reader.parseVertex(next.getKey(), next.getValue(),
-            vertex);
-      } catch (Exception e) {
-        // LOG.error("exception occured during parsing vertex!" + e.toString());
-        throw new IOException("exception occured during parsing vertex!"
-            + e.toString());
-      }
-
-      if (!vertexFinished) {
-        continue;
+      V key = (V) next.getKey();
+      Writable[] edges = ((ArrayWritable) next.getValue()).get();
+      vertex.setVertexID(key);
+      List<Edge<V, E>> edgeList = new ArrayList<Edge<V, E>>();
+      for (Writable edge : edges) {
+        edgeList.add(new Edge<V, E>((V) edge, null));
       }
+      vertex.setEdges(edgeList);
 
       if (vertex.getEdges() == null) {
         if (selfReference) {
@@ -334,44 +305,12 @@ public final class GraphJobRunner<V exte
         vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
       }
 
-      if (runtimePartitioning) {
-        int partition = partitioner.getPartition(vertex.getVertexID(),
-            vertex.getValue(), peer.getNumPeers());
-        peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
-      } else {
-        vertex.setup(conf);
-        vertices.add(vertex);
-      }
+      vertex.setup(conf);
+      vertices.add(vertex);
       vertex = newVertexInstance(vertexClass, conf);
       vertex.runner = this;
-
-      if (runtimePartitioning) {
-        if (steps < partitioningSteps && (peer.getPos() - startPos) >= interval)
{
-          peer.sync();
-          steps++;
-          GraphJobMessage msg = null;
-          while ((msg = peer.getCurrentMessage()) != null) {
-            Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
-            messagedVertex.runner = this;
-            messagedVertex.setup(conf);
-            vertices.add(messagedVertex);
-          }
-          startPos = peer.getPos();
-        }
-      }
     }
 
-    if (runtimePartitioning) {
-      peer.sync();
-
-      GraphJobMessage msg = null;
-      while ((msg = peer.getCurrentMessage()) != null) {
-        Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
-        messagedVertex.runner = this;
-        messagedVertex.setup(conf);
-        vertices.add(messagedVertex);
-      }
-    }
     LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
 
     /*
@@ -383,7 +322,7 @@ public final class GraphJobRunner<V exte
      */
     if (repairNeeded) {
       LOG.debug("Starting repair of this graph!");
-      repair(peer, partitioningSteps, selfReference);
+      repair(peer, selfReference);
     }
 
     LOG.debug("Starting Vertex processing!");
@@ -392,83 +331,16 @@ public final class GraphJobRunner<V exte
   @SuppressWarnings("unchecked")
   private void repair(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
-      int partitioningSteps, boolean selfReference) throws IOException,
+      boolean selfReference) throws IOException,
       SyncException, InterruptedException {
 
-    int multiSteps = 0;
-    MapWritable ssize = new MapWritable();
-    ssize.put(new IntWritable(peer.getPeerIndex()),
-        new IntWritable(vertices.size()));
-    peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
-    ssize = null;
-    peer.sync();
-
-    if (isMasterTask(peer)) {
-      int minVerticesSize = Integer.MAX_VALUE;
-      GraphJobMessage received = null;
-      while ((received = peer.getCurrentMessage()) != null) {
-        MapWritable x = received.getMap();
-        for (Entry<Writable, Writable> e : x.entrySet()) {
-          int curr = ((IntWritable) e.getValue()).get();
-          if (minVerticesSize > curr) {
-            minVerticesSize = curr;
-          }
-        }
-      }
-
-      if (minVerticesSize < (partitioningSteps * 2)) {
-        multiSteps = minVerticesSize;
-      } else {
-        multiSteps = (partitioningSteps * 2);
-      }
-
-      for (String peerName : peer.getAllPeerNames()) {
-        MapWritable temp = new MapWritable();
-        temp.put(new Text("steps"), new IntWritable(multiSteps));
-        peer.send(peerName, new GraphJobMessage(temp));
-      }
-    }
-    peer.sync();
-
-    GraphJobMessage received = peer.getCurrentMessage();
-    MapWritable x = received.getMap();
-    for (Entry<Writable, Writable> e : x.entrySet()) {
-      multiSteps = ((IntWritable) e.getValue()).get();
-    }
-
     Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
 
-    int i = 0;
-    int syncs = 0;
-
     for (Vertex<V, E, M> v : vertices) {
       for (Edge<V, E> e : v.getEdges()) {
         peer.send(v.getDestinationPeerName(e),
             new GraphJobMessage(e.getDestinationVertexID()));
       }
-
-      if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
-        peer.sync();
-        syncs++;
-        GraphJobMessage msg = null;
-        while ((msg = peer.getCurrentMessage()) != null) {
-          V vertexName = (V) msg.getVertexId();
-
-          Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
-          newVertex.setVertexID(vertexName);
-          newVertex.runner = this;
-          if (selfReference) {
-            newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
-                newVertex.getVertexID(), null)));
-          } else {
-            newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
-          }
-          newVertex.setup(conf);
-          tmp.put(vertexName, newVertex);
-
-        }
-      }
-      i++;
     }
 
     peer.sync();
@@ -488,7 +360,6 @@ public final class GraphJobRunner<V exte
       newVertex.setup(conf);
       tmp.put(vertexName, newVertex);
       newVertex = null;
-
     }
 
     for (Vertex<V, E, M> e : vertices) {
@@ -502,59 +373,6 @@ public final class GraphJobRunner<V exte
   }
 
   /**
-   * Partitions our vertices through multiple supersteps to save memory.
-   */
-  private int partitionMultiSteps(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
-      long splitSize) throws IOException, SyncException, InterruptedException {
-    int multiSteps = 1;
-
-    MapWritable ssize = new MapWritable();
-    ssize
-        .put(new IntWritable(peer.getPeerIndex()), new LongWritable(splitSize));
-    peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
-    ssize = null;
-    peer.sync();
-
-    if (isMasterTask(peer)) {
-      long maxSplitSize = 0L;
-      GraphJobMessage received = null;
-      while ((received = peer.getCurrentMessage()) != null) {
-        MapWritable x = received.getMap();
-        for (Entry<Writable, Writable> e : x.entrySet()) {
-          long curr = ((LongWritable) e.getValue()).get();
-          if (maxSplitSize < curr) {
-            maxSplitSize = curr;
-          }
-        }
-      }
-
-      int steps = (int) (maxSplitSize / conf.getLong( // 20 mb
-          "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
-
-      for (String peerName : peer.getAllPeerNames()) {
-        MapWritable temp = new MapWritable();
-        temp.put(new Text("max"), new IntWritable(steps));
-        peer.send(peerName, new GraphJobMessage(temp));
-      }
-    }
-    peer.sync();
-
-    GraphJobMessage received = peer.getCurrentMessage();
-    MapWritable x = received.getMap();
-    for (Entry<Writable, Writable> e : x.entrySet()) {
-      multiSteps = ((IntWritable) e.getValue()).get();
-    }
-
-    if (isMasterTask(peer)) {
-      peer.getCounter(GraphJobCounter.MULTISTEP_PARTITIONING).increment(
-          multiSteps);
-    }
-
-    return multiSteps;
-  }
-
-  /**
    * Counts vertices globally by sending the count of vertices in the map to the
    * other peers.
    */

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Wed Dec 12
05:32:53 2012
@@ -17,38 +17,36 @@
  */
 package org.apache.hama.graph;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.TestBSPMasterGroomServer;
-import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.TextArrayWritable;
 import org.apache.hama.graph.example.PageRank;
 
 public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
 
   String[] input = new String[] { "stackoverflow.com\tyahoo.com",
-      "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]",
-      "yahoo.com\tnasa.gov\tstackoverflow.com]",
-      "twitter.com\tgoogle.com\tfacebook.com]",
-      "nasa.gov\tyahoo.com\tstackoverflow.com]",
-      "youtube.com\tgoogle.com\tyahoo.com]" };
+      "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov",
+      "yahoo.com\tnasa.gov\tstackoverflow.com",
+      "twitter.com\tgoogle.com\tfacebook.com",
+      "nasa.gov\tyahoo.com\tstackoverflow.com",
+      "youtube.com\tgoogle.com\tyahoo.com" };
 
-  private static String INPUT = "/tmp/pagerank-real-tmp.seq";
-  private static String OUTPUT = "/tmp/pagerank-real-out";
+  private static String INPUT = "/tmp/pagerank/real-tmp.seq";
+  private static String OUTPUT = "/tmp/pagerank/real-out";
 
   @SuppressWarnings("unchecked")
   @Override
@@ -60,7 +58,7 @@ public class TestSubmitGraphJob extends 
     configuration.setInt("hama.graph.multi.step.partitioning.interval", 30);
 
     GraphJob bsp = new GraphJob(configuration, PageRank.class);
-    bsp.setInputPath(new Path(INPUT));
+    bsp.setInputPath(new Path("/tmp/pagerank"));
     bsp.setOutputPath(new Path(OUTPUT));
     BSPJobClient jobClient = new BSPJobClient(configuration);
     configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
@@ -77,14 +75,15 @@ public class TestSubmitGraphJob extends 
     bsp.setAggregatorClass(AverageAggregator.class,
         PageRank.DanglingNodeAggregator.class);
 
+    bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
+    bsp.setInputFormat(SequenceFileInputFormat.class);
+    bsp.setInputKeyClass(Text.class);
+    bsp.setInputValueClass(TextArrayWritable.class);
+    
     bsp.setVertexIDClass(Text.class);
     bsp.setVertexValueClass(DoubleWritable.class);
     bsp.setEdgeValueClass(NullWritable.class);
 
-    bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
-    bsp.setInputFormat(TextInputFormat.class);
-    bsp.setInputKeyClass(LongWritable.class);
-    bsp.setInputValueClass(Text.class);
     bsp.setPartitioner(HashPartitioner.class);
     bsp.setOutputFormat(SequenceFileOutputFormat.class);
     bsp.setOutputKeyClass(Text.class);
@@ -123,26 +122,25 @@ public class TestSubmitGraphJob extends 
   }
 
   private void generateTestData() {
-    BufferedWriter bw = null;
     try {
-      bw = new BufferedWriter(new FileWriter(INPUT));
-      for (String s : input) {
-        bw.write(s + "\n");
-      }
-    } catch (IOException e) {
-      e.printStackTrace();
-    } finally {
-      if (bw != null) {
-        try {
-          bw.close();
-
-          File file = new File(INPUT);
-          LOG.info("Temp file length: " + file.length());
+      SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(),
+          new Path(INPUT), Text.class, TextArrayWritable.class);
 
-        } catch (IOException e) {
-          e.printStackTrace();
+      for (int i = 0; i < input.length; i++) {
+        String[] x = input[i].split("\t");
+        Text key = new Text(x[0]);
+        Writable[] values = new Writable[x.length - 1];
+        for (int j = 1; j < x.length; j++) {
+          values[j - 1] = new Text(x[j]);
         }
+        TextArrayWritable value = new TextArrayWritable();
+        value.set(values);
+        writer.append(key, value);
       }
+
+      writer.close();
+    } catch (IOException e) {
+      e.printStackTrace();
     }
   }
 



Mime
View raw message