giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject git commit: updated refs/heads/trunk to 1a4756b
Date Mon, 06 Jul 2015 17:59:31 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 3b7c68e54 -> 1a4756b66


[GIRAPH-1019] Optimizing and debugging vertex mutation mechanism

Summary:
The old implementation of vertex mutation mechanism was single-threaded and had some redundant
computation. The single threaded behavior is causing a huge performance degradation for out-of-core
case, since all the partitions are being read and written sequentially in one thread to apply
mutations. Also, in case where a vertex is mutated and has messages at the same time, the
current code fails to execute which does not seem to be the expected behavior.

This diff implements an optimized multi-threaded approach for vertex mutations. With this
diff, vertex mutation happens at the beginning of processing each partition. Also, parts of
partition migration code is modified to migrate mutations as well.

Test Plan: mvn clean verify

Reviewers: avery.ching, maja.kabiljo, dionysis.logothetis

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D40821


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1a4756b6
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1a4756b6
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1a4756b6

Branch: refs/heads/trunk
Commit: 1a4756b66bbd219943dce80a95dd46f44f46cbeb
Parents: 3b7c68e
Author: Hassan Eslami <heslami@fb.com>
Authored: Mon Jul 6 10:39:53 2015 -0700
Committer: Avery Ching <aching@fb.com>
Committed: Mon Jul 6 10:55:24 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   3 +
 .../apache/giraph/comm/SendMutationsCache.java  |   2 +-
 .../java/org/apache/giraph/comm/ServerData.java | 118 +++++++++++++++++--
 .../NettyWorkerClientRequestProcessor.java      |  20 +++-
 .../giraph/comm/netty/NettyWorkerServer.java    |  95 ---------------
 .../requests/SendPartitionMutationsRequest.java |  58 ++++++---
 .../apache/giraph/graph/ComputeCallable.java    |   3 +-
 .../apache/giraph/graph/GraphTaskManager.java   |  11 +-
 .../org/apache/giraph/comm/RequestTest.java     |  28 +++--
 .../examples/SimpleMutateGraphComputation.java  |  24 ++++
 .../java/org/apache/giraph/TestMutateGraph.java |  43 +++++++
 11 files changed, 264 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index c8c80df..79f95b0 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+=======
+  GIRAPH-1019: Optimizing and debugging vertex mutation. (heslami via aching)
+
   GIRAPH-1018: Improving PartitionStore API to better match its expected behaviour
   (heslami via aching)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
index a1f08c0..4e5f8d8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
@@ -28,7 +28,7 @@ import java.util.Map;
 
 /**
  * Aggregates the mutations to be sent to partitions so they can be sent in
- * bulk.
+ * bulk. Not thread-safe.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data

http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 129df59..62e58c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -22,7 +22,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
@@ -34,13 +38,17 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.EdgeStore;
 import org.apache.giraph.edge.EdgeStoreFactory;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.partition.DiskBackedPartitionStore;
+import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.partition.SimplePartitionStore;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
 
 /**
  * Anything that the server stores
@@ -52,6 +60,9 @@ import org.apache.hadoop.mapreduce.Mapper;
 @SuppressWarnings("rawtypes")
 public class ServerData<I extends WritableComparable,
     V extends Writable, E extends Writable> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(ServerData.class);
   /** Configuration */
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Partition store for this worker. */
@@ -72,11 +83,23 @@ public class ServerData<I extends WritableComparable,
    */
   private volatile MessageStore<I, Writable> currentMessageStore;
   /**
-   * Map of partition ids to incoming vertex mutations from other workers.
-   * (Synchronized access to values)
+   * Map of partition ids to vertex mutations from other workers. These are
+   * mutations that should be applied before execution of *current* super step.
+   * (accesses to keys should be thread-safe as multiple threads may resolve
+   * mutations of different partitions at the same time)
+   */
+  private ConcurrentMap<Integer,
+      ConcurrentMap<I, VertexMutations<I, V, E>>>
+      oldPartitionMutations = Maps.newConcurrentMap();
+  /**
+   * Map of partition ids to vertex mutations from other workers. These are
+   * mutations that are coming from other workers as the execution goes one in a
+   * super step. These mutations should be applied in the *next* super step.
+   * (this should be thread-safe)
    */
-  private final ConcurrentHashMap<I, VertexMutations<I, V, E>>
-  vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E>>();
+  private ConcurrentMap<Integer,
+      ConcurrentMap<I, VertexMutations<I, V, E>>>
+      partitionMutations = Maps.newConcurrentMap();
   /**
    * Holds aggregtors which current worker owns from current superstep
    */
@@ -217,9 +240,9 @@ public class ServerData<I extends WritableComparable,
    *
    * @return Vertex mutations
    */
-  public ConcurrentHashMap<I, VertexMutations<I, V, E>>
-  getVertexMutations() {
-    return vertexMutations;
+  public ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
+  getPartitionMutations() {
+    return partitionMutations;
   }
 
   /**
@@ -279,4 +302,83 @@ public class ServerData<I extends WritableComparable,
     return currentWorkerToWorkerMessages;
   }
 
+  /**
+   * Prepare resolving mutation.
+   */
+  public void prepareResolveMutations() {
+    oldPartitionMutations = partitionMutations;
+    partitionMutations = Maps.newConcurrentMap();
+  }
+
+  /**
+   * Resolve mutations specific for a partition. This method is called once
+   * per partition, before the computation for that partition starts.
+   * @param partition The partition to resolve mutations for
+   */
+  public void resolvePartitionMutation(Partition<I, V, E> partition) {
+    Integer partitionId = partition.getId();
+    VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
+    ConcurrentMap<I, VertexMutations<I, V, E>> prevPartitionMutations =
+        oldPartitionMutations.get(partitionId);
+
+    // Resolve mutations that are explicitly sent for this partition
+    if (prevPartitionMutations != null) {
+      for (Map.Entry<I, VertexMutations<I, V, E>> entry : prevPartitionMutations
+          .entrySet()) {
+        I vertexId = entry.getKey();
+        Vertex<I, V, E> originalVertex = partition.getVertex(vertexId);
+        VertexMutations<I, V, E> vertexMutations = entry.getValue();
+        Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId,
+            originalVertex, vertexMutations,
+            getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("resolvePartitionMutations: Resolved vertex index " +
+              vertexId + " in partition index " + partitionId +
+              " with original vertex " + originalVertex +
+              ", returned vertex " + vertex + " on superstep " +
+              serviceWorker.getSuperstep() + " with mutations " +
+              vertexMutations);
+        }
+
+        if (vertex != null) {
+          partition.putVertex(vertex);
+        } else if (originalVertex != null) {
+          partition.removeVertex(vertexId);
+          try {
+            getCurrentMessageStore().clearVertexMessages(vertexId);
+          } catch (IOException e) {
+            throw new IllegalStateException("resolvePartitionMutations: " +
+                "Caught IOException while clearing messages for a deleted " +
+                "vertex due to a mutation");
+          }
+        }
+      }
+    }
+
+    // Keep track of vertices which are not here in the partition, but have
+    // received messages
+    Iterable<I> destinations = getCurrentMessageStore().
+        getPartitionDestinationVertices(partitionId);
+    if (!Iterables.isEmpty(destinations)) {
+      for (I vertexId : destinations) {
+        if (partition.getVertex(vertexId) == null) {
+          Vertex<I, V, E> vertex =
+              vertexResolver.resolve(vertexId, null, null, true);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("resolvePartitionMutations: A non-existing vertex has " +
+                "message(s). Added vertex index " + vertexId +
+                " in partition index " + partitionId +
+                ", vertex = " + vertex + ", on superstep " +
+                serviceWorker.getSuperstep());
+          }
+
+          if (vertex != null) {
+            partition.putVertex(vertex);
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index a64a33e..1cd1bd6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -20,6 +20,7 @@ package org.apache.giraph.comm.netty;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -190,6 +191,12 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     // Messages are stored separately
     if (serviceWorker.getSuperstep() != BspService.INPUT_SUPERSTEP) {
       sendPartitionMessages(workerInfo, partition);
+      ConcurrentMap<I, VertexMutations<I, V, E>> vertexMutationMap =
+          serverData.getPartitionMutations().remove(partition.getId());
+      WritableRequest partitionMutationsRequest =
+          new SendPartitionMutationsRequest<I, V, E>(partition.getId(),
+              vertexMutationMap);
+      doRequest(workerInfo, partitionMutationsRequest);
     }
   }
 
@@ -220,11 +227,11 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         }
       } catch (IOException e) {
         throw new IllegalStateException(
-            "sendVertexRequest: Got IOException ", e);
+            "sendPartitionMessages: Got IOException ", e);
       }
       if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) {
-        WritableRequest messagesRequest = new
-            SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
+        WritableRequest messagesRequest =
+            new SendPartitionCurrentMessagesRequest<I, V, E, Writable>(
             partitionId, vertexIdMessages);
         doRequest(workerInfo, messagesRequest);
         vertexIdMessages =
@@ -240,6 +247,13 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
           partitionId, vertexIdMessages);
       doRequest(workerInfo, messagesRequest);
     }
+    try {
+      messageStore.clearPartition(partitionId);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "sendPartitionMessages: Got IOException while removing messages " +
+              "for partition " + partitionId + " :" + e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 81c892d..37fb246 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -25,23 +25,13 @@ import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexMutations;
-import org.apache.giraph.graph.VertexResolver;
-import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-
 import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Map.Entry;
 
 import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
 
@@ -123,91 +113,6 @@ public class NettyWorkerServer<I extends WritableComparable,
   @Override
   public void prepareSuperstep() {
     serverData.prepareSuperstep(); // updates the current message-store
-    resolveMutations();
-  }
-
-  /**
-   * Resolve mutation requests.
-   */
-  private void resolveMutations() {
-    Multimap<Integer, I> resolveVertexIndices = HashMultimap.create(
-        service.getPartitionStore().getNumPartitions(), 100);
-      // Add any mutated vertex indices to be resolved
-    for (Entry<I, VertexMutations<I, V, E>> e :
-        serverData.getVertexMutations().entrySet()) {
-      I vertexId = e.getKey();
-      Integer partitionId = service.getPartitionId(vertexId);
-      if (!resolveVertexIndices.put(partitionId, vertexId)) {
-        throw new IllegalStateException(
-            "resolveMutations: Already has missing vertex on this " +
-                "worker for " + vertexId);
-      }
-    }
-    // Keep track of the vertices which are not here but have received messages
-    for (Integer partitionId : service.getPartitionStore().getPartitionIds()) {
-      Iterable<I> destinations = serverData.getCurrentMessageStore().
-          getPartitionDestinationVertices(partitionId);
-      if (!Iterables.isEmpty(destinations)) {
-        Partition<I, V, E> partition =
-            service.getPartitionStore().getOrCreatePartition(partitionId);
-        for (I vertexId : destinations) {
-          if (partition.getVertex(vertexId) == null) {
-            if (!resolveVertexIndices.put(partitionId, vertexId)) {
-              throw new IllegalStateException(
-                  "resolveMutations: Already has missing vertex on this " +
-                      "worker for " + vertexId);
-            }
-          }
-        }
-        service.getPartitionStore().putPartition(partition);
-      }
-    }
-    // Resolve all graph mutations
-    VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
-    for (Entry<Integer, Collection<I>> e :
-        resolveVertexIndices.asMap().entrySet()) {
-      Partition<I, V, E> partition =
-          service.getPartitionStore().getOrCreatePartition(e.getKey());
-      for (I vertexIndex : e.getValue()) {
-        Vertex<I, V, E> originalVertex =
-            partition.getVertex(vertexIndex);
-
-        VertexMutations<I, V, E> mutations = null;
-        VertexMutations<I, V, E> vertexMutations =
-            serverData.getVertexMutations().get(vertexIndex);
-        if (vertexMutations != null) {
-          synchronized (vertexMutations) {
-            mutations = vertexMutations.copy();
-          }
-          serverData.getVertexMutations().remove(vertexIndex);
-        }
-        Vertex<I, V, E> vertex = vertexResolver.resolve(
-            vertexIndex, originalVertex, mutations,
-            serverData.getCurrentMessageStore().
-                hasMessagesForVertex(vertexIndex));
-        context.progress();
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("resolveMutations: Resolved vertex index " +
-              vertexIndex + " with original vertex " +
-              originalVertex + ", returned vertex " + vertex +
-              " on superstep " + service.getSuperstep() +
-              " with mutations " +
-              mutations);
-        }
-        if (vertex != null) {
-          partition.putVertex(vertex);
-        } else if (originalVertex != null) {
-          partition.removeVertex(originalVertex.getId());
-        }
-      }
-      service.getPartitionStore().putPartition(partition);
-    }
-    if (!serverData.getVertexMutations().isEmpty()) {
-      throw new IllegalStateException("resolveMutations: Illegally " +
-          "still has " + serverData.getVertexMutations().size() +
-          " mutations left.");
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
index de0d098..b1e8b29 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
@@ -35,9 +35,13 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 /**
- * Send a collection of vertex mutations for a partition.
+ * Send a collection of vertex mutations for a partition. This type of request
+ * is used for two purposes: 1) sending mutation requests generated due to user
+ * compute function in the middle of the execution of a superstep, and
+ * 2) sending mutation requests due to partition migration.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
@@ -77,7 +81,12 @@ public class SendPartitionMutationsRequest<I extends WritableComparable,
   public void readFieldsRequest(DataInput input) throws IOException {
     partitionId = input.readInt();
     int vertexIdMutationsSize = input.readInt();
-    vertexIdMutations = Maps.newHashMapWithExpectedSize(vertexIdMutationsSize);
+    // The request is going to be served by adding/merging it with the current
+    // mutations stored in ServerData. Since the mutations stored in ServerData
+    // is in the form of a ConcurrentMap, the data here is being read in this
+    // form, so it would be more efficient to merge/add the mutations in this
+    // request with/to mutations stored in SeverData.
+    vertexIdMutations = Maps.newConcurrentMap();
     for (int i = 0; i < vertexIdMutationsSize; ++i) {
       I vertexId = getConf().createVertexId();
       vertexId.readFields(input);
@@ -110,25 +119,40 @@ public class SendPartitionMutationsRequest<I extends WritableComparable,
 
   @Override
   public void doRequest(ServerData<I, V, E> serverData) {
-    ConcurrentHashMap<I, VertexMutations<I, V, E>> vertexMutations =
-      serverData.getVertexMutations();
+    ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
+        partitionMutations = serverData.getPartitionMutations();
     Histogram verticesInMutationHist = GiraphMetrics.get().perSuperstep()
         .getUniformHistogram(MetricNames.VERTICES_IN_MUTATION_REQUEST);
-    verticesInMutationHist.update(vertexMutations.size());
-    for (Entry<I, VertexMutations<I, V, E>> entry :
-        vertexIdMutations.entrySet()) {
-      VertexMutations<I, V, E> mutations =
-          vertexMutations.get(entry.getKey());
-      if (mutations == null) {
-        mutations = vertexMutations.putIfAbsent(
-            entry.getKey(), entry.getValue());
-        if (mutations == null) {
-          continue;
+    int mutationSize = 0;
+    for (Map<I, VertexMutations<I, V, E>> map : partitionMutations.values())
{
+      mutationSize += map.size();
+    }
+    verticesInMutationHist.update(mutationSize);
+    // If the request is a result of sending mutations in the middle of the
+    // superstep to local partitions, the request is "short-circuit"ed and
+    // vertexIdMutations is coming from an instance of SendMutationsCache.
+    // Since the vertex mutations are created locally, they are not stored in
+    // a ConcurrentMap. So, we first need to transform the data structure
+    // for more efficiently merge/add process.
+    if (!(vertexIdMutations instanceof ConcurrentMap)) {
+      vertexIdMutations = new ConcurrentHashMap<>(vertexIdMutations);
+    }
+
+    ConcurrentMap<I, VertexMutations<I, V, E>> currentVertexIdMutations =
+        partitionMutations.putIfAbsent(partitionId,
+            (ConcurrentMap<I, VertexMutations<I, V, E>>) vertexIdMutations);
+
+    if (currentVertexIdMutations != null) {
+      for (Entry<I, VertexMutations<I, V, E>> entry : vertexIdMutations
+          .entrySet()) {
+        VertexMutations<I, V, E> mutations = currentVertexIdMutations
+            .putIfAbsent(entry.getKey(), entry.getValue());
+        if (mutations != null) {
+          synchronized (mutations) {
+            mutations.addVertexMutations(entry.getValue());
+          }
         }
       }
-      synchronized (mutations) {
-        mutations.addVertexMutations(entry.getValue());
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 16c798c..e44a794 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -161,11 +161,12 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
         break;
       }
 
+      long startTime = System.currentTimeMillis();
       Partition<I, V, E> partition =
           serviceWorker.getPartitionStore().getOrCreatePartition(partitionId);
-      long startTime = System.currentTimeMillis();
 
       try {
+        serviceWorker.getServerData().resolvePartitionMutation(partition);
         PartitionStats partitionStats =
             computePartition(computation, partition);
         partitionStatsList.add(partitionStats);

http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index a75f62b..5c80297 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -238,8 +238,8 @@ end[PURE_YARN]*/
         LOG.info("setup: Chosen to run ZooKeeper...");
       }
     }
-    context.setStatus("setup: Connected to Zookeeper service " +
-      serverPortList);
+    context
+        .setStatus("setup: Connected to Zookeeper service " + serverPortList);
     this.graphFunctions = determineGraphFunctions(conf, zkManager);
     // Sometimes it takes a while to get multiple ZooKeeper servers up
     if (conf.getZooKeeperServerCount() > 1) {
@@ -329,6 +329,8 @@ end[PURE_YARN]*/
       } else if (storeCheckpoint(globalStats.getCheckpointStatus())) {
         break;
       }
+      serviceWorker.getServerData().prepareResolveMutations();
+      context.progress();
       prepareForSuperstep(graphState);
       context.progress();
       MessageStore<I, Writable> messageStore =
@@ -760,8 +762,9 @@ end[PURE_YARN]*/
         }
       };
     List<Collection<PartitionStats>> results =
-        ProgressableUtils.getResultsWithNCallables(callableFactory,
-            numThreads, "compute-%d", context);
+        ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
+            "compute-%d", context);
+
     for (Collection<PartitionStats> result : results) {
       partitionStatsList.addAll(result);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index b56bab3..b9ef1e3 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -55,7 +55,7 @@ import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -97,6 +97,7 @@ public class RequestTest {
         new WorkerRequestServerHandler.Factory(serverData), workerInfo,
             context, new MockExceptionHandler());
     server.start();
+
     workerInfo.setInetSocketAddress(server.getMyAddress());
     client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
@@ -272,7 +273,8 @@ public class RequestTest {
     // Send the request
     SendPartitionMutationsRequest<IntWritable, IntWritable, IntWritable>
         request = new SendPartitionMutationsRequest<IntWritable, IntWritable,
-        IntWritable>(partitionId, vertexIdMutations);
+        IntWritable>(partitionId,
+        vertexIdMutations);
     GiraphMetrics.init(conf);
     client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
@@ -282,25 +284,27 @@ public class RequestTest {
     server.stop();
 
     // Check the output
-    ConcurrentHashMap<IntWritable, VertexMutations<IntWritable, IntWritable,
-    IntWritable>> inVertexIdMutations =
-        serverData.getVertexMutations();
+    ConcurrentMap<IntWritable,
+        VertexMutations<IntWritable, IntWritable, IntWritable>>
+        inVertexIdMutations =
+        serverData.getPartitionMutations().get(partitionId);
     int keySum = 0;
-    for (Entry<IntWritable, VertexMutations<IntWritable, IntWritable,
-        IntWritable>> entry :
-          inVertexIdMutations.entrySet()) {
+    for (Entry<IntWritable,
+        VertexMutations<IntWritable, IntWritable, IntWritable>> entry :
+        inVertexIdMutations
+        .entrySet()) {
       synchronized (entry.getValue()) {
         keySum += entry.getKey().get();
         int vertexValueSum = 0;
-        for (Vertex<IntWritable, IntWritable, IntWritable>
-        vertex : entry.getValue().getAddedVertexList()) {
+        for (Vertex<IntWritable, IntWritable, IntWritable> vertex : entry
+            .getValue().getAddedVertexList()) {
           vertexValueSum += vertex.getValue().get();
         }
         assertEquals(3, vertexValueSum);
         assertEquals(2, entry.getValue().getRemovedVertexCount());
         int removeEdgeValueSum = 0;
-        for (Edge<IntWritable, IntWritable> edge :
-            entry.getValue().getAddedEdgeList()) {
+        for (Edge<IntWritable, IntWritable> edge : entry.getValue()
+            .getAddedEdgeList()) {
           removeEdgeValueSum += edge.getValue().get();
         }
         assertEquals(20, removeEdgeValueSum);

http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
index fc3b6ad..0daaf23 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
@@ -133,6 +133,30 @@ public class SimpleMutateGraphComputation extends BasicComputation<
             " vertices when should have " + vertexCount / 2 +
             " on superstep " + getSuperstep());
       }
+    } else if (getSuperstep() == 9) {
+      // Remove all the vertices created in superstep 1, and send a message to
+      // them at the same time
+      if (vertex.getId().compareTo(
+          new LongWritable(rangeVertexIdStart(1))) >= 0) {
+        // This is an added vertex, so remove it
+        removeVertexRequest(vertex.getId());
+      } else {
+        // This is a vertex since the start of the computation, so send a
+        // message to a vertex added in superstep 1
+        sendMessage(
+            new LongWritable(rangeVertexIdStart(1) + vertex.getId().get()),
+            new DoubleWritable(0.0));
+      }
+    } else if (getSuperstep() == 10) {
+      LOG.debug("Reached superstep " + getSuperstep());
+    } else if (getSuperstep() == 11) {
+      long vertexCount = workerContext.getVertexCount();
+      if (vertexCount / 2 != getTotalNumVertices()) {
+        throw new IllegalStateException(
+            "Impossible to have " + getTotalNumVertices() +
+                " vertices when should have " + vertexCount / 2 +
+                " on superstep " + getSuperstep());
+      }
     } else {
       vertex.voteToHalt();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/1a4756b6/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java b/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java
index 91252f3..8128220 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestMutateGraph.java
@@ -19,10 +19,17 @@
 package org.apache.giraph;
 
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.examples.GeneratedVertexReader;
 import org.apache.giraph.examples.SimpleMutateGraphComputation;
 import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat;
 import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexOutputFormat;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexChanges;
 import org.apache.giraph.job.GiraphJob;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -36,6 +43,37 @@ public class TestMutateGraph extends BspCase {
   public TestMutateGraph() {
       super(TestMutateGraph.class.getName());
   }
+  /**
+   * Custom vertex resolver
+   */
+  public static class TestVertexResolver<I extends WritableComparable, V
+      extends Writable, E extends Writable>
+      extends DefaultVertexResolver {
+    @Override
+    public Vertex resolve(WritableComparable vertexId, Vertex vertex,
+        VertexChanges vertexChanges, boolean hasMessages) {
+      Vertex originalVertex = vertex;
+      // 1. If the vertex exists, first prune the edges
+      removeEdges(vertex, vertexChanges);
+
+      // 2. If vertex removal desired, remove the vertex.
+      vertex = removeVertexIfDesired(vertex, vertexChanges);
+
+      // If vertex removal happens do not add it back even if it has messages.
+      if (originalVertex != null && vertex == null) {
+        hasMessages = false;
+      }
+
+      // 3. If creation of vertex desired, pick first vertex
+      // 4. If vertex doesn't exist, but got messages or added edges, create
+      vertex = addVertexIfDesired(vertexId, vertex, vertexChanges, hasMessages);
+
+      // 5. If edge addition, add the edges
+      addEdges(vertex, vertexChanges);
+
+      return vertex;
+    }
+  }
 
   /**
    * Run a job that tests the various graph mutations that can occur
@@ -53,8 +91,13 @@ public class TestMutateGraph extends BspCase {
     conf.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
     conf.setWorkerContextClass(
         SimpleMutateGraphComputation.SimpleMutateGraphVertexWorkerContext.class);
+    GiraphConstants.USER_PARTITION_COUNT.set(conf, 32);
+    conf.setNumComputeThreads(8);
+    GiraphConstants.VERTEX_RESOLVER_CLASS.set(conf, TestVertexResolver.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf,
         getTempPath(getCallingMethodName()));
+    // Overwrite the number of vertices set in BspCase
+    GeneratedVertexReader.READER_VERTICES.set(conf, 400);
     assertTrue(job.run(true));
   }
 }


Mime
View raw message