hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1514667 - in /hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ examples/src/test/resources/ graph/src/main/java/org/apache/hama/graph/
Date Fri, 16 Aug 2013 12:03:59 GMT
Author: edwardyoon
Date: Fri Aug 16 12:03:59 2013
New Revision: 1514667

URL: http://svn.apache.org/r1514667
Log:
HAMA-767: Add vertex addition/removal APIs (Anastasis Andronidis via edwardyoon)

Added:
    hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
    hama/trunk/examples/src/test/resources/dg.txt
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Aug 16 12:03:59 2013
@@ -4,6 +4,7 @@ Release 0.6.3 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-767: Add vertex addition/removal APIs (Anastasis Andronidis via edwardyoon)
    HAMA-594: Semi-Clustering Algorithm Implementation (Renil Jeseph via edwardyoon)
 
   BUG FIXES

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java?rev=1514667&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java Fri Aug 16
12:03:59 2013
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+import org.apache.hama.graph.GraphJobRunner.GraphJobCounter;
+
+/**
+ * This is an example of how to manipulate Graphs dynamically.
+ * The input of this example is a number in each row. We assume
+ * that the is a vertex with ID:1 which is responsible to create
+ * a sum vertex that will aggregate the values of the other 
+ * vertices. During the aggregation, sum vertex will delete all 
+ * other vertices. 
+ * 
+ * Input example:
+ * 1
+ * 2
+ * 3
+ * 4
+ * 
+ * Output example:
+ * sum  10
+ */
+public class DynamicGraph {
+
+  public static class GraphTextReader extends 
+      VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {
+
+    @Override
+    public boolean parseVertex(LongWritable key, Text value,
+            Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {
+
+        vertex.setVertexID(value);
+        vertex.setValue(new IntWritable(Integer.parseInt(value.toString())));
+
+        return true;
+    }
+  }
+
+  public static class GraphVertex extends 
+      Vertex<Text, NullWritable, IntWritable> {
+    
+    private void createSumVertex() throws IOException {
+      if (this.getVertexID().toString().equals("1")) {
+        Text new_id = new Text("sum");
+        this.addVertex(new_id, new ArrayList<Edge<Text, NullWritable>>(), new
IntWritable(0));
+      }
+    }
+
+    private void sendAllValuesToSumAndRemove() throws IOException {
+      if (!this.getVertexID().toString().equals("sum")) {
+        this.sendMessage(new Text("sum"), this.getValue());
+        this.remove();
+      }
+    }
+
+    // this must run only on "sum" vertex
+    private void calculateSum(Iterable<IntWritable> msgs) throws IOException {
+      if (this.getVertexID().toString().equals("sum")) {
+        int s = 0;
+        for (IntWritable i : msgs) {
+          s += i.get();
+        }
+        s += this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter();
+        this.setValue(new IntWritable(this.getValue().get() +s));
+      } else {
+        throw new UnsupportedOperationException("We have more vertecies than we expected:
" + this.getVertexID() + " " + this.getValue()); 
+      }
+    }
+
+    @Override
+    public void compute(Iterable<IntWritable> msgs) throws IOException {
+      if (this.getSuperstepCount() == 0) {
+        createSumVertex();
+      } else if (this.getSuperstepCount() == 1) {
+        sendAllValuesToSumAndRemove();
+      } else if (this.getSuperstepCount() == 2) {
+        calculateSum(msgs);
+      } else if (this.getSuperstepCount() == 3) {
+        this.voteToHalt();
+      }
+    }
+  }
+
+  public static void main(String[] args) throws IOException, 
+        InterruptedException, ClassNotFoundException {
+    if (args.length != 2) {
+      printUsage();
+    }
+    HamaConfiguration conf = new HamaConfiguration(new Configuration());
+    GraphJob graphJob = createJob(args, conf);
+    long startTime = System.currentTimeMillis();
+    if (graphJob.waitForCompletion(true)) {
+      System.out.println("Job Finished in "
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+    }
+  }
+
+  private static void printUsage() {
+    System.out.println("Usage: <input> <output>");
+    System.exit(-1);
+  }
+
+  private static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException
{
+    GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
+    graphJob.setJobName("Dynamic Graph");
+    graphJob.setVertexClass(GraphVertex.class);
+
+    graphJob.setInputPath(new Path(args[0]));
+    graphJob.setOutputPath(new Path(args[1]));
+
+    graphJob.setVertexIDClass(Text.class);
+    graphJob.setVertexValueClass(IntWritable.class);
+    graphJob.setEdgeValueClass(NullWritable.class);
+
+    graphJob.setInputFormat(TextInputFormat.class);
+
+    graphJob.setVertexInputReaderClass(GraphTextReader.class);
+    graphJob.setPartitioner(HashPartitioner.class);
+
+    graphJob.setOutputFormat(TextOutputFormat.class);
+    graphJob.setOutputKeyClass(Text.class);
+    graphJob.setOutputValueClass(IntWritable.class);
+
+    return graphJob;
+  }  
+
+}

Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java?rev=1514667&view=auto
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java (added)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java Fri Aug
16 12:03:59 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+import org.junit.Test;
+
+/**
+ * Testcase for {@link org.apache.hama.examples.DynamicGraph}
+ */
+public class DynamicGraphTest extends TestCase {
+  private static String OUTPUT = "/tmp/page-out";
+  private Configuration conf = new HamaConfiguration();
+  private FileSystem fs;
+  
+  private void deleteTempDirs() {
+    try {
+      if (fs.exists(new Path(OUTPUT)))
+        fs.delete(new Path(OUTPUT), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void verifyResult() throws IOException {
+    FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
+    for (FileStatus fts : globStatus) {
+      BufferedReader reader = new BufferedReader(new InputStreamReader(
+          fs.open(fts.getPath())));
+      String line = null;
+      while ((line = reader.readLine()) != null) {
+        String[] split = line.split("\t");
+        assertTrue(split[0].equals("sum"));
+        assertTrue(split[1].equals("11"));
+        System.out.println(split[0] + " : " + split[1]);
+      }
+    }
+  }
+  
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException, ClassNotFoundException {
+    try {
+      DynamicGraph.main(new String[] {"src/test/resources/dg.txt", OUTPUT });
+      verifyResult();
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
+}

Added: hama/trunk/examples/src/test/resources/dg.txt
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/resources/dg.txt?rev=1514667&view=auto
==============================================================================
--- hama/trunk/examples/src/test/resources/dg.txt (added)
+++ hama/trunk/examples/src/test/resources/dg.txt Fri Aug 16 12:03:59 2013
@@ -0,0 +1,4 @@
+1
+2
+3
+4
\ No newline at end of file

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java Fri Aug 16
12:03:59 2013
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -90,14 +91,18 @@ public final class AggregationRunner<V e
 
   /**
    * Runs the aggregators by sending their values to the master task.
+   * @param changedVertexCnt 
    */
   public void sendAggregatorValues(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
-      int activeVertices) throws IOException {
+      int activeVertices, int changedVertexCnt) throws IOException {
     // send msgCounts to the master task
     MapWritable updatedCnt = new MapWritable();
     updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable(
         activeVertices));
+    // send total number of vertices changes
+    updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable(
+        changedVertexCnt));
     // also send aggregated values to the master
     if (aggregators != null) {
       for (int i = 0; i < this.aggregators.length; i++) {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java Fri Aug 16
12:03:59 2013
@@ -120,6 +120,11 @@ public final class DiskVerticesInfo<V ex
     size++;
   }
 
+  @Override
+  public void removeVertex(V vertexID) {
+    throw new UnsupportedOperationException ("Not yet implemented");
+  }
+
   /**
    * Serializes the vertex's soft parts to its file. If the vertex does not have
    * an index yet (e.G. at startup) you can provide -1 and it will be added to
@@ -169,6 +174,11 @@ public final class DiskVerticesInfo<V ex
     lockedAdditions = true;
   }
 
+  @Override
+  public void finishRemovals() {
+    throw new UnsupportedOperationException ("Not yet implemented");
+  }
+
   private static long[] copy(ArrayList<Long> lst) {
     long[] arr = new long[lst.size()];
     for (int i = 0; i < arr.length; i++) {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Fri Aug 16 12:03:59
2013
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -63,7 +64,15 @@ public final class GraphJobRunner<V exte
   public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
   public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
   public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
+  public static final String S_FLAG_VERTEX_INCREASE = "hama.3";
+  public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
+  public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
+  public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
   public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
+  public static final Text FLAG_VERTEX_INCREASE = new Text(S_FLAG_VERTEX_INCREASE);
+  public static final Text FLAG_VERTEX_DECREASE = new Text(S_FLAG_VERTEX_DECREASE);
+  public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text(S_FLAG_VERTEX_ALTER_COUNTER);
+  public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(S_FLAG_VERTEX_TOTAL_VERTICES);
 
   public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class";
   public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
@@ -81,6 +90,7 @@ public final class GraphJobRunner<V exte
   private VerticesInfo<V, E, M> vertices;
   private boolean updated = true;
   private int globalUpdateCounts = 0;
+  private int changedVertexCnt = 0;
 
   private long numberVertices = 0;
   // -1 is deactivated
@@ -165,9 +175,20 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
 
+    if (isMasterTask(peer) && iteration == 1) {
+      MapWritable updatedCnt = new MapWritable();
+      updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter())));
+      // send the updates from the master tasks back to the slaves
+      for (String peerName : peer.getAllPeerNames()) {
+        peer.send(peerName, new GraphJobMessage(updatedCnt));
+      }
+    }
+    
     // this is only done in every second iteration
     if (isMasterTask(peer) && iteration > 1) {
       MapWritable updatedCnt = new MapWritable();
+      // send total number of vertices.      
+      updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter())));
       // exit if there's no update made
       if (globalUpdateCounts == 0) {
         updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE));
@@ -210,6 +231,7 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     int activeVertices = 0;
+    this.changedVertexCnt = 0;
     vertices.startSuperstep();
     /*
      * We iterate over our messages and vertices in sorted order. That means
@@ -255,7 +277,7 @@ public final class GraphJobRunner<V exte
     }
     vertices.finishSuperstep();
 
-    aggregationRunner.sendAggregatorValues(peer, activeVertices);
+    aggregationRunner.sendAggregatorValues(peer, activeVertices, this.changedVertexCnt);
     iteration++;
   }
 
@@ -310,6 +332,7 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     vertices.startSuperstep();
+    this.changedVertexCnt = 0;
     IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
     while (skippingIterator.hasNext()) {
       Vertex<V, E, M> vertex = skippingIterator.next();
@@ -319,7 +342,7 @@ public final class GraphJobRunner<V exte
       vertices.finishVertexComputation(vertex);
     }
     vertices.finishSuperstep();
-    aggregationRunner.sendAggregatorValues(peer, 1);
+    aggregationRunner.sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
   }
 
@@ -426,6 +449,51 @@ public final class GraphJobRunner<V exte
   }
 
   /**
+   * Add new vertex into memory of each peer.
+   * @throws IOException 
+   */
+  private void addVertex(Vertex<V, E, M> vertex) throws IOException {
+    vertex.runner = this;
+    vertex.setup(conf);
+          
+    if (conf.getBoolean("hama.graph.self.ref", false)) {
+      vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
+    }
+    
+    LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer " + peer.getPeerName());
+    vertices.addVertex(vertex);
+  }
+
+  /**
+   * Remove vertex from this peer.
+   * @throws IOException 
+   */
+  private void removeVertex(V vertexID) {
+    vertices.removeVertex(vertexID);
+    LOG.debug("Removed VertexID: " + vertexID + " in peer " + peer.getPeerName());
+  }
+
+  /**
+   * After all inserts are done, we must finalize the VertexInfo data structure.
+   * @throws IOException 
+   */
+  private void finishAdditions() throws IOException {
+    vertices.finishAdditions();
+    // finish the "superstep" because we have written a new file here
+    vertices.finishSuperstep();
+  }
+
+  /**
+   * After all inserts are done, we must finalize the VertexInfo data structure.
+   * @throws IOException 
+   */
+  private void finishRemovals() throws IOException {
+    vertices.finishRemovals();
+    // finish the "superstep" because we have written a new file here
+    vertices.finishSuperstep();
+  }
+
+  /**
    * Counts vertices globally by sending the count of vertices in the map to the
    * other peers.
    */
@@ -461,6 +529,9 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
     GraphJobMessage msg = null;
+    boolean dynamicAdditions = false;
+    boolean dynamicRemovals  = false;
+    
     while ((msg = peer.getCurrentMessage()) != null) {
       // either this is a vertex message or a directive that must be read
       // as map
@@ -486,6 +557,20 @@ public final class GraphJobRunner<V exte
               && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
             aggregationRunner.masterReadAggregatedIncrementalValue(vertexID,
                 (M) e.getValue());
+          } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
+            dynamicAdditions = true;
+            addVertex((Vertex<V, E, M>) e.getValue());
+          } else if (FLAG_VERTEX_DECREASE.equals(vertexID)) {
+            dynamicRemovals = true;
+            removeVertex((V) e.getValue());
+          } else if (FLAG_VERTEX_TOTAL_VERTICES.equals(vertexID)) {
+            this.numberVertices = ((LongWritable) e.getValue()).get();
+          } else if (FLAG_VERTEX_ALTER_COUNTER.equals(vertexID)) {
+            if (isMasterTask(peer)) {
+              peer.getCounter(GraphJobCounter.INPUT_VERTICES).increment(((LongWritable) e.getValue()).get());
+            } else {
+              throw new UnsupportedOperationException("A message to increase vertex count
is in a wrong place: " + peer);
+            }
           }
         }
 
@@ -494,6 +579,15 @@ public final class GraphJobRunner<V exte
       }
 
     }
+
+    // If we applied any changes to vertices, we need to call finishAdditions and finishRemovals
in the end.
+    if (dynamicAdditions) {
+      finishAdditions();
+    }
+    if (dynamicRemovals) {
+      finishRemovals();
+    }
+    
     return msg;
   }
 
@@ -605,4 +699,12 @@ public final class GraphJobRunner<V exte
     return (X) ReflectionUtils.newInstance(EDGE_VALUE_CLASS);
   }
 
+  public int getChangedVertexCnt() {
+    return changedVertexCnt;
+  }
+
+  public void setChangedVertexCnt(int changedVertexCnt) {
+    this.changedVertexCnt = changedVertexCnt;
+  }
+
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java Fri Aug 16
12:03:59 2013
@@ -18,9 +18,9 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -37,12 +37,24 @@ import org.apache.hama.bsp.TaskAttemptID
 public final class ListVerticesInfo<V extends WritableComparable<V>, E extends Writable,
M extends Writable>
     implements VerticesInfo<V, E, M> {
 
-  private final List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V,
E, M>>(
-      100);
+  private final SortedSet<Vertex<V, E, M>> vertices = new TreeSet<Vertex<V,
E, M>>();
+  // We will use this variable to make vertex removals, so we don't invoke GC too many times.

+  private final Vertex<V, E, M> vertexTemplate = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
 
   @Override
   public void addVertex(Vertex<V, E, M> vertex) {
-    vertices.add(vertex);
+    if (!vertices.add(vertex)) {
+      throw new UnsupportedOperationException("Vertex with ID: " + vertex.getVertexID() +
" already exists!");
+    }
+  }
+
+  @Override
+  public void removeVertex(V vertexID) throws UnsupportedOperationException {
+    vertexTemplate.setVertexID(vertexID);    
+    
+    if (!vertices.remove(vertexTemplate)) {
+      throw new UnsupportedOperationException("Vertex with ID: " + vertexID + " not found
on this peer.");
+    }
   }
 
   public void clear() {
@@ -57,26 +69,40 @@ public final class ListVerticesInfo<V ex
   @Override
   public IDSkippingIterator<V, E, M> skippingIterator() {
     return new IDSkippingIterator<V, E, M>() {
-      int currentIndex = 0;
+      Iterator<Vertex<V, E, M>> it = vertices.iterator();
+      Vertex<V, E, M> v;
 
       @Override
-      public boolean hasNext(V e,
+      public boolean hasNext(V msgId,
           org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
-        if (currentIndex < vertices.size()) {
 
-          while (!strat.accept(vertices.get(currentIndex), e)) {
-            currentIndex++;
+        if (it.hasNext()) {
+          v = it.next();
+
+          while (!strat.accept(v, msgId)) {
+            if (it.hasNext()) {
+              v = it.next();
+            } else {
+              return false;
+            }
           }
 
           return true;
         } else {
+          v = null;
           return false;
         }
       }
 
       @Override
       public Vertex<V, E, M> next() {
-        return vertices.get(currentIndex++);
+        if (v == null) {
+          throw new UnsupportedOperationException("You must invoke hasNext before ask for
the next vertex.");
+        }
+
+        Vertex<V, E, M> tmp = v;
+        v = null;
+        return tmp;
       }
 
     };
@@ -89,7 +115,11 @@ public final class ListVerticesInfo<V ex
 
   @Override
   public void finishAdditions() {
-    Collections.sort(vertices);
+
+  }
+
+  @Override
+  public void finishRemovals() {
   }
 
   @Override

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri Aug 16 12:03:59 2013
@@ -25,6 +25,8 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.bsp.BSPPeer;
@@ -112,6 +114,42 @@ public abstract class Vertex<V extends W
         new GraphJobMessage(destinationVertexID, msg));
   }
 
+  private void alterVertexCounter(int i) throws IOException {
+    this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i);
+  }
+  
+  @Override
+  public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws IOException
{
+    MapWritable msg = new MapWritable();
+    // Create the new vertex.
+    Vertex<V, E, M> vertex = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+    vertex.setEdges(edges);
+    vertex.setValue(value);
+    vertex.setVertexID(vertexID);
+    
+    msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex);
+    // Find the proper partition to host the new vertex.
+    int partition = getPartitioner().getPartition(vertexID, value, 
+        runner.getPeer().getNumPeers());
+    String destPeer = runner.getPeer().getAllPeerNames()[partition];
+    
+    runner.getPeer().send(destPeer, new GraphJobMessage(msg));
+    
+    alterVertexCounter(1);
+  }
+
+  @Override
+  public void remove() throws IOException {
+    MapWritable msg = new MapWritable();
+    msg.put(GraphJobRunner.FLAG_VERTEX_DECREASE, this.vertexID);
+    
+    // Get master task peer.
+    String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
+    runner.getPeer().send(destPeer, new GraphJobMessage(msg));
+    
+    alterVertexCounter(-1);
+  }
+
   @Override
   public long getSuperstepCount() {
     return runner.getNumberIterations();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Fri Aug 16 12:03:59
2013
@@ -78,6 +78,16 @@ public interface VertexInterface<V exten
   public void sendMessage(V destinationVertexID, M msg) throws IOException;
 
   /**
+   * Sends a message to add a new vertex through the partitioner to the appropriate BSP peer

+   */
+  public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws IOException;
+
+  /**
+   * Removes current Vertex from local peer. 
+   */
+  public void remove() throws IOException;
+
+  /**
    * @return the superstep number of the current superstep (starting from 0).
    */
   public long getSuperstepCount();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Fri Aug 16 12:03:59
2013
@@ -52,12 +52,23 @@ public interface VerticesInfo<V extends 
   public void addVertex(Vertex<V, E, M> vertex) throws IOException;
 
   /**
+   * Remove a vertex to the underlying structure.
+   */
+  public void removeVertex(V vertexID) throws UnsupportedOperationException;
+
+  /**
    * Finish the additions, from this point on the implementations should close
    * the adds and throw exceptions in case something is added after this call.
    */
   public void finishAdditions();
 
   /**
+   * Finish the removals, from this point on the implementations should close
+   * the removes and throw exceptions in case something is removed after this call.
+   */
+  public void finishRemovals();
+
+  /**
    * Called once a superstep starts.
    */
   public void startSuperstep() throws IOException;



Mime
View raw message