incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1238308 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ examples/src/main/java/org/apache/hama/examples/util/ examples/src/test/java/org/apache/hama/examples/ examples...
Date Tue, 31 Jan 2012 09:19:54 GMT
Author: tjungblut
Date: Tue Jan 31 09:19:53 2012
New Revision: 1238308

URL: http://svn.apache.org/viewvc?rev=1238308&view=rev
Log:
[HAMA-493] Provide text to seq-file utils for graph examples


Added:
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java   (with props)
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java   (with props)
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java   (with props)
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java   (with props)
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java   (with props)
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Jan 31 09:19:53 2012
@@ -3,7 +3,8 @@ Hama Change Log
 Release 0.4 - Unreleased
 
   NEW FEATURES
-   
+
+   HAMA-493: Provide text to seq-file utils for graph examples (tjungblut)
    HAMA-491: Add dist module to generate release tarball (edwardyoon)
    HAMA-451: Show Supersteps in waitForCompletion in YARN (tjungblut) 
    HAMA-479: Add Counters to Hama Jobs (edwardyoon) 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Tue Jan 31 09:19:53 2012
@@ -178,9 +178,24 @@ public abstract class FileInputFormat<K,
     FileStatus[] files = listStatus(job);
 
     long totalSize = 0; // compute total size
-    for (FileStatus file : files) { // check we have valid files
+    for (int i = 0; i < files.length; i++) { // check we have valid files
+      FileStatus file = files[i];
       if (file.isDir()) {
-        throw new IOException("Not a file: " + file.getPath());
+        final Path path = file.getPath();
+        if (path.getName().equals("hama-partitions")
+            || (job.get("bsp.partitioning.dir") != null && path.getName()
+                .equals(job.get("bsp.partitioning.dir")))) {
+          // if we find the partitioning dir, just remove it.
+          LOG.warn("Removing already existing partitioning directory " + path);
+          FileSystem fileSystem = path.getFileSystem(job.getConf());
+          if (!fileSystem.delete(path, true)) {
+            LOG.error("Remove failed.");
+          }
+          // remove this file from our initial list
+          files[i] = null;
+        } else {
+          throw new IOException("Not a file (dir): " + path);
+        }
       }
       totalSize += file.getLen();
     }
@@ -189,8 +204,10 @@ public abstract class FileInputFormat<K,
     // take the short circuit path if we have already partitioned
     if (numSplits == files.length) {
       for (FileStatus file : files) {
-        splits.add(new FileSplit(file.getPath(), 0, file.getLen(),
-            new String[0]));
+        if (file != null) {
+          splits.add(new FileSplit(file.getPath(), 0, file.getLen(),
+              new String[0]));
+        }
       }
       return splits.toArray(new FileSplit[splits.size()]);
     }
@@ -202,36 +219,40 @@ public abstract class FileInputFormat<K,
     // generate splits
     NetworkTopology clusterMap = new NetworkTopology();
     for (FileStatus file : files) {
-      Path path = file.getPath();
-      FileSystem fs = path.getFileSystem(job.getConf());
-      long length = file.getLen();
-      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-      if ((length != 0) && isSplitable(fs, path)) {
-        long blockSize = file.getBlockSize();
-        long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-        LOG.debug("computeSplitSize: " + splitSize + " (" + goalSize + ", "
-            + minSize + ", " + blockSize + ")");
-
-        long bytesRemaining = length;
-        while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
-          String[] splitHosts = getSplitHosts(blkLocations, length
-              - bytesRemaining, splitSize, clusterMap);
-          splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
-              splitHosts));
-          bytesRemaining -= splitSize;
-        }
+      if (file != null) {
+        Path path = file.getPath();
+        FileSystem fs = path.getFileSystem(job.getConf());
+        long length = file.getLen();
+        BlockLocation[] blkLocations = fs
+            .getFileBlockLocations(file, 0, length);
+        if ((length != 0) && isSplitable(fs, path)) {
+          long blockSize = file.getBlockSize();
+          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+          LOG.debug("computeSplitSize: " + splitSize + " (" + goalSize + ", "
+              + minSize + ", " + blockSize + ")");
+
+          long bytesRemaining = length;
+          while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+            String[] splitHosts = getSplitHosts(blkLocations, length
+                - bytesRemaining, splitSize, clusterMap);
+            splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+                splitHosts));
+            bytesRemaining -= splitSize;
+          }
 
-        if (bytesRemaining != 0) {
-          splits
-              .add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
-                  blkLocations[blkLocations.length - 1].getHosts()));
+          if (bytesRemaining != 0) {
+            splits.add(new FileSplit(path, length - bytesRemaining,
+                bytesRemaining, blkLocations[blkLocations.length - 1]
+                    .getHosts()));
+          }
+        } else if (length != 0) {
+          String[] splitHosts = getSplitHosts(blkLocations, 0, length,
+              clusterMap);
+          splits.add(new FileSplit(path, 0, length, splitHosts));
+        } else {
+          // Create empty hosts array for zero length files
+          splits.add(new FileSplit(path, 0, length, new String[0]));
         }
-      } else if (length != 0) {
-        String[] splitHosts = getSplitHosts(blkLocations, 0, length, clusterMap);
-        splits.add(new FileSplit(path, 0, length, splitHosts));
-      } else {
-        // Create empty hosts array for zero length files
-        splits.add(new FileSplit(path, 0, length, new String[0]));
       }
     }
     LOG.info("Total # of splits: " + splits.size());
@@ -266,8 +287,8 @@ public abstract class FileInputFormat<K,
    *          inputs for the map-reduce job.
    */
   public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) {
-    setInputPaths(conf, StringUtils
-        .stringToPath(getPathStrings(commaSeparatedPaths)));
+    setInputPaths(conf,
+        StringUtils.stringToPath(getPathStrings(commaSeparatedPaths)));
   }
 
   /**

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Jan 31 09:19:53 2012
@@ -62,8 +62,7 @@ public class LocalBSPRunner implements J
 
   private static final String IDENTIFIER = "localrunner";
   private static String WORKING_DIR = "/tmp/hama-bsp/";
-  protected static volatile ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors
-      .newCachedThreadPool();
+  private volatile ThreadPoolExecutor threadPool;
 
   @SuppressWarnings("rawtypes")
   protected static final LinkedList<Future<BSP>> futureList = new LinkedList<Future<BSP>>();
@@ -143,6 +142,8 @@ public class LocalBSPRunner implements J
         splitFile.close();
       }
     }
+    
+    threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(numBspTask);
 
     peerNames = new String[numBspTask];
     for (int i = 0; i < numBspTask; i++) {

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Tue Jan 31 09:19:53 2012
@@ -20,6 +20,8 @@
 package org.apache.hama.examples;
 
 import org.apache.hadoop.util.ProgramDriver;
+import org.apache.hama.examples.util.PagerankTextToSeq;
+import org.apache.hama.examples.util.SSSPTextToSeq;
 
 public class ExampleDriver {
 
@@ -27,9 +29,11 @@ public class ExampleDriver {
     ProgramDriver pgd = new ProgramDriver();
     try {
       pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
+      pgd.addClass("sssp-text2seq", SSSPTextToSeq.class, "Generates SSSP input from textfile");
       pgd.addClass("sssp", ShortestPaths.class, "Single Shortest Path");
       pgd.addClass("cmb", CombineExample.class, "Combine");
       pgd.addClass("bench", RandBench.class, "Random Benchmark");
+      pgd.addClass("pagerank-text2seq", PagerankTextToSeq.class, "Generates Pagerank input from textfile");
       pgd.addClass("pagerank", PageRank.class, "PageRank");
       pgd.driver(args);
     } catch (Throwable e) {

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java?rev=1238308&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java (added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java Tue Jan 31 09:19:53 2012
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+import org.apache.hama.util.KeyValuePair;
+
+/**
+ * This utility turns a Pagerank-formatted text file into a sequence file that
+ * can be inputted to the Pagerank example. <br/>
+ * 
+ * <pre>
+ * Usage: &lt;input path&gt; &lt;output path&gt; &lt;optional: text separator. Default is \"\t\"&gt;
+ * 
+ * </pre>
+ * 
+ * So you may start this with: <br/>
+ * 
+ * <pre>
+ *    bin/hama -jar examples.jar pagerank-text2seq /tmp/in /tmp/out ";"
+ * </pre>
+ */
+public class PagerankTextToSeq extends TextToSequenceFile {
+
+  public PagerankTextToSeq(Path inPath, Path outPath, String delimiter)
+      throws IOException {
+    super(inPath, outPath, delimiter);
+  }
+
+  @Override
+  protected KeyValuePair<VertexWritable, VertexArrayWritable> processLine(
+      String line) throws IOException {
+    String[] split = line.split(delimiter);
+    VertexWritable key = new VertexWritable(split[0]);
+    VertexWritable[] v = new VertexWritable[split.length - 1];
+    for (int i = 1; i < split.length; i++) {
+      v[i - 1] = new VertexWritable(split[i]);
+    }
+    VertexArrayWritable value = new VertexArrayWritable();
+    value.set(v);
+    return new KeyValuePair<VertexWritable, VertexArrayWritable>(key, value);
+  }
+
+  @Override
+  protected Writer getWriter(Path outPath) throws IOException {
+    return new Writer(destFs, conf, outPath, VertexWritable.class,
+        VertexArrayWritable.class);
+  }
+
+  private static void printUsage() {
+    LOG.info("<input path> <output path> <optional: text separator. Default is \"\t\">");
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length != 2 && args.length != 3) {
+      printUsage();
+      System.exit(-1);
+    }
+
+    String inPath = args[0];
+    String outPath = args[1];
+    String delimiter = "\t";
+    if (args.length > 2) {
+      delimiter = args[2];
+    }
+    PagerankTextToSeq o = new PagerankTextToSeq(new Path(inPath), new Path(
+        outPath), delimiter);
+    o.run();
+  }
+}

Propchange: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java?rev=1238308&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java (added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java Tue Jan 31 09:19:53 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hama.examples.ShortestPathVertex;
+import org.apache.hama.examples.ShortestPathVertexArrayWritable;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+import org.apache.hama.util.KeyValuePair;
+
+/**
+ * This utility turns a SSSP (Single Source Shortest Paths)-formatted text file
+ * into a sequence file that can be inputted to the SSSP example. <br/>
+ * 
+ * <pre>
+ * Usage: &lt;input path&gt; &lt;output path&gt; &lt;optional: text separator. Default is \"\t\"&gt;
+ * &lt;optional: edge weight separator. Default is \":\"&gt;
+ * 
+ * </pre>
+ * 
+ * So you may start this with: <br/>
+ * 
+ * <pre>
+ *    bin/hama -jar examples.jar sssp-text2seq /tmp/in /tmp/out ";" ":"
+ * </pre>
+ */
+public class SSSPTextToSeq extends TextToSequenceFile {
+
+  private final String edgeDelimiter;
+
+  public SSSPTextToSeq(Path inPath, Path outPath, String delimiter,
+      String edgeDelimiter) throws IOException {
+    super(inPath, outPath, delimiter);
+    this.edgeDelimiter = edgeDelimiter;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Override
+  protected KeyValuePair<VertexWritable, VertexArrayWritable> processLine(
+      String line) throws IOException {
+    String[] split = line.split(delimiter);
+    ShortestPathVertex key = new ShortestPathVertex(0, split[0]);
+    ShortestPathVertex[] v = new ShortestPathVertex[split.length - 1];
+    for (int i = 1; i < split.length; i++) {
+      String[] weightSplit = split[i].split(edgeDelimiter);
+      if (weightSplit.length != 2) {
+        LOG.error("Adjacent vertices must contain a \"" + edgeDelimiter
+            + "\" between the vertex name and the edge weight! Line was: "
+            + line);
+      }
+      v[i - 1] = new ShortestPathVertex(Integer.parseInt(weightSplit[1]),
+          weightSplit[0]);
+    }
+    ShortestPathVertexArrayWritable value = new ShortestPathVertexArrayWritable();
+    value.set(v);
+    return new KeyValuePair(key, value);
+  }
+
+  @Override
+  protected Writer getWriter(Path outPath) throws IOException {
+    return new Writer(destFs, conf, outPath, ShortestPathVertex.class,
+        ShortestPathVertexArrayWritable.class);
+  }
+
+  private static void printUsage() {
+    LOG.info("<input path> <output path> <optional: text separator. Default is \"\t\"> <optional: edge weight separator. Default is \":\">");
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length != 2 && args.length != 3 && args.length != 4) {
+      printUsage();
+      System.exit(-1);
+    }
+
+    String inPath = args[0];
+    String outPath = args[1];
+    String delimiter = "\t";
+    if (args.length > 2) {
+      delimiter = args[2];
+    }
+    String edgeDelimiter = ":";
+    if (args.length > 3) {
+      edgeDelimiter = args[3];
+    }
+    SSSPTextToSeq o = new SSSPTextToSeq(new Path(inPath), new Path(outPath),
+        delimiter, edgeDelimiter);
+    o.run();
+  }
+}

Propchange: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java?rev=1238308&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java (added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java Tue Jan 31 09:19:53 2012
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+import org.apache.hama.util.KeyValuePair;
+
+/**
+ * Abstract base class for turning a text graph into a sequence file. It offers
+ * help for multiple inputs in a directory.
+ */
+public abstract class TextToSequenceFile {
+
+  protected static final Log LOG = LogFactory.getLog(TextToSequenceFile.class);
+
+  protected final Path inPath;
+  protected final Path outPath;
+  protected final String delimiter;
+  protected final Configuration conf;
+  protected final FileSystem sourceFs;
+  protected final FileSystem destFs;
+
+  public TextToSequenceFile(Path inPath, Path outPath, String delimiter)
+      throws IOException {
+    super();
+    this.inPath = inPath;
+    this.outPath = outPath;
+    this.delimiter = delimiter;
+
+    this.conf = new Configuration();
+    this.sourceFs = inPath.getFileSystem(conf);
+    this.destFs = outPath.getFileSystem(conf);
+  }
+
+  public final void run() throws IOException {
+    final FileStatus[] stati = sourceFs.globStatus(inPath);
+    if (stati == null || stati.length == 0) {
+      throw new FileNotFoundException("Cannot access " + inPath
+          + ": No such file or directory.");
+    }
+
+    for (int i = 0; i < stati.length; i++) {
+      final Path p = stati[i].getPath();
+      if (!sourceFs.getFileStatus(p).isDir()) {
+        Writer writer = null;
+        try {
+          LOG.info("Processing file : " + p);
+          Path out = new Path(outPath, p.getName() + ".seq");
+          writer = getWriter(out);
+          processFile(p, writer);
+          LOG.info("Written " + writer.getLength() + " bytes to " + out);
+        } finally {
+          if (writer != null)
+            writer.close();
+        }
+      } else {
+        LOG.warn("Skipping dir : " + p);
+      }
+    }
+
+  }
+
+  private final void processFile(Path p, SequenceFile.Writer writer)
+      throws IOException {
+    final FileStatus fileStatus = sourceFs.getFileStatus(p);
+    if (sourceFs.exists(p) && !fileStatus.isDir()) {
+      BufferedReader br = null;
+      try {
+        br = new BufferedReader(new InputStreamReader(sourceFs.open(p)));
+        String line;
+        while ((line = br.readLine()) != null) {
+          final KeyValuePair<VertexWritable, VertexArrayWritable> kv = processLine(line);
+          writer.append(kv.getKey(), kv.getValue());
+        }
+      } finally {
+        if (br != null)
+          br.close();
+      }
+    } else {
+      LOG.error(p + " is a directory or does not exist!");
+    }
+  }
+
+  protected abstract SequenceFile.Writer getWriter(Path outPath)
+      throws IOException;
+
+  protected abstract KeyValuePair<VertexWritable, VertexArrayWritable> processLine(
+      String line) throws IOException;
+
+}

Propchange: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Tue Jan 31 09:19:53 2012
@@ -1,5 +1,7 @@
 package org.apache.hama.examples;
 
+import java.io.BufferedWriter;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -13,22 +15,64 @@ import org.apache.hadoop.io.DoubleWritab
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.util.PagerankTextToSeq;
 import org.apache.hama.graph.VertexArrayWritable;
 import org.apache.hama.graph.VertexWritable;
 
 public class PageRankTest extends TestCase {
+  /**
+   * The graph looks like this (adjacency list, [] contains outlinks):<br/>
+   * stackoverflow.com [yahoo.com] <br/>
+   * google.com []<br/>
+   * facebook.com [twitter.com, google.com, nasa.gov]<br/>
+   * yahoo.com [nasa.gov, stackoverflow.com]<br/>
+   * twitter.com [google.com, facebook.com]<br/>
+   * nasa.gov [yahoo.com, stackoverflow.com]<br/>
+   * youtube.com [google.com, yahoo.com]<br/>
+   */
+  private static final Map<VertexWritable, VertexArrayWritable> tmp = new HashMap<VertexWritable, VertexArrayWritable>();
+  static {
+    // our first entry is null, because our indices in hama 3.0 pre calculated
+    // example starts at 1.
+    // FIXME This is really ugly.
+    String[] pages = new String[] { null, "twitter.com", "google.com",
+        "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
+        "youtube.com" };
+    String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
+        "5;4;6", "6;4", "7;2;4" };
+
+    for (int i = 0; i < lineArray.length; i++) {
 
+      String[] adjacencyStringArray = lineArray[i].split(";");
+      int vertexId = Integer.parseInt(adjacencyStringArray[0]);
+      String name = pages[vertexId];
+      VertexWritable[] arr = new VertexWritable[adjacencyStringArray.length - 1];
+      for (int j = 1; j < adjacencyStringArray.length; j++) {
+        arr[j - 1] = new VertexWritable(
+            pages[Integer.parseInt(adjacencyStringArray[j])]);
+      }
+      VertexArrayWritable wr = new VertexArrayWritable();
+      wr.set(arr);
+      tmp.put(new VertexWritable(name), wr);
+    }
+  }
   private static String INPUT = "/tmp/pagerank-tmp.seq";
+  private static String TEXT_INPUT = "/tmp/pagerank.txt";
+  private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
   private static String OUTPUT = "/tmp/pagerank-out";
-  private Configuration conf;
+  private Configuration conf = new HamaConfiguration();
   private FileSystem fs;
 
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+  }
+
   public void testPageRank() throws IOException, InterruptedException,
       ClassNotFoundException, InstantiationException, IllegalAccessException {
-    conf = new HamaConfiguration();
-    fs = FileSystem.get(conf);
 
-    generateTestData();
+    generateSeqTestData();
     try {
       PageRank.main(new String[] { INPUT, OUTPUT, "0.85", "0.000001" });
       verifyResult();
@@ -37,6 +81,20 @@ public class PageRankTest extends TestCa
     }
   }
 
+  public void testPageRankUtil() throws IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
+    generateTestTextData();
+    // <input path> <output path>
+    PagerankTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
+    try {
+      PageRank.main(new String[] { TEXT_OUTPUT, OUTPUT, "0.85", "0.000001" });
+
+      verifyResult();
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
   private void verifyResult() throws IOException {
     Map<String, Double> rs = new HashMap<String, Double>();
     // our desired results
@@ -62,43 +120,7 @@ public class PageRankTest extends TestCa
     assertEquals(sum, 1.0d);
   }
 
-  /**
-   * The graph looks like this (adjacency list, [] contains outlinks):<br/>
-   * stackoverflow.com [yahoo.com] <br/>
-   * google.com []<br/>
-   * facebook.com [twitter.com, google.com, nasa.gov]<br/>
-   * yahoo.com [nasa.gov, stackoverflow.com]<br/>
-   * twitter.com [google.com, facebook.com]<br/>
-   * nasa.gov [yahoo.com, stackoverflow.com]<br/>
-   * youtube.com [google.com, yahoo.com]<br/>
-   */
-  private void generateTestData() throws IOException {
-    Map<VertexWritable, VertexArrayWritable> tmp = new HashMap<VertexWritable, VertexArrayWritable>();
-
-    // our first entry is null, because our indices in hama 3.0 pre calculated
-    // example starts at 1.
-    // FIXME This is really ugly.
-    String[] pages = new String[] { null, "twitter.com", "google.com",
-        "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
-        "youtube.com" };
-    String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
-        "5;4;6", "6;4", "7;2;4" };
-
-    for (int i = 0; i < lineArray.length; i++) {
-
-      String[] adjacencyStringArray = lineArray[i].split(";");
-      int vertexId = Integer.parseInt(adjacencyStringArray[0]);
-      String name = pages[vertexId];
-      VertexWritable[] arr = new VertexWritable[adjacencyStringArray.length - 1];
-      for (int j = 1; j < adjacencyStringArray.length; j++) {
-        arr[j - 1] = new VertexWritable(
-            pages[Integer.parseInt(adjacencyStringArray[j])]);
-      }
-      VertexArrayWritable wr = new VertexArrayWritable();
-      wr.set(arr);
-      tmp.put(new VertexWritable(name), wr);
-    }
-
+  private void generateSeqTestData() throws IOException {
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
         INPUT), VertexWritable.class, VertexArrayWritable.class);
     for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
@@ -107,12 +129,29 @@ public class PageRankTest extends TestCa
     writer.close();
   }
 
+  private void generateTestTextData() throws IOException {
+    BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
+    for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+      writer.write(e.getKey() + "\t");
+      for (int i = 0; i < e.getValue().get().length; i++) {
+        VertexWritable writable = (VertexWritable) e.getValue().get()[i];
+        writer.write(writable.getName() + "\t");
+      }
+      writer.write("\n");
+    }
+    writer.close();
+  }
+
   private void deleteTempDirs() {
     try {
       if (fs.exists(new Path(INPUT)))
         fs.delete(new Path(INPUT), true);
       if (fs.exists(new Path(OUTPUT)))
         fs.delete(new Path(OUTPUT), true);
+      if (fs.exists(new Path(TEXT_INPUT)))
+        fs.delete(new Path(TEXT_INPUT), true);
+      if (fs.exists(new Path(TEXT_OUTPUT)))
+        fs.delete(new Path(TEXT_OUTPUT), true);
     } catch (IOException e) {
       e.printStackTrace();
     }

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java Tue Jan 31 09:19:53 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.examples;
 
+import java.io.BufferedWriter;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,56 +32,17 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.util.SSSPTextToSeq;
 
 /**
  * Testcase for {@link ShortestPaths}
  */
 
 public class ShortestPathsTest extends TestCase {
-  private static String INPUT = "/tmp/sssp-tmp.seq";
-  private static String OUTPUT = "/tmp/sssp-out";
-  private Configuration conf;
-  private FileSystem fs;
-
-  public void testShortestPaths() throws IOException, InterruptedException,
-      ClassNotFoundException, InstantiationException, IllegalAccessException {
-    conf = new HamaConfiguration();
-    fs = FileSystem.get(conf);
-
-    generateTestData();
-    try {
-      ShortestPaths.main(new String[] { "Frankfurt", INPUT, OUTPUT });
-
-      verifyResult();
-    } finally {
-      deleteTempDirs();
-    }
-  }
-
-  private void verifyResult() throws IOException {
-    Map<String, Integer> rs = new HashMap<String, Integer>();
-    rs.put("Erfurt", 403);
-    rs.put("Mannheim", 85);
-    rs.put("Stuttgart", 503);
-    rs.put("Kassel", 173);
-    rs.put("Nuernberg", 320);
-    rs.put("Augsburg", 415);
-    rs.put("Frankfurt", 0);
-    rs.put("Muenchen", 487);
-    rs.put("Wuerzburg", 217);
-    rs.put("Karlsruhe", 165);
 
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(OUTPUT
-        + "/part-00000"), conf);
-    Text key = new Text();
-    IntWritable value = new IntWritable();
-    while (reader.next(key, value)) {
-      assertEquals(value.get(), (int) rs.get(key.toString()));
-    }
-  }
+  private static final Map<ShortestPathVertex, ShortestPathVertexArrayWritable> testData = new HashMap<ShortestPathVertex, ShortestPathVertexArrayWritable>();
 
-  private void generateTestData() throws IOException {
-    Map<ShortestPathVertex, ShortestPathVertexArrayWritable> tmp = new HashMap<ShortestPathVertex, ShortestPathVertexArrayWritable>();
+  static {
     String[] cities = new String[] { "Frankfurt", "Mannheim", "Wuerzburg",
         "Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg",
         "Muenchen" };
@@ -92,26 +55,26 @@ public class ShortestPathsTest extends T
         textArr[2] = new ShortestPathVertex(217, "Wuerzburg");
         ShortestPathVertexArrayWritable arr = new ShortestPathVertexArrayWritable();
         arr.set(textArr);
-        tmp.put(new ShortestPathVertex(0, city), arr);
+        testData.put(new ShortestPathVertex(0, city), arr);
       } else if (city.equals("Stuttgart")) {
         ShortestPathVertex[] textArr = new ShortestPathVertex[1];
         textArr[0] = new ShortestPathVertex(183, "Nuernberg");
         ShortestPathVertexArrayWritable arr = new ShortestPathVertexArrayWritable();
         arr.set(textArr);
-        tmp.put(new ShortestPathVertex(0, city), arr);
+        testData.put(new ShortestPathVertex(0, city), arr);
       } else if (city.equals("Kassel")) {
         ShortestPathVertex[] textArr = new ShortestPathVertex[2];
         textArr[0] = new ShortestPathVertex(502, "Muenchen");
         textArr[1] = new ShortestPathVertex(173, "Frankfurt");
         ShortestPathVertexArrayWritable arr = new ShortestPathVertexArrayWritable();
         arr.set(textArr);
-        tmp.put(new ShortestPathVertex(0, city), arr);
+        testData.put(new ShortestPathVertex(0, city), arr);
       } else if (city.equals("Erfurt")) {
         ShortestPathVertex[] textArr = new ShortestPathVertex[1];
         textArr[0] = new ShortestPathVertex(186, "Wuerzburg");
         ShortestPathVertexArrayWritable arr = new ShortestPathVertexArrayWritable();
         arr.set(textArr);
-        tmp.put(new ShortestPathVertex(0, city), arr);
+        testData.put(new ShortestPathVertex(0, city), arr);
       } else if (city.equals("Wuerzburg")) {
         ShortestPathVertex[] textArr = new ShortestPathVertex[3];
         textArr[0] = new ShortestPathVertex(217, "Frankfurt");
@@ -119,28 +82,28 @@ public class ShortestPathsTest extends T
         textArr[2] = new ShortestPathVertex(103, "Nuernberg");
         ShortestPathVertexArrayWritable arr = new ShortestPathVertexArrayWritable();
         arr.set(textArr);
-        tmp.put(new ShortestPathVertex(0, city), arr);
+        testData.put(new ShortestPathVertex(0, city), arr);
       } else if (city.equals("Mannheim")) {
         ShortestPathVertex[] textArr = new ShortestPathVertex[2];
         textArr[0] = new ShortestPathVertex(80, "Karlsruhe");
         textArr[1] = new ShortestPathVertex(85, "Frankfurt");
         ShortestPathVertexArrayWritable arr = new ShortestPathVertexArrayWritable();
         arr.set(textArr);
-        tmp.put(new ShortestPathVertex(0, city), arr);
+        testData.put(new ShortestPathVertex(0, city), arr);
       } else if (city.equals("Karlsruhe")) {
         ShortestPathVertex[] textArr = new ShortestPathVertex[2];
         textArr[0] = new ShortestPathVertex(250, "Augsburg");
         textArr[1] = new ShortestPathVertex(80, "Mannheim");
         ShortestPathVertexArrayWritable arr = new ShortestPathVertexArrayWritable();
         arr.set(textArr);
-        tmp.put(new ShortestPathVertex(0, city), arr);
+        testData.put(new ShortestPathVertex(0, city), arr);
       } else if (city.equals("Augsburg")) {
         ShortestPathVertex[] textArr = new ShortestPathVertex[2];
         textArr[0] = new ShortestPathVertex(250, "Karlsruhe");
         textArr[1] = new ShortestPathVertex(84, "Muenchen");
         ShortestPathVertexArrayWritable arr = new ShortestPathVertexArrayWritable();
         arr.set(textArr);
-        tmp.put(new ShortestPathVertex(0, city), arr);
+        testData.put(new ShortestPathVertex(0, city), arr);
       } else if (city.equals("Nuernberg")) {
         ShortestPathVertex[] textArr = new ShortestPathVertex[3];
         textArr[0] = new ShortestPathVertex(183, "Stuttgart");
@@ -148,7 +111,7 @@ public class ShortestPathsTest extends T
         textArr[2] = new ShortestPathVertex(103, "Wuerzburg");
         ShortestPathVertexArrayWritable arr = new ShortestPathVertexArrayWritable();
         arr.set(textArr);
-        tmp.put(new ShortestPathVertex(0, city), arr);
+        testData.put(new ShortestPathVertex(0, city), arr);
       } else if (city.equals("Muenchen")) {
         ShortestPathVertex[] textArr = new ShortestPathVertex[3];
         textArr[0] = new ShortestPathVertex(167, "Nuernberg");
@@ -156,26 +119,109 @@ public class ShortestPathsTest extends T
         textArr[2] = new ShortestPathVertex(84, "Augsburg");
         ShortestPathVertexArrayWritable arr = new ShortestPathVertexArrayWritable();
         arr.set(textArr);
-        tmp.put(new ShortestPathVertex(0, city), arr);
+        testData.put(new ShortestPathVertex(0, city), arr);
       }
     }
+  }
+
+  private static String INPUT = "/tmp/sssp-tmp.seq";
+  private static String TEXT_INPUT = "/tmp/sssp.txt";
+  private static String TEXT_OUTPUT = INPUT + "sssp.txt.seq";
+  private static String OUTPUT = "/tmp/sssp-out";
+  private Configuration conf = new HamaConfiguration();
+  private FileSystem fs;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+  }
+
+  public void testShortestPaths() throws IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
 
+    generateTestSequenceFileData();
+    try {
+      ShortestPaths.main(new String[] { "Frankfurt", INPUT, OUTPUT });
+
+      verifyResult();
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
+  public void testShortestPathsUtil() throws IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
+    generateTestTextData();
+    // <input path> <output path>
+    SSSPTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
+    try {
+      ShortestPaths.main(new String[] { "Frankfurt", TEXT_OUTPUT, OUTPUT });
+
+      verifyResult();
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
+  private void verifyResult() throws IOException {
+    Map<String, Integer> rs = new HashMap<String, Integer>();
+    rs.put("Erfurt", 403);
+    rs.put("Mannheim", 85);
+    rs.put("Stuttgart", 503);
+    rs.put("Kassel", 173);
+    rs.put("Nuernberg", 320);
+    rs.put("Augsburg", 415);
+    rs.put("Frankfurt", 0);
+    rs.put("Muenchen", 487);
+    rs.put("Wuerzburg", 217);
+    rs.put("Karlsruhe", 165);
+
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(OUTPUT
+        + "/part-00000"), conf);
+    Text key = new Text();
+    IntWritable value = new IntWritable();
+    while (reader.next(key, value)) {
+      assertEquals(value.get(), (int) rs.get(key.toString()));
+    }
+  }
+
+  private void generateTestSequenceFileData() throws IOException {
     SequenceFile.Writer writer = SequenceFile
         .createWriter(fs, conf, new Path(INPUT), ShortestPathVertex.class,
             ShortestPathVertexArrayWritable.class);
-    for (Map.Entry<ShortestPathVertex, ShortestPathVertexArrayWritable> e : tmp
+    for (Map.Entry<ShortestPathVertex, ShortestPathVertexArrayWritable> e : testData
         .entrySet()) {
       writer.append(e.getKey(), e.getValue());
     }
     writer.close();
   }
-  
+
+  private void generateTestTextData() throws IOException {
+    BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
+    for (Map.Entry<ShortestPathVertex, ShortestPathVertexArrayWritable> e : testData
+        .entrySet()) {
+      writer.write(e.getKey().getName() + "\t");
+      for (int i = 0; i < e.getValue().get().length; i++) {
+        writer.write(((ShortestPathVertex) e.getValue().get()[i]).getName()
+            + ":" + ((ShortestPathVertex) e.getValue().get()[i]).getWeight()
+            + "\t");
+      }
+      writer.write("\n");
+    }
+    writer.close();
+  }
+
   private void deleteTempDirs() {
     try {
       if (fs.exists(new Path(INPUT)))
         fs.delete(new Path(INPUT), true);
       if (fs.exists(new Path(OUTPUT)))
         fs.delete(new Path(OUTPUT), true);
+      if (fs.exists(new Path(TEXT_INPUT)))
+        fs.delete(new Path(TEXT_INPUT), true);
+      if (fs.exists(new Path(TEXT_OUTPUT)))
+        fs.delete(new Path(TEXT_OUTPUT), true);
     } catch (IOException e) {
       e.printStackTrace();
     }

Added: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java?rev=1238308&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java (added)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java Tue Jan 31 09:19:53 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+
+public class PagerankTextToSeqTest extends TestCase {
+
+  private static final String DELIMITER = ";";
+  private static final String TXT_INPUT_DIR = "/tmp/pageranktext/";
+  private static final String TXT_INPUT = TXT_INPUT_DIR + "in.txt";
+  private static final String SEQ_OUTPUT = "/tmp/pageranktext/";
+  private static final String SEQ_INPUT = SEQ_OUTPUT + "in.txt.seq";
+
+  private Configuration conf = new HamaConfiguration();
+  private FileSystem fs;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+    deleteTempDirs();
+    File dir = new File(TXT_INPUT_DIR);
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+  }
+
+  private void writeTextFile() throws IOException {
+    BufferedWriter writer = new BufferedWriter(new FileWriter(TXT_INPUT));
+    for (int lines = 0; lines < 10; lines++) {
+      for (int cols = 0; cols < 5; cols++) {
+        writer.append(cols + DELIMITER);
+      }
+      writer.append("\n");
+    }
+    writer.close();
+  }
+
+  private void verifyOutput() throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+        new Path(SEQ_INPUT), conf);
+    VertexWritable vertex = new VertexWritable();
+    VertexArrayWritable vertexArray = new VertexArrayWritable();
+
+ 
+    while (reader.next(vertex, vertexArray)) {
+      int count = 0;
+      assertEquals(vertex.getName(), count + "");
+      Writable[] writables = vertexArray.get();
+      assertEquals(writables.length, 4);
+      for (int i = 0; i < 4; i++) {
+        count++;
+        assertEquals(((VertexWritable) writables[i]).getName(), count + "");
+      }
+    }
+    reader.close();
+  }
+
+  public void testArgs() throws Exception {
+    writeTextFile();
+    PagerankTextToSeq.main(new String[] { TXT_INPUT, SEQ_OUTPUT, DELIMITER });
+    verifyOutput();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    deleteTempDirs();
+  }
+
+  private void deleteTempDirs() {
+    try {
+      if (fs.exists(new Path(TXT_INPUT_DIR)))
+        fs.delete(new Path(TXT_INPUT_DIR), true);
+      if (fs.exists(new Path(TXT_INPUT)))
+        fs.delete(new Path(TXT_INPUT), true);
+      if (fs.exists(new Path(SEQ_OUTPUT)))
+        fs.delete(new Path(SEQ_OUTPUT), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+}

Propchange: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java?rev=1238308&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java (added)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java Tue Jan 31 09:19:53 2012
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.ShortestPathVertex;
+import org.apache.hama.examples.ShortestPathVertexArrayWritable;
+
+public class SSSPTextToSeqTest extends TestCase {
+
+  private static final String DELIMITER = ";";
+  private static final String EDGE_DELIMITER = ":";
+  private static final String TXT_INPUT_DIR = "/tmp/sssptest/";
+  private static final String TXT_INPUT = TXT_INPUT_DIR + "in.txt";
+  private static final String SEQ_OUTPUT = "/tmp/sssptest/";
+  private static final String SEQ_INPUT = SEQ_OUTPUT + "in.txt.seq";
+
+  private Configuration conf = new HamaConfiguration();
+  private FileSystem fs;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+    deleteTempDirs();
+    File dir = new File(TXT_INPUT_DIR);
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+  }
+
+  private void writeTextFile() throws IOException {
+    BufferedWriter writer = new BufferedWriter(new FileWriter(TXT_INPUT));
+    for (int lines = 0; lines < 10; lines++) {
+      writer.append(lines + DELIMITER);
+      for (int cols = 0; cols < 5; cols++) {
+        writer.append(cols + EDGE_DELIMITER + lines + "" + DELIMITER);
+      }
+      writer.append("\n");
+    }
+    writer.close();
+  }
+
+  private void verifyOutput() throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+        new Path(SEQ_INPUT), conf);
+    ShortestPathVertex vertex = new ShortestPathVertex();
+    ShortestPathVertexArrayWritable vertexArray = new ShortestPathVertexArrayWritable();
+
+    int lines = 0;
+    while (reader.next(vertex, vertexArray)) {
+      int count = 0;
+      assertEquals(vertex.getName(), lines + "");
+      assertEquals(vertex.getWeight(), 0);
+      Writable[] writables = vertexArray.get();
+      assertEquals(writables.length, 5);
+      for (int i = 0; i < 5; i++) {
+        assertEquals(((ShortestPathVertex) writables[i]).getName(), count + "");
+        assertEquals(((ShortestPathVertex) writables[i]).getWeight(), lines);
+        count++;
+      }
+      lines++;
+    }
+    reader.close();
+  }
+
+  public void testArgs() throws Exception {
+    writeTextFile();
+    SSSPTextToSeq.main(new String[] { TXT_INPUT, SEQ_OUTPUT, DELIMITER, EDGE_DELIMITER });
+    verifyOutput();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    deleteTempDirs();
+  }
+
+  private void deleteTempDirs() {
+    try {
+      if (fs.exists(new Path(TXT_INPUT_DIR)))
+        fs.delete(new Path(TXT_INPUT_DIR), true);
+      if (fs.exists(new Path(TXT_INPUT)))
+        fs.delete(new Path(TXT_INPUT), true);
+      if (fs.exists(new Path(SEQ_OUTPUT)))
+        fs.delete(new Path(SEQ_OUTPUT), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+}

Propchange: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message