hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1345327 - in /incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph: GraphJobRunner.java Vertex.java VertexInterface.java
Date Fri, 01 Jun 2012 20:01:41 GMT
Author: tjungblut
Date: Fri Jun  1 20:01:40 2012
New Revision: 1345327

URL: http://svn.apache.org/viewvc?rev=1345327&view=rev
Log:
Enable out of core messaging for bipartite matching in graph module

Modified:
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1345327&r1=1345326&r2=1345327&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Fri
Jun  1 20:01:40 2012
@@ -69,6 +69,7 @@ public final class GraphJobRunner<VERTEX
 
   private Configuration conf;
   private Combiner<VERTEX_VALUE> combiner;
+  private Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner;
 
   // multiple aggregator arrays
   private Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>[]
aggregators;
@@ -92,11 +93,10 @@ public final class GraphJobRunner<VERTEX
   private int maxIteration = -1;
   private long iteration;
 
-  // aimed to be accessed by vertex writables to serialize stuff
-  Class<VERTEX_ID> vertexIdClass;
-  Class<VERTEX_VALUE> vertexValueClass;
-  Class<EDGE_VALUE_TYPE> edgeValueClass;
-  Class<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> vertexClass;
+  private Class<VERTEX_ID> vertexIdClass;
+  private Class<VERTEX_VALUE> vertexValueClass;
+  private Class<EDGE_VALUE_TYPE> edgeValueClass;
+  private Class<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> vertexClass;
 
   @Override
   @SuppressWarnings("unchecked")
@@ -125,7 +125,7 @@ public final class GraphJobRunner<VERTEX
     boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
     boolean runtimePartitioning = conf.getBoolean(
         GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
-    Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner = (Partitioner<VERTEX_ID, VERTEX_VALUE>)
ReflectionUtils
+    partitioner = (Partitioner<VERTEX_ID, VERTEX_VALUE>) ReflectionUtils
         .newInstance(
             conf.getClass("bsp.input.partitioner.class", HashPartitioner.class),
             conf);
@@ -389,7 +389,7 @@ public final class GraphJobRunner<VERTEX
     boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
     Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = newVertexInstance(
         vertexClass, conf);
-    vertex.peer = peer;
+    vertex.setPeer(peer);
     vertex.runner = this;
     while (true) {
       KeyValuePair<Writable, Writable> next = peer.readNext();
@@ -424,7 +424,7 @@ public final class GraphJobRunner<VERTEX
         vertices.put(vertex.getVertexID(), vertex);
       }
       vertex = newVertexInstance(vertexClass, conf);
-      vertex.peer = peer;
+      vertex.setPeer(peer);
       vertex.runner = this;
     }
 
@@ -434,7 +434,7 @@ public final class GraphJobRunner<VERTEX
       while ((msg = peer.getCurrentMessage()) != null) {
         Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> messagedVertex = (Vertex<VERTEX_ID,
VERTEX_VALUE, EDGE_VALUE_TYPE>) msg
             .getVertex();
-        messagedVertex.peer = peer;
+        messagedVertex.setPeer(peer);
         messagedVertex.runner = this;
         messagedVertex.setup(conf);
         vertices.put(messagedVertex.getVertexID(), messagedVertex);
@@ -466,7 +466,7 @@ public final class GraphJobRunner<VERTEX
         if (!vertices.containsKey(vertexName)) {
           Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> newVertex = newVertexInstance(
               vertexClass, conf);
-          newVertex.peer = peer;
+          newVertex.setPeer(peer);
           newVertex.setVertexID(vertexName);
           newVertex.runner = this;
           if (selfReference) {
@@ -543,6 +543,10 @@ public final class GraphJobRunner<VERTEX
     return maxIteration;
   }
 
+  public Partitioner<VERTEX_ID, VERTEX_VALUE> getPartitioner() {
+    return partitioner;
+  }
+
   public final Writable getLastAggregatedValue(int index) {
     return globalAggregatorResult[index];
   }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1345327&r1=1345326&r2=1345327&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri Jun  1
20:01:40 2012
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.Partitioner;
 
 public abstract class Vertex<ID_TYPE extends Writable, MSG_TYPE extends Writable, EDGE_VALUE_TYPE
extends Writable>
     implements VertexInterface<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> {
@@ -32,7 +33,7 @@ public abstract class Vertex<ID_TYPE ext
   private ID_TYPE vertexID;
   private MSG_TYPE value;
   protected GraphJobRunner<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> runner;
-  protected BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
+  private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
   private List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> edges;
 
   public Configuration getConf() {
@@ -64,6 +65,15 @@ public abstract class Vertex<ID_TYPE ext
   }
 
   @Override
+  public void sendMessage(ID_TYPE destinationVertexID, MSG_TYPE msg)
+      throws IOException {
+    int partition = getPartitioner().getPartition(destinationVertexID, msg,
+        peer.getNumPeers());
+    String destPeer = peer.getAllPeerNames()[partition];
+    peer.send(destPeer, new GraphJobMessage(destinationVertexID, msg));
+  }
+
+  @Override
   public long getSuperstepCount() {
     return runner.getNumberIterations();
   }
@@ -131,6 +141,22 @@ public abstract class Vertex<ID_TYPE ext
     return peer.getNumPeers();
   }
 
+  /**
+   * Gives access to the BSP primitives and additional features by a peer.
+   */
+  public BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> getPeer()
{
+    return peer;
+  }
+
+  public Partitioner<ID_TYPE, MSG_TYPE> getPartitioner() {
+    return runner.getPartitioner();
+  }
+
+  void setPeer(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+    this.peer = peer;
+  }
+
   @Override
   public long getNumVertices() {
     return runner.getNumberVertices();

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1345327&r1=1345326&r2=1345327&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Fri
Jun  1 20:01:40 2012
@@ -40,39 +40,55 @@ public interface VertexInterface<ID_TYPE
    */
   public void setup(Configuration conf);
 
-  /** @return the unique identification for the vertex. */
+  /**
+   * @return the unique identification for the vertex.
+   */
   public ID_TYPE getVertexID();
 
-  /** @return the number of vertices in the input graph. */
+  /**
+   * @return the number of vertices in the input graph.
+   */
   public long getNumVertices();
 
-  /** The user-defined function */
+  /**
+   * The user-defined function
+   */
   public void compute(Iterator<MSG_TYPE> messages) throws IOException;
 
-  /** @return a list of outgoing edges of this vertex in the input graph. */
+  /**
+   * @return a list of outgoing edges of this vertex in the input graph.
+   */
   public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getEdges();
 
-  /** Sends a message to another vertex. */
+  /**
+   * Sends a message to another vertex.
+   */
   public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg)
       throws IOException;
 
-  /** Sends a message to neighbors */
+  /**
+   * Sends a message to neighbors
+   */
   public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException;
 
-  /** @return the superstep number of the current superstep (starting from 0). */
+  /**
+   * Sends a message to the given destination vertex by ID and the message value
+   */
+  public void sendMessage(ID_TYPE destinationVertexID, MSG_TYPE msg)
+      throws IOException;
+
+  /**
+   * @return the superstep number of the current superstep (starting from 0).
+   */
   public long getSuperstepCount();
 
   /**
    * Sets the vertex value
-   * 
-   * @param value
    */
   public void setValue(MSG_TYPE value);
 
   /**
    * Gets the vertex value
-   * 
-   * @return value
    */
   public MSG_TYPE getValue();
 



Mime
View raw message