hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1555020 - in /hama/trunk: examples/src/main/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/graph/ ml/src/main/java/org/apache/hama/ml/semiclustering/
Date Fri, 03 Jan 2014 07:03:59 GMT
Author: edwardyoon
Date: Fri Jan  3 07:03:58 2014
New Revision: 1555020

URL: http://svn.apache.org/r1555020
Log:
HAMA-783: Efficient InMemory Storage for Vertices

Modified:
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Fri Jan  3 07:03:58
2014
@@ -69,7 +69,6 @@ public class SSSP {
       }
       voteToHalt();
     }
-
   }
 
   public static class MinIntCombiner extends Combiner<IntWritable> {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java Fri Jan  3
07:03:58 2014
@@ -17,12 +17,13 @@
  */
 package org.apache.hama.graph;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,11 +32,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.graph.IDSkippingIterator.Strategy;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 @SuppressWarnings("rawtypes")
 public final class DiskVerticesInfo<V extends WritableComparable, E extends Writable,
M extends Writable>
     implements VerticesInfo<V, E, M> {
@@ -67,12 +67,12 @@ public final class DiskVerticesInfo<V ex
   private Vertex<V, E, M> cachedVertexInstance;
   private int currentStep = 0;
   private int index = 0;
-  private Configuration conf;
+  private HamaConfiguration conf;
   private GraphJobRunner<V, E, M> runner;
   private String staticFile;
 
   @Override
-  public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
+  public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
       TaskAttemptID attempt) throws IOException {
     this.runner = runner;
     this.conf = conf;
@@ -92,7 +92,7 @@ public final class DiskVerticesInfo<V ex
   }
 
   @Override
-  public void cleanup(Configuration conf, TaskAttemptID attempt)
+  public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
       throws IOException {
     IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsNextIterationDos,
         staticGraphPartsDis, softGraphPartsDis);
@@ -122,7 +122,7 @@ public final class DiskVerticesInfo<V ex
 
   @Override
   public void removeVertex(V vertexID) {
-    throw new UnsupportedOperationException ("Not yet implemented");
+    throw new UnsupportedOperationException("Not yet implemented");
   }
 
   /**
@@ -176,7 +176,7 @@ public final class DiskVerticesInfo<V ex
 
   @Override
   public void finishRemovals() {
-    throw new UnsupportedOperationException ("Not yet implemented");
+    throw new UnsupportedOperationException("Not yet implemented");
   }
 
   private static long[] copy(ArrayList<Long> lst) {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Fri Jan  3 07:03:58
2014
@@ -248,12 +248,14 @@ public final class GraphJobRunner<V exte
     int activeVertices = 0;
     this.changedVertexCnt = 0;
     vertices.startSuperstep();
+
     /*
      * We iterate over our messages and vertices in sorted order. That means
      * that we need to seek the first vertex that has the same ID as the
      * currentMessage or the first vertex that is active.
      */
     IDSkippingIterator<V, E, M> iterator = vertices.skippingIterator();
+    
     // note that can't skip inactive vertices because we have to rewrite the
     // complete vertex file in each iteration
     while (iterator.hasNext(
@@ -266,9 +268,11 @@ public final class GraphJobRunner<V exte
         iterable = iterate(currentMessage, (V) currentMessage.getVertexId(),
             vertex, peer);
       }
+      
       if (iterable != null && vertex.isHalted()) {
         vertex.setActive();
       }
+      
       if (!vertex.isHalted()) {
         M lastValue = vertex.getValue();
         if (iterable == null) {
@@ -285,7 +289,7 @@ public final class GraphJobRunner<V exte
         getAggregationRunner().aggregateVertex(lastValue, vertex);
         activeVertices++;
       }
-
+      
       // note that we even need to rewrite the vertex if it is halted for
       // consistency reasons
       vertices.finishVertexComputation(vertex);
@@ -352,7 +356,10 @@ public final class GraphJobRunner<V exte
     IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
     while (skippingIterator.hasNext()) {
       Vertex<V, E, M> vertex = skippingIterator.next();
+      
       M lastValue = vertex.getValue();
+      // Calls setup method.
+      vertex.setup(conf);
       vertex.compute(Collections.singleton(vertex.getValue()));
       getAggregationRunner().aggregateVertex(lastValue, vertex);
       vertices.finishVertexComputation(vertex);
@@ -456,9 +463,6 @@ public final class GraphJobRunner<V exte
             vertex.addEdge(edge);
           }
         } else {
-          vertex.setRunner(this);
-          vertex.setup(conf);
-
           if (selfReference) {
             vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
           }
@@ -469,8 +473,6 @@ public final class GraphJobRunner<V exte
       }
     }
     // add last vertex.
-    vertex.setRunner(this);
-    vertex.setup(conf);
     if (selfReference) {
       vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
     }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java Fri Jan 
3 07:03:58 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.graph;
 
+import java.io.IOException;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -54,8 +56,9 @@ public abstract class IDSkippingIterator
    * Skips nothing, accepts everything.
    * 
    * @return true if the strategy found a new item, false if not.
+   * @throws IOException 
    */
-  public boolean hasNext() {
+  public boolean hasNext() throws IOException {
     return hasNext(null, Strategy.ALL);
   }
 
@@ -63,8 +66,9 @@ public abstract class IDSkippingIterator
    * Skips until the given strategy is satisfied.
    * 
    * @return true if the strategy found a new item, false if not.
+   * @throws IOException 
    */
-  public abstract boolean hasNext(V e, Strategy strat);
+  public abstract boolean hasNext(V e, Strategy strat) throws IOException;
 
   /**
    * @return a found vertex that can be read safely.

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java Fri Jan  3
07:03:58 2014
@@ -17,14 +17,18 @@
  */
 package org.apache.hama.graph;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.Map;
+import java.util.TreeMap;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
@@ -36,52 +40,69 @@ import org.apache.hama.bsp.TaskAttemptID
  */
 public final class ListVerticesInfo<V extends WritableComparable<V>, E extends Writable,
M extends Writable>
     implements VerticesInfo<V, E, M> {
+  private GraphJobRunner<V, E, M> runner;
+  Vertex<V, E, M> v;
 
-  private final SortedSet<Vertex<V, E, M>> vertices = new TreeSet<Vertex<V,
E, M>>();
-  // We will use this variable to make vertex removals, so we don't invoke GC too many times.

-  private final Vertex<V, E, M> vertexTemplate = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+  private final Map<V, byte[]> verticesMap = new TreeMap<V, byte[]>();
+
+  private ByteArrayOutputStream bos = null;
+  private DataOutputStream dos = null;
+  private ByteArrayInputStream bis = null;
+  private DataInputStream dis = null;
+
+  @Override
+  public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
+      TaskAttemptID attempt) throws IOException {
+    this.runner = runner;
+  }
 
   @Override
-  public void addVertex(Vertex<V, E, M> vertex) {
-    if (!vertices.add(vertex)) {
-      throw new UnsupportedOperationException("Vertex with ID: " + vertex.getVertexID() +
" already exists!");
+  public void addVertex(Vertex<V, E, M> vertex) throws IOException {
+    if (verticesMap.containsKey(vertex.getVertexID())) {
+      throw new UnsupportedOperationException("Vertex with ID: "
+          + vertex.getVertexID() + " already exists!");
+    } else {
+      verticesMap.put(vertex.getVertexID(), serialize(vertex));
     }
   }
 
   @Override
   public void removeVertex(V vertexID) throws UnsupportedOperationException {
-    vertexTemplate.setVertexID(vertexID);    
-    
-    if (!vertices.remove(vertexTemplate)) {
-      throw new UnsupportedOperationException("Vertex with ID: " + vertexID + " not found
on this peer.");
+    if (verticesMap.containsKey(vertexID)) {
+      verticesMap.remove(vertexID);
+    } else {
+      throw new UnsupportedOperationException("Vertex with ID: " + vertexID
+          + " not found on this peer.");
     }
   }
 
   public void clear() {
-    vertices.clear();
+    verticesMap.clear();
   }
 
   @Override
   public int size() {
-    return this.vertices.size();
+    return this.verticesMap.size();
   }
 
   @Override
   public IDSkippingIterator<V, E, M> skippingIterator() {
     return new IDSkippingIterator<V, E, M>() {
-      Iterator<Vertex<V, E, M>> it = vertices.iterator();
-      Vertex<V, E, M> v;
+      Iterator<V> it = verticesMap.keySet().iterator();
 
       @Override
       public boolean hasNext(V msgId,
-          org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
+          org.apache.hama.graph.IDSkippingIterator.Strategy strat)
+          throws IOException {
 
         if (it.hasNext()) {
-          v = it.next();
+          V vertexKey = it.next();
+          v = deserialize(verticesMap.get(vertexKey));
 
           while (!strat.accept(v, msgId)) {
             if (it.hasNext()) {
-              v = it.next();
+              vertexKey = it.next();
+              v = deserialize(verticesMap.get(vertexKey));
             } else {
               return false;
             }
@@ -97,7 +118,8 @@ public final class ListVerticesInfo<V ex
       @Override
       public Vertex<V, E, M> next() {
         if (v == null) {
-          throw new UnsupportedOperationException("You must invoke hasNext before ask for
the next vertex.");
+          throw new UnsupportedOperationException(
+              "You must invoke hasNext before ask for the next vertex.");
         }
 
         Vertex<V, E, M> tmp = v;
@@ -108,9 +130,27 @@ public final class ListVerticesInfo<V ex
     };
   }
 
-  @Override
-  public void finishVertexComputation(Vertex<V, E, M> vertex) {
+  public byte[] serialize(Vertex<V, E, M> vertex) throws IOException {
+    bos = new ByteArrayOutputStream();
+    dos = new DataOutputStream(bos);
+    vertex.write(dos);
+    return bos.toByteArray();
+  }
+
+  public Vertex<V, E, M> deserialize(byte[] serialized) throws IOException {
+    bis = new ByteArrayInputStream(serialized);
+    dis = new DataInputStream(bis);
+    v = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
 
+    v.readFields(dis);
+    v.setRunner(runner);
+    return v;
+  }
+
+  @Override
+  public void finishVertexComputation(Vertex<V, E, M> vertex)
+      throws IOException {
+    verticesMap.put(vertex.getVertexID(), serialize(vertex));
   }
 
   @Override
@@ -122,13 +162,13 @@ public final class ListVerticesInfo<V ex
   public void finishRemovals() {
   }
 
-   @Override
+  @Override
   public void finishSuperstep() {
 
   }
 
   @Override
-  public void cleanup(Configuration conf, TaskAttemptID attempt)
+  public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
       throws IOException {
 
   }
@@ -137,11 +177,4 @@ public final class ListVerticesInfo<V ex
   public void startSuperstep() throws IOException {
 
   }
-
-  @Override
-  public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
-      TaskAttemptID attempt) throws IOException {
-
-  }
-
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java Fri Jan
 3 07:03:58 2014
@@ -27,9 +27,9 @@ import org.apache.directmemory.memory.Po
 import org.apache.directmemory.serialization.Serializer;
 import org.apache.directmemory.serialization.kryo.KryoSerializer;
 import org.apache.directmemory.utils.CacheValuesIterable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.util.ReflectionUtils;
 
@@ -37,123 +37,128 @@ import org.apache.hama.util.ReflectionUt
  * An off heap version of a {@link org.apache.hama.graph.Vertex} storage.
  */
 public class OffHeapVerticesInfo<V extends WritableComparable<?>, E extends Writable,
M extends Writable>
-        implements VerticesInfo<V, E, M> {
+    implements VerticesInfo<V, E, M> {
 
-    public static final String DM_STRICT_ITERATOR = "dm.iterator.strict";
-    public static final String DM_BUFFERS = "dm.buffers";
-    public static final String DM_SIZE = "dm.size";
-    public static final String DM_CAPACITY = "dm.capacity";
-    public static final String DM_CONCURRENCY = "dm.concurrency";
-    public static final String DM_DISPOSAL_TIME = "dm.disposal.time";
-    public static final String DM_SERIALIZER = "dm.serializer";
-    public static final String DM_SORTED = "dm.sorted";
-
-    private CacheService<V, Vertex<V, E, M>> vertices;
-
-    private boolean strict;
-    private GraphJobRunner<V, E, M> runner;
-
-    @Override
-    public void init(GraphJobRunner<V, E, M> runner, Configuration conf, TaskAttemptID
attempt) throws IOException {
-        this.runner = runner;
-        this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true);
-        DirectMemory<V, Vertex<V, E, M>> dm = new DirectMemory<V, Vertex<V,
E, M>>()
-                .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 100))
-                .setSize(conf.getInt(DM_SIZE, 102400))
-                .setSerializer(ReflectionUtils.newInstance(conf.getClass(DM_SERIALIZER, KryoSerializer.class,
Serializer.class)))
-                .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 3600000));
-        if (conf.getBoolean(DM_SORTED, true)) {
-            dm.setMap(new ConcurrentSkipListMap<V, Pointer<Vertex<V, E, M>>>());
+  public static final String DM_STRICT_ITERATOR = "dm.iterator.strict";
+  public static final String DM_BUFFERS = "dm.buffers";
+  public static final String DM_SIZE = "dm.size";
+  public static final String DM_CAPACITY = "dm.capacity";
+  public static final String DM_CONCURRENCY = "dm.concurrency";
+  public static final String DM_DISPOSAL_TIME = "dm.disposal.time";
+  public static final String DM_SERIALIZER = "dm.serializer";
+  public static final String DM_SORTED = "dm.sorted";
+
+  private CacheService<V, Vertex<V, E, M>> vertices;
+
+  private boolean strict;
+  private GraphJobRunner<V, E, M> runner;
+
+  @Override
+  public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
+      TaskAttemptID attempt) throws IOException {
+    this.runner = runner;
+    this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true);
+    DirectMemory<V, Vertex<V, E, M>> dm = new DirectMemory<V, Vertex<V,
E, M>>()
+        .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 100))
+        .setSize(conf.getInt(DM_SIZE, 102400))
+        .setSerializer(
+            ReflectionUtils.newInstance(conf.getClass(DM_SERIALIZER,
+                KryoSerializer.class, Serializer.class)))
+        .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 3600000));
+    if (conf.getBoolean(DM_SORTED, true)) {
+      dm.setMap(new ConcurrentSkipListMap<V, Pointer<Vertex<V, E, M>>>());
+    } else {
+      dm.setInitialCapacity(conf.getInt(DM_CAPACITY, 1000))
+          .setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10));
+    }
+
+    this.vertices = dm.newCacheService();
+
+  }
+
+  @Override
+  public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
+      throws IOException {
+    vertices.dump();
+  }
+
+  public void addVertex(Vertex<V, E, M> vertex) {
+    vertices.put(vertex.getVertexID(), vertex);
+  }
+
+  @Override
+  public void finishAdditions() {
+  }
+
+  @Override
+  public void startSuperstep() throws IOException {
+  }
+
+  @Override
+  public void finishSuperstep() throws IOException {
+  }
+
+  @Override
+  public void finishVertexComputation(Vertex<V, E, M> vertex)
+      throws IOException {
+    vertices.put(vertex.getVertexID(), vertex);
+  }
+
+  public void clear() {
+    vertices.clear();
+  }
+
+  public int size() {
+    return (int) this.vertices.entries();
+  }
+
+  @Override
+  public IDSkippingIterator<V, E, M> skippingIterator() {
+    final Iterator<Vertex<V, E, M>> vertexIterator = new CacheValuesIterable<V,
Vertex<V, E, M>>(
+        vertices, strict).iterator();
+
+    return new IDSkippingIterator<V, E, M>() {
+      int currentIndex = 0;
+
+      Vertex<V, E, M> currentVertex = null;
+
+      @Override
+      public boolean hasNext(V e,
+          org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
+        if (currentIndex < vertices.entries()) {
+
+          Vertex<V, E, M> next = vertexIterator.next();
+          while (!strat.accept(next, e)) {
+            currentIndex++;
+          }
+          currentVertex = next;
+          return true;
         } else {
-            dm.setInitialCapacity(conf.getInt(DM_CAPACITY, 1000))
-                    .setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10));
+          return false;
         }
+      }
 
-        this.vertices = dm.newCacheService();
-
-    }
-
-    @Override
-    public void cleanup(Configuration conf, TaskAttemptID attempt) throws IOException {
-        vertices.dump();
-    }
-
-    public void addVertex(Vertex<V, E, M> vertex) {
-        vertices.put(vertex.getVertexID(), vertex);
-    }
-
-    @Override
-    public void finishAdditions() {
-    }
-
-    @Override
-    public void startSuperstep() throws IOException {
-    }
-
-    @Override
-    public void finishSuperstep() throws IOException {
-    }
-
-    @Override
-    public void finishVertexComputation(Vertex<V, E, M> vertex) throws IOException
{
-        vertices.put(vertex.getVertexID(), vertex);
-    }
-
-    public void clear() {
-        vertices.clear();
-    }
-
-    public int size() {
-        return (int) this.vertices.entries();
-    }
-
-    @Override
-    public IDSkippingIterator<V, E, M> skippingIterator() {
-        final Iterator<Vertex<V, E, M>> vertexIterator =
-                new CacheValuesIterable<V, Vertex<V, E, M>>(vertices, strict).iterator();
-
-        return new IDSkippingIterator<V, E, M>() {
-            int currentIndex = 0;
-
-            Vertex<V, E, M> currentVertex = null;
-
-            @Override
-            public boolean hasNext(V e,
-                                   org.apache.hama.graph.IDSkippingIterator.Strategy strat)
{
-                if (currentIndex < vertices.entries()) {
-
-                    Vertex<V, E, M> next = vertexIterator.next();
-                    while (!strat.accept(next, e)) {
-                        currentIndex++;
-                    }
-                    currentVertex = next;
-                    return true;
-                } else {
-                    return false;
-                }
-            }
-
-            @Override
-            public Vertex<V, E, M> next() {
-                currentIndex++;
-                if (currentVertex.getRunner() == null) {
-                  currentVertex.setRunner(runner);
-                }
-                return currentVertex;
-            }
-
-        };
+      @Override
+      public Vertex<V, E, M> next() {
+        currentIndex++;
+        if (currentVertex.getRunner() == null) {
+          currentVertex.setRunner(runner);
+        }
+        return currentVertex;
+      }
 
-    }
+    };
 
-    @Override
-    public void removeVertex(V vertexID) {
-      throw new UnsupportedOperationException ("Not yet implemented");
-    }
+  }
 
-    @Override
-    public void finishRemovals() {
-      throw new UnsupportedOperationException ("Not yet implemented");      
-    }
+  @Override
+  public void removeVertex(V vertexID) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  @Override
+  public void finishRemovals() {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
 
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri Jan  3 07:03:58 2014
@@ -71,7 +71,7 @@ public abstract class Vertex<V extends W
   @Override
   public void setup(HamaConfiguration conf) {
   }
-
+  
   @Override
   public void sendMessage(Edge<V, E> e, M msg) throws IOException {
     runner.getPeer().send(getDestinationPeerName(e),
@@ -172,7 +172,7 @@ public abstract class Vertex<V extends W
 
   @Override
   public M getValue() {
-    return value;
+    return this.value;
   }
 
   @Override
@@ -306,10 +306,11 @@ public abstract class Vertex<V extends W
     }
     if (in.readBoolean()) {
       if (this.value == null) {
-        value = GraphJobRunner.createVertexValue();
+        this.value = GraphJobRunner.createVertexValue();
       }
-      value.readFields(in);
+      this.value.readFields(in);
     }
+    
     this.edges = new ArrayList<Edge<V, E>>();
     if (in.readBoolean()) {
       int num = in.readInt();
@@ -340,6 +341,7 @@ public abstract class Vertex<V extends W
       out.writeBoolean(true);
       vertexID.write(out);
     }
+    
     if (value == null) {
       out.writeBoolean(false);
     } else {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Fri Jan  3 07:03:58
2014
@@ -38,7 +38,10 @@ public interface VertexInterface<V exten
     extends WritableComparable<VertexInterface<V, E, M>> {
 
   /**
-   * Used to setup a vertex.
+   * This method is called once before the Vertex computation begins. Since the
+   * Vertex object is serializable, variables in your Vertex program always
+   * should be declared a s static.
+   * 
    */
   public void setup(HamaConfiguration conf);
 
@@ -78,12 +81,14 @@ public interface VertexInterface<V exten
   public void sendMessage(V destinationVertexID, M msg) throws IOException;
 
   /**
-   * Sends a message to add a new vertex through the partitioner to the appropriate BSP peer

+   * Sends a message to add a new vertex through the partitioner to the
+   * appropriate BSP peer
    */
-  public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws IOException;
+  public void addVertex(V vertexID, List<Edge<V, E>> edges, M value)
+      throws IOException;
 
   /**
-   * Removes current Vertex from local peer. 
+   * Removes current Vertex from local peer.
    */
   public void remove() throws IOException;
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Fri Jan  3 07:03:58
2014
@@ -19,9 +19,9 @@ package org.apache.hama.graph;
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
@@ -37,13 +37,13 @@ public interface VerticesInfo<V extends 
   /**
    * Initialization of internal structures.
    */
-  public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
+  public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
       TaskAttemptID attempt) throws IOException;
 
   /**
    * Cleanup of internal structures.
    */
-  public void cleanup(Configuration conf, TaskAttemptID attempt)
+  public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
       throws IOException;
 
   /**
@@ -92,5 +92,4 @@ public interface VerticesInfo<V extends 
   public int size();
 
   public IDSkippingIterator<V, E, M> skippingIterator();
-
 }

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java Fri Jan
 3 07:03:58 2014
@@ -22,10 +22,10 @@ import java.util.List;
 
 import junit.framework.TestCase;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.graph.example.PageRank;
 import org.apache.hama.graph.example.PageRank.PageRankVertex;
@@ -36,7 +36,7 @@ public class TestDiskVerticesInfo extend
   @Test
   public void testDiskVerticesInfoLifeCycle() throws Exception {
     DiskVerticesInfo<Text, NullWritable, DoubleWritable> info = new DiskVerticesInfo<Text,
NullWritable, DoubleWritable>();
-    Configuration conf = new Configuration();
+    HamaConfiguration conf = new HamaConfiguration();
     conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName());
     conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR,
         NullWritable.class.getName());

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java Fri
Jan  3 07:03:58 2014
@@ -17,28 +17,28 @@
  */
 package org.apache.hama.graph;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.graph.example.PageRank.PageRankVertex;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
 public class TestOffHeapVerticesInfo {
 
   @Test
   public void testOffHeapVerticesInfoLifeCycle() throws Exception {
     OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> info = new OffHeapVerticesInfo<Text,
NullWritable, DoubleWritable>();
-    Configuration conf = new Configuration();
+    HamaConfiguration conf = new HamaConfiguration();
     conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName());
     conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR,
         NullWritable.class.getName());
@@ -121,7 +121,7 @@ public class TestOffHeapVerticesInfo {
   public void testAdditionWithDefaults() throws Exception {
     OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> verticesInfo =
             new OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
-    Configuration conf = new Configuration();
+    HamaConfiguration conf = new HamaConfiguration();
     verticesInfo.init(null, conf, null);
     Vertex<Text, NullWritable, DoubleWritable> vertex = new PageRankVertex();
     vertex.setVertexID(new Text("some-id"));
@@ -133,7 +133,7 @@ public class TestOffHeapVerticesInfo {
   public void testMassiveAdditionWithDefaults() throws Exception {
     OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> verticesInfo =
             new OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
-    Configuration conf = new Configuration();
+    HamaConfiguration conf = new HamaConfiguration();
     verticesInfo.init(null, conf, null);
     assertEquals("vertices info size should be 0 at startup", 0, verticesInfo.size());
     Random r = new Random();

Modified: hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
(original)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
Fri Jan  3 07:03:58 2014
@@ -35,9 +35,9 @@ import java.util.*;
  */
 public class SemiClusteringVertex extends
     Vertex<Text, DoubleWritable, SemiClusterMessage> {
-  private int semiClusterMaximumVertexCount;
-  private int graphJobMessageSentCount;
-  private int graphJobVertexMaxClusterCount;
+  private static int semiClusterMaximumVertexCount;
+  private static int graphJobMessageSentCount;
+  private static int graphJobVertexMaxClusterCount;
 
   @Override
   public void setup(HamaConfiguration conf) {



Mime
View raw message