giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1393266 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/comm/messages/ src/main/java/org/apache/giraph/comm/netty/ src/main/java/org/apache/giraph/comm/requests/ src/main/java/org/apache/giraph/...
Date Wed, 03 Oct 2012 03:05:01 GMT
Author: aching
Date: Wed Oct  3 03:05:01 2012
New Revision: 1393266

URL: http://svn.apache.org/viewvc?rev=1393266&view=rev
Log:
GIRAPH-328: Outgoing messages from current superstep should be grouped
at the sender by owning worker, not by partition. (Eli Reisman via
aching)

Added:
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
Removed:
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1393266&r1=1393265&r2=1393266&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Oct  3 03:05:01 2012
@@ -2,6 +2,10 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-328: Outgoing messages from current superstep should be
+  grouped at the sender by owning worker, not by partition. (Eli
+  Reisman via aching)
+
   GIRAPH-293: Should aggregators be checkpointed? (majakabiljo via
   aching)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1393266&r1=1393265&r2=1393266&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java Wed Oct  3 03:05:01
2012
@@ -26,13 +26,14 @@ import java.util.Map;
 
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import com.google.common.collect.Lists;
 
 /**
- * Aggregates the messages to be send to partitions so they can be sent
+ * Aggregates the messages to be send to workers so they can be sent
  * in bulk.
  *
  * @param <I> Vertex id
@@ -44,11 +45,11 @@ public class SendMessageCache<I extends 
   /** Combiner instance, can be null */
   private final VertexCombiner<I, M> combiner;
   /** Internal cache */
-  private Map<Integer, Map<I, Collection<M>>> messageCache =
-      new HashMap<Integer, Map<I, Collection<M>>>();
+  private Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> messageCache
=
+      new HashMap<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>();
   /** Number of messages in each partition */
-  private final Map<Integer, Integer> messageCountMap =
-      new HashMap<Integer, Integer>();
+  private final Map<WorkerInfo, Integer> messageCountMap =
+      new HashMap<WorkerInfo, Integer>();
 
   /**
    * Constructor
@@ -66,17 +67,27 @@ public class SendMessageCache<I extends 
   /**
    * Add a message to the cache.
    *
-   * @param partitionId Partition id
-   * @param destVertexId Destination vertex id
-   * @param message Message to be added
+   * @param workerInfo the remote worker destination
+   * @param partitionId the remote Partition this message belongs to
+   * @param destVertexId vertex id that is ultimate destination
+   * @param message Message to be send to remote
+   *                <b>host => partition => vertex</b>
    * @return Number of messages in the partition.
    */
-  public int addMessage(Integer partitionId, I destVertexId, M message) {
+  public int addMessage(WorkerInfo workerInfo,
+    final int partitionId, I destVertexId, M message) {
     // Get the message collection
-    Map<I, Collection<M>> idMessagesMap = messageCache.get(partitionId);
+    Map<Integer, Map<I, Collection<M>>> partitionMap =
+      messageCache.get(workerInfo);
+    if (partitionMap == null) {
+      partitionMap = new HashMap<Integer, Map<I, Collection<M>>>();
+      messageCache.put(workerInfo, partitionMap);
+    }
+    Map<I, Collection<M>> idMessagesMap = partitionMap.get(partitionId);
+
     if (idMessagesMap == null) {
       idMessagesMap = new HashMap<I, Collection<M>>();
-      messageCache.put(partitionId, idMessagesMap);
+      partitionMap.put(partitionId, idMessagesMap);
     }
     Collection<M> messages = idMessagesMap.get(destVertexId);
     if (messages == null) {
@@ -85,7 +96,7 @@ public class SendMessageCache<I extends 
     }
 
     // Add the message
-    int originalMessageCount = messages.size();
+    final int originalMessageCount = messages.size();
     messages.add(message);
     if (combiner != null) {
       try {
@@ -97,27 +108,31 @@ public class SendMessageCache<I extends 
       idMessagesMap.put(destVertexId, messages);
     }
 
-    // Update the number of messages per partition
-    Integer currentPartitionMessageCount = messageCountMap.get(partitionId);
-    if (currentPartitionMessageCount == null) {
-      currentPartitionMessageCount = 0;
-    }
-    Integer updatedPartitionMessageCount =
-        currentPartitionMessageCount + messages.size() - originalMessageCount;
-    messageCountMap.put(partitionId, updatedPartitionMessageCount);
-    return updatedPartitionMessageCount;
+    // Update the number of cached, outgoing messages per worker
+    Integer currentWorkerMessageCount = messageCountMap.get(workerInfo);
+    if (currentWorkerMessageCount == null) {
+      currentWorkerMessageCount = 0;
+    }
+    final int updatedWorkerMessageCount =
+        currentWorkerMessageCount + messages.size() - originalMessageCount;
+    messageCountMap.put(workerInfo, updatedWorkerMessageCount);
+    return updatedWorkerMessageCount;
   }
 
   /**
-   * Gets the messages for a partition and removes it from the cache.
+   * Gets the messages for a worker and removes it from the cache.
    *
-   * @param partitionId Partition id
-   * @return Removed partition messages
+   * @param workerInfo the address of the worker who owns the data
+   *                   partitions that are receiving the messages
+   * @return Map of all messages (keyed by partition ID's) destined
+   *         for vertices hosted by <code>workerInfo</code>
    */
-  public Map<I, Collection<M>> removePartitionMessages(int partitionId) {
-    Map<I, Collection<M>> idMessages = messageCache.remove(partitionId);
-    messageCountMap.put(partitionId, 0);
-    return idMessages;
+  public Map<Integer, Map<I, Collection<M>>> removeWorkerMessages(
+    WorkerInfo workerInfo) {
+    Map<Integer, Map<I, Collection<M>>> workerMessages =
+      messageCache.remove(workerInfo);
+    messageCountMap.put(workerInfo, 0);
+    return workerMessages;
   }
 
   /**
@@ -125,9 +140,12 @@ public class SendMessageCache<I extends 
    *
    * @return All vertex messages for all partitions
    */
-  public Map<Integer, Map<I, Collection<M>>> removeAllPartitionMessages()
{
-    Map<Integer, Map<I, Collection<M>>> allMessages = messageCache;
-    messageCache = new HashMap<Integer, Map<I, Collection<M>>>();
+  public Map<WorkerInfo, Map<
+    Integer, Map<I, Collection<M>>>> removeAllMessages() {
+    Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>
+      allMessages = messageCache;
+    messageCache =
+      new HashMap<WorkerInfo, Map<Integer, Map<I, Collection<M>>>>();
     messageCountMap.clear();
     return allMessages;
   }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1393266&r1=1393265&r2=1393266&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Wed
Oct  3 03:05:01 2012
@@ -84,7 +84,7 @@ public class SimpleMessageStore<I extend
   @Override
   public void addVertexMessages(I vertexId,
       Collection<M> messages) throws IOException {
-    int partitionId = getPartitonId(vertexId);
+    int partitionId = getPartitionId(vertexId);
     ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
     if (partitionMap == null) {
       ConcurrentMap<I, Collection<M>> tmpMap  =
@@ -96,13 +96,12 @@ public class SimpleMessageStore<I extend
         partitionMap = map.get(partitionId);
       }
     }
-
     Collection<M> currentMessages =
-        CollectionUtils.addConcurrent(vertexId, messages, partitionMap);
+      CollectionUtils.addConcurrent(vertexId, messages, partitionMap);
     if (combiner != null) {
       synchronized (currentMessages) {
         currentMessages =
-            Lists.newArrayList(combiner.combine(vertexId, currentMessages));
+          Lists.newArrayList(combiner.combine(vertexId, currentMessages));
         partitionMap.put(vertexId, currentMessages);
       }
     }
@@ -148,9 +147,9 @@ public class SimpleMessageStore<I extend
   @Override
   public Collection<M> getVertexMessages(I vertexId) throws IOException {
     ConcurrentMap<I, Collection<M>> partitionMap =
-        map.get(getPartitonId(vertexId));
+        map.get(getPartitionId(vertexId));
     return (partitionMap == null) ? Collections.<M>emptyList() :
-        map.get(getPartitonId(vertexId)).get(vertexId);
+        partitionMap.get(vertexId);
   }
 
   @Override
@@ -167,7 +166,7 @@ public class SimpleMessageStore<I extend
   @Override
   public boolean hasMessagesForVertex(I vertexId) {
     ConcurrentMap<I, Collection<M>> partitionMap =
-        map.get(getPartitonId(vertexId));
+        map.get(getPartitionId(vertexId));
     return (partitionMap == null) ? false : partitionMap.containsKey(vertexId);
   }
 
@@ -190,7 +189,7 @@ public class SimpleMessageStore<I extend
   @Override
   public void clearVertexMessages(I vertexId) throws IOException {
     ConcurrentMap<I, Collection<M>> partitionMap =
-        map.get(getPartitonId(vertexId));
+        map.get(getPartitionId(vertexId));
     if (partitionMap != null) {
       partitionMap.remove(vertexId);
     }
@@ -212,7 +211,7 @@ public class SimpleMessageStore<I extend
    * @param vertexId Id of vertex
    * @return Id of partiton
    */
-  private int getPartitonId(I vertexId) {
+  private int getPartitionId(I vertexId) {
     return service.getVertexPartitionOwner(vertexId).getPartitionId();
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1393266&r1=1393265&r2=1393266&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Wed Oct
 3 03:05:01 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.comm.netty;
 
 import com.google.common.collect.Sets;
 import java.util.Set;
+import java.util.Iterator;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -29,7 +30,7 @@ import org.apache.giraph.comm.ServerData
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.requests.SendPartitionCurrentMessagesRequest;
-import org.apache.giraph.comm.requests.SendPartitionMessagesRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
 import org.apache.giraph.comm.requests.WorkerRequest;
@@ -70,6 +71,8 @@ public class NettyWorkerClient<I extends
   /** Class logger */
   private static final Logger LOG =
     Logger.getLogger(NettyWorkerClient.class);
+  /** signal for getInetSocketAddress() to use WorkerInfo's address */
+  private static final int NO_PARTITION_ID = Integer.MIN_VALUE;
   /** Hadoop configuration */
   private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Netty client that does that actual I/O */
@@ -89,8 +92,8 @@ public class NettyWorkerClient<I extends
    * Cached map of partitions to vertex indices to mutations
    */
   private final SendMutationsCache<I, V, E, M> sendMutationsCache;
-  /** Maximum number of messages per partition before sending */
-  private final int maxMessagesPerPartition;
+  /** Maximum number of messages per remote worker to cache before sending */
+  private final int maxMessagesPerWorker;
   /** Maximum number of mutations per partition before sending */
   private final int maxMutationsPerPartition;
   /** Maximum number of attempts to resolve an address*/
@@ -116,7 +119,7 @@ public class NettyWorkerClient<I extends
     this.nettyClient = new NettyClient(context, configuration);
     this.conf = configuration;
     this.service = service;
-    maxMessagesPerPartition = conf.getInt(
+    maxMessagesPerWorker = conf.getInt(
         GiraphConfiguration.MSG_SIZE,
         GiraphConfiguration.MSG_SIZE_DEFAULT);
     maxMutationsPerPartition = conf.getInt(
@@ -156,8 +159,8 @@ public class NettyWorkerClient<I extends
       }
 
       // No need to connect to myself
-      if (service.getWorkerInfo().getPartitionId() !=
-          partitionOwner.getWorkerInfo().getPartitionId()) {
+      if (service.getWorkerInfo().getTaskId() !=
+          partitionOwner.getWorkerInfo().getTaskId()) {
         addresses.add(getInetSocketAddress(partitionOwner.getWorkerInfo(),
             partitionOwner.getPartitionId()));
       }
@@ -179,35 +182,47 @@ public class NettyWorkerClient<I extends
    */
   private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
       int partitionId) {
-    InetSocketAddress address =
-        partitionIndexAddressMap.get(partitionId);
+    InetSocketAddress address = partitionIndexAddressMap.get(partitionId);
     if (address == null) {
-      address = workerInfo.getInetSocketAddress();
-      int resolveAttempts = 0;
-      while (address.isUnresolved() &&
-          resolveAttempts < maxResolveAddressAttempts) {
-        address = workerInfo.getInetSocketAddress();
-        ++resolveAttempts;
-        LOG.warn("getInetSocketAddress: Failed to resolve " + address +
-            " on attempt " + resolveAttempts + " of " +
-            maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
-        try {
-          Thread.sleep(5000);
-        } catch (InterruptedException e) {
-          LOG.warn("getInetSocketAddress: Interrupted.", e);
-        }
-      }
-      if (resolveAttempts >= maxResolveAddressAttempts) {
-        throw new IllegalStateException("getInetSocketAddress: Coudldn't " +
-            "resolve " + address + " in " +  resolveAttempts + " tries.");
+      address = resolveAddress(workerInfo.getInetSocketAddress());
+      if (partitionId != NO_PARTITION_ID) {
+        // Only cache valid partition ids
+        partitionIndexAddressMap.put(partitionId, address);
       }
-      partitionIndexAddressMap.put(partitionId, address);
     }
 
     return address;
   }
 
   /**
+   * Utility method for getInetSocketAddress()
+   * @param address the address we are attempting to resolve
+   * @return the successfully resolved address.
+   * @throws IllegalStateException if the address is not resolved
+   *         in <code>maxResolveAddressAttempts</code> tries.
+   */
+  private InetSocketAddress resolveAddress(InetSocketAddress address) {
+    int resolveAttempts = 0;
+    while (address.isUnresolved() &&
+      resolveAttempts < maxResolveAddressAttempts) {
+      ++resolveAttempts;
+      LOG.warn("resolveAddress: Failed to resolve " + address +
+        " on attempt " + resolveAttempts + " of " +
+        maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        LOG.warn("resolveAddress: Interrupted.", e);
+      }
+    }
+    if (resolveAttempts >= maxResolveAddressAttempts) {
+      throw new IllegalStateException("resolveAddress: Couldn't " +
+        "resolve " + address + " in " +  resolveAttempts + " tries.");
+    }
+    return address;
+  }
+
+  /**
    * When doing the request, short circuit if it is local
    *
    * @param workerInfo Worker info
@@ -218,56 +233,55 @@ public class NettyWorkerClient<I extends
                          InetSocketAddress remoteServerAddress,
                          WritableRequest writableRequest) {
     // If this is local, execute locally
-    if (service.getWorkerInfo().getPartitionId() ==
-        workerInfo.getPartitionId()) {
+    if (service.getWorkerInfo().getTaskId() ==
+        workerInfo.getTaskId()) {
       ((WorkerRequest) writableRequest).doRequest(serverData);
     } else {
       nettyClient.sendWritableRequest(
-          workerInfo.getPartitionId(), remoteServerAddress, writableRequest);
+          workerInfo.getTaskId(), remoteServerAddress, writableRequest);
     }
   }
 
   @Override
   public void sendMessageRequest(I destVertexId, M message) {
-    PartitionOwner partitionOwner =
-        service.getVertexPartitionOwner(destVertexId);
-    int partitionId = partitionOwner.getPartitionId();
+    PartitionOwner owner = service.getVertexPartitionOwner(destVertexId);
+    WorkerInfo workerInfo = owner.getWorkerInfo();
+    final int partitionId = owner.getPartitionId();
     if (LOG.isTraceEnabled()) {
       LOG.trace("sendMessageRequest: Send bytes (" + message.toString() +
-          ") to " + destVertexId + " with partition " + partitionId);
+          ") to " + destVertexId + " on worker " + workerInfo);
     }
     ++totalMsgsSentInSuperstep;
 
     // Add the message to the cache
-    int partitionMessageCount =
-        sendMessageCache.addMessage(partitionId, destVertexId, message);
+    int workerMessageCount = sendMessageCache
+      .addMessage(workerInfo, partitionId, destVertexId, message);
 
-    // Send a request if enough messages are there for a partition
-    if (partitionMessageCount >= maxMessagesPerPartition) {
-      InetSocketAddress remoteServerAddress =
-          getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
-      Map<I, Collection<M>> partitionMessages =
-          sendMessageCache.removePartitionMessages(partitionId);
+    // Send a request if the cache of outgoing message to
+    // the remote worker 'workerInfo' is full enough to be flushed
+    if (workerMessageCount >= maxMessagesPerWorker) {
+      Map<Integer, Map<I, Collection<M>>> workerMessages =
+        sendMessageCache.removeWorkerMessages(workerInfo);
+      InetSocketAddress remoteWorkerAddress =
+        getInetSocketAddress(workerInfo, partitionId);
       WritableRequest writableRequest =
-          new SendPartitionMessagesRequest<I, V, E, M>(
-              partitionId, partitionMessages);
-      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
-          writableRequest);
+        new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
+      doRequest(workerInfo, remoteWorkerAddress, writableRequest);
     }
   }
 
   @Override
   public void sendPartitionRequest(WorkerInfo workerInfo,
                                    Partition<I, V, E, M> partition) {
+    final int partitionId = partition.getId();
     InetSocketAddress remoteServerAddress =
-        getInetSocketAddress(workerInfo, partition.getId());
+        getInetSocketAddress(workerInfo, partitionId);
     if (LOG.isTraceEnabled()) {
       LOG.trace("sendPartitionRequest: Sending to " +
           remoteServerAddress +
           " from " + workerInfo + ", with partition " + partition);
     }
 
-    int partitionId = partition.getId();
     WritableRequest vertexRequest =
         new SendVertexRequest<I, V, E, M>(partitionId,
             partition.getVertices());
@@ -288,7 +302,7 @@ public class NettyWorkerClient<I extends
         throw new IllegalStateException(
             "sendPartitionReq: Got IOException ", e);
       }
-      if (messagesInMap > maxMessagesPerPartition) {
+      if (messagesInMap > maxMessagesPerWorker) {
         WritableRequest messagesRequest = new
             SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
         doRequest(workerInfo, remoteServerAddress, messagesRequest);
@@ -407,21 +421,19 @@ public class NettyWorkerClient<I extends
   @Override
   public void flush() throws IOException {
     // Execute the remaining sends messages (if any)
-    Map<Integer, Map<I, Collection<M>>> remainingMessageCache =
-        sendMessageCache.removeAllPartitionMessages();
-    for (Entry<Integer, Map<I, Collection<M>>> entry :
-        remainingMessageCache.entrySet()) {
+    Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> remainingMessageCache
=
+        sendMessageCache.removeAllMessages();
+    for (Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry
:
+      remainingMessageCache.entrySet()) {
+      Iterator<Integer> cachedPartitionId =
+        entry.getValue().keySet().iterator();
+      final int partitionId = cachedPartitionId.hasNext() ?
+        cachedPartitionId.next() : NO_PARTITION_ID;
+      InetSocketAddress remoteWorkerAddress =
+        getInetSocketAddress(entry.getKey(), partitionId);
       WritableRequest writableRequest =
-          new SendPartitionMessagesRequest<I, V, E, M>(
-              entry.getKey(), entry.getValue());
-      PartitionOwner partitionOwner =
-          service.getVertexPartitionOwner(
-              entry.getValue().keySet().iterator().next());
-      InetSocketAddress remoteServerAddress =
-          getInetSocketAddress(partitionOwner.getWorkerInfo(),
-              partitionOwner.getPartitionId());
-      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
-          writableRequest);
+        new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
+      doRequest(entry.getKey(), remoteWorkerAddress, writableRequest);
     }
 
     // Execute the remaining sends mutations (if any)

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java?rev=1393266&r1=1393265&r2=1393266&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java Wed Oct  3
03:05:01 2012
@@ -25,7 +25,7 @@ public enum RequestType {
   /** Sending vertices request */
   SEND_VERTEX_REQUEST(SendVertexRequest.class),
   /** Sending a partition of messages for next superstep */
-  SEND_PARTITION_MESSAGES_REQUEST(SendPartitionMessagesRequest.class),
+  SEND_WORKER_MESSAGES_REQUEST(SendWorkerMessagesRequest.class),
   /**
    * Sending a partition of messages for current superstep
    * (used during partition exchange)

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java?rev=1393266&r1=1393265&r2=1393266&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
Wed Oct  3 03:05:01 2012
@@ -18,13 +18,19 @@
 
 package org.apache.giraph.comm.requests;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import org.apache.giraph.comm.ServerData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * Send a collection of vertex messages for a partition. It adds messages to
@@ -36,8 +42,13 @@ import java.util.Map;
  * @param <M> Message data
  */
 public class SendPartitionCurrentMessagesRequest<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    SendPartitionMessagesRequest<I, V, E, M> {
+  V extends Writable, E extends Writable, M extends Writable> extends
+  WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
+  /** the destination partition for these vertices' messages*/
+  private int partitionId;
+  /** map of destination vertex ID's to message lists */
+  private Map<I, Collection<M>> vertexMessageMap;
+
   /** Constructor used for reflection only */
   public SendPartitionCurrentMessagesRequest() { }
 
@@ -48,8 +59,10 @@ public class SendPartitionCurrentMessage
    * @param vertexIdMessages Map of messages to send
    */
   public SendPartitionCurrentMessagesRequest(int partitionId,
-      Map<I, Collection<M>> vertexIdMessages) {
-    super(partitionId, vertexIdMessages);
+    Map<I, Collection<M>> vertexIdMessages) {
+    super();
+    this.partitionId = partitionId;
+    this.vertexMessageMap = vertexIdMessages;
   }
 
   @Override
@@ -58,10 +71,44 @@ public class SendPartitionCurrentMessage
   }
 
   @Override
+  public void readFieldsRequest(DataInput input) throws IOException {
+    partitionId = input.readInt();
+    final int numVertices = input.readInt();
+    vertexMessageMap =
+      Maps.<I, Collection<M>>newHashMapWithExpectedSize(numVertices);
+    for (int i = 0; i < numVertices; ++i) {
+      I nextVertex = getConf().createVertexId();
+      nextVertex.readFields(input);
+      final int numMessages = input.readInt();
+      Collection<M> messagesForVertex =
+        Lists.<M>newArrayListWithExpectedSize(numMessages);
+      vertexMessageMap.put(nextVertex, messagesForVertex);
+      for (int j = 0; j < numMessages; ++j) {
+        M nextMessage = getConf().createMessageValue();
+        nextMessage.readFields(input);
+        messagesForVertex.add(nextMessage);
+      }
+    }
+  }
+
+  @Override
+  public void writeRequest(DataOutput output) throws IOException {
+    output.writeInt(partitionId);
+    output.writeInt(vertexMessageMap.size());
+    for (Entry<I, Collection<M>> entry : vertexMessageMap.entrySet()) {
+      entry.getKey().write(output);
+      output.writeInt(entry.getValue().size());
+      for (M message : entry.getValue()) {
+        message.write(output);
+      }
+    }
+  }
+
+  @Override
   public void doRequest(ServerData<I, V, E, M> serverData) {
     try {
       serverData.getCurrentMessageStore().addPartitionMessages(
-          getVertexIdMessages(), getPartitionId());
+        vertexMessageMap, partitionId);
     } catch (IOException e) {
       throw new RuntimeException("doRequest: Got IOException ", e);
     }

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1393266&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
(added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
Wed Oct  3 03:05:01 2012
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Send a collection of vertex messages for a partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class SendWorkerMessagesRequest<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SendWorkerMessagesRequest.class);
+  /**
+   * All messages for a group of vertices, organized by partition, which
+   * are owned by a single (destination) worker. These messages are all
+   * destined for this worker.
+   * */
+  private Map<Integer, Map<I, Collection<M>>> partitionVertexMessagesMap;
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendWorkerMessagesRequest() { }
+
+  /**
+   * Constructor used to send request.
+   *
+   * @param partVertMsgsMap Map of remote partitions => vertices => messages
+   */
+  public SendWorkerMessagesRequest(
+    Map<Integer, Map<I, Collection<M>>> partVertMsgsMap) {
+    super();
+    this.partitionVertexMessagesMap = partVertMsgsMap;
+  }
+
+  @Override
+  public void readFieldsRequest(DataInput input) throws IOException {
+    int numPartitions = input.readInt();
+    partitionVertexMessagesMap = Maps.<Integer, Map<I, Collection<M>>>
+      newHashMapWithExpectedSize(numPartitions);
+    while (numPartitions-- > 0) {
+      final int partitionId = input.readInt();
+      int numVertices = input.readInt();
+      Map<I, Collection<M>> vertexIdMessages =
+        Maps.<I, Collection<M>>newHashMapWithExpectedSize(numVertices);
+      partitionVertexMessagesMap.put(partitionId, vertexIdMessages);
+      while (numVertices-- > 0) {
+        I vertexId = getConf().createVertexId();
+        vertexId.readFields(input);
+        int messageCount = input.readInt();
+        List<M> messageList =
+          Lists.newArrayListWithExpectedSize(messageCount);
+        while (messageCount-- > 0) {
+          M message = getConf().createMessageValue();
+          message.readFields(input);
+          messageList.add(message);
+        }
+        if (vertexIdMessages.put(vertexId, messageList) != null) {
+          throw new IllegalStateException(
+            "readFields: Already has vertex id " + vertexId);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeRequest(DataOutput output) throws IOException {
+    output.writeInt(partitionVertexMessagesMap.size());
+    for (Entry<Integer, Map<I, Collection<M>>> partitionEntry :
+      partitionVertexMessagesMap.entrySet()) {
+      output.writeInt(partitionEntry.getKey());
+      output.writeInt(partitionEntry.getValue().size());
+      for (Entry<I, Collection<M>> vertexEntry :
+        partitionEntry.getValue().entrySet()) {
+        vertexEntry.getKey().write(output);
+        output.writeInt(vertexEntry.getValue().size());
+        for (M message : vertexEntry.getValue()) {
+          message.write(output);
+        }
+      }
+    }
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_WORKER_MESSAGES_REQUEST;
+  }
+
+  @Override
+  public void doRequest(ServerData<I, V, E, M> serverData) {
+    for (Entry<Integer, Map<I, Collection<M>>> entry :
+      partitionVertexMessagesMap.entrySet()) {
+      try {
+        serverData.getIncomingMessageStore()
+          .addPartitionMessages(entry.getValue(), entry.getKey());
+      } catch (IOException e) {
+        throw new RuntimeException("doRequest: Got IOException ", e);
+      }
+    }
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1393266&r1=1393265&r2=1393266&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Oct  3 03:05:01
2012
@@ -445,10 +445,10 @@ public class BspServiceMaster<I extends 
             partitionLongTailMinPrint) {
           Set<Integer> partitionSet = new TreeSet<Integer>();
           for (WorkerInfo workerInfo : healthyWorkerInfoList) {
-            partitionSet.add(workerInfo.getPartitionId());
+            partitionSet.add(workerInfo.getTaskId());
           }
           for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
-            partitionSet.add(workerInfo.getPartitionId());
+            partitionSet.add(workerInfo.getTaskId());
           }
           for (int i = 1; i <= maxWorkers; ++i) {
             if (partitionSet.contains(Integer.valueOf(i))) {

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1393266&r1=1393265&r2=1393266&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Wed Oct  3 03:05:01
2012
@@ -31,8 +31,8 @@ import org.apache.hadoop.io.Writable;
 public class WorkerInfo implements Writable {
   /** Worker hostname */
   private String hostname;
-  /** Partition id of this worker */
-  private int partitionId = -1;
+  /** Task Partition (Worker) ID of this worker */
+  private int taskId = -1;
   /** Port that the RPC server is using */
   private int port = -1;
   /** Hostname + "_" + id for easier debugging */
@@ -45,25 +45,25 @@ public class WorkerInfo implements Writa
   }
 
   /**
-   * Constructor with paramters.
+   * Constructor with parameters.
    *
    * @param hostname Hostname of this worker.
-   * @param partitionId partition id of this particular object.
+   * @param taskId the task partition for this worker
    * @param port Port of the service.
    */
-  public WorkerInfo(String hostname, int partitionId, int port) {
+  public WorkerInfo(String hostname, int taskId, int port) {
     this.hostname = hostname;
-    this.partitionId = partitionId;
+    this.taskId = taskId;
     this.port = port;
-    this.hostnameId = hostname + "_" + partitionId;
+    this.hostnameId = hostname + "_" + taskId;
   }
 
   public String getHostname() {
     return hostname;
   }
 
-  public int getPartitionId() {
-    return partitionId;
+  public int getTaskId() {
+    return taskId;
   }
 
   public String getHostnameId() {
@@ -88,7 +88,7 @@ public class WorkerInfo implements Writa
     if (other instanceof WorkerInfo) {
       WorkerInfo workerInfo = (WorkerInfo) other;
       if (hostname.equals(workerInfo.getHostname()) &&
-          (partitionId == workerInfo.getPartitionId()) &&
+          (taskId == workerInfo.getTaskId()) &&
           (port == workerInfo.getPort())) {
         return true;
       }
@@ -101,28 +101,28 @@ public class WorkerInfo implements Writa
     int result = 17;
     result = 37 * result + port;
     result = 37 * result + hostname.hashCode();
-    result = 37 * result + partitionId;
+    result = 37 * result + taskId;
     return result;
   }
 
   @Override
   public String toString() {
-    return "Worker(hostname=" + hostname + ", MRpartition=" +
-        partitionId + ", port=" + port + ")";
+    return "Worker(hostname=" + hostname + ", MRtaskID=" +
+        taskId + ", port=" + port + ")";
   }
 
   @Override
   public void readFields(DataInput input) throws IOException {
     hostname = input.readUTF();
-    partitionId = input.readInt();
+    taskId = input.readInt();
     port = input.readInt();
-    hostnameId = hostname + "_" + partitionId;
+    hostnameId = hostname + "_" + taskId;
   }
 
   @Override
   public void write(DataOutput output) throws IOException {
     output.writeUTF(hostname);
-    output.writeInt(partitionId);
+    output.writeInt(taskId);
     output.writeInt(port);
   }
 }

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1393266&r1=1393265&r2=1393266&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Wed Oct  3 03:05:01
2012
@@ -24,7 +24,7 @@ import org.apache.giraph.comm.messages.S
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.netty.NettyServer;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
-import org.apache.giraph.comm.requests.SendPartitionMessagesRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.utils.MockUtils;
@@ -84,9 +84,12 @@ public class RequestFailureTest {
 
   private WritableRequest getRequest() {
     // Data to send
-    int partitionId = 0;
+    final int partitionId = 0;
+    Map<Integer, Map<IntWritable, Collection<IntWritable>>> sendMap =
+        Maps.newHashMap();
     Map<IntWritable, Collection<IntWritable>> vertexIdMessages =
         Maps.newHashMap();
+    sendMap.put(partitionId, vertexIdMessages);
     for (int i = 1; i < 7; ++i) {
       IntWritable vertexId = new IntWritable(i);
       Collection<IntWritable> messages = Lists.newArrayList();
@@ -97,10 +100,10 @@ public class RequestFailureTest {
     }
 
     // Send the request
-    SendPartitionMessagesRequest<IntWritable, IntWritable, IntWritable,
-        IntWritable> request =
-        new SendPartitionMessagesRequest<IntWritable, IntWritable,
-            IntWritable, IntWritable>(partitionId, vertexIdMessages);
+    SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable,
+            IntWritable> request =
+        new SendWorkerMessagesRequest<IntWritable, IntWritable,
+                    IntWritable, IntWritable>(sendMap);
     return request;
   }
 

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1393266&r1=1393265&r2=1393266&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Wed Oct  3 03:05:01
2012
@@ -24,7 +24,7 @@ import org.apache.giraph.comm.messages.S
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.netty.NettyServer;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
-import org.apache.giraph.comm.requests.SendPartitionMessagesRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
 import org.apache.giraph.graph.Edge;
@@ -145,11 +145,14 @@ public class RequestTest {
   }
 
   @Test
-  public void sendPartitionMessagesRequest() throws IOException {
+  public void sendWorkerMessagesRequest() throws IOException {
     // Data to send
+    Map<Integer, Map<IntWritable, Collection<IntWritable>>> sendMap =
+        Maps.newHashMap();
     int partitionId = 0;
     Map<IntWritable, Collection<IntWritable>> vertexIdMessages =
         Maps.newHashMap();
+    sendMap.put(partitionId, vertexIdMessages);
     for (int i = 1; i < 7; ++i) {
       IntWritable vertexId = new IntWritable(i);
       Collection<IntWritable> messages = Lists.newArrayList();
@@ -160,10 +163,10 @@ public class RequestTest {
     }
 
     // Send the request
-    SendPartitionMessagesRequest<IntWritable, IntWritable, IntWritable,
-    IntWritable> request =
-      new SendPartitionMessagesRequest<IntWritable, IntWritable,
-      IntWritable, IntWritable>(partitionId, vertexIdMessages);
+    SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable,
+        IntWritable> request =
+      new SendWorkerMessagesRequest<IntWritable, IntWritable,
+            IntWritable, IntWritable>(sendMap);
     client.sendWritableRequest(-1, server.getMyAddress(), request);
     client.waitAllRequests();
 



Mime
View raw message