hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1556453 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/...
Date Wed, 08 Jan 2014 07:21:58 GMT
Author: edwardyoon
Date: Wed Jan  8 07:21:58 2014
New Revision: 1556453

URL: http://svn.apache.org/r1556453
Log:
HAMA-837: Add sort behaviour to runtime partitioner

Added:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Jan  8 07:21:58 2014
@@ -3,7 +3,8 @@ Hama Change Log
 Release 0.7.0 (unreleased changes)
 
   NEW FEATURES
-
+  
+   HAMA-837: Add sort behaviour to runtime partitioner (edwardyoon)
    HAMA-827: Add NamedVector (edwardyoon)
    HAMA-822: Add feature transformer interface to improve the power and flexibility of existing
machine learning model (Yexi Jiang)
    HAMA-774: CompositeInputFormat in Hama (Martin Illecker)

Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Wed Jan  8 07:21:58 2014
@@ -120,6 +120,8 @@ public interface Constants {
   public static final String RUNTIME_PARTITIONING_CLASS = "bsp.input.partitioner.class";
   public static final String RUNTIME_DESIRED_PEERS_COUNT = "desired.num.of.tasks";
   public static final String RUNTIME_PARTITION_RECORDCONVERTER = "bsp.runtime.partition.recordconverter";
+  
+  public static final String PARTITION_SORT_BY_KEY = "bsp.partition.sort.by.converted.record";
  
 
   // /////////////////////////////////////
   // Constants for ZooKeeper

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Wed Jan  8 07:21:58
2014
@@ -20,9 +20,10 @@ package org.apache.hama.bsp;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,10 +31,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.sync.SyncException;
@@ -49,7 +52,6 @@ public class PartitioningRunner extends
   private FileSystem fs = null;
   private Path partitionDir;
   private RecordConverter converter;
-  private Map<Integer, LinkedList<KeyValuePair<Writable, Writable>>> values
= new HashMap<Integer, LinkedList<KeyValuePair<Writable, Writable>>>();
   private PipesPartitioner<?, ?> pipesPartitioner = null;
 
   @Override
@@ -72,7 +74,6 @@ public class PartitioningRunner extends
     } else {
       this.partitionDir = new Path(conf.get(Constants.RUNTIME_PARTITIONING_DIR));
     }
-
   }
 
   /**
@@ -97,20 +98,10 @@ public class PartitioningRunner extends
         KeyValuePair<Writable, Writable> inputRecord, Configuration conf);
 
     public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
-        @SuppressWarnings("rawtypes") Partitioner partitioner,
-        Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
-        int numTasks);
-
-    /**
-     * @return a map implementation, so order can be changed in subclasses if
-     *         needed.
-     */
-    public Map<Writable, Writable> newMap();
-
-    /**
-     * @return a list implementation, so order will not be changed in subclasses
-     */
-    public List<KeyValuePair<Writable, Writable>> newList();
+        @SuppressWarnings("rawtypes")
+        Partitioner partitioner, Configuration conf,
+        @SuppressWarnings("rawtypes")
+        BSPPeer peer, int numTasks);
   }
 
   /**
@@ -127,34 +118,32 @@ public class PartitioningRunner extends
     @SuppressWarnings("unchecked")
     @Override
     public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
-        @SuppressWarnings("rawtypes") Partitioner partitioner,
-        Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
-        int numTasks) {
+        @SuppressWarnings("rawtypes")
+        Partitioner partitioner, Configuration conf,
+        @SuppressWarnings("rawtypes")
+        BSPPeer peer, int numTasks) {
       return Math.abs(partitioner.getPartition(outputRecord.getKey(),
           outputRecord.getValue(), numTasks));
     }
 
     @Override
     public void setup(Configuration conf) {
-
-    }
-
-    @Override
-    public Map<Writable, Writable> newMap() {
-      return new HashMap<Writable, Writable>();
     }
 
-    @Override
-    public List<KeyValuePair<Writable, Writable>> newList() {
-      return new LinkedList<KeyValuePair<Writable, Writable>>();
-    }
   }
 
+  public Map<Integer, SequenceFile.Writer> writerCache = new HashMap<Integer, SequenceFile.Writer>();
+
+  @SuppressWarnings("rawtypes")
+  public SortedMap<WritableComparable, KeyValuePair<IntWritable, KeyValuePair>>
sortedMap = new TreeMap<WritableComparable, KeyValuePair<IntWritable, KeyValuePair>>();
+
   @Override
   @SuppressWarnings({ "rawtypes", "unchecked" })
   public void bsp(
       BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
       throws IOException, SyncException, InterruptedException {
+
+    int peerNum = peer.getNumPeers();
     Partitioner partitioner = getPartitioner();
     KeyValuePair<Writable, Writable> pair = null;
     KeyValuePair<Writable, Writable> outputPair = null;
@@ -166,7 +155,6 @@ public class PartitioningRunner extends
         keyClass = pair.getKey().getClass();
         valueClass = pair.getValue().getClass();
       }
-
       outputPair = converter.convertRecord(pair, conf);
 
       if (outputPair == null) {
@@ -176,74 +164,153 @@ public class PartitioningRunner extends
       int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
           desiredNum);
 
-      LinkedList<KeyValuePair<Writable, Writable>> list = values.get(index);
-      if (list == null) {
-        list = (LinkedList<KeyValuePair<Writable, Writable>>) converter
-            .newList();
-        values.put(index, list);
+      // if key is comparable and it need to be sorted by key,
+      if (outputPair.getKey() instanceof WritableComparable
+          && conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
+        sortedMap.put(
+            (WritableComparable) outputPair.getKey(),
+            new KeyValuePair(new IntWritable(index), new KeyValuePair(pair
+                .getKey(), pair.getValue())));
+      } else {
+        if (!writerCache.containsKey(index)) {
+          Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
+              + peer.getPeerIndex());
+          SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+              destFile, keyClass, valueClass, CompressionType.NONE);
+          writerCache.put(index, writer);
+        }
+
+        writerCache.get(index).append(pair.getKey(), pair.getValue());
       }
-      list.add(new KeyValuePair<Writable, Writable>(pair.getKey(), pair
-          .getValue()));
     }
 
-    // The reason of use of Memory is to reduce file opens
-    for (Map.Entry<Integer, LinkedList<KeyValuePair<Writable, Writable>>>
e : values
-        .entrySet()) {
-      Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
-          + peer.getPeerIndex());
-      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-          destFile, keyClass, valueClass, CompressionType.NONE);
+    if (sortedMap.size() > 0) {
+      writeSortedFile(peer.getPeerIndex(), keyClass, valueClass);
+    }
 
-      for (KeyValuePair<Writable, Writable> v : e.getValue()) {
-        writer.append(v.getKey(), v.getValue());
-      }
-      writer.close();
+    for (SequenceFile.Writer w : writerCache.values()) {
+      w.close();
     }
 
     peer.sync();
     FileStatus[] status = fs.listStatus(partitionDir);
-    // To avoid race condition, we should store the peer number
-    int peerNum = peer.getNumPeers();
     // Call sync() one more time to avoid concurrent access
     peer.sync();
 
-    // merge files into one.
-    // TODO if we use header info, we might able to merge files without full
-    // scan.
     for (FileStatus stat : status) {
       int partitionID = Integer
           .parseInt(stat.getPath().getName().split("[-]")[1]);
 
-      // TODO set replica factor to 1.
       if (getMergeProcessorID(partitionID, peerNum) == peer.getPeerIndex()) {
-        Path partitionFile = new Path(partitionDir + "/"
+        Path destinationFilePath = new Path(partitionDir + "/"
             + getPartitionName(partitionID));
 
         FileStatus[] files = fs.listStatus(stat.getPath());
+        if (outputPair.getKey() instanceof WritableComparable
+            && conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
+          mergeSortedFiles(files, destinationFilePath, keyClass, valueClass);
+        } else {
+          mergeFiles(files, destinationFilePath, keyClass, valueClass);
+        }
+        fs.delete(stat.getPath(), true);
+      }
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void writeSortedFile(int peerIndex, Class keyClass, Class valueClass)
+      throws IOException {
+    for (Entry<WritableComparable, KeyValuePair<IntWritable, KeyValuePair>> e
: sortedMap
+        .entrySet()) {
+      int index = ((IntWritable) e.getValue().getKey()).get();
+      KeyValuePair rawRecord = e.getValue().getValue();
+
+      if (!writerCache.containsKey(index)) {
+        Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
+            + peerIndex);
         SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-            partitionFile, keyClass, valueClass, CompressionType.NONE);
+            destFile, keyClass, valueClass, CompressionType.NONE);
+        writerCache.put(index, writer);
+      }
 
-        for (int i = 0; i < files.length; i++) {
-          LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
-              + "/" + getPartitionName(partitionID));
-
-          SequenceFile.Reader reader = new SequenceFile.Reader(fs,
-              files[i].getPath(), conf);
-
-          Writable key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
-          Writable value = (Writable) ReflectionUtils.newInstance(valueClass,
-              conf);
-
-          while (reader.next(key, value)) {
-            writer.append(key, value);
-          }
-          reader.close();
-        }
+      writerCache.get(index).append(rawRecord.getKey(), rawRecord.getValue());
+    }
 
-        writer.close();
-        fs.delete(stat.getPath(), true);
+    sortedMap.clear();
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void mergeSortedFiles(FileStatus[] status, Path destinationFilePath,
+      Class keyClass, Class valueClass) throws IOException {
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+        destinationFilePath, keyClass, valueClass, CompressionType.NONE);
+    KeyValuePair outputPair = null;
+    Writable key;
+    Writable value;
+
+    Map<Integer, SequenceFile.Reader> readers = new HashMap<Integer, SequenceFile.Reader>();
+    for (int i = 0; i < status.length; i++) {
+      readers.put(i, new SequenceFile.Reader(fs, status[i].getPath(), conf));
+    }
+
+    for (int i = 0; i < readers.size(); i++) {
+      key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
+      value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+
+      readers.get(i).next(key, value);
+      KeyValuePair record = new KeyValuePair(key, value);
+      outputPair = converter.convertRecord(record, conf);
+      sortedMap.put((WritableComparable) outputPair.getKey(), new KeyValuePair(
+          new IntWritable(i), record));
+    }
+
+    while (readers.size() > 0) {
+      key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
+      value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+
+      WritableComparable firstKey = sortedMap.firstKey();
+      KeyValuePair kv = sortedMap.get(firstKey);
+      int readerIndex = ((IntWritable) kv.getKey()).get();
+      KeyValuePair rawRecord = (KeyValuePair) kv.getValue();
+      writer.append(rawRecord.getKey(), rawRecord.getValue());
+
+      sortedMap.remove(firstKey);
+
+      if (readers.get(readerIndex).next(key, value)) {
+        KeyValuePair record = new KeyValuePair(key, value);
+        outputPair = converter.convertRecord(record, conf);
+        sortedMap.put((WritableComparable) outputPair.getKey(),
+            new KeyValuePair(new IntWritable(readerIndex), record));
+      } else {
+        readers.get(readerIndex).close();
+        readers.remove(readerIndex);
+      }
+    }
+
+    sortedMap.clear();
+    writer.close();
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void mergeFiles(FileStatus[] status, Path destinationFilePath,
+      Class keyClass, Class valueClass) throws IOException {
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+        destinationFilePath, keyClass, valueClass, CompressionType.NONE);
+    Writable key;
+    Writable value;
+
+    for (int i = 0; i < status.length; i++) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+          status[i].getPath(), conf);
+      key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
+      value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+
+      while (reader.next(key, value)) {
+        writer.append(key, value);
       }
+      reader.close();
     }
+    writer.close();
   }
 
   @Override

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java Wed Jan 
8 07:21:58 2014
@@ -33,10 +33,13 @@ import org.apache.hama.bsp.TextOutputFor
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.GraphJobRunner.GraphJobCounter;
+import org.apache.hama.graph.MapVerticesInfo;
 import org.apache.hama.graph.Vertex;
 import org.apache.hama.graph.VertexInputReader;
 
 /**
+ * NOTE: Graph modification APIs can be used only with {@link MapVerticesInfo}.
+ * 
  * This is an example of how to manipulate Graphs dynamically. The input of this
  * example is a number in each row. We assume that the is a vertex with ID:1
  * which is responsible to create a sum vertex that will aggregate the values of
@@ -134,9 +137,9 @@ public class DynamicGraph {
   private static GraphJob createJob(String[] args, HamaConfiguration conf)
       throws IOException {
 
-    // NOTE Graph modification APIs can be used only with in-memory vertices storage.
+    // NOTE: Graph modification APIs can be used only with MapVerticesInfo.
     conf.set("hama.graph.vertices.info",
-        "org.apache.hama.graph.ListVerticesInfo");
+        "org.apache.hama.graph.MapVerticesInfo");
 
     GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
     graphJob.setJobName("Dynamic Graph");

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java Wed Jan
 8 07:21:58 2014
@@ -31,13 +31,13 @@ import org.apache.hama.HamaConfiguration
 import org.junit.Test;
 
 /**
- * Testcase for {@link org.apache.hama.examples.DynamicGraph}
+ * Unit test for {@link org.apache.hama.examples.DynamicGraph}
  */
 public class DynamicGraphTest extends TestCase {
   private static String OUTPUT = "/tmp/page-out";
   private Configuration conf = new HamaConfiguration();
   private FileSystem fs;
-  
+
   private void deleteTempDirs() {
     try {
       if (fs.exists(new Path(OUTPUT)))
@@ -61,7 +61,7 @@ public class DynamicGraphTest extends Te
       }
     }
   }
-  
+
   @Override
   protected void setUp() throws Exception {
     super.setUp();
@@ -69,9 +69,10 @@ public class DynamicGraphTest extends Te
   }
 
   @Test
-  public void test() throws IOException, InterruptedException, ClassNotFoundException {
+  public void test() throws IOException, InterruptedException,
+      ClassNotFoundException {
     try {
-      DynamicGraph.main(new String[] {"src/test/resources/dg.txt", OUTPUT });
+      DynamicGraph.main(new String[] { "src/test/resources/dg.txt", OUTPUT });
       verifyResult();
     } finally {
       deleteTempDirs();

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Wed
Jan  8 07:21:58 2014
@@ -132,5 +132,4 @@ public class MindistSearchTest extends T
       e.printStackTrace();
     }
   }
-
-}
+}
\ No newline at end of file

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java Wed Jan  8
07:21:58 2014
@@ -36,6 +36,14 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.graph.IDSkippingIterator.Strategy;
 
+/**
+ * Stores the sorted vertices into a local file. It doesn't allow modification
+ * and random access by vertexID.
+ * 
+ * @param <V>
+ * @param <E>
+ * @param <M>
+ */
 @SuppressWarnings("rawtypes")
 public final class DiskVerticesInfo<V extends WritableComparable, E extends Writable,
M extends Writable>
     implements VerticesInfo<V, E, M> {
@@ -122,7 +130,8 @@ public final class DiskVerticesInfo<V ex
 
   @Override
   public void removeVertex(V vertexID) {
-    throw new UnsupportedOperationException("Not yet implemented");
+    throw new UnsupportedOperationException(
+        "DiskVerticesInfo doesn't support this operation. Please use the MapVerticesInfo.");
   }
 
   /**
@@ -176,7 +185,8 @@ public final class DiskVerticesInfo<V ex
 
   @Override
   public void finishRemovals() {
-    throw new UnsupportedOperationException("Not yet implemented");
+    throw new UnsupportedOperationException(
+        "DiskVerticesInfo doesn't support this operation. Please use the MapVerticesInfo.");
   }
 
   private static long[] copy(ArrayList<Long> lst) {
@@ -378,5 +388,4 @@ public final class DiskVerticesInfo<V ex
   private static String getSoftGraphFileName(String root, int step) {
     return root + "soft_" + step + ".graph";
   }
-
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Wed Jan  8 07:21:58
2014
@@ -57,6 +57,7 @@ public class GraphJob extends BSPJob {
   public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
       throws IOException {
     super(conf);
+    this.setBoolean(Constants.PARTITION_SORT_BY_KEY, true);
     this.setBspClass(GraphJobRunner.class);
     this.setJarByClass(exampleClass);
     this.setVertexIDClass(Text.class);

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Jan  8 07:21:58
2014
@@ -255,7 +255,7 @@ public final class GraphJobRunner<V exte
      * currentMessage or the first vertex that is active.
      */
     IDSkippingIterator<V, E, M> iterator = vertices.skippingIterator();
-    
+
     // note that can't skip inactive vertices because we have to rewrite the
     // complete vertex file in each iteration
     while (iterator.hasNext(
@@ -268,11 +268,11 @@ public final class GraphJobRunner<V exte
         iterable = iterate(currentMessage, (V) currentMessage.getVertexId(),
             vertex, peer);
       }
-      
+
       if (iterable != null && vertex.isHalted()) {
         vertex.setActive();
       }
-      
+
       if (!vertex.isHalted()) {
         M lastValue = vertex.getValue();
         if (iterable == null) {
@@ -289,7 +289,7 @@ public final class GraphJobRunner<V exte
         getAggregationRunner().aggregateVertex(lastValue, vertex);
         activeVertices++;
       }
-      
+
       // note that we even need to rewrite the vertex if it is halted for
       // consistency reasons
       vertices.finishVertexComputation(vertex);
@@ -356,7 +356,7 @@ public final class GraphJobRunner<V exte
     IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
     while (skippingIterator.hasNext()) {
       Vertex<V, E, M> vertex = skippingIterator.next();
-      
+
       M lastValue = vertex.getValue();
       // Calls setup method.
       vertex.setup(conf);
@@ -403,7 +403,7 @@ public final class GraphJobRunner<V exte
     getAggregationRunner().setupAggregators(peer);
 
     Class<? extends VerticesInfo<V, E, M>> verticesInfoClass = (Class<? extends
VerticesInfo<V, E, M>>) conf
-        .getClass("hama.graph.vertices.info", ListVerticesInfo.class,
+        .getClass("hama.graph.vertices.info", DiskVerticesInfo.class,
             VerticesInfo.class);
     vertices = ReflectionUtils.newInstance(verticesInfoClass);
     vertices.init(this, conf, peer.getTaskId());
@@ -453,7 +453,7 @@ public final class GraphJobRunner<V exte
 
     while ((record = peer.readNext()) != null) {
       converted = converter.convertRecord(record, conf);
-      currentVertex = (Vertex<V, E, M>) converted.getKey();
+      currentVertex = (Vertex<V, E, M>) converted.getValue();
 
       if (vertex.getVertexID() == null) {
         vertex = currentVertex;

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java Wed Jan  8
07:21:58 2014
@@ -17,14 +17,16 @@
  */
 package org.apache.hama.graph;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.List;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -32,7 +34,8 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
- * VerticesInfo encapsulates the storage of vertices in a BSP Task.
+ * Stores the serialized vertices into a memory-based list. It doesn't allow
+ * modification and random access by vertexID.
  * 
  * @param <V> Vertex ID object type
  * @param <E> Edge cost object type
@@ -43,7 +46,9 @@ public final class ListVerticesInfo<V ex
   private GraphJobRunner<V, E, M> runner;
   Vertex<V, E, M> v;
 
-  private final Map<V, byte[]> verticesMap = new TreeMap<V, byte[]>();
+  private final List<byte[]> verticesList = new ArrayList<byte[]>();
+  private boolean lockedAdditions = false;
+  private int index = 0;
 
   private ByteArrayOutputStream bos = null;
   private DataOutputStream dos = null;
@@ -58,37 +63,32 @@ public final class ListVerticesInfo<V ex
 
   @Override
   public void addVertex(Vertex<V, E, M> vertex) throws IOException {
-    if (verticesMap.containsKey(vertex.getVertexID())) {
-      throw new UnsupportedOperationException("Vertex with ID: "
-          + vertex.getVertexID() + " already exists!");
-    } else {
-      verticesMap.put(vertex.getVertexID(), serialize(vertex));
-    }
+    // messages must be added in sorted order to work this out correctly
+    checkArgument(!lockedAdditions,
+        "Additions are locked now, nobody is allowed to change the structure anymore.");
+
+    verticesList.add(serialize(vertex));
   }
 
   @Override
   public void removeVertex(V vertexID) throws UnsupportedOperationException {
-    if (verticesMap.containsKey(vertexID)) {
-      verticesMap.remove(vertexID);
-    } else {
-      throw new UnsupportedOperationException("Vertex with ID: " + vertexID
-          + " not found on this peer.");
-    }
+    throw new UnsupportedOperationException(
+        "ListVerticesInfo doesn't support this operation. Please use the MapVerticesInfo.");
   }
 
   public void clear() {
-    verticesMap.clear();
+    verticesList.clear();
   }
 
   @Override
   public int size() {
-    return this.verticesMap.size();
+    return this.verticesList.size();
   }
 
   @Override
   public IDSkippingIterator<V, E, M> skippingIterator() {
     return new IDSkippingIterator<V, E, M>() {
-      Iterator<V> it = verticesMap.keySet().iterator();
+      Iterator<byte[]> it = verticesList.iterator();
 
       @Override
       public boolean hasNext(V msgId,
@@ -96,13 +96,13 @@ public final class ListVerticesInfo<V ex
           throws IOException {
 
         if (it.hasNext()) {
-          V vertexID = it.next();
-          v = deserialize(vertexID, verticesMap.get(vertexID));
+          byte[] serialized = it.next();
+          v = deserialize(serialized);
 
           while (!strat.accept(v, msgId)) {
             if (it.hasNext()) {
-              vertexID = it.next();
-              v = deserialize(vertexID, verticesMap.get(vertexID));
+              serialized = it.next();
+              v = deserialize(serialized);
             } else {
               return false;
             }
@@ -131,42 +131,38 @@ public final class ListVerticesInfo<V ex
   }
 
   public byte[] serialize(Vertex<V, E, M> vertex) throws IOException {
-    v = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
-    v.setEdges(vertex.getEdges());
-    v.setValue(vertex.getValue());
-    if(vertex.isHalted()) {
-      v.voteToHalt();
-    }
     bos = new ByteArrayOutputStream();
     dos = new DataOutputStream(bos);
-    v.write(dos);
+    vertex.write(dos);
     return bos.toByteArray();
   }
 
-  public Vertex<V, E, M> deserialize(V vertexID, byte[] serialized) throws IOException
{
+  public Vertex<V, E, M> deserialize(byte[] serialized) throws IOException {
     bis = new ByteArrayInputStream(serialized);
     dis = new DataInputStream(bis);
     v = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
 
     v.readFields(dis);
     v.setRunner(runner);
-    v.setVertexID(vertexID);
     return v;
   }
 
   @Override
   public void finishVertexComputation(Vertex<V, E, M> vertex)
       throws IOException {
-    verticesMap.put(vertex.getVertexID(), serialize(vertex));
+    verticesList.set(index, serialize(vertex));
+    index++;
   }
 
   @Override
   public void finishAdditions() {
-
+    lockedAdditions = true;
   }
 
   @Override
   public void finishRemovals() {
+    throw new UnsupportedOperationException(
+        "ListVerticesInfo doesn't support this operation. Please use the MapVerticesInfo.");
   }
 
   @Override
@@ -182,6 +178,6 @@ public final class ListVerticesInfo<V ex
 
   @Override
   public void startSuperstep() throws IOException {
-
+    index = 0;
   }
 }

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1556453&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java Wed Jan  8 07:21:58
2014
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Stores the vertices into a memory-based tree map. This implementation allows
+ * the runtime graph modification and random access by vertex ID.
+ * 
+ * But it might be inefficient in memory usage.
+ * 
+ * @param <V> Vertex ID object type
+ * @param <E> Edge cost object type
+ * @param <M> Vertex value object type
+ */
+public final class MapVerticesInfo<V extends WritableComparable<V>, E extends Writable,
M extends Writable>
+    implements VerticesInfo<V, E, M> {
+  private GraphJobRunner<V, E, M> runner;
+  Vertex<V, E, M> v;
+
+  private final SortedMap<V, byte[]> verticesMap = new TreeMap<V, byte[]>();
+
+  private ByteArrayOutputStream bos = null;
+  private DataOutputStream dos = null;
+  private ByteArrayInputStream bis = null;
+  private DataInputStream dis = null;
+
+  @Override
+  public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
+      TaskAttemptID attempt) throws IOException {
+    this.runner = runner;
+  }
+
+  @Override
+  public void addVertex(Vertex<V, E, M> vertex) throws IOException {
+    if (verticesMap.containsKey(vertex.getVertexID())) {
+      throw new UnsupportedOperationException("Vertex with ID: "
+          + vertex.getVertexID() + " already exists!");
+    } else {
+      verticesMap.put(vertex.getVertexID(), serialize(vertex));
+    }
+  }
+
+  @Override
+  public void removeVertex(V vertexID) throws UnsupportedOperationException {
+    if (verticesMap.containsKey(vertexID)) {
+      verticesMap.remove(vertexID);
+    } else {
+      throw new UnsupportedOperationException("Vertex with ID: " + vertexID
+          + " not found on this peer.");
+    }
+  }
+
+  public void clear() {
+    verticesMap.clear();
+  }
+
+  @Override
+  public int size() {
+    return this.verticesMap.size();
+  }
+
+  @Override
+  public IDSkippingIterator<V, E, M> skippingIterator() {
+    return new IDSkippingIterator<V, E, M>() {
+      Iterator<V> it = verticesMap.keySet().iterator();
+
+      @Override
+      public boolean hasNext(V msgId,
+          org.apache.hama.graph.IDSkippingIterator.Strategy strat)
+          throws IOException {
+
+        if (it.hasNext()) {
+          V vertexID = it.next();
+          v = deserialize(vertexID, verticesMap.get(vertexID));
+
+          while (!strat.accept(v, msgId)) {
+            if (it.hasNext()) {
+              vertexID = it.next();
+              v = deserialize(vertexID, verticesMap.get(vertexID));
+            } else {
+              return false;
+            }
+          }
+
+          return true;
+        } else {
+          v = null;
+          return false;
+        }
+      }
+
+      @Override
+      public Vertex<V, E, M> next() {
+        if (v == null) {
+          throw new UnsupportedOperationException(
+              "You must invoke hasNext before ask for the next vertex.");
+        }
+
+        Vertex<V, E, M> tmp = v;
+        v = null;
+        return tmp;
+      }
+
+    };
+  }
+
+  public byte[] serialize(Vertex<V, E, M> vertex) throws IOException {
+    bos = new ByteArrayOutputStream();
+    dos = new DataOutputStream(bos);
+    vertex.write(dos);
+    return bos.toByteArray();
+  }
+
+  public Vertex<V, E, M> deserialize(V vertexID, byte[] serialized)
+      throws IOException {
+    bis = new ByteArrayInputStream(serialized);
+    dis = new DataInputStream(bis);
+    v = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+
+    v.readFields(dis);
+    v.setRunner(runner);
+    v.setVertexID(vertexID);
+    return v;
+  }
+
+  @Override
+  public void finishVertexComputation(Vertex<V, E, M> vertex)
+      throws IOException {
+    verticesMap.put(vertex.getVertexID(), serialize(vertex));
+  }
+
+  @Override
+  public void finishAdditions() {
+  }
+
+  @Override
+  public void finishRemovals() {
+  }
+
+  @Override
+  public void finishSuperstep() {
+  }
+
+  @Override
+  public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
+      throws IOException {
+
+  }
+
+  @Override
+  public void startSuperstep() throws IOException {
+
+  }
+}

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Wed Jan  8 07:21:58 2014
@@ -17,8 +17,12 @@
  */
 package org.apache.hama.graph;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -65,7 +69,7 @@ public abstract class Vertex<V extends W
 
   @Override
   public V getVertexID() {
-    return vertexID;
+    return this.vertexID;
   }
 
   @Override
@@ -299,10 +303,10 @@ public abstract class Vertex<V extends W
   @Override
   public void readFields(DataInput in) throws IOException {
     if (in.readBoolean()) {
-      if (vertexID == null) {
-        vertexID = GraphJobRunner.createVertexIDObject();
+      if (this.vertexID == null) {
+        this.vertexID = GraphJobRunner.createVertexIDObject();
       }
-      vertexID.readFields(in);
+      this.vertexID.readFields(in);
     }
     if (in.readBoolean()) {
       if (this.value == null) {
@@ -402,4 +406,17 @@ public abstract class Vertex<V extends W
   protected GraphJobRunner<V, E, M> getRunner() {
     return runner;
   }
+  
+  public Vertex<V, E, M> deepCopy() throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    this.write(dos);
+    
+    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+    DataInputStream dis = new DataInputStream(bis);
+    
+    Vertex<V, E, M> vertex = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+    vertex.readFields(dis);
+    return vertex;
+  }
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java Wed Jan  8
07:21:58 2014
@@ -17,15 +17,9 @@
  */
 package org.apache.hama.graph;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.bsp.BSPPeer;
@@ -85,8 +79,9 @@ public abstract class VertexInputReader<
     if (!vertexCreation) {
       return null;
     }
-    outputRecord.setKey(vertex);
-    outputRecord.setValue(NullWritable.get());
+
+    outputRecord.setKey(vertex.getVertexID());
+    outputRecord.setValue(vertex);
     return outputRecord;
   }
 
@@ -94,20 +89,10 @@ public abstract class VertexInputReader<
   @Override
   public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
       Partitioner partitioner, Configuration conf, BSPPeer peer, int numTasks) {
-    Vertex<V, E, M> vertex = (Vertex<V, E, M>) outputRecord.getKey();
+    Vertex<V, E, M> vertex = (Vertex<V, E, M>) outputRecord.getValue();
+
     return Math.abs(partitioner.getPartition(vertex.getVertexID(),
         vertex.getValue(), numTasks));
   }
 
-  // final because we don't want vertices to change ordering
-  @Override
-  public final Map<Writable, Writable> newMap() {
-    return new TreeMap<Writable, Writable>();
-  }
-
-  @Override
-  public List<KeyValuePair<Writable, Writable>> newList() {
-    return new LinkedList<KeyValuePair<Writable,Writable>>();
-  }
-
 }

Modified: hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
(original)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
Wed Jan  8 07:21:58 2014
@@ -18,15 +18,21 @@
 
 package org.apache.hama.ml.semiclustering;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.Vertex;
 
-import java.io.IOException;
-import java.util.*;
-
 /**
  * SemiClusteringVertex Class defines each vertex in a Graph job and the
  * compute() method is the function which is applied on each Vertex in the graph
@@ -57,6 +63,7 @@ public class SemiClusteringVertex extend
     if (this.getSuperstepCount() == 0) {
       firstSuperStep();
     }
+
     if (this.getSuperstepCount() >= 1) {
       Set<SemiClusterMessage> scListContainThis = new TreeSet<SemiClusterMessage>();
       Set<SemiClusterMessage> scListNotContainThis = new TreeSet<SemiClusterMessage>();
@@ -104,13 +111,14 @@ public class SemiClusteringVertex extend
    * @throws java.io.IOException
    */
   public void firstSuperStep() throws IOException {
-    Vertex<Text, DoubleWritable, SemiClusterMessage> v = this;
+    Vertex<Text, DoubleWritable, SemiClusterMessage> v = this.deepCopy();
     List<Vertex<Text, DoubleWritable, SemiClusterMessage>> lV = new ArrayList<Vertex<Text,
DoubleWritable, SemiClusterMessage>>();
     lV.add(v);
     String newClusterName = "C" + createNewSemiClusterName(lV);
     SemiClusterMessage initialClusters = new SemiClusterMessage(newClusterName,
         lV, 1);
     sendMessageToNeighbors(initialClusters);
+
     Set<SemiClusterDetails> scList = new TreeSet<SemiClusterDetails>();
     scList.add(new SemiClusterDetails(newClusterName, 1.0));
     SemiClusterMessage vertexValue = new SemiClusterMessage(scList);
@@ -192,6 +200,7 @@ public class SemiClusteringVertex extend
     while (vertexItrator.hasNext()) {
       vertexId.add(vertexItrator.next().getVertexID().toString());
     }
+
     return vertexId;
   }
 
@@ -202,7 +211,7 @@ public class SemiClusteringVertex extend
   public boolean isVertexInSc(SemiClusterMessage msg) {
     List<String> vertexId = getSemiClusterVerticesIdList(msg.getVertexList());
     return vertexId.contains(this.getVertexID().toString())
-            && vertexId.size() < semiClusterMaximumVertexCount;
+        && vertexId.size() < semiClusterMaximumVertexCount;
   }
 
   /**



Mime
View raw message