hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hama Wiki] Update of "WriteHamaGraphFile" by thomasjungblut
Date Sat, 26 May 2012 18:10:12 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.

The "WriteHamaGraphFile" page has been changed by thomasjungblut:
http://wiki.apache.org/hama/WriteHamaGraphFile?action=diff&rev1=2&rev2=3

+ <<TableOfContents(5)>>
+ 
+ = Overview =
+ 
- This article is about how to bring your graph into a format the Graph module of Hama can
read and successfully let you run algorithms on it.
+ This article is about how to bring your graph into the graph module of Hama and successfully
run algorithms on it.
  
+ == Google Web dataset (local mode, pseudo distributed cluser) ==
- For this concrete example, the Google web graph from 2002 is used (http://snap.stanford.edu/data/web-Google.html),
we will give you a step by step guide from the download to a run of Pagerank on this dataset.
- To run this example in your code, you will need the [[Guava Library|http://code.google.com/p/guava-libraries/]].

  
- # to be extended
+ For this example, the Google web graph from 2002 is used (http://snap.stanford.edu/data/web-Google.html),
it contains 875,713 nodes and 5,105,039 edges.
+ 
+ The file itself looks like this:
  {{{
-     Path txtPath = new Path("/tmp/web-Google.txt");
-     Path input = new Path("/tmp/pagerankin.seq");
+ # Directed graph (each unordered pair of nodes is saved once): web-Google.txt 
+ # Webgraph from the Google programming contest, 2002
+ # Nodes: 875713 Edges: 5105039
+ # FromNodeId	ToNodeId
+ 0	11342
+ 0	824020
+ 0	867923
+ 0	891835
+ 11342	0
+ 11342	27469
+ 11342	38716
+ }}}
+ 
+ To read this kind of file you need to provide a vertex reader like this:
+ 
+ {{{
+   public static class PagerankTextReader extends
+       VertexInputReader<LongWritable, Text, Text, DoubleWritable, NullWritable> {
+ 
+     String lastVertexId = null;
+     List<String> adjacents = new LinkedList<String>();
+ 
+     @Override
+     public boolean parseVertex(LongWritable key, Text value,
+         Vertex<Text, DoubleWritable, NullWritable> vertex) {
+ 
+       String line = value.toString();
+       String[] lineSplit = line.split("\t");
+       if (!line.startsWith("#")) {
+         if (lastVertexId == null) {
+           lastVertexId = lineSplit[0];
+         }
+         if (lastVertexId.equals(lineSplit[0])) {
+           adjacents.add(lineSplit[1]);
+         } else {
+           vertex.setVertexID(new Text(lastVertexId));
+           for (String adjacent : adjacents) {
+             vertex.addEdge(new Edge<Text, NullWritable>(new Text(adjacent),
+                 null));
+           }
+           adjacents.clear();
+           lastVertexId = lineSplit[0];
+           adjacents.add(lineSplit[1]);
+           return true;
+         }
+       }
+       return false;
+     }
+ 
+   }
+ }}}
+ 
+ {{{
      HamaConfiguration conf = new HamaConfiguration(new Configuration());
+     // if we are in local mode, prevent the file from beeing split
+     conf.set("bsp.local.tasks.maximum", "1");
-     HashMultimap<Integer, Integer> map = HashMultimap.create();
-     BufferedReader br = new BufferedReader(new InputStreamReader(
-         fileSystem.open(txtPath)));
-     String line = null;
-     while ((line = br.readLine()) != null) {
-       String[] split = line.split("\t");
-       map.put(Integer.parseInt(split[0]), Integer.parseInt(split[1]));
-     }
- 
-     Set<Entry<Integer, Collection<Integer>>> entries = map.asMap().entrySet();
- 
-     Path input = new Path(
-         "/Users/thomas.jungblut/Downloads/web_in/pagerankin.seq");
-     VertexWritable.CONFIGURATION = conf;
-     SequenceFile.Writer writer = new SequenceFile.Writer(fileSystem, conf,
-         input, VertexWritable.class, VertexArrayWritable.class);
-     for (Entry<Integer, Collection<Integer>> entry : entries) {
-       VertexWritable<Text, DoubleWritable> key = new VertexWritable<Text, DoubleWritable>(
-           new DoubleWritable(0.0d), new Text(entry.getKey() + ""), Text.class,
-           DoubleWritable.class);
-       ArrayList<Integer> arrayList = new ArrayList<Integer>(entry.getValue());
-       @SuppressWarnings("unchecked")
-       VertexWritable<Text, NullWritable>[] adjacents = new VertexWritable[entry
-           .getValue().size()];
-       for (int i = 0; i < adjacents.length; i++) {
-         adjacents[i] = new VertexWritable<Text, NullWritable>(
-             NullWritable.get(), new Text(arrayList.get(i) + ""), Text.class,
-             NullWritable.class);
-       }
-       VertexArrayWritable val = new VertexArrayWritable();
-       val.set(adjacents);
-       writer.append(key, val);
-     }
-     writer.close();
- 
- }}}
- 
- 
- Now we can setup the job with the following code
- 
- {{{
-  GraphJob pageJob = new GraphJob(conf, PageRank.class);
+     GraphJob pageJob = new GraphJob(conf, PageRank.class);
      pageJob.setJobName("Pagerank");
-     pageJob.set("hama.graph.repair", "true");
  
      pageJob.setVertexClass(PageRankVertex.class);
-     pageJob.setInputPath(input);
+     pageJob.setInputPath(new Path(
-     pageJob.setOutputPath(new Path("/tmp/pagerank-output/"));
+         "/tmp/google-in/"));
+     pageJob.setOutputPath(new Path(
+         "/tmp/google-out/"));
  
-     // set the defaults
+     // do a maximum of 60 iterations
-     pageJob.setMaxIteration(30);
+     pageJob.setMaxIteration(60);
+     pageJob.set("hama.pagerank.alpha", "0.85");
      // we need to include a vertex in its adjacency list,
      // otherwise the pagerank result has a constant loss
      pageJob.set("hama.graph.self.ref", "true");
+     // run until the error is archived
+     pageJob.set("hama.graph.max.convergence.error", "0.001");
+     // hama takes care that the graph is complete
+     pageJob.set("hama.graph.repair", "true");
      pageJob.setAggregatorClass(AverageAggregator.class);
- 
+     // don't get splitted
+     pageJob.setNumBspTask(1);
      pageJob.setVertexIDClass(Text.class);
      pageJob.setVertexValueClass(DoubleWritable.class);
      pageJob.setEdgeValueClass(NullWritable.class);
  
+     pageJob.setInputKeyClass(LongWritable.class);
+     pageJob.setInputValueClass(Text.class);
-     pageJob.setInputFormat(SequenceFileInputFormat.class);
+     pageJob.setInputFormat(TextInputFormat.class);
+     pageJob.setVertexInputReaderClass(PagerankTextReader.class);
      pageJob.setPartitioner(HashPartitioner.class);
-     pageJob.setOutputFormat(SequenceFileOutputFormat.class);
+     pageJob.setOutputFormat(TextOutputFormat.class);
      pageJob.setOutputKeyClass(Text.class);
      pageJob.setOutputValueClass(DoubleWritable.class);
  
@@ -83, +110 @@

      }
  }}}
  
- You should see the algorithm converge relative fast, in my case it was after nine supersteps.
+ 
+ You should see the algorithm converge quite fast after nine supersteps. The textfile is
sadly not deterministically splitable, so it doesn't converge as fast as if it were executed
in parallel.
+ 
- If you read the results back from the sequencefile output, you will see the following top
10 ranked sites:
+ If you read the results back from the text output, you will see the following top 10 ranked
sites:
  
  {{{
  885605 = 0.00149900065779375
@@ -100, +129 @@

  486980 = 5.394792436341423E-4
  }}}
  
+ == Wikipedia dataset (for smallish clusters) ==
+ 
+ For this example, the Wikipedia link dataset is used (http://haselgrove.id.au/wikipedia.htm)
/ (http://users.on.net/~henry/pagerank/links-simple-sorted.zip).
+ 
+ The dataset contains 5,716,808 pages and 130,160,392 links and is unzipped ~1gb large. You
should use a smallish cluster to crunch this dataset with Hama, based on the blocksize of
HDFS a slot number of 8-32 is required. We tell you later how to fine tune this to use fewer
slots if you don't have them currently.
+ 
+ The file is formatted like this
+ 
+ {{{
+     from1: to11 to12 to13 ...
+     from2: to21 to22 to23 ...
+ }}}
+ 
+ To read this kind of file you need to provide a vertex reader like this:
+ 
+ {{{
+  public static class PagerankTextReader extends
+       VertexInputReader<LongWritable, Text, Text, DoubleWritable, NullWritable> {
+ 
+     @Override
+     public boolean parseVertex(LongWritable key, Text value,
+         Vertex<Text, DoubleWritable, NullWritable> vertex) {
+       String[] vertexAdjacents = value.toString().split(":");
+       vertex.setVertexID(new Text(vertexAdjacents[0].trim()));
+       String[] split = vertexAdjacents[1].split(" ");
+       for (int i = 0; i < split.length; i++) {
+         if (!split[i].isEmpty()) {
+           vertex.addEdge(new Edge<Text, NullWritable>(
+               new Text(split[i].trim()), null));
+         }
+       }
+       return true;
+     }
+   }
+ }}}
+ 
+ Now we can setup the job with the following code (assuming that your input is in /tmp/wiki-in/:
+ 
+ {{{
+     HamaConfiguration conf = new HamaConfiguration(new Configuration());
+     GraphJob pageJob = new GraphJob(conf, PageRank.class);
+     pageJob.setJobName("Pagerank");
+ 
+     pageJob.setVertexClass(PageRankVertex.class);
+     pageJob.setInputPath(new Path(
+         "/tmp/wiki-in/"));
+     pageJob.setOutputPath(new Path(
+         "/tmp/wiki-out/"));
+ 
+     // do a maximum of 60 iterations
+     pageJob.setMaxIteration(60);
+     pageJob.set("hama.pagerank.alpha", "0.85");
+     // we need to include a vertex in its adjacency list,
+     // otherwise the pagerank result has a constant loss
+     pageJob.set("hama.graph.self.ref", "true");
+     // run until the error is archived
+     pageJob.set("hama.graph.max.convergence.error", "0.001");
+     // hama takes care that the graph is complete
+     pageJob.set("hama.graph.repair", "true");
+     pageJob.setAggregatorClass(AverageAggregator.class);
+ 
+     pageJob.setVertexIDClass(Text.class);
+     pageJob.setVertexValueClass(DoubleWritable.class);
+     pageJob.setEdgeValueClass(NullWritable.class);
+ 
+     pageJob.setInputKeyClass(LongWritable.class);
+     pageJob.setInputValueClass(Text.class);
+     pageJob.setInputFormat(TextInputFormat.class);
+     pageJob.setVertexInputReaderClass(WikipediaLinkDatasetReader.class);
+     pageJob.setPartitioner(HashPartitioner.class);
+     pageJob.setOutputFormat(TextOutputFormat.class);
+     pageJob.setOutputKeyClass(Text.class);
+     pageJob.setOutputValueClass(DoubleWritable.class);
+ 
+     long startTime = System.currentTimeMillis();
+     if (pageJob.waitForCompletion(true)) {
+       System.out.println("Job Finished in "
+           + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+     }
+ }}}
+ 
+ Now you could export a jar with the job submission as main class and you can submit this
via
+ {{{
+ hama/bin/hama jar YOUR_JAR.jar
+ }}}
+ to your Hama cluster. 
+ 
+ '''Troubleshooting'''
+ 
+ If your job does not execute, your cluster may not have enough resources (task slots). 
+ You can either increase them, or decrease the minimum split size by setting:
+ {{{
+    pageJob.set("bsp.min.split.size", (512 * 1024 * 1024) + "");
+ }}}
+ This will set the split size to 512mb, thus having 2 tasks and not 32 or 16.
+ 
+ If you sort the result descending by pagerank you can see the following top 10 sites:
+ 
+ {{{
+   1    0.00222  United_States
+   2    0.00141  2007
+   3    0.00136  2008
+   4    0.00126  Geographic_coordinate_system
+   5    0.00101  United_Kingdom
+   6    0.00087  2006
+   7    0.00074  France
+   8    0.00073  Wikimedia_Commons
+   9    0.00066  Wiktionary
+  10    0.00065  Canada
+ }}}
+ 
+ Note that you can decode the indices you may see with the [[http://users.on.net/~henry/pagerank/titles-sorted.zip|page
titels]] files.
+ 

Mime
View raw message