hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1383326 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ graph/src/main/java/org/apache/hama/graph/
Date Tue, 11 Sep 2012 09:47:47 GMT
Author: tjungblut
Date: Tue Sep 11 09:47:46 2012
New Revision: 1383326

URL: http://svn.apache.org/viewvc?rev=1383326&view=rev
Log:
[HAMA-596]:Optimize memory usage of graph job

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Sep 11 09:47:46 2012
@@ -13,6 +13,7 @@ Release 0.6 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-596: Optimize memory usage of graph job (tjungblut)
    HAMA-599: Improvement of network-based runtime partitioner (edwardyoon)
   
 Release 0.5 - April 10, 2012 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Sep 11 09:47:46
2012
@@ -344,10 +344,15 @@ public final class BSPPeerImpl<K1, V1, K
   /**
    * @return the size of assigned split
    */
+  @Override
   public long getSplitSize() {
     return splitSize;
   }
   
+  /**
+   * @return the position in the input stream.
+   */
+  @Override
   public long getPos() throws IOException {
     return in.getPos();
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Sep 11 09:47:46
2012
@@ -20,11 +20,10 @@ package org.apache.hama.bsp;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.LinkedList;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -64,9 +63,6 @@ public class LocalBSPRunner implements J
   private static String WORKING_DIR = "/tmp/hama-bsp/";
   private volatile ThreadPoolExecutor threadPool;
 
-  @SuppressWarnings("rawtypes")
-  private static final LinkedList<Future<BSPPeerImpl>> FUTURE_LIST = new LinkedList<Future<BSPPeerImpl>>();
-
   private String jobFile;
   private String jobName;
 
@@ -145,16 +141,19 @@ public class LocalBSPRunner implements J
     }
 
     threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(numBspTask);
+    @SuppressWarnings("rawtypes")
+    ExecutorCompletionService<BSPPeerImpl> completionService = new ExecutorCompletionService<BSPPeerImpl>(
+        threadPool);
 
     peerNames = new String[numBspTask];
     for (int i = 0; i < numBspTask; i++) {
       peerNames[i] = "local:" + i;
-      FUTURE_LIST.add(threadPool.submit(new BSPRunner(new Configuration(conf),
-          job, i, splits)));
+      completionService.submit(new BSPRunner(new Configuration(conf), job, i,
+          splits));
       globalCounters.incrCounter(JobInProgress.JobCounter.LAUNCHED_TASKS, 1L);
     }
 
-    new Thread(new ThreadObserver(currentJobStatus)).start();
+    new Thread(new ThreadObserver(numBspTask, completionService)).start();
     return currentJobStatus;
   }
 
@@ -233,7 +232,6 @@ public class LocalBSPRunner implements J
 
     }
 
-    // deprecated until 0.5.0, then it will be removed.
     @SuppressWarnings("unchecked")
     public void run() throws Exception {
 
@@ -287,29 +285,34 @@ public class LocalBSPRunner implements J
   }
 
   // this thread observes the status of the runners.
+  @SuppressWarnings("rawtypes")
   class ThreadObserver implements Runnable {
 
-    final JobStatus status;
+    private final ExecutorCompletionService<BSPPeerImpl> completionService;
+    private final int numTasks;
+
+    public ThreadObserver(int numTasks,
 
-    public ThreadObserver(JobStatus currentJobStatus) {
-      this.status = currentJobStatus;
+    ExecutorCompletionService<BSPPeerImpl> completionService) {
+      this.numTasks = numTasks;
+      this.completionService = completionService;
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
     public void run() {
       boolean success = true;
-      for (Future<BSPPeerImpl> future : FUTURE_LIST) {
+
+      for (int i = 0; i < numTasks; i++) {
         try {
-          BSPPeerImpl bspPeerImpl = future.get();
-          currentJobStatus.getCounter().incrAllCounters(
-              bspPeerImpl.getCounters());
-        } catch (InterruptedException e) {
-          LOG.error("Exception during BSP execution!", e);
-          success = false;
-        } catch (ExecutionException e) {
+          Future<BSPPeerImpl> take = completionService.take();
+          if (take != null) {
+            currentJobStatus.getCounter().incrAllCounters(
+                take.get().getCounters());
+          }
+        } catch (Exception e) {
           LOG.error("Exception during BSP execution!", e);
           success = false;
+          break;
         }
       }
       if (success) {
@@ -321,7 +324,6 @@ public class LocalBSPRunner implements J
       }
       threadPool.shutdownNow();
     }
-
   }
 
   public static class LocalMessageManager<M extends Writable> extends

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java Tue Sep 11
09:47:46 2012
@@ -17,10 +17,10 @@
  */
 package org.apache.hama.bsp.message;
 
+import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
-import java.util.LinkedList;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -31,7 +31,7 @@ import org.apache.hama.bsp.TaskAttemptID
  */
 public final class MemoryQueue<M extends Writable> implements MessageQueue<M>
{
 
-  private final Deque<M> deque = new LinkedList<M>();
+  private final Deque<M> deque = new ArrayDeque<M>();
   private Configuration conf;
 
   @Override

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Tue Sep 11 09:47:46 2012
@@ -27,7 +27,6 @@ public final class Edge<VERTEX_ID extend
 
   private final VERTEX_ID destinationVertexID;
   private final EDGE_VALUE_TYPE cost;
-  String destinationPeerName;
 
   public Edge(VERTEX_ID sourceVertexID, EDGE_VALUE_TYPE cost) {
     this.destinationVertexID = sourceVertexID;
@@ -38,32 +37,16 @@ public final class Edge<VERTEX_ID extend
     }
   }
 
-  public Edge(VERTEX_ID sourceVertexID, String destinationPeer,
-      EDGE_VALUE_TYPE cost) {
-    this.destinationVertexID = sourceVertexID;
-    destinationPeerName = destinationPeer;
-    if (cost instanceof NullWritable) {
-      this.cost = null;
-    } else {
-      this.cost = cost;
-    }
-  }
-
   public VERTEX_ID getDestinationVertexID() {
     return destinationVertexID;
   }
 
-  public String getDestinationPeerName() {
-    return destinationPeerName;
-  }
-
   public EDGE_VALUE_TYPE getValue() {
     return cost;
   }
 
   @Override
   public String toString() {
-    return this.destinationVertexID + ":" + this.getValue() + " (resides on "
-        + destinationPeerName + ")";
+    return this.destinationVertexID + ":" + this.getValue();
   }
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Tue Sep 11 09:47:46
2012
@@ -107,7 +107,6 @@ public final class GraphJobMessage imple
       out.writeInt(outEdges.size());
       for (Object e : outEdges) {
         Edge<?, ?> edge = (Edge<?, ?>) e;
-        out.writeUTF(edge.getDestinationPeerName());
         edge.getDestinationVertexID().write(out);
         if (edge.getValue() != null) {
           out.writeBoolean(true);
@@ -136,7 +135,7 @@ public final class GraphJobMessage imple
       map = new MapWritable();
       map.readFields(in);
     } else if (isPartitioningMessage()) {
-      Vertex<Writable, Writable, Writable> vertex = GraphJobRunner
+      Vertex<Writable, Writable, Writable> vertex = GraphJobRunnerBase
           .newVertexInstance(VERTEX_CLASS, null);
       Writable vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
       vertexId.readFields(in);
@@ -150,7 +149,6 @@ public final class GraphJobMessage imple
       int size = in.readInt();
       vertex.setEdges(new ArrayList<Edge<Writable, Writable>>(size));
       for (int i = 0; i < size; i++) {
-        String destination = in.readUTF();
         Writable edgeVertexID = ReflectionUtils.newInstance(VERTEX_ID_CLASS,
             null);
         edgeVertexID.readFields(in);
@@ -160,7 +158,7 @@ public final class GraphJobMessage imple
           edgeValue.readFields(in);
         }
         vertex.getEdges().add(
-            new Edge<Writable, Writable>(edgeVertexID, destination, edgeValue));
+            new Edge<Writable, Writable>(edgeVertexID, edgeValue));
       }
       this.vertex = vertex;
     } else if (isVerticesSizeMessage()) {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Sep 11 09:47:46
2012
@@ -48,6 +48,7 @@ public final class GraphJobRunner<V exte
   public final void setup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
+    this.peer = peer;
     this.conf = peer.getConfiguration();
     // Choose one as a master to collect global updates
     this.masterTask = peer.getPeerName(0);
@@ -116,8 +117,9 @@ public final class GraphJobRunner<V exte
         .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER,
             VertexInputReader.class), conf);
 
-    loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader, this);
-    
+    loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader,
+        this);
+
     for (String peerName : peer.getAllPeerNames()) {
       peer.send(peerName, new GraphJobMessage(new IntWritable(vertices.size())));
     }
@@ -130,7 +132,7 @@ public final class GraphJobRunner<V exte
         numberVertices += msg.getVerticesSize().get();
       }
     }
-    
+
     // TODO refactor this to a single step
     for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
       LinkedList<M> msgIterator = new LinkedList<M>();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java Tue Sep 11
09:47:46 2012
@@ -89,6 +89,8 @@ public abstract class GraphJobRunnerBase
   protected Class<E> edgeValueClass;
   protected Class<Vertex<V, E, M>> vertexClass;
 
+  protected BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
+
   @SuppressWarnings("unchecked")
   protected void loadVertices(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
@@ -107,7 +109,6 @@ public abstract class GraphJobRunnerBase
     LOG.debug("vertex class: " + vertexClass);
     boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
     Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
-    vertex.setPeer(peer);
     vertex.runner = graphJobRunner;
 
     long startPos = peer.getPos();
@@ -127,27 +128,17 @@ public abstract class GraphJobRunnerBase
         vertex.setEdges(new ArrayList<Edge<V, E>>(0));
       }
       if (selfReference) {
-        vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), peer.getPeerName(),
-            null));
+        vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
       }
       if (runtimePartitioning) {
         int partition = partitioner.getPartition(vertex.getVertexID(),
             vertex.getValue(), peer.getNumPeers());
-        // set the destination name for the edge now
-        for (Edge<V, E> edge : vertex.getEdges()) {
-          int edgePartition = partitioner.getPartition(
-              edge.getDestinationVertexID(), (M) edge.getValue(),
-              peer.getNumPeers());
-          edge.destinationPeerName = peer.getPeerName(edgePartition);
-        }
         peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
       } else {
-        // FIXME need to set destination names
         vertex.setup(conf);
         vertices.put(vertex.getVertexID(), vertex);
       }
       vertex = newVertexInstance(vertexClass, conf);
-      vertex.setPeer(peer);
       vertex.runner = graphJobRunner;
 
       if (runtimePartitioning) {
@@ -157,7 +148,6 @@ public abstract class GraphJobRunnerBase
           GraphJobMessage msg = null;
           while ((msg = peer.getCurrentMessage()) != null) {
             Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
-            messagedVertex.setPeer(peer);
             messagedVertex.runner = graphJobRunner;
             messagedVertex.setup(conf);
             vertices.put(messagedVertex.getVertexID(), messagedVertex);
@@ -173,7 +163,6 @@ public abstract class GraphJobRunnerBase
       GraphJobMessage msg = null;
       while ((msg = peer.getCurrentMessage()) != null) {
         Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
-        messagedVertex.setPeer(peer);
         messagedVertex.runner = graphJobRunner;
         messagedVertex.setup(conf);
         vertices.put(messagedVertex.getVertexID(), messagedVertex);
@@ -238,8 +227,9 @@ public abstract class GraphJobRunnerBase
       int i = 0;
       int syncs = 0;
       for (V v : keys) {
+        Vertex<V, E, M> vertex2 = vertices.get(v);
         for (Edge<V, E> e : vertices.get(v).getEdges()) {
-          peer.send(e.getDestinationPeerName(),
+          peer.send(vertex2.getDestinationPeerName(e),
               new GraphJobMessage(e.getDestinationVertexID()));
         }
 
@@ -251,16 +241,11 @@ public abstract class GraphJobRunnerBase
             V vertexName = (V) msg.getVertexId();
             if (!vertices.containsKey(vertexName)) {
               Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
-              newVertex.setPeer(peer);
               newVertex.setVertexID(vertexName);
               newVertex.runner = graphJobRunner;
               if (selfReference) {
-                int partition = partitioner.getPartition(
-                    newVertex.getVertexID(), newVertex.getValue(),
-                    peer.getNumPeers());
-                String target = peer.getPeerName(partition);
                 newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
-                    newVertex.getVertexID(), target, null)));
+                    newVertex.getVertexID(), null)));
               } else {
                 newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
               }
@@ -278,15 +263,11 @@ public abstract class GraphJobRunnerBase
         V vertexName = (V) msg.getVertexId();
         if (!vertices.containsKey(vertexName)) {
           Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
-          newVertex.setPeer(peer);
           newVertex.setVertexID(vertexName);
           newVertex.runner = graphJobRunner;
           if (selfReference) {
-            int partition = partitioner.getPartition(newVertex.getVertexID(),
-                newVertex.getValue(), peer.getNumPeers());
-            String target = peer.getPeerName(partition);
             newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
-                newVertex.getVertexID(), target, null)));
+                newVertex.getVertexID(), null)));
           } else {
             newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
           }
@@ -477,4 +458,8 @@ public abstract class GraphJobRunnerBase
     return globalAggregatorIncrement[index];
   }
 
+  public BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> getPeer()
{
+    return peer;
+  }
+
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Tue Sep 11 09:47:46 2012
@@ -30,16 +30,16 @@ import org.apache.hama.bsp.Partitioner;
 public abstract class Vertex<V extends Writable, E extends Writable, M extends Writable>
     implements VertexInterface<V, E, M> {
 
+  GraphJobRunner<?, ?, ?> runner;
+
   private V vertexID;
   private M value;
-  protected GraphJobRunner<V, E, M> runner;
-  private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
   private List<Edge<V, E>> edges;
 
   private boolean votedToHalt = false;
 
   public Configuration getConf() {
-    return peer.getConfiguration();
+    return runner.getPeer().getConfiguration();
   }
 
   @Override
@@ -53,10 +53,28 @@ public abstract class Vertex<V extends W
 
   @Override
   public void sendMessage(Edge<V, E> e, M msg) throws IOException {
-    peer.send(e.getDestinationPeerName(),
+    runner.getPeer().send(getDestinationPeerName(e),
         new GraphJobMessage(e.getDestinationVertexID(), msg));
   }
 
+  /**
+   * @return the destination peer name of the destination of the given directed
+   *         edge.
+   */
+  public String getDestinationPeerName(Edge<V, E> edge) {
+    return getDestinationPeerName(edge.getDestinationVertexID());
+  }
+
+  /**
+   * @return the destination peer name of the given vertex id, determined by the
+   *         partitioner.
+   */
+  public String getDestinationPeerName(V vertexId) {
+    return runner.getPeer().getPeerName(
+        getPartitioner().getPartition(vertexId, value,
+            runner.getPeer().getNumPeers()));
+  }
+
   @Override
   public void sendMessageToNeighbors(M msg) throws IOException {
     final List<Edge<V, E>> outEdges = this.getEdges();
@@ -68,9 +86,10 @@ public abstract class Vertex<V extends W
   @Override
   public void sendMessage(V destinationVertexID, M msg) throws IOException {
     int partition = getPartitioner().getPartition(destinationVertexID, msg,
-        peer.getNumPeers());
-    String destPeer = peer.getAllPeerNames()[partition];
-    peer.send(destPeer, new GraphJobMessage(destinationVertexID, msg));
+        runner.getPeer().getNumPeers());
+    String destPeer = runner.getPeer().getAllPeerNames()[partition];
+    runner.getPeer().send(destPeer,
+        new GraphJobMessage(destinationVertexID, msg));
   }
 
   @Override
@@ -84,7 +103,7 @@ public abstract class Vertex<V extends W
 
   public void addEdge(Edge<V, E> edge) {
     if (edges == null) {
-      this.edges = new ArrayList<Edge<V, E>>();
+      this.edges = new ArrayList<Edge<V, E>>(1);
     }
     this.edges.add(edge);
   }
@@ -138,23 +157,22 @@ public abstract class Vertex<V extends W
   }
 
   public int getNumPeers() {
-    return peer.getNumPeers();
+    return runner.getPeer().getNumPeers();
   }
 
   /**
    * Gives access to the BSP primitives and additional features by a peer.
    */
   public BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> getPeer()
{
-    return peer;
+    return runner.getPeer();
   }
 
+  /**
+   * @return the configured partitioner instance to message vertices.
+   */
+  @SuppressWarnings("unchecked")
   public Partitioner<V, M> getPartitioner() {
-    return runner.getPartitioner();
-  }
-
-  void setPeer(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
-    this.peer = peer;
+    return (Partitioner<V, M>) runner.getPartitioner();
   }
 
   @Override



Mime
View raw message