Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8C09DDEC3 for ; Wed, 12 Dec 2012 05:33:32 +0000 (UTC) Received: (qmail 2102 invoked by uid 500); 12 Dec 2012 05:33:32 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 1913 invoked by uid 500); 12 Dec 2012 05:33:24 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 1433 invoked by uid 99); 12 Dec 2012 05:33:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Dec 2012 05:33:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Dec 2012 05:33:18 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 421112388906; Wed, 12 Dec 2012 05:32:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1420530 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apach... Date: Wed, 12 Dec 2012 05:32:55 -0000 To: commits@hama.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121212053258.421112388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: edwardyoon Date: Wed Dec 12 05:32:53 2012 New Revision: 1420530 URL: http://svn.apache.org/viewvc?rev=1420530&view=rev Log: Reimplementation of partitioner Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Modified: hama/trunk/CHANGES.txt hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Modified: hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/CHANGES.txt (original) +++ hama/trunk/CHANGES.txt Wed Dec 12 05:32:53 2012 @@ -10,6 +10,8 @@ Release 0.7 (unreleased changes) IMPROVEMENTS + HAMA-531: Reimplementation of partitioner (edwardyoon) + Release 0.6 - November 28, 2012 NEW FEATURES Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Wed Dec 12 05:32:53 2012 @@ -22,6 +22,7 @@ import java.net.URL; import java.net.URLDecoder; import java.util.Enumeration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -219,8 +220,41 @@ public class BSPJob extends BSPJobContex state = JobState.RUNNING; } + boolean isPartitioned = false; + public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { + if (this.getConfiguration().get("bsp.input.partitioner.class") != null + && !isPartitioned) { + FileSystem fs = FileSystem.get(conf); + Path inputDir = new Path(conf.get("bsp.input.dir")); + if (fs.isFile(inputDir)) { + inputDir = inputDir.getParent(); + } + Path partitionDir = new Path(inputDir + "/partitions"); + + if (fs.exists(partitionDir)) { + fs.delete(partitionDir, true); + } + + HamaConfiguration conf = new HamaConfiguration(); + conf.setInt("desired.num.of.tasks", + Integer.parseInt(this.getConfiguration().get("bsp.peers.num"))); + BSPJob partitioningJob = new BSPJob(conf); + partitioningJob.setInputPath(new Path(this.getConfiguration().get( + "bsp.input.dir"))); + partitioningJob.setInputFormat(this.getInputFormat().getClass()); + partitioningJob.setInputKeyClass(this.getInputKeyClass()); + partitioningJob.setInputValueClass(getInputValueClass()); + partitioningJob.setOutputFormat(NullOutputFormat.class); + partitioningJob.setBspClass(PartitioningRunner.class); + + isPartitioned = partitioningJob.waitForCompletion(true); + if (isPartitioned) { + this.setInputPath(new Path(inputDir + "/partitions")); + } + } + if (state == JobState.DEFINE) { submit(); } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Wed Dec 12 05:32:53 2012 @@ -30,9 +30,7 @@ import java.io.OutputStreamWriter; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.StringTokenizer; @@ -55,7 +53,6 @@ import org.apache.hadoop.io.WritableUtil import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -301,9 +298,10 @@ public class BSPJobClient extends Config throws IOException { BSPJob job = pJob; job.setJobID(jobId); - int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, 0); + int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, + 0); int maxTasks = checkTaskLimits(job, limitTasks); - + Path submitJobDir = new Path(getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36)); Path submitSplitFile = new Path(submitJobDir, "job.split"); @@ -325,12 +323,6 @@ public class BSPJobClient extends Config if (job.get("bsp.input.dir") != null) { // Create the splits for the job LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); - if (job.getConfiguration().get("bsp.input.partitioner.class") != null - && !job.getConfiguration() - .getBoolean("hama.graph.runtime.partitioning", false)) { - job = partition(job, maxTasks); - maxTasks = job.getInt("hama.partition.count", maxTasks); - } job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks)); job.set("bsp.job.split.file", submitSplitFile.toString()); } @@ -375,15 +367,16 @@ public class BSPJobClient extends Config protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { int maxTasks; ClusterStatus clusterStatus = getClusterStatus(true); - - if(limitTasks > 0) { + + if (limitTasks > 0) { maxTasks = limitTasks; } else { maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks(); } - + if (maxTasks < job.getNumBspTask()) { - throw new IOException("Job failed! The number of tasks has exceeded the maximum allowed."); + throw new IOException( + "Job failed! The number of tasks has exceeded the maximum allowed."); } return maxTasks; } @@ -402,97 +395,10 @@ public class BSPJobClient extends Config } } - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected BSPJob partition(BSPJob job, int maxTasks) throws IOException { - InputSplit[] splits = job.getInputFormat().getSplits( - job, - (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() - : maxTasks); - - String input = job.getConfiguration().get("bsp.input.dir"); - - if (input != null) { - InputFormat inputFormat = job.getInputFormat(); - - Path partitionedPath = new Path(input, "hama-partitions"); - Path inputPath = new Path(input); - if (fs.isFile(inputPath)) { - partitionedPath = new Path(inputPath.getParent(), "hama-partitions"); - } - - String alternatePart = job.get("bsp.partitioning.dir"); - if (alternatePart != null) { - partitionedPath = new Path(alternatePart, job.getJobID().toString()); - } - - if (fs.exists(partitionedPath)) { - fs.delete(partitionedPath, true); - } else { - fs.mkdirs(partitionedPath); - } - // FIXME this is soo unsafe - RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job); - - List writers = new ArrayList( - splits.length); - - CompressionType compressionType = getOutputCompressionType(job); - Class outputCompressorClass = getOutputCompressorClass( - job, null); - CompressionCodec codec = null; - if (outputCompressorClass != null) { - codec = ReflectionUtils.newInstance(outputCompressorClass, - job.getConfiguration()); - } - - try { - for (int i = 0; i < splits.length; i++) { - Path p = new Path(partitionedPath, getPartitionName(i)); - if (codec == null) { - writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p, - sampleReader.createKey().getClass(), sampleReader.createValue() - .getClass(), CompressionType.NONE)); - } else { - writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p, - sampleReader.createKey().getClass(), sampleReader.createValue() - .getClass(), compressionType, codec)); - } - } - - Partitioner partitioner = job.getPartitioner(); - for (int i = 0; i < splits.length; i++) { - InputSplit split = splits[i]; - RecordReader recordReader = inputFormat.getRecordReader(split, job); - Object key = recordReader.createKey(); - Object value = recordReader.createValue(); - while (recordReader.next(key, value)) { - int index = Math.abs(partitioner.getPartition(key, value, - splits.length)); - writers.get(index).append(key, value); - } - LOG.debug("Done with split " + i); - } - } finally { - for (SequenceFile.Writer wr : writers) { - wr.close(); - } - } - job.set("hama.partition.count", writers.size() + ""); - job.setInputFormat(SequenceFileInputFormat.class); - job.setInputPath(partitionedPath); - } - - return job; - } - private static boolean isProperSize(int numBspTask, int maxTasks) { return (numBspTask > 1 && numBspTask < maxTasks); } - private static String getPartitionName(int i) { - return "part-" + String.valueOf(100000 + i).substring(1, 6); - } - /** * Get the {@link CompressionType} for the output {@link SequenceFile}. * Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1420530&view=auto ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (added) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Wed Dec 12 05:32:53 2012 @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.util.KeyValuePair; + +public class PartitioningRunner extends + BSP { + private Configuration conf; + private int desiredNum; + private FileSystem fs = null; + private Path partitionDir; + private Map> values = new HashMap>(); + + @Override + public final void setup( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + this.conf = peer.getConfiguration(); + this.desiredNum = conf.getInt("desired.num.of.tasks", 1); + this.fs = FileSystem.get(conf); + + Path inputDir = new Path(conf.get("bsp.input.dir")); + if (fs.isFile(inputDir)) { + inputDir = inputDir.getParent(); + } + + this.partitionDir = new Path(inputDir + "/partitions"); + } + + @Override + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void bsp( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + Partitioner partitioner = getPartitioner(); + KeyValuePair pair = null; + + Class keyClass = null; + Class valueClass = null; + while ((pair = peer.readNext()) != null) { + if (keyClass == null && valueClass == null) { + keyClass = pair.getKey().getClass(); + valueClass = pair.getValue().getClass(); + } + + int index = Math.abs(partitioner.getPartition(pair.getKey(), + pair.getValue(), desiredNum)); + + if (!values.containsKey(index)) { + values.put(index, new HashMap()); + } + values.get(index).put(pair.getKey(), pair.getValue()); + } + + for (Map.Entry> e : values.entrySet()) { + Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-" + + peer.getPeerIndex()); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, + destFile, keyClass, valueClass, CompressionType.NONE); + for (Map.Entry v : e.getValue().entrySet()) { + writer.append(v.getKey(), v.getValue()); + } + writer.close(); + } + + peer.sync(); + + // merge files into one. + FileStatus[] status = fs.listStatus(partitionDir); + for (int j = 0; j < status.length; j++) { + int idx = Integer.parseInt(status[j].getPath().getName().split("[-]")[1]); + int assignedID = idx / (desiredNum / peer.getNumPeers()); + if (assignedID == peer.getNumPeers()) + assignedID = assignedID - 1; + + if (assignedID == peer.getPeerIndex()) { + FileStatus[] files = fs.listStatus(status[j].getPath()); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, + new Path(partitionDir + "/" + getPartitionName(j)), keyClass, + valueClass, CompressionType.NONE); + + for (int i = 0; i < files.length; i++) { + SequenceFile.Reader reader = new SequenceFile.Reader(fs, + files[i].getPath(), conf); + + Writable key = (Writable) ReflectionUtils.newInstance(keyClass, conf); + Writable value = (Writable) ReflectionUtils.newInstance(valueClass, conf); + + while (reader.next(key, value)) { + writer.append(key, value); + } + reader.close(); + } + + writer.close(); + fs.delete(status[j].getPath(), true); + } + } + } + + @SuppressWarnings("rawtypes") + public Partitioner getPartitioner() { + return ReflectionUtils.newInstance(conf + .getClass("bsp.input.partitioner.class", HashPartitioner.class, + Partitioner.class), conf); + } + + private static String getPartitionName(int i) { + return "part-" + String.valueOf(100000 + i).substring(1, 6); + } + +} Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Wed Dec 12 05:32:53 2012 @@ -22,17 +22,15 @@ import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; -import org.apache.hama.bsp.TextInputFormat; -import org.apache.hama.graph.Edge; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; public class InlinkCount extends Vertex { @@ -51,34 +49,6 @@ public class InlinkCount extends Vertex< } } - public static class InlinkCountTextReader extends - VertexInputReader { - - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) throws Exception { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } - } - return true; - } - - } - private static void printUsage() { System.out.println("Usage: [tasks]"); System.exit(-1); @@ -104,14 +74,14 @@ public class InlinkCount extends Vertex< } inlinkJob.setVertexClass(InlinkCount.class); - inlinkJob.setInputFormat(TextInputFormat.class); - inlinkJob.setInputKeyClass(LongWritable.class); - inlinkJob.setInputValueClass(Text.class); + + inlinkJob.setInputFormat(SequenceFileInputFormat.class); + inlinkJob.setInputKeyClass(Text.class); + inlinkJob.setInputValueClass(TextArrayWritable.class); inlinkJob.setVertexIDClass(Text.class); inlinkJob.setVertexValueClass(IntWritable.class); inlinkJob.setEdgeValueClass(NullWritable.class); - inlinkJob.setVertexInputReaderClass(InlinkCountTextReader.class); inlinkJob.setPartitioner(HashPartitioner.class); inlinkJob.setOutputFormat(SequenceFileOutputFormat.class); Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Wed Dec 12 05:32:53 2012 @@ -22,18 +22,17 @@ import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.Combiner; import org.apache.hama.bsp.HashPartitioner; -import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.bsp.TextOutputFormat; import org.apache.hama.graph.Edge; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; /** * Finding the mindist vertex in a connected component. @@ -97,34 +96,6 @@ public class MindistSearch { } - public static class MindistSearchCountReader extends - VertexInputReader { - - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) throws Exception { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } - } - return true; - } - - } - private static void printUsage() { System.out .println("Usage: [maximum iterations (default 30)] [tasks]"); @@ -157,10 +128,10 @@ public class MindistSearch { job.setVertexValueClass(Text.class); job.setEdgeValueClass(NullWritable.class); - job.setInputKeyClass(LongWritable.class); - job.setInputValueClass(Text.class); - job.setInputFormat(TextInputFormat.class); - job.setVertexInputReaderClass(MindistSearchCountReader.class); + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(Text.class); + job.setInputValueClass(TextArrayWritable.class); + job.setPartitioner(HashPartitioner.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Wed Dec 12 05:32:53 2012 @@ -23,19 +23,17 @@ import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.HashPartitioner; -import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.bsp.TextOutputFormat; import org.apache.hama.graph.AbstractAggregator; import org.apache.hama.graph.AverageAggregator; -import org.apache.hama.graph.Edge; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; /** * Real pagerank with dangling node contribution. @@ -99,37 +97,8 @@ public class PageRank { } } - public static class PagerankTextReader extends - VertexInputReader { - - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) throws Exception { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } - } - return true; - } - - } - private static void printUsage() { - System.out - .println("Usage: [damping factor (default 0.85)] [Epsilon (convergence error, default 0.001)] [Max iterations (default 30)] [tasks]"); + System.out.println("Usage: [tasks]"); System.exit(-1); } @@ -161,15 +130,11 @@ public class PageRank { // set the defaults pageJob.setMaxIteration(30); pageJob.set("hama.pagerank.alpha", "0.85"); + pageJob.set("hama.graph.max.convergence.error", "0.001"); - if (args.length == 6) - pageJob.setNumBspTask(Integer.parseInt(args[5])); - if (args.length >= 5) - pageJob.setMaxIteration(Integer.parseInt(args[4])); - if (args.length >= 4) - pageJob.set("hama.graph.max.convergence.error", args[3]); - if (args.length >= 3) - pageJob.set("hama.pagerank.alpha", args[2]); + if (args.length == 3) { + pageJob.setNumBspTask(Integer.parseInt(args[2])); + } // error, dangling node probability sum pageJob.setAggregatorClass(AverageAggregator.class, @@ -179,10 +144,10 @@ public class PageRank { pageJob.setVertexValueClass(DoubleWritable.class); pageJob.setEdgeValueClass(NullWritable.class); - pageJob.setInputKeyClass(LongWritable.class); - pageJob.setInputValueClass(Text.class); - pageJob.setInputFormat(TextInputFormat.class); - pageJob.setVertexInputReaderClass(PagerankTextReader.class); + pageJob.setInputFormat(SequenceFileInputFormat.class); + pageJob.setInputKeyClass(Text.class); + pageJob.setInputValueClass(TextArrayWritable.class); + pageJob.setPartitioner(HashPartitioner.class); pageJob.setOutputFormat(TextOutputFormat.class); pageJob.setOutputKeyClass(Text.class); Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original) +++ hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Wed Dec 12 05:32:53 2012 @@ -18,8 +18,6 @@ package org.apache.hama.examples; import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; @@ -31,8 +29,11 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.examples.MindistSearch.MinTextCombiner; public class MindistSearchTest extends TestCase { @@ -95,22 +96,25 @@ public class MindistSearchTest extends T } private void generateTestData() { - BufferedWriter bw = null; try { - bw = new BufferedWriter(new FileWriter(INPUT)); - for (String s : input) { - bw.write(s + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (bw != null) { - try { - bw.close(); - } catch (IOException e) { - e.printStackTrace(); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, + new Path(INPUT), Text.class, TextArrayWritable.class); + + for (int i = 0; i < input.length; i++) { + String[] x = input[i].split("\t"); + Text key = new Text(x[0]); + Writable[] values = new Writable[x.length - 1]; + for (int j = 1; j < x.length; j++) { + values[j - 1] = new Text(x[j]); } + TextArrayWritable value = new TextArrayWritable(); + value.set(values); + writer.append(key, value); } + + writer.close(); + } catch (Exception e) { + e.printStackTrace(); } } Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original) +++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Wed Dec 12 05:32:53 2012 @@ -18,8 +18,6 @@ package org.apache.hama.examples; import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; @@ -30,33 +28,15 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hama.HamaConfiguration; +import org.apache.hama.examples.util.SymmetricMatrixGen; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.GraphJobRunner; public class PageRankTest extends TestCase { - /** - * The graph looks like this (adjacency list, [] contains outlinks):
- * stackoverflow.com [yahoo.com]
- * google.com []
- * facebook.com [twitter.com, google.com, nasa.gov]
- * yahoo.com [nasa.gov, stackoverflow.com]
- * twitter.com [google.com, facebook.com]
- * nasa.gov [yahoo.com, stackoverflow.com]
- * youtube.com [google.com, yahoo.com]
- * Note that google is removed in this part mainly to test the repair - * functionality. - */ - String[] input = new String[] { "stackoverflow.com\tyahoo.com", - "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov", - "yahoo.com\tnasa.gov\tstackoverflow.com", - "twitter.com\tgoogle.com\tfacebook.com", - "nasa.gov\tyahoo.com\tstackoverflow.com", - "youtube.com\tgoogle.com\tyahoo.com" }; - - private static String INPUT = "/tmp/pagerank-tmp.seq"; - private static String TEXT_INPUT = "/tmp/pagerank.txt"; + private static String INPUT = "/tmp/pagerank/pagerank-tmp.seq"; + private static String TEXT_INPUT = "/tmp/pagerank/pagerank.txt"; private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq"; - private static String OUTPUT = "/tmp/pagerank-out"; + private static String OUTPUT = "/tmp/pagerank/pagerank-out"; private Configuration conf = new HamaConfiguration(); private FileSystem fs; @@ -87,9 +67,10 @@ public class PageRankTest extends TestCa generateTestData(); try { HamaConfiguration conf = new HamaConfiguration(new Configuration()); - conf.set("bsp.local.tasks.maximum", "1"); + conf.set("bsp.local.tasks.maximum", "10"); + conf.set("bsp.peers.num", "7"); conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true); - GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT }, + GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT, "7" }, conf); if (!pageJob.waitForCompletion(true)) { @@ -101,24 +82,9 @@ public class PageRankTest extends TestCa } } - private void generateTestData() { - BufferedWriter bw = null; - try { - bw = new BufferedWriter(new FileWriter(INPUT)); - for (String s : input) { - bw.write(s + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (bw != null) { - try { - bw.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } + private void generateTestData() throws ClassNotFoundException, + InterruptedException, IOException { + SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "3" }); } private void deleteTempDirs() { Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Wed Dec 12 05:32:53 2012 @@ -165,9 +165,6 @@ public class GraphJob extends BSPJob { .checkArgument(this.getConfiguration() .get(VERTEX_EDGE_VALUE_CLASS_ATTR) != null, "Please provide an edge value class, if you don't need one, use NullWritable!"); - Preconditions.checkArgument( - this.getConfiguration().get(VERTEX_GRAPH_INPUT_READER) != null, - "Please provide a vertex input reader!"); super.submit(); } Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Dec 12 05:32:53 2012 @@ -28,8 +28,8 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -273,53 +273,24 @@ public final class GraphJobRunner peer) throws IOException, SyncException, InterruptedException { - - /* - * Several partitioning constants begin - */ - - final VertexInputReader reader = (VertexInputReader) ReflectionUtils - .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER, - VertexInputReader.class), conf); - final boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false); - final boolean runtimePartitioning = conf.getBoolean( - GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true); - - final long splitSize = peer.getSplitSize(); - final int partitioningSteps = partitionMultiSteps(peer, splitSize); - final long interval = splitSize / partitioningSteps; final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false); - /* - * Several partitioning constants end - */ - LOG.debug("vertex class: " + vertexClass); Vertex vertex = newVertexInstance(vertexClass, conf); vertex.runner = this; - long startPos = peer.getPos(); - if (startPos == 0) - startPos = 1L; - KeyValuePair next = null; - int steps = 1; while ((next = peer.readNext()) != null) { - boolean vertexFinished = false; - try { - vertexFinished = reader.parseVertex(next.getKey(), next.getValue(), - vertex); - } catch (Exception e) { - // LOG.error("exception occured during parsing vertex!" + e.toString()); - throw new IOException("exception occured during parsing vertex!" - + e.toString()); - } - - if (!vertexFinished) { - continue; + V key = (V) next.getKey(); + Writable[] edges = ((ArrayWritable) next.getValue()).get(); + vertex.setVertexID(key); + List> edgeList = new ArrayList>(); + for (Writable edge : edges) { + edgeList.add(new Edge((V) edge, null)); } + vertex.setEdges(edgeList); if (vertex.getEdges() == null) { if (selfReference) { @@ -334,44 +305,12 @@ public final class GraphJobRunner(vertex.getVertexID(), null)); } - if (runtimePartitioning) { - int partition = partitioner.getPartition(vertex.getVertexID(), - vertex.getValue(), peer.getNumPeers()); - peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex)); - } else { - vertex.setup(conf); - vertices.add(vertex); - } + vertex.setup(conf); + vertices.add(vertex); vertex = newVertexInstance(vertexClass, conf); vertex.runner = this; - - if (runtimePartitioning) { - if (steps < partitioningSteps && (peer.getPos() - startPos) >= interval) { - peer.sync(); - steps++; - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - Vertex messagedVertex = (Vertex) msg.getVertex(); - messagedVertex.runner = this; - messagedVertex.setup(conf); - vertices.add(messagedVertex); - } - startPos = peer.getPos(); - } - } } - if (runtimePartitioning) { - peer.sync(); - - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - Vertex messagedVertex = (Vertex) msg.getVertex(); - messagedVertex.runner = this; - messagedVertex.setup(conf); - vertices.add(messagedVertex); - } - } LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps."); /* @@ -383,7 +322,7 @@ public final class GraphJobRunner peer, - int partitioningSteps, boolean selfReference) throws IOException, + boolean selfReference) throws IOException, SyncException, InterruptedException { - int multiSteps = 0; - MapWritable ssize = new MapWritable(); - ssize.put(new IntWritable(peer.getPeerIndex()), - new IntWritable(vertices.size())); - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); - ssize = null; - peer.sync(); - - if (isMasterTask(peer)) { - int minVerticesSize = Integer.MAX_VALUE; - GraphJobMessage received = null; - while ((received = peer.getCurrentMessage()) != null) { - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - int curr = ((IntWritable) e.getValue()).get(); - if (minVerticesSize > curr) { - minVerticesSize = curr; - } - } - } - - if (minVerticesSize < (partitioningSteps * 2)) { - multiSteps = minVerticesSize; - } else { - multiSteps = (partitioningSteps * 2); - } - - for (String peerName : peer.getAllPeerNames()) { - MapWritable temp = new MapWritable(); - temp.put(new Text("steps"), new IntWritable(multiSteps)); - peer.send(peerName, new GraphJobMessage(temp)); - } - } - peer.sync(); - - GraphJobMessage received = peer.getCurrentMessage(); - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - multiSteps = ((IntWritable) e.getValue()).get(); - } - Map> tmp = new HashMap>(); - int i = 0; - int syncs = 0; - for (Vertex v : vertices) { for (Edge e : v.getEdges()) { peer.send(v.getDestinationPeerName(e), new GraphJobMessage(e.getDestinationVertexID())); } - - if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) { - peer.sync(); - syncs++; - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - V vertexName = (V) msg.getVertexId(); - - Vertex newVertex = newVertexInstance(vertexClass, conf); - newVertex.setVertexID(vertexName); - newVertex.runner = this; - if (selfReference) { - newVertex.setEdges(Collections.singletonList(new Edge( - newVertex.getVertexID(), null))); - } else { - newVertex.setEdges(new ArrayList>(0)); - } - newVertex.setup(conf); - tmp.put(vertexName, newVertex); - - } - } - i++; } peer.sync(); @@ -488,7 +360,6 @@ public final class GraphJobRunner e : vertices) { @@ -502,59 +373,6 @@ public final class GraphJobRunner peer, - long splitSize) throws IOException, SyncException, InterruptedException { - int multiSteps = 1; - - MapWritable ssize = new MapWritable(); - ssize - .put(new IntWritable(peer.getPeerIndex()), new LongWritable(splitSize)); - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); - ssize = null; - peer.sync(); - - if (isMasterTask(peer)) { - long maxSplitSize = 0L; - GraphJobMessage received = null; - while ((received = peer.getCurrentMessage()) != null) { - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - long curr = ((LongWritable) e.getValue()).get(); - if (maxSplitSize < curr) { - maxSplitSize = curr; - } - } - } - - int steps = (int) (maxSplitSize / conf.getLong( // 20 mb - "hama.graph.multi.step.partitioning.interval", 20000000)) + 1; - - for (String peerName : peer.getAllPeerNames()) { - MapWritable temp = new MapWritable(); - temp.put(new Text("max"), new IntWritable(steps)); - peer.send(peerName, new GraphJobMessage(temp)); - } - } - peer.sync(); - - GraphJobMessage received = peer.getCurrentMessage(); - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - multiSteps = ((IntWritable) e.getValue()).get(); - } - - if (isMasterTask(peer)) { - peer.getCounter(GraphJobCounter.MULTISTEP_PARTITIONING).increment( - multiSteps); - } - - return multiSteps; - } - - /** * Counts vertices globally by sending the count of vertices in the map to the * other peers. */ Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1420530&r1=1420529&r2=1420530&view=diff ============================================================================== --- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original) +++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Wed Dec 12 05:32:53 2012 @@ -17,38 +17,36 @@ */ package org.apache.hama.graph; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hama.Constants; import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.ClusterStatus; import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; import org.apache.hama.bsp.TestBSPMasterGroomServer; -import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.graph.example.PageRank; public class TestSubmitGraphJob extends TestBSPMasterGroomServer { String[] input = new String[] { "stackoverflow.com\tyahoo.com", - "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]", - "yahoo.com\tnasa.gov\tstackoverflow.com]", - "twitter.com\tgoogle.com\tfacebook.com]", - "nasa.gov\tyahoo.com\tstackoverflow.com]", - "youtube.com\tgoogle.com\tyahoo.com]" }; + "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov", + "yahoo.com\tnasa.gov\tstackoverflow.com", + "twitter.com\tgoogle.com\tfacebook.com", + "nasa.gov\tyahoo.com\tstackoverflow.com", + "youtube.com\tgoogle.com\tyahoo.com" }; - private static String INPUT = "/tmp/pagerank-real-tmp.seq"; - private static String OUTPUT = "/tmp/pagerank-real-out"; + private static String INPUT = "/tmp/pagerank/real-tmp.seq"; + private static String OUTPUT = "/tmp/pagerank/real-out"; @SuppressWarnings("unchecked") @Override @@ -60,7 +58,7 @@ public class TestSubmitGraphJob extends configuration.setInt("hama.graph.multi.step.partitioning.interval", 30); GraphJob bsp = new GraphJob(configuration, PageRank.class); - bsp.setInputPath(new Path(INPUT)); + bsp.setInputPath(new Path("/tmp/pagerank")); bsp.setOutputPath(new Path(OUTPUT)); BSPJobClient jobClient = new BSPJobClient(configuration); configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000); @@ -77,14 +75,15 @@ public class TestSubmitGraphJob extends bsp.setAggregatorClass(AverageAggregator.class, PageRank.DanglingNodeAggregator.class); + bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class); + bsp.setInputFormat(SequenceFileInputFormat.class); + bsp.setInputKeyClass(Text.class); + bsp.setInputValueClass(TextArrayWritable.class); + bsp.setVertexIDClass(Text.class); bsp.setVertexValueClass(DoubleWritable.class); bsp.setEdgeValueClass(NullWritable.class); - bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class); - bsp.setInputFormat(TextInputFormat.class); - bsp.setInputKeyClass(LongWritable.class); - bsp.setInputValueClass(Text.class); bsp.setPartitioner(HashPartitioner.class); bsp.setOutputFormat(SequenceFileOutputFormat.class); bsp.setOutputKeyClass(Text.class); @@ -123,26 +122,25 @@ public class TestSubmitGraphJob extends } private void generateTestData() { - BufferedWriter bw = null; try { - bw = new BufferedWriter(new FileWriter(INPUT)); - for (String s : input) { - bw.write(s + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (bw != null) { - try { - bw.close(); - - File file = new File(INPUT); - LOG.info("Temp file length: " + file.length()); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), + new Path(INPUT), Text.class, TextArrayWritable.class); - } catch (IOException e) { - e.printStackTrace(); + for (int i = 0; i < input.length; i++) { + String[] x = input[i].split("\t"); + Text key = new Text(x[0]); + Writable[] values = new Writable[x.length - 1]; + for (int j = 1; j < x.length; j++) { + values[j - 1] = new Text(x[j]); } + TextArrayWritable value = new TextArrayWritable(); + value.set(values); + writer.append(key, value); } + + writer.close(); + } catch (IOException e) { + e.printStackTrace(); } }