Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 25990E0B5 for ; Sun, 13 Jan 2013 20:46:09 +0000 (UTC) Received: (qmail 19909 invoked by uid 500); 13 Jan 2013 20:46:09 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 19855 invoked by uid 500); 13 Jan 2013 20:46:08 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 19843 invoked by uid 99); 13 Jan 2013 20:46:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 13 Jan 2013 20:46:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 13 Jan 2013 20:45:59 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 52EC8238896F; Sun, 13 Jan 2013 20:45:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1432733 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ examples/src/main/java/org/apache/hama/examples/util/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/jav... Date: Sun, 13 Jan 2013 20:45:36 -0000 To: commits@hama.apache.org From: surajsmenon@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130113204537.52EC8238896F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: surajsmenon Date: Sun Jan 13 20:45:35 2013 New Revision: 1432733 URL: http://svn.apache.org/viewvc?rev=1432733&view=rev Log: [HAMA-700] and fixed the unit tests. Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Modified: hama/trunk/CHANGES.txt hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Modified: hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/CHANGES.txt (original) +++ hama/trunk/CHANGES.txt Sun Jan 13 20:45:35 2013 @@ -4,6 +4,7 @@ Release 0.7 (unreleased changes) NEW FEATURES + HAMA-700: BSPPartitioner should be configurable to be optional and allow input format conversion (surajsmenon) HAMA-524: Add SpMV example (Mikalai Parafeniuk via edwardyoon) HAMA-658: Add random symmetric sparse matrix generator (edwardyoon) Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Sun Jan 13 20:45:35 2013 @@ -22,7 +22,6 @@ import java.net.URL; import java.net.URLDecoder; import java.util.Enumeration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Sun Jan 13 20:45:35 2013 @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; @@ -288,7 +289,7 @@ public class BSPJobClient extends Config * @throws IOException */ public RunningJob submitJob(BSPJob job) throws FileNotFoundException, - IOException { + IOException { return submitJobInternal(job, jobSubmitClient.getNewJobId()); } @@ -368,12 +369,14 @@ public class BSPJobClient extends Config return launchJob(jobId, job, submitJobFile, fs); } - protected BSPJob partition(BSPJob job, int maxTasks) throws IOException { - - if(job.get("bsp.partitioning.runner.job") != null){return job;}//Early exit for the partitioner job. - - InputSplit[] splits = job.getInputFormat().getSplits(job, + + if (job.get("bsp.partitioning.runner.job") != null) { + return job; + }// Early exit for the partitioner job. + + InputSplit[] splits = job.getInputFormat().getSplits( + job, (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() : maxTasks); @@ -386,6 +389,17 @@ public class BSPJobClient extends Config if (inputPath != null) { int numSplits = splits.length; int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0); + if (LOG.isDebugEnabled()) { + LOG.debug(" numTasks = " + + numTasks + + " numSplits = " + + numSplits + + " enable = " + + (job.getConfiguration().getBoolean( + Constants.ENABLE_RUNTIME_PARTITIONING, false) + + " class = " + job.getConfiguration().get( + Constants.RUNTIME_PARTITIONING_CLASS))); + } if ((numTasks > 0 && numTasks != numSplits) || (job.getConfiguration().getBoolean( @@ -398,14 +412,18 @@ public class BSPJobClient extends Config fs.delete(partitionDir, true); } - HamaConfiguration conf = new HamaConfiguration(); + HamaConfiguration conf = new HamaConfiguration(job.getConfiguration()); + conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, Integer.parseInt(job.getConfiguration().get("bsp.peers.num"))); if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) { conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration() .get(Constants.RUNTIME_PARTITIONING_DIR)); } - conf.set(Constants.RUNTIME_PARTITIONING_CLASS, job.get(Constants.RUNTIME_PARTITIONING_CLASS)); + if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null) { + conf.set(Constants.RUNTIME_PARTITIONING_CLASS, + job.get(Constants.RUNTIME_PARTITIONING_CLASS)); + } BSPJob partitioningJob = new BSPJob(conf); partitioningJob.setInputPath(new Path(job.getConfiguration().get( Constants.JOB_INPUT_DIR))); @@ -413,8 +431,12 @@ public class BSPJobClient extends Config partitioningJob.setInputKeyClass(job.getInputKeyClass()); partitioningJob.setInputValueClass(job.getInputValueClass()); partitioningJob.setOutputFormat(NullOutputFormat.class); + partitioningJob.setOutputKeyClass(NullWritable.class); + partitioningJob.setOutputValueClass(NullWritable.class); partitioningJob.setBspClass(PartitioningRunner.class); partitioningJob.set("bsp.partitioning.runner.job", "true"); + partitioningJob.getConfiguration().setBoolean( + Constants.ENABLE_RUNTIME_PARTITIONING, false); boolean isPartitioned = false; try { @@ -431,6 +453,7 @@ public class BSPJobClient extends Config } else { job.setInputPath(new Path(inputDir + "/partitions")); } + job.setInputFormat(SequenceFileInputFormat.class); } else { LOG.error("Error partitioning the input path."); throw new IOException("Runtime partition failed for the job."); @@ -440,7 +463,6 @@ public class BSPJobClient extends Config return job; } - protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { int maxTasks; ClusterStatus clusterStatus = getClusterStatus(true); @@ -628,9 +650,9 @@ public class BSPJobClient extends Config if (job.isSuccessful()) { LOG.info("The total number of supersteps: " + info.getSuperstepCount()); info.getStatus() - .getCounter() - .incrCounter(JobInProgress.JobCounter.SUPERSTEPS, - info.getSuperstepCount()); + .getCounter() + .incrCounter(JobInProgress.JobCounter.SUPERSTEPS, + info.getSuperstepCount()); info.getStatus().getCounter().log(LOG); } else { LOG.info("Job failed."); @@ -692,7 +714,7 @@ public class BSPJobClient extends Config } public static void runJob(BSPJob job) throws FileNotFoundException, - IOException { + IOException { BSPJobClient jc = new BSPJobClient(job.getConfiguration()); if (job.getNumBspTask() == 0 Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Sun Jan 13 20:45:35 2013 @@ -36,29 +36,87 @@ import org.apache.hama.util.KeyValuePair public class PartitioningRunner extends BSP { + private Configuration conf; private int desiredNum; private FileSystem fs = null; private Path partitionDir; + private RecordConverter converter; private Map> values = new HashMap>(); @Override public final void setup( BSPPeer peer) throws IOException, SyncException, InterruptedException { + this.conf = peer.getConfiguration(); - this.desiredNum = conf.getInt("desired.num.of.tasks", 1); + this.desiredNum = conf.getInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, 1); + this.fs = FileSystem.get(conf); - Path inputDir = new Path(conf.get("bsp.input.dir")); + Path inputDir = new Path(conf.get(Constants.JOB_INPUT_DIR)); if (fs.isFile(inputDir)) { inputDir = inputDir.getParent(); } - if(conf.get("bsp.partitioning.dir") != null) { - this.partitionDir = new Path(conf.get("bsp.partitioning.dir")); - } else { + converter = ReflectionUtils.newInstance(conf.getClass( + Constants.RUNTIME_PARTITION_RECORDCONVERTER, + DefaultRecordConverter.class, RecordConverter.class), conf); + + if (conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) { this.partitionDir = new Path(inputDir + "/partitions"); + } else { + this.partitionDir = new Path(conf.get(Constants.RUNTIME_PARTITIONING_DIR)); + } + + } + + /** + * This record converter could be used to convert the records from the input + * format type to the sequential record types the BSP Job uses for + * computation. + * + */ + public static interface RecordConverter { + + /** + * Should return the Key-Value pair constructed from the input format. + * + * @param inputRecord The input key-value pair. + * @param conf Configuration of the job. + * @return the Key-Value pair instance of the expected sequential format. + * Should return null if the conversion was not successful. + */ + public KeyValuePair convertRecord( + KeyValuePair inputRecord, Configuration conf); + + public int getPartitionId(KeyValuePair inputRecord, + @SuppressWarnings("rawtypes") + Partitioner partitioner, Configuration conf, + @SuppressWarnings("rawtypes") + BSPPeer peer, int numTasks); + } + + /** + * The default converter does no conversion. + */ + public static class DefaultRecordConverter implements RecordConverter { + + @Override + public KeyValuePair convertRecord( + KeyValuePair inputRecord, Configuration conf) { + return inputRecord; + } + + @SuppressWarnings("unchecked") + @Override + public int getPartitionId(KeyValuePair outputRecord, + @SuppressWarnings("rawtypes") + Partitioner partitioner, Configuration conf, + @SuppressWarnings("rawtypes") + BSPPeer peer, int numTasks) { + return Math.abs(partitioner.getPartition(outputRecord.getKey(), + outputRecord.getValue(), numTasks)); } } @@ -69,22 +127,36 @@ public class PartitioningRunner extends throws IOException, SyncException, InterruptedException { Partitioner partitioner = getPartitioner(); KeyValuePair pair = null; + KeyValuePair outputPair = null; Class keyClass = null; Class valueClass = null; + Class outputKeyClass = null; + Class outputValueClass = null; while ((pair = peer.readNext()) != null) { if (keyClass == null && valueClass == null) { keyClass = pair.getKey().getClass(); valueClass = pair.getValue().getClass(); } - int index = Math.abs(partitioner.getPartition(pair.getKey(), - pair.getValue(), desiredNum)); + outputPair = converter.convertRecord(pair, conf); + + if (outputPair == null) { + continue; + } + + if (outputKeyClass == null && outputValueClass == null) { + outputKeyClass = outputPair.getKey().getClass(); + outputValueClass = outputPair.getValue().getClass(); + } + + int index = converter.getPartitionId(outputPair, partitioner, conf, peer, + desiredNum); if (!values.containsKey(index)) { values.put(index, new HashMap()); } - values.get(index).put(pair.getKey(), pair.getValue()); + values.get(index).put(outputPair.getKey(), outputPair.getValue()); } // The reason of use of Memory is to reduce file opens @@ -92,7 +164,8 @@ public class PartitioningRunner extends Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-" + peer.getPeerIndex()); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, - destFile, keyClass, valueClass, CompressionType.NONE); + destFile, outputKeyClass, outputValueClass, CompressionType.NONE); + for (Map.Entry v : e.getValue().entrySet()) { writer.append(v.getKey(), v.getValue()); } @@ -102,28 +175,31 @@ public class PartitioningRunner extends peer.sync(); // merge files into one. - // TODO if we use header info, we might able to merge files without full scan. + // TODO if we use header info, we might able to merge files without full + // scan. FileStatus[] status = fs.listStatus(partitionDir); for (int j = 0; j < status.length; j++) { int idx = Integer.parseInt(status[j].getPath().getName().split("[-]")[1]); int assignedID = idx / (desiredNum / peer.getNumPeers()); if (assignedID == peer.getNumPeers()) assignedID = assignedID - 1; - - // TODO set replica factor to 1. + + // TODO set replica factor to 1. // TODO and check whether we can write to specific DataNode. if (assignedID == peer.getPeerIndex()) { FileStatus[] files = fs.listStatus(status[j].getPath()); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, - new Path(partitionDir + "/" + getPartitionName(j)), keyClass, - valueClass, CompressionType.NONE); + new Path(partitionDir + "/" + getPartitionName(j)), outputKeyClass, + outputValueClass, CompressionType.NONE); for (int i = 0; i < files.length; i++) { SequenceFile.Reader reader = new SequenceFile.Reader(fs, files[i].getPath(), conf); - Writable key = (Writable) ReflectionUtils.newInstance(keyClass, conf); - Writable value = (Writable) ReflectionUtils.newInstance(valueClass, conf); + Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass, + conf); + Writable value = (Writable) ReflectionUtils.newInstance( + outputValueClass, conf); while (reader.next(key, value)) { writer.append(key, value); @@ -139,9 +215,9 @@ public class PartitioningRunner extends @SuppressWarnings("rawtypes") public Partitioner getPartitioner() { - return ReflectionUtils.newInstance(conf - .getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class, - Partitioner.class), conf); + return ReflectionUtils.newInstance(conf.getClass( + Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class, + Partitioner.class), conf); } private static String getPartitionName(int i) { Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java Sun Jan 13 20:45:35 2013 @@ -17,6 +17,8 @@ */ package org.apache.hama.examples; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -159,6 +161,41 @@ public final class BipartiteMatching { return !getValue().getFirst().equals(UNMATCHED); } + @Override + public void readState(DataInput in) throws IOException { + if (in.readBoolean()) { + reusableMessage = new TextPair(); + reusableMessage.readFields(in); + } + + } + + @Override + public void writeState(DataOutput out) throws IOException { + if (reusableMessage == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + reusableMessage.write(out); + } + + } + + @Override + public Text createVertexIDObject() { + return new Text(); + } + + @Override + public NullWritable createEdgeCostObject() { + return NullWritable.get(); + } + + @Override + public TextPair createVertexValue() { + return new TextPair(); + } + } /** @@ -199,16 +236,9 @@ public final class BipartiteMatching { System.exit(-1); } - public static void main(String... args) throws IOException, - InterruptedException, ClassNotFoundException { - - if (args.length < 2) { - printUsage(); - } - - HamaConfiguration conf = new HamaConfiguration(new Configuration()); + public static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException{ GraphJob job = new GraphJob(conf, BipartiteMatching.class); - + // set the defaults job.setMaxIteration(30); job.setNumBspTask(2); @@ -230,14 +260,26 @@ public final class BipartiteMatching { job.setVertexValueClass(TextPair.class); job.setEdgeValueClass(NullWritable.class); - job.setInputKeyClass(LongWritable.class); - job.setInputValueClass(Text.class); job.setInputFormat(TextInputFormat.class); job.setVertexInputReaderClass(BipartiteMatchingVertexReader.class); job.setPartitioner(HashPartitioner.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TextPair.class); + return job; + } + + + public static void main(String... args) throws IOException, + InterruptedException, ClassNotFoundException { + + if (args.length < 2) { + printUsage(); + } + + HamaConfiguration conf = new HamaConfiguration(new Configuration()); + + GraphJob job = createJob(args, conf); long startTime = System.currentTimeMillis(); if (job.waitForCompletion(true)) { Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Sun Jan 13 20:45:35 2013 @@ -17,6 +17,8 @@ */ package org.apache.hama.examples; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; @@ -94,4 +96,27 @@ public class InlinkCount extends Vertex< + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } } + + @Override + public void readState(DataInput in) throws IOException {} + + @Override + public void writeState(DataOutput out) throws IOException {} + + @Override + public Text createVertexIDObject() { + return new Text(); + } + + @Override + public NullWritable createEdgeCostObject() { + return NullWritable.get(); + } + + @Override + public IntWritable createVertexValue() { + return new IntWritable(); + } + + } Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Sun Jan 13 20:45:35 2013 @@ -17,6 +17,8 @@ */ package org.apache.hama.examples; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; @@ -79,6 +81,29 @@ public class MindistSearch { } } } + + @Override + public void readState(DataInput in) throws IOException {} + + @Override + public void writeState(DataOutput out) throws IOException {} + + @Override + public Text createVertexIDObject() { + return new Text(); + } + + @Override + public NullWritable createEdgeCostObject() { + return NullWritable.get(); + } + + @Override + public Text createVertexValue() { + return new Text(); + } + + } public static class MinTextCombiner extends Combiner { Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Sun Jan 13 20:45:35 2013 @@ -17,29 +17,61 @@ */ package org.apache.hama.examples; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.SequenceFileInputFormat; -import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.bsp.TextOutputFormat; import org.apache.hama.graph.AbstractAggregator; import org.apache.hama.graph.AverageAggregator; +import org.apache.hama.graph.Edge; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; +import org.apache.hama.graph.VertexInputReader; /** * Real pagerank with dangling node contribution. */ public class PageRank { + public static class PagerankTextReader extends + VertexInputReader { + + /** + * The text file essentially should look like:
+ * VERTEX_ID\t(n-tab separated VERTEX_IDs)
+ * E.G:
+ * 1\t2\t3\t4
+ * 2\t3\t1
+ * etc. + */ + @Override + public boolean parseVertex(LongWritable key, Text value, + Vertex vertex) throws Exception { + String[] split = value.toString().split("\t"); + for (int i = 0; i < split.length; i++) { + if (i == 0) { + vertex.setVertexID(new Text(split[i])); + } else { + vertex + .addEdge(new Edge(new Text(split[i]), null)); + } + } + return true; + } + + } + public static class PageRankVertex extends Vertex { @@ -95,6 +127,30 @@ public class PageRank { sendMessageToNeighbors(new DoubleWritable(this.getValue().get() / numEdges)); } + + @Override + public void readState(DataInput in) throws IOException { + } + + @Override + public void writeState(DataOutput out) throws IOException { + } + + @Override + public Text createVertexIDObject() { + return new Text(); + } + + @Override + public NullWritable createEdgeCostObject() { + return NullWritable.get(); + } + + @Override + public DoubleWritable createVertexValue() { + return new DoubleWritable(); + } + } private static void printUsage() { @@ -109,6 +165,7 @@ public class PageRank { HamaConfiguration conf = new HamaConfiguration(new Configuration()); GraphJob pageJob = createJob(args, conf); + pageJob.setVertexInputReaderClass(PagerankTextReader.class); long startTime = System.currentTimeMillis(); if (pageJob.waitForCompletion(true)) { @@ -145,8 +202,6 @@ public class PageRank { pageJob.setEdgeValueClass(NullWritable.class); pageJob.setInputFormat(SequenceFileInputFormat.class); - pageJob.setInputKeyClass(Text.class); - pageJob.setInputValueClass(TextArrayWritable.class); pageJob.setPartitioner(HashPartitioner.class); pageJob.setOutputFormat(TextOutputFormat.class); Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Sun Jan 13 20:45:35 2013 @@ -17,6 +17,8 @@ */ package org.apache.hama.examples; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; @@ -70,6 +72,29 @@ public class SSSP { voteToHalt(); } } + + @Override + public void readState(DataInput in) throws IOException {} + + @Override + public void writeState(DataOutput out) throws IOException {} + + @Override + public Text createVertexIDObject() { + return new Text(); + } + + @Override + public IntWritable createEdgeCostObject() { + return new IntWritable(); + } + + @Override + public IntWritable createVertexValue() { + return new IntWritable(); + } + + } public static class MinIntCombiner extends Combiner { Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java Sun Jan 13 20:45:35 2013 @@ -151,6 +151,8 @@ public class SpMV { * Output is pairs of integer and double */ bsp.setInputFormat(SequenceFileInputFormat.class); + bsp.setInputKeyClass(IntWritable.class); + bsp.setInputValueClass(SparseVectorWritable.class); bsp.setOutputKeyClass(IntWritable.class); bsp.setOutputValueClass(DoubleWritable.class); bsp.setOutputFormat(SequenceFileOutputFormat.class); Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java Sun Jan 13 20:45:35 2013 @@ -28,36 +28,34 @@ import com.google.common.base.Objects; /** * TextPair class for use in BipartiteMatching algorithm. - * + * */ -public final class TextPair implements Writable{ - +public final class TextPair implements Writable { + Text first; Text second; - + String nameFirst = "First"; String nameSecond = "Second"; - - public TextPair(){ - first = new Text(); - second = new Text(); - } - - public TextPair(Text first, Text second){ - this.first = first; + + public TextPair() { + first = new Text(); + second = new Text(); + } + + public TextPair(Text first, Text second) { + this.first = first; this.second = second; } - + /** - * Sets the names of the attributes + * Sets the names of the attributes */ - public TextPair setNames(String nameFirst, String nameSecond){ + public TextPair setNames(String nameFirst, String nameSecond) { this.nameFirst = nameFirst; this.nameSecond = nameSecond; return this; } - - public Text getFirst() { return first; @@ -77,23 +75,29 @@ public final class TextPair implements W @Override public void write(DataOutput out) throws IOException { + (new Text(nameFirst)).write(out); + (new Text(nameSecond)).write(out); first.write(out); second.write(out); } @Override public void readFields(DataInput in) throws IOException { + + Text t1 = new Text(); + Text t2 = new Text(); + t1.readFields(in); + t2.readFields(in); + nameFirst = t1.toString(); + nameSecond = t2.toString(); first.readFields(in); second.readFields(in); } - + @Override - public String toString(){ - return Objects.toStringHelper(this) - .add(nameFirst, getFirst()) - .add(nameSecond, getSecond()) - .toString(); + public String toString() { + return Objects.toStringHelper(this).add(nameFirst, getFirst()) + .add(nameSecond, getSecond()).toString(); } - } Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java?rev=1432733&view=auto ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java (added) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java Sun Jan 13 20:45:35 2013 @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples.util; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.FileOutputFormat; +import org.apache.hama.bsp.NullInputFormat; +import org.apache.hama.bsp.SequenceFileOutputFormat; +import org.apache.hama.bsp.TextArrayWritable; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.examples.CombineExample; +import org.apache.hama.examples.PageRank.PageRankVertex; +import org.apache.hama.graph.Edge; +import org.apache.hama.graph.Vertex; +import org.apache.hama.util.ReflectionUtils; + +public class VertexInputGen { + + public static final String SIZE_OF_MATRIX = "size.of.matrix"; + public static final String DENSITY = "density.of.matrix"; + + public static interface VertexCreator { + @SuppressWarnings("rawtypes") + Vertex createVertex(Text id, Text[] edges, Text value); + } + + public static class PageRankVertexCreatorImpl implements VertexCreator { + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public Vertex createVertex(Text id, Text[] edges, Text value) { + Vertex v = new PageRankVertex(); + v.setVertexID(id); + for (Text t : edges) { + v.addEdge(new Edge(t, null)); + } + return v; + } + + } + + public static int getVertexCaseId(Class classObj) { + if (classObj.getCanonicalName().equals( + PageRankVertexCreatorImpl.class.getCanonicalName())) { + return 1; + } + + return -1; + } + + @SuppressWarnings("rawtypes") + public static class VertexInputGenBSP extends + BSP { + + private Configuration conf; + private int sizeN; + private int density; + private Map> list = new HashMap>(); + private VertexCreator vertexCreator; + + @Override + public void setup( + BSPPeer peer) { + this.conf = peer.getConfiguration(); + sizeN = conf.getInt(SIZE_OF_MATRIX, 10); + density = conf.getInt(DENSITY, 1); + + int vertexCase = conf.getInt("hama.test.vertexcreatorid", -1); + if (vertexCase == 1) { + vertexCreator = new PageRankVertexCreatorImpl(); + } else { + throw new RuntimeException("No vertex creator specified"); + } + + } + + @Override + public void bsp( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + int interval = sizeN / peer.getNumPeers(); + int startID = peer.getPeerIndex() * interval; + int endID; + if (peer.getPeerIndex() == peer.getNumPeers() - 1) + endID = sizeN; + else + endID = startID + interval; + + // Generate N*(N+1) elements for lower triangular + for (int i = startID; i < endID; i++) { + HashSet edges = new HashSet(); + for (int j = 0; j <= i; j++) { + boolean nonZero = new Random().nextInt(density) == 0; + if (nonZero && !edges.contains(j) && i != j) { + edges.add(j); + + // TODO please refactor this. + int peerIndex = j / interval; + if (peerIndex == peer.getNumPeers()) + peerIndex = peerIndex - 1; + + peer.send(peer.getPeerName(j / interval), new Text(j + "," + i)); + } + } + + list.put(i, edges); + } + + // Synchronize the upper and lower + peer.sync(); + Text received; + while ((received = peer.getCurrentMessage()) != null) { + String[] kv = received.toString().split(","); + HashSet nList = list.get(Integer.parseInt(kv[0])); + nList.add(Integer.parseInt(kv[1])); + list.put(Integer.parseInt(kv[0]), nList); + } + } + + @Override + public void cleanup( + BSPPeer peer) + throws IOException { + for (Map.Entry> e : list.entrySet()) { + Text[] values = new Text[e.getValue().size()]; + if (values.length > 0) { + int i = 0; + for (Integer v : e.getValue()) { + values[i] = new Text(String.valueOf(v)); + i++; + } + peer.write( + (Vertex)this.vertexCreator.createVertex( + new Text(String.valueOf(e.getKey())), values, new Text()), + NullWritable.get()); + } + } + } + } + + public static void runJob(HamaConfiguration conf, int numTasks, String output, Class cls) + throws IOException, InterruptedException, ClassNotFoundException { + BSPJob bsp = new BSPJob(conf, VertexInputGen.class); + // Set the job name + bsp.setJobName("Random Vertex Input Generator"); + bsp.setBspClass(VertexInputGenBSP.class); + bsp.setInputFormat(NullInputFormat.class); + bsp.setOutputKeyClass(cls); + bsp.setOutputValueClass(NullWritable.class); + bsp.setOutputFormat(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(bsp, new Path(output)); + bsp.setNumBspTask(numTasks); + + long startTime = System.currentTimeMillis(); + if (bsp.waitForCompletion(true)) { + System.out.println("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); + } + } + + public static void main(String[] args) throws InterruptedException, + IOException, ClassNotFoundException { + if (args.length < 4) { + System.out + .println("Usage: <1/x density> "); + System.exit(1); + } + + // BSP job configuration + HamaConfiguration conf = new HamaConfiguration(); + + conf.setInt(SIZE_OF_MATRIX, Integer.parseInt(args[0])); + conf.setInt(DENSITY, Integer.parseInt(args[1])); + runJob(conf, Integer.parseInt(args[3]), args[2], Vertex.class); + + } + +} Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java (original) +++ hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java Sun Jan 13 20:45:35 2013 @@ -26,6 +26,8 @@ import java.io.PrintWriter; import java.util.HashMap; import java.util.Map; +import junit.framework.TestCase; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -33,30 +35,36 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.Partitioner; +import org.apache.hama.examples.util.TextPair; +import org.apache.hama.graph.GraphJob; import org.junit.Test; -import junit.framework.TestCase; +public class BipartiteMatchingTest extends TestCase { -public class BipartiteMatchingTest extends TestCase{ - - private String[] input = { - "A L:B D", - "B R:A C", - "C L:B D", - "D R:A C" - }; + private String[] input = { "A L:B D", "B R:A C", "C L:B D", "D R:A C" }; private final static String DELIMETER = "\t"; @SuppressWarnings("serial") - private Map output1 = new HashMap() - {{ - put("C", "TextPair{MatchVertex=D, Component=L}"); - put("A", "TextPair{MatchVertex=B, Component=L}"); - put("D", "TextPair{MatchVertex=C, Component=R}"); - put("B", "TextPair{MatchVertex=A, Component=R}"); - }}; + private Map output1 = new HashMap() { + { + put("C", "TextPair{MatchVertex=D, Component=L}"); + put("A", "TextPair{MatchVertex=B, Component=L}"); + put("D", "TextPair{MatchVertex=C, Component=R}"); + put("B", "TextPair{MatchVertex=A, Component=R}"); + } + }; + public static class CustomTextPartitioner implements + Partitioner { + + @Override + public int getPartition(Text key, TextPair value, int numTasks) { + return Character.getNumericValue(key.toString().charAt(0)) % numTasks; + } + + } private static String INPUT = "/tmp/graph.txt"; private static String OUTPUT = "/tmp/graph-bipartite"; @@ -70,57 +78,63 @@ public class BipartiteMatchingTest exten fs = FileSystem.get(conf); } - private void generateTestData(){ + private void generateTestData() { FileWriter fout = null; BufferedWriter bout = null; PrintWriter pout = null; - try{ + try { fout = new FileWriter(INPUT); bout = new BufferedWriter(fout); pout = new PrintWriter(bout); - for(String line:input){ + for (String line : input) { pout.println(line); } - } - catch(IOException e){ + } catch (IOException e) { e.printStackTrace(); - } - finally{ + } finally { try { - if(pout!=null){pout.close();} - if(bout!=null){bout.close();} - if(fout!=null){fout.close();} + if (pout != null) { + pout.close(); + } + if (bout != null) { + bout.close(); + } + if (fout != null) { + fout.close(); + } } catch (IOException e) { e.printStackTrace(); - } + } } } - - private void verifyResult()throws IOException{ + private void verifyResult() throws IOException { FileStatus[] files = fs.globStatus(new Path(OUTPUT + "/part-*")); + assertTrue(files.length == 2); + Text key = new Text(); Text value = new Text(); - for(FileStatus file:files){ - if(file.getLen() > 0){ - FSDataInputStream in = fs.open(file.getPath()); - BufferedReader bin = new BufferedReader( - new InputStreamReader(in)); + + for (FileStatus file : files) { + if (file.getLen() > 0) { + FSDataInputStream in = fs.open(file.getPath()); + BufferedReader bin = new BufferedReader(new InputStreamReader(in)); String s = bin.readLine(); - while(s!=null){ + while (s != null) { next(key, value, s); String expValue = output1.get(key.toString()); + System.out.println(key + " " + value + " expvalue = " + expValue); assertEquals(expValue, value.toString()); - System.out.println(key + " "+value); + s = bin.readLine(); - } + } in.close(); } } } - private static void next(Text key, Text value, String line){ + private static void next(Text key, Text value, String line) { String[] lineA = line.split(DELIMETER); key.set(lineA[0]); value.set(lineA[1]); @@ -139,17 +153,24 @@ public class BipartiteMatchingTest exten @Test public void testBipartiteMatching() throws IOException, InterruptedException, - ClassNotFoundException{ + ClassNotFoundException { generateTestData(); try { String seed = "2"; - BipartiteMatching.main(new String[] { INPUT, OUTPUT, "30", "2", - seed}); + HamaConfiguration conf = new HamaConfiguration(); + GraphJob job = BipartiteMatching.createJob(new String[] { INPUT, OUTPUT, + "30", "2", seed }, conf); + job.setPartitioner(CustomTextPartitioner.class); + + long startTime = System.currentTimeMillis(); + if (job.waitForCompletion(true)) { + System.out.println("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); + } + verifyResult(); } finally { deleteTempDirs(); } } - - } Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original) +++ hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Sun Jan 13 20:45:35 2013 @@ -29,12 +29,13 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.examples.MindistSearch.MinTextCombiner; +import org.apache.hama.examples.MindistSearch.MindistSearchVertex; +import org.apache.hama.graph.Edge; public class MindistSearchTest extends TestCase { @@ -98,18 +99,18 @@ public class MindistSearchTest extends T private void generateTestData() { try { SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, - new Path(INPUT), Text.class, TextArrayWritable.class); + new Path(INPUT), MindistSearchVertex.class, NullWritable.class); for (int i = 0; i < input.length; i++) { String[] x = input[i].split("\t"); Text key = new Text(x[0]); - Writable[] values = new Writable[x.length - 1]; + MindistSearchVertex vertex = new MindistSearchVertex(); + vertex.setVertexID(key); for (int j = 1; j < x.length; j++) { - values[j - 1] = new Text(x[j]); + vertex.addEdge(new Edge(new Text(x[j]), + NullWritable.get())); } - TextArrayWritable value = new TextArrayWritable(); - value.set(values); - writer.append(key, value); + writer.append(vertex, NullWritable.get()); } writer.close(); Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original) +++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Sun Jan 13 20:45:35 2013 @@ -28,14 +28,17 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hama.HamaConfiguration; -import org.apache.hama.examples.util.SymmetricMatrixGen; +import org.apache.hama.examples.PageRank.PageRankVertex; +import org.apache.hama.examples.util.VertexInputGen; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.GraphJobRunner; public class PageRankTest extends TestCase { + private static String INPUT = "/tmp/pagerank/pagerank-tmp.seq"; private static String TEXT_INPUT = "/tmp/pagerank/pagerank.txt"; private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq"; + private static String OUTPUT = "/tmp/pagerank/pagerank-out"; private Configuration conf = new HamaConfiguration(); private FileSystem fs; @@ -70,8 +73,8 @@ public class PageRankTest extends TestCa conf.set("bsp.local.tasks.maximum", "10"); conf.set("bsp.peers.num", "7"); conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true); - GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT, "7" }, - conf); + GraphJob pageJob = PageRank.createJob( + new String[] { INPUT, OUTPUT, "7" }, conf); if (!pageJob.waitForCompletion(true)) { fail("Job did not complete normally!"); @@ -84,7 +87,11 @@ public class PageRankTest extends TestCa private void generateTestData() throws ClassNotFoundException, InterruptedException, IOException { - SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "3" }); + HamaConfiguration conf = new HamaConfiguration(); + conf.setInt(VertexInputGen.SIZE_OF_MATRIX, 40); + conf.setInt(VertexInputGen.DENSITY, 10); + conf.setInt("hama.test.vertexcreatorid", 1); + VertexInputGen.runJob(conf, 3, INPUT, PageRankVertex.class); } private void deleteTempDirs() { Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Sun Jan 13 20:45:35 2013 @@ -24,7 +24,6 @@ import org.apache.hadoop.io.Writable; * The edge class */ public final class Edge { - private final VERTEX_ID destinationVertexID; private final EDGE_VALUE_TYPE cost; Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Sun Jan 13 20:45:35 2013 @@ -20,13 +20,16 @@ package org.apache.hama.graph; import java.io.IOException; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSPJob; import org.apache.hama.bsp.Combiner; import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.Partitioner; +import org.apache.hama.bsp.PartitioningRunner.RecordConverter; import com.google.common.base.Preconditions; @@ -39,9 +42,6 @@ public class GraphJob extends BSPJob { public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class"; public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class"; - public final static String VERTEX_GRAPH_RUNTIME_PARTIONING = "hama.graph.runtime.partitioning"; - public final static String VERTEX_GRAPH_INPUT_READER = "hama.graph.input.reader.class"; - /** * Creates a new Graph Job with the given configuration and an exampleClass. * The exampleClass is used to determine the user's jar to distribute in the @@ -67,6 +67,8 @@ public class GraphJob extends BSPJob { Class> cls) throws IllegalStateException { conf.setClass(VERTEX_CLASS_ATTR, cls, Vertex.class); + setInputKeyClass(cls); + setInputValueClass(NullWritable.class); } /** @@ -119,7 +121,9 @@ public class GraphJob extends BSPJob { public void setVertexInputReaderClass( Class> cls) { ensureState(JobState.DEFINE); - conf.setClass(VERTEX_GRAPH_INPUT_READER, cls, VertexInputReader.class); + conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls, + RecordConverter.class); + conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true); } @SuppressWarnings("unchecked") @@ -132,7 +136,7 @@ public class GraphJob extends BSPJob { public void setPartitioner(@SuppressWarnings("rawtypes") Class theClass) { super.setPartitioner(theClass); - conf.setBoolean(VERTEX_GRAPH_RUNTIME_PARTIONING, true); + conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true); } @Override Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Sun Jan 13 20:45:35 2013 @@ -28,7 +28,6 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -66,13 +65,13 @@ public final class GraphJobRunner combiner; private Partitioner partitioner; - private List> vertices = new ArrayList>(); - + private VerticesInfo vertices; private boolean updated = true; private int globalUpdateCounts = 0; @@ -264,10 +263,12 @@ public final class GraphJobRunner(); aggregationRunner.setupAggregators(peer); + + vertices = new VerticesInfo(); } /** - * Loads vertices into memory of each peer. TODO this needs to be simplified. + * Loads vertices into memory of each peer. */ @SuppressWarnings("unchecked") private void loadVertices( @@ -277,41 +278,22 @@ public final class GraphJobRunner vertex = newVertexInstance(vertexClass, conf); - vertex.runner = this; + if (LOG.isDebugEnabled()) + LOG.debug("Vertex class: " + vertexClass); KeyValuePair next = null; while ((next = peer.readNext()) != null) { - V key = (V) next.getKey(); - Writable[] edges = ((ArrayWritable) next.getValue()).get(); - vertex.setVertexID(key); - List> edgeList = new ArrayList>(); - for (Writable edge : edges) { - edgeList.add(new Edge((V) edge, null)); - } - vertex.setEdges(edgeList); - - if (vertex.getEdges() == null) { - if (selfReference) { - vertex.setEdges(Collections.singletonList(new Edge(vertex - .getVertexID(), null))); - } else { - vertex.setEdges(Collections.EMPTY_LIST); - } - } - + Vertex vertex = (Vertex) next.getKey(); + vertex.runner = this; + vertex.setup(conf); + vertices.addVertex(vertex); if (selfReference) { vertex.addEdge(new Edge(vertex.getVertexID(), null)); } - - vertex.setup(conf); - vertices.add(vertex); - vertex = newVertexInstance(vertexClass, conf); - vertex.runner = this; } - LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps."); + if (LOG.isDebugEnabled()) + LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps."); /* * If the user want to repair the graph, it should traverse through that @@ -321,18 +303,20 @@ public final class GraphJobRunner peer, - boolean selfReference) throws IOException, - SyncException, InterruptedException { + boolean selfReference) throws IOException, SyncException, + InterruptedException { Map> tmp = new HashMap>(); @@ -368,7 +352,9 @@ public final class GraphJobRunner v : tmp.values()) { + vertices.addVertex(v); + } tmp.clear(); } @@ -533,4 +519,3 @@ public final class GraphJobRunner Vertex ID object type + * @param Edge cost object type + * @param Vertex value object type + */ public abstract class Vertex - implements VertexInterface { + implements VertexInterface, Writable { GraphJobRunner runner; @@ -74,7 +92,7 @@ public abstract class Vertex> outEdges = this.getEdges(); @@ -103,7 +121,7 @@ public abstract class Vertex edge) { if (edges == null) { - this.edges = new ArrayList>(1); + this.edges = new LinkedList>(); } this.edges.add(edge); } @@ -195,10 +213,7 @@ public abstract class Vertex>(); + if (in.readBoolean()) { + int num = in.readInt(); + if (num > 0) { + for (int i = 0; i < num; ++i) { + V vertex = createVertexIDObject(); + vertex.readFields(in); + E edgeCost = null; + if (in.readBoolean()) { + edgeCost = this.createEdgeCostObject(); + edgeCost.readFields(in); + } + Edge edge = new Edge(vertex, edgeCost); + this.edges.add(edge); + } + + } + } + votedToHalt = in.readBoolean(); + readState(in); + } + + @Override + public void write(DataOutput out) throws IOException { + if (vertexID == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + vertexID.write(out); + } + if (value == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + value.write(out); + } + if (this.edges == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeInt(this.edges.size()); + for (Edge edge : this.edges) { + edge.getDestinationVertexID().write(out); + if (edge.getValue() == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + edge.getValue().write(out); + } + } + } + out.writeBoolean(votedToHalt); + writeState(out); + + } + + /** + * Create the vertex id object. This function is used by the framework to + * construct the vertex id object. + * + * @return instance of V + */ + public abstract V createVertexIDObject(); + + /** + * Create the Edge cost object. This function is used by the framework to + * construct the edge cost object + * + * @return instance of E + */ + public abstract E createEdgeCostObject(); + + /** + * Create the vertex value object. This function is used by the framework to + * construct the vertex value object. + * + * @return + */ + public abstract M createVertexValue(); + + /** + * Read the state of the vertex from the input stream. The framework would + * have already constructed and loaded the vertex-id, edges and voteToHalt + * state. This function is essential if there is any more properties of vertex + * to be read from. + * + * @param in + * @throws IOException + */ + public abstract void readState(DataInput in) throws IOException; + + /** + * Writes the state of vertex to the output stream. The framework writes the + * vertex and edge information to the output stream. This function could be + * used to save the state variable of the vertex added in the implementation + * of object. + * + * @param out + * @throws IOException + */ + public abstract void writeState(DataOutput out) throws IOException; + } Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java Sun Jan 13 20:45:35 2013 @@ -17,12 +17,25 @@ */ package org.apache.hama.graph; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.Partitioner; +import org.apache.hama.bsp.PartitioningRunner.RecordConverter; +import org.apache.hama.util.KeyValuePair; /** * A reader to read Hama's input files and parses a vertex out of it. */ -public abstract class VertexInputReader { +public abstract class VertexInputReader + implements RecordConverter { + + private static final Log LOG = LogFactory.getLog(VertexInputReader.class); + + private KeyValuePair outputRecord = new KeyValuePair(); /** * Parses a given key and value into the given vertex. If returned true, the @@ -32,4 +45,40 @@ public abstract class VertexInputReader< public abstract boolean parseVertex(KEYIN key, VALUEIN value, Vertex vertex) throws Exception; + @SuppressWarnings("unchecked") + @Override + public KeyValuePair convertRecord( + KeyValuePair inputRecord, Configuration conf) { + Class> vertexClass = (Class>) conf + .getClass(GraphJob.VERTEX_CLASS_ATTR, Vertex.class); + boolean vertexCreation = true; + Vertex vertex = GraphJobRunner + .newVertexInstance(vertexClass, conf); + try { + vertexCreation = parseVertex((KEYIN) inputRecord.getKey(), + (VALUEIN) inputRecord.getValue(), vertex); + } catch (Exception e) { + LOG.error("Error parsing vertex.", e); + vertexCreation = false; + } + if (!vertexCreation) { + return null; + } + outputRecord.setKey(vertex); + outputRecord.setValue(NullWritable.get()); + return outputRecord; + } + + @SuppressWarnings("unchecked") + @Override + public int getPartitionId(KeyValuePair inputRecord, + @SuppressWarnings("rawtypes") + Partitioner partitioner, Configuration conf, + @SuppressWarnings("rawtypes") + BSPPeer peer, int numTasks) { + Vertex vertex = (Vertex) outputRecord.getKey(); + return Math.abs(partitioner.getPartition(vertex.getVertexID(), + vertex.getValue(), numTasks)); + } + } Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1432733&view=auto ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (added) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Sun Jan 13 20:45:35 2013 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.io.Writable; + +/** + * VerticesInfo encapsulates the storage of vertices in a BSP Task. + * + * @param Vertex ID object type + * @param Edge cost object type + * @param Vertex value object type + */ +public class VerticesInfo + implements Iterable> { + + private List> vertices = new ArrayList>(100); + + public void addVertex(Vertex vertex) { + int i = 0; + for (Vertex check : this) { + if (check.getVertexID().equals(vertex.getVertexID())) { + this.vertices.set(i, vertex); + return; + } + ++i; + } + vertices.add(vertex); + } + + public Vertex getVertex(V vertexId) { + for (Vertex vertex : this) { + if (vertex.getVertexID().equals(vertexId)) { + return vertex; + } + } + return null; + } + + public boolean containsVertex(V vertexId) { + for (Vertex vertex : this) { + if (vertex.getVertexID().equals(vertexId)) { + return true; + } + } + return false; + } + + public void clear() { + vertices.clear(); + } + + public int size() { + return this.vertices.size(); + } + + @Override + public Iterator> iterator() { + return vertices.iterator(); + } + + public void recoverState(DataInput in) { + + } + + public void saveState(DataOutput out) { + + } +} Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Sun Jan 13 20:45:35 2013 @@ -25,7 +25,6 @@ import org.apache.hadoop.io.DoubleWritab import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hama.Constants; import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.ClusterStatus; @@ -33,8 +32,8 @@ import org.apache.hama.bsp.HashPartition import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; import org.apache.hama.bsp.TestBSPMasterGroomServer; -import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.graph.example.PageRank; +import org.apache.hama.graph.example.PageRank.PageRankVertex; public class TestSubmitGraphJob extends TestBSPMasterGroomServer { @@ -57,8 +56,10 @@ public class TestSubmitGraphJob extends // Set multi-step partitioning interval to 30 bytes configuration.setInt("hama.graph.multi.step.partitioning.interval", 30); + configuration.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, false); + GraphJob bsp = new GraphJob(configuration, PageRank.class); - bsp.setInputPath(new Path("/tmp/pagerank")); + bsp.setInputPath(new Path("/tmp/pagerank/real-tmp.seq")); bsp.setOutputPath(new Path(OUTPUT)); BSPJobClient jobClient = new BSPJobClient(configuration); configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000); @@ -75,10 +76,7 @@ public class TestSubmitGraphJob extends bsp.setAggregatorClass(AverageAggregator.class, PageRank.DanglingNodeAggregator.class); - bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class); bsp.setInputFormat(SequenceFileInputFormat.class); - bsp.setInputKeyClass(Text.class); - bsp.setInputValueClass(TextArrayWritable.class); bsp.setVertexIDClass(Text.class); bsp.setVertexValueClass(DoubleWritable.class); @@ -124,18 +122,18 @@ public class TestSubmitGraphJob extends private void generateTestData() { try { SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), - new Path(INPUT), Text.class, TextArrayWritable.class); + new Path(INPUT), PageRankVertex.class, NullWritable.class); for (int i = 0; i < input.length; i++) { String[] x = input[i].split("\t"); - Text key = new Text(x[0]); - Writable[] values = new Writable[x.length - 1]; + + PageRankVertex vertex = new PageRankVertex(); + vertex.setVertexID(new Text(x[0])); for (int j = 1; j < x.length; j++) { - values[j - 1] = new Text(x[j]); + vertex.addEdge(new Edge(new Text(x[j]), + NullWritable.get())); } - TextArrayWritable value = new TextArrayWritable(); - value.set(values); - writer.append(key, value); + writer.append(vertex, NullWritable.get()); } writer.close(); Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1432733&r1=1432732&r2=1432733&view=diff ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Sun Jan 13 20:45:35 2013 @@ -17,6 +17,8 @@ */ package org.apache.hama.graph.example; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; @@ -25,6 +27,7 @@ import org.apache.hadoop.io.DoubleWritab import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hama.bsp.PartitioningRunner.RecordConverter; import org.apache.hama.graph.AbstractAggregator; import org.apache.hama.graph.Edge; import org.apache.hama.graph.Vertex; @@ -87,10 +90,33 @@ public class PageRank { sendMessageToNeighbors(new DoubleWritable(this.getValue().get() / numEdges)); } + + @Override + public Text createVertexIDObject() { + return new Text(); + } + + @Override + public NullWritable createEdgeCostObject() { + return NullWritable.get(); + } + + @Override + public DoubleWritable createVertexValue() { + return new DoubleWritable(); + } + + @Override + public void readState(DataInput in) throws IOException {} + + @Override + public void writeState(DataOutput out) throws IOException {} + } public static class PagerankTextReader extends - VertexInputReader { + VertexInputReader + implements RecordConverter { /** * The text file essentially should look like: