giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject svn commit: r1406239 - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/bsp/ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/messages/ giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/mai...
Date Tue, 06 Nov 2012 17:36:37 GMT
Author: maja
Date: Tue Nov  6 17:36:36 2012
New Revision: 1406239

URL: http://svn.apache.org/viewvc?rev=1406239&view=rev
Log:
GIRAPH-404: More SendMessageCache improvements

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairList.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairListWritable.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Nov  6 17:36:36 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-404: More SendMessageCache improvements (majakabiljo)
+
   GIRAPH-412: Checkstyle error from Giraph-403 (majakabiljo) 
 
   GIRAPH-403: GraphMapper.notiftySentMessages need to be thread-safe 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
Tue Nov  6 17:36:36 2012
@@ -168,6 +168,13 @@ public interface CentralizedServiceWorke
   PartitionOwner getVertexPartitionOwner(I vertexId);
 
   /**
+   * Get all partition owners.
+   *
+   * @return Iterable through partition owners
+   */
+  Iterable<? extends PartitionOwner> getPartitionOwners();
+
+  /**
    * Look up a vertex on a worker given its vertex index.
    *
    * @param vertexId Vertex index to look for

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java Tue Nov
 6 17:36:36 2012
@@ -18,15 +18,20 @@
 
 package org.apache.giraph.comm;
 
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 /**
  * Aggregates the messages to be send to workers so they can be sent
  * in bulk.  Not thread-safe.
@@ -37,30 +42,45 @@ import org.apache.hadoop.io.WritableComp
 @SuppressWarnings("rawtypes")
 public class SendMessageCache<I extends WritableComparable,
     M extends Writable> {
-  /** Combiner instance, can be null */
-  private final VertexCombiner<I, M> combiner;
   /** Internal cache */
-  private Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
-  messageCache =
-      new HashMap<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>();
+  private final VertexIdMessageCollection<I, M>[] messageCache;
   /** Number of messages in each partition */
-  private final Map<WorkerInfo, Integer> messageCountMap =
-      new HashMap<WorkerInfo, Integer>();
+  private final int[] messageCounts;
+  /** List of partition ids belonging to a worker */
+  private final Map<WorkerInfo, List<Integer>> workerPartitions =
+      Maps.newHashMap();
   /** Giraph configuration */
   private final ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Constructor
    *
-   * @param conf Configuration used for instantiating the combiner.
+   * @param conf Giraph configuration
+   * @param serviceWorker Service worker
    */
-  public SendMessageCache(ImmutableClassesGiraphConfiguration conf) {
+  public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
+      CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
     this.conf = conf;
-    if (conf.getVertexCombinerClass() == null) {
-      this.combiner = null;
-    } else {
-      this.combiner = conf.createVertexCombiner();
+
+    int maxPartition = 0;
+    for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
+      List<Integer> workerPartitionIds =
+          workerPartitions.get(partitionOwner.getWorkerInfo());
+      if (workerPartitionIds == null) {
+        workerPartitionIds = Lists.newArrayList();
+        workerPartitions.put(partitionOwner.getWorkerInfo(),
+            workerPartitionIds);
+      }
+      workerPartitionIds.add(partitionOwner.getPartitionId());
+      maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
     }
+    messageCache = new VertexIdMessageCollection[maxPartition + 1];
+
+    int maxWorker = 0;
+    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+      maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
+    }
+    messageCounts = new int[maxWorker + 1];
   }
 
   /**
@@ -76,31 +96,18 @@ public class SendMessageCache<I extends 
   public int addMessage(WorkerInfo workerInfo,
     final int partitionId, I destVertexId, M message) {
     // Get the message collection
-    Map<Integer, VertexIdMessageCollection<I, M>> partitionMap =
-      messageCache.get(workerInfo);
-    if (partitionMap == null) {
-      partitionMap = new HashMap<Integer, VertexIdMessageCollection<I, M>>();
-      messageCache.put(workerInfo, partitionMap);
+    VertexIdMessageCollection<I, M> partitionMessages =
+        messageCache[partitionId];
+    if (partitionMessages == null) {
+      partitionMessages = new VertexIdMessageCollection<I, M>(conf);
+      partitionMessages.initialize();
+      messageCache[partitionId] = partitionMessages;
     }
-    VertexIdMessageCollection<I, M> vertexMessages =
-        partitionMap.get(partitionId);
-
-    if (vertexMessages == null) {
-      vertexMessages = new VertexIdMessageCollection<I, M>(conf);
-      vertexMessages.initialize();
-      partitionMap.put(partitionId, vertexMessages);
-    }
-    vertexMessages.add(destVertexId, message);
+    partitionMessages.add(destVertexId, message);
 
     // Update the number of cached, outgoing messages per worker
-    Integer currentWorkerMessageCount = messageCountMap.get(workerInfo);
-    if (currentWorkerMessageCount == null) {
-      currentWorkerMessageCount = 0;
-    }
-    final int updatedWorkerMessageCount =
-        currentWorkerMessageCount + 1;
-    messageCountMap.put(workerInfo, updatedWorkerMessageCount);
-    return updatedWorkerMessageCount;
+    messageCounts[workerInfo.getTaskId()]++;
+    return messageCounts[workerInfo.getTaskId()];
   }
 
   /**
@@ -108,14 +115,21 @@ public class SendMessageCache<I extends 
    *
    * @param workerInfo the address of the worker who owns the data
    *                   partitions that are receiving the messages
-   * @return Map of all messages (keyed by partition ID's) destined
-   *         for vertices hosted by <code>workerInfo</code>
+   * @return List of pairs (partitionId, VertexIdMessageCollection),
+   *         where all partition ids belong to workerInfo
    */
-  public Map<Integer, VertexIdMessageCollection<I, M>> removeWorkerMessages(
-      WorkerInfo workerInfo) {
-    Map<Integer, VertexIdMessageCollection<I, M>> workerMessages =
-        messageCache.remove(workerInfo);
-    messageCountMap.put(workerInfo, 0);
+  public PairList<Integer, VertexIdMessageCollection<I, M>>
+  removeWorkerMessages(WorkerInfo workerInfo) {
+    PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+        new PairList<Integer, VertexIdMessageCollection<I, M>>();
+    workerMessages.initialize();
+    for (Integer partitionId : workerPartitions.get(workerInfo)) {
+      if (messageCache[partitionId] != null) {
+        workerMessages.add(partitionId, messageCache[partitionId]);
+        messageCache[partitionId] = null;
+      }
+    }
+    messageCounts[workerInfo.getTaskId()] = 0;
     return workerMessages;
   }
 
@@ -124,14 +138,20 @@ public class SendMessageCache<I extends 
    *
    * @return All vertex messages for all partitions
    */
-  public Map<WorkerInfo, Map<
+  public PairList<WorkerInfo, PairList<
       Integer, VertexIdMessageCollection<I, M>>> removeAllMessages() {
-    Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
-        allMessages = messageCache;
-    messageCache =
-        new HashMap<WorkerInfo,
-            Map<Integer, VertexIdMessageCollection<I, M>>>();
-    messageCountMap.clear();
+    PairList<WorkerInfo, PairList<Integer, VertexIdMessageCollection<I, M>>>
+        allMessages = new PairList<WorkerInfo,
+        PairList<Integer, VertexIdMessageCollection<I, M>>>();
+    allMessages.initialize();
+    for (WorkerInfo workerInfo : workerPartitions.keySet()) {
+      PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+          removeWorkerMessages(workerInfo);
+      if (!workerMessages.isEmpty()) {
+        allMessages.add(workerInfo, workerMessages);
+      }
+      messageCounts[workerInfo.getTaskId()] = 0;
+    }
     return allMessages;
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
Tue Nov  6 17:36:36 2012
@@ -19,16 +19,10 @@
 package org.apache.giraph.comm;
 
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.PairListWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.google.common.collect.Lists;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
 /**
  * Holder for pairs of vertex ids and messages. Not thread-safe.
  *
@@ -36,11 +30,7 @@ import java.util.List;
  * @param <M> Message data
  */
 public class VertexIdMessageCollection<I extends WritableComparable,
-    M extends Writable> implements Writable {
-  /** List of ids of vertices */
-  private List<I> vertexIds;
-  /** List of messages */
-  private List<M> messages;
+    M extends Writable> extends PairListWritable<I, M> {
   /** Giraph configuration */
   private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> conf;
 
@@ -57,109 +47,13 @@ public class VertexIdMessageCollection<I
     this.conf = conf;
   }
 
-  /**
-   * Initialize the inner state. Must be called before {@code add()} is
-   * called. If you want to call {@code readFields()} you don't need to call
-   * this method.
-   */
-  public void initialize() {
-    vertexIds = Lists.newArrayList();
-    messages = Lists.newArrayList();
-  }
-
-  /**
-   * Adds message for vertex with selected id.
-   *
-   * @param vertexId Id of vertex
-   * @param message  Message to add
-   */
-  public void add(I vertexId, M message) {
-    vertexIds.add(vertexId);
-    messages.add(message);
-  }
-
   @Override
-  public void write(DataOutput dataOutput) throws IOException {
-    dataOutput.writeInt(vertexIds.size());
-    for (int i = 0; i < vertexIds.size(); i++) {
-      vertexIds.get(i).write(dataOutput);
-      messages.get(i).write(dataOutput);
-    }
+  protected I newFirstInstance() {
+    return conf.createVertexId();
   }
 
   @Override
-  public void readFields(DataInput input) throws IOException {
-    int messageCount = input.readInt();
-    vertexIds = Lists.newArrayListWithCapacity(messageCount);
-    messages = Lists.newArrayListWithCapacity(messageCount);
-    while (messageCount-- > 0) {
-      I vertexId = conf.createVertexId();
-      vertexId.readFields(input);
-      vertexIds.add(vertexId);
-      M message = conf.createMessageValue();
-      message.readFields(input);
-      messages.add(message);
-    }
-  }
-
-  /**
-   * Get iterator through destination vertices and messages.
-   *
-   * @return {@link Iterator} iterator
-   */
-  public Iterator getIterator() {
-    return new Iterator();
-  }
-
-  /**
-   * Special iterator class which we'll use to iterate through elements of
-   * {@link VertexIdMessageCollection}, without having to create new object as
-   * wrapper for destination vertex id and message.
-   *
-   * Protocol is somewhat similar to the protocol of {@link java.util.Iterator}
-   * only here next() doesn't return the next object, it just moves along in
-   * the collection. Values related to current pair of (vertex id, message)
-   * can be retrieved by calling getCurrentVertexId() and getCurrentMessage()
-   * methods.
-   *
-   * Not thread-safe.
-   */
-  public class Iterator {
-    /** Current position of the iterator */
-    private int position = -1;
-
-    /**
-     * Returns true if the iteration has more elements.
-     *
-     * @return True if the iteration has more elements.
-     */
-    public boolean hasNext() {
-      return position < messages.size() - 1;
-    }
-
-    /**
-     * Moves to the next element in the iteration.
-     */
-    public void next() {
-      position++;
-    }
-
-    /**
-     * Get vertex id related to current element of the iteration.
-     *
-     * @return Vertex id related to current element of the iteration.
-     */
-    public I getCurrentVertexId() {
-      return vertexIds.get(position);
-    }
-
-    /**
-     * Get message related to current element of the iteration.
-     *
-     * @return Message related to current element of the iteration.
-     */
-    public M getCurrentMessage() {
-      return messages.get(position);
-    }
+  protected M newSecondInstance() {
+    return conf.createMessageValue();
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
Tue Nov  6 17:36:36 2012
@@ -107,8 +107,8 @@ public class DiskBackedMessageStoreByPar
     VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
-      I vertexId = iterator.getCurrentVertexId();
-      M message = iterator.getCurrentMessage();
+      I vertexId = iterator.getCurrentFirst();
+      M message = iterator.getCurrentSecond();
       Collection<M> currentMessages = map.get(vertexId);
       if (currentMessages == null) {
         currentMessages = Lists.newArrayList(message);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
Tue Nov  6 17:36:36 2012
@@ -136,8 +136,8 @@ public class SimpleMessageStore<I extend
     VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
-      I vertexId = iterator.getCurrentVertexId();
-      M message = iterator.getCurrentMessage();
+      I vertexId = iterator.getCurrentFirst();
+      M message = iterator.getCurrentSecond();
       Collection<M> currentMessages = partitionMap.get(vertexId);
       if (currentMessages == null) {
         Collection<M> newMessages = Lists.newArrayList(message);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
Tue Nov  6 17:36:36 2012
@@ -42,6 +42,7 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -112,7 +113,7 @@ public class NettyWorkerClientRequestPro
     sendPartitionCache = new SendPartitionCache<I, V, E, M>(context,
         configuration);
     sendMessageCache =
-        new SendMessageCache<I, M>(configuration);
+        new SendMessageCache<I, M>(configuration, serviceWorker);
     maxMessagesPerWorker = configuration.getInt(
         GiraphConfiguration.MSG_SIZE,
         GiraphConfiguration.MSG_SIZE_DEFAULT);
@@ -146,7 +147,7 @@ public class NettyWorkerClientRequestPro
     // Send a request if the cache of outgoing message to
     // the remote worker 'workerInfo' is full enough to be flushed
     if (workerMessageCount >= maxMessagesPerWorker) {
-      Map<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+      PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
           sendMessageCache.removeWorkerMessages(workerInfo);
       WritableRequest writableRequest =
           new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
@@ -321,13 +322,17 @@ public class NettyWorkerClientRequestPro
     sendPartitionCache.clear();
 
     // Execute the remaining sends messages (if any)
-    Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
+    PairList<WorkerInfo, PairList<Integer, VertexIdMessageCollection<I, M>>>
         remainingMessageCache = sendMessageCache.removeAllMessages();
-    for (Map.Entry<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
-        entry : remainingMessageCache.entrySet()) {
+    PairList<WorkerInfo,
+        PairList<Integer, VertexIdMessageCollection<I, M>>>.Iterator
+        iterator = remainingMessageCache.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
       WritableRequest writableRequest =
-          new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
-      doRequest(entry.getKey(), writableRequest);
+          new SendWorkerMessagesRequest<I, V, E, M>(
+              iterator.getCurrentSecond());
+      doRequest(iterator.getCurrentFirst(), writableRequest);
     }
 
     // Execute the remaining sends mutations (if any)

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
Tue Nov  6 17:36:36 2012
@@ -20,17 +20,14 @@ package org.apache.giraph.comm.requests;
 
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.VertexIdMessageCollection;
+import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Maps;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
 
 /**
  * Send a collection of vertex messages for a partition.
@@ -52,8 +49,8 @@ public class SendWorkerMessagesRequest<I
    * are owned by a single (destination) worker. These messages are all
    * destined for this worker.
    * */
-  private Map<Integer, VertexIdMessageCollection<I, M>>
-  partitionVertexMessagesMap;
+  private PairList<Integer, VertexIdMessageCollection<I, M>>
+  partitionVertexMessages;
 
   /**
    * Constructor used for reflection only
@@ -63,37 +60,38 @@ public class SendWorkerMessagesRequest<I
   /**
    * Constructor used to send request.
    *
-   * @param partVertMsgsMap Map of remote partitions =>
-   *                        VertexIdMessageCollection
+   * @param partVertMsgs Map of remote partitions => VertexIdMessageCollection
    */
   public SendWorkerMessagesRequest(
-    Map<Integer, VertexIdMessageCollection<I, M>> partVertMsgsMap) {
+    PairList<Integer, VertexIdMessageCollection<I, M>> partVertMsgs) {
     super();
-    this.partitionVertexMessagesMap = partVertMsgsMap;
+    this.partitionVertexMessages = partVertMsgs;
   }
 
   @Override
   public void readFieldsRequest(DataInput input) throws IOException {
     int numPartitions = input.readInt();
-    partitionVertexMessagesMap =
-        Maps.<Integer, VertexIdMessageCollection<I, M>>
-            newHashMapWithExpectedSize(numPartitions);
+    partitionVertexMessages =
+        new PairList<Integer, VertexIdMessageCollection<I, M>>();
+    partitionVertexMessages.initialize(numPartitions);
     while (numPartitions-- > 0) {
       final int partitionId = input.readInt();
       VertexIdMessageCollection<I, M> vertexIdMessages =
           new VertexIdMessageCollection<I, M>(getConf());
       vertexIdMessages.readFields(input);
-      partitionVertexMessagesMap.put(partitionId, vertexIdMessages);
+      partitionVertexMessages.add(partitionId, vertexIdMessages);
     }
   }
 
   @Override
   public void writeRequest(DataOutput output) throws IOException {
-    output.writeInt(partitionVertexMessagesMap.size());
-    for (Entry<Integer, VertexIdMessageCollection<I, M>> partitionEntry :
-      partitionVertexMessagesMap.entrySet()) {
-      output.writeInt(partitionEntry.getKey());
-      partitionEntry.getValue().write(output);
+    output.writeInt(partitionVertexMessages.getSize());
+    PairList<Integer, VertexIdMessageCollection<I, M>>.Iterator iterator =
+        partitionVertexMessages.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      output.writeInt(iterator.getCurrentFirst());
+      iterator.getCurrentSecond().write(output);
     }
   }
 
@@ -104,11 +102,14 @@ public class SendWorkerMessagesRequest<I
 
   @Override
   public void doRequest(ServerData<I, V, E, M> serverData) {
-    for (Entry<Integer, VertexIdMessageCollection<I, M>> entry :
-      partitionVertexMessagesMap.entrySet()) {
+    PairList<Integer, VertexIdMessageCollection<I, M>>.Iterator iterator =
+        partitionVertexMessages.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
       try {
         serverData.getIncomingMessageStore()
-          .addPartitionMessages(entry.getValue(), entry.getKey());
+            .addPartitionMessages(iterator.getCurrentSecond(),
+                iterator.getCurrentFirst());
       } catch (IOException e) {
         throw new RuntimeException("doRequest: Got IOException ", e);
       }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Nov
 6 17:36:36 2012
@@ -1316,6 +1316,11 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
+  public Iterable<? extends PartitionOwner> getPartitionOwners() {
+    return workerGraphPartitioner.getPartitionOwners();
+  }
+
+  @Override
   public Partition<I, V, E, M> getPartition(I vertexId) {
     return getPartitionStore().getPartition(getPartitionId(vertexId));
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Tue Nov  6
17:36:36 2012
@@ -112,7 +112,7 @@ public class GraphMapper<I extends Writa
   /** Milliseconds from starting compute to sending first message */
   private Timer timeToFirstMessage;
   /** Timer context used for computer msec from compute to first message */
-  private TimerContext timeToFirstMessageContext;
+  private volatile TimerContext timeToFirstMessageContext;
   /** Time from first sent message till last message flushed. */
   private Timer communicationTimer;
   /** Timer context for communication timer. */

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairList.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairList.java?rev=1406239&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairList.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairList.java Tue Nov  6 17:36:36
2012
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Collection to keep pairs in, without creating a wrapper object around
+ * each pair of objects.
+ *
+ * @param <U> Type of the first element in a pair
+ * @param <V> Type of the second element in a pair
+ */
+public class PairList<U, V> {
+  /** List to keep first elements of pairs in */
+  protected List<U> firstList;
+  /** List to keep second elements of pairs in */
+  protected List<V> secondList;
+
+  /**
+   * Constructor
+   */
+  public PairList() {
+  }
+
+  /**
+   * Initialize the inner state. Must be called before {@code add()} is
+   * called.
+   */
+  public void initialize() {
+    firstList = Lists.newArrayList();
+    secondList = Lists.newArrayList();
+  }
+
+
+  /**
+   * Initialize the inner state, with a known size. Must be called before
+   * {@code add()} is called.
+   *
+   * @param size Number of pairs which will be added to the list
+   */
+  public void initialize(int size) {
+    firstList = Lists.newArrayListWithCapacity(size);
+    secondList = Lists.newArrayListWithCapacity(size);
+  }
+
+  /**
+   * Add a pair to the collection.
+   *
+   * @param first First element of the pair
+   * @param second Second element of the pair
+   */
+  public void add(U first, V second) {
+    firstList.add(first);
+    secondList.add(second);
+  }
+
+  /**
+   * Get number of pairs in this list.
+   *
+   * @return Number of pairs in the list
+   */
+  public int getSize() {
+    return firstList.size();
+  }
+
+  /**
+   * Check if the list is empty.
+   *
+   * @return True iff there are no pairs in the list
+   */
+  public boolean isEmpty() {
+    return getSize() == 0;
+  }
+
+  /**
+   * Get iterator through elements of this object.
+   *
+   * @return {@link Iterator} iterator
+   */
+  public Iterator getIterator() {
+    return new Iterator();
+  }
+
+  /**
+   * Special iterator class which we'll use to iterate through elements of
+   * {@link PairList}, without having to create new object as wrapper for
+   * each pair.
+   *
+   * Protocol is somewhat similar to the protocol of {@link java.util.Iterator}
+   * only here next() doesn't return the next object, it just moves along in
+   * the collection. Values related to current pair can be retrieved by calling
+   * getCurrentFirst() and getCurrentSecond() methods.
+   *
+   * Not thread-safe.
+   */
+  public class Iterator {
+    /** Current position of the iterator */
+    private int position = -1;
+
+    /**
+     * Returns true if the iteration has more elements.
+     *
+     * @return True if the iteration has more elements.
+     */
+    public boolean hasNext() {
+      return position < getSize() - 1;
+    }
+
+    /**
+     * Moves to the next element in the iteration.
+     */
+    public void next() {
+      position++;
+    }
+
+    /**
+     * Get first element of the current pair of the iteration.
+     *
+     * @return First element of the current pair of the iteration
+     */
+    public U getCurrentFirst() {
+      return firstList.get(position);
+    }
+
+    /**
+     * Get second element of the current pair of the iteration.
+     *
+     * @return Second element of the current pair of the iteration
+     */
+    public V getCurrentSecond() {
+      return secondList.get(position);
+    }
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairListWritable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairListWritable.java?rev=1406239&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairListWritable.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairListWritable.java Tue Nov
 6 17:36:36 2012
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Writable implementation of {@link PairList}.
+ *
+ * @param <U> Type of the first element in a pair
+ * @param <V> Type of the second element in a pair
+ */
+public abstract class PairListWritable<U extends Writable,
+    V extends Writable> extends PairList<U, V> implements Writable {
+  /**
+   * Create an empty instance of the first element in the pair,
+   * so we could read it from {@DataInput}.
+   *
+   * @return New instance of the first element in the pair
+   */
+  protected abstract U newFirstInstance();
+
+  /**
+   * Create an empty instance of the second element in the pair,
+   * so we could read it from {@DataInput}.
+   *
+   * @return New instance of the second element in the pair
+   */
+  protected abstract V newSecondInstance();
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    int size = getSize();
+    output.writeInt(size);
+    for (int i = 0; i < size; i++) {
+      firstList.get(i).write(output);
+      secondList.get(i).write(output);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    int size = input.readInt();
+    firstList = Lists.newArrayListWithCapacity(size);
+    secondList = Lists.newArrayListWithCapacity(size);
+    while (size-- > 0) {
+      U first = newFirstInstance();
+      first.readFields(input);
+      V second = newSecondInstance();
+      second.readFields(input);
+      add(first, second);
+    }
+  }
+}

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Tue Nov
 6 17:36:36 2012
@@ -29,6 +29,7 @@ import org.apache.giraph.comm.requests.W
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.utils.MockUtils;
+import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.junit.Before;
@@ -39,11 +40,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Map;
 
 /**
  * Test all the netty failure scenarios
@@ -85,12 +84,14 @@ public class RequestFailureTest {
   private WritableRequest getRequest() {
     // Data to send
     final int partitionId = 0;
-    Map<Integer, VertexIdMessageCollection<IntWritable, IntWritable>> sendMap
=
-        Maps.newHashMap();
+    PairList<Integer, VertexIdMessageCollection<IntWritable, IntWritable>>
+        dataToSend = new PairList<Integer,
+        VertexIdMessageCollection<IntWritable, IntWritable>>();
+    dataToSend.initialize();
     VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
         new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
     vertexIdMessages.initialize();
-    sendMap.put(partitionId, vertexIdMessages);
+    dataToSend.add(partitionId, vertexIdMessages);
     for (int i = 1; i < 7; ++i) {
       IntWritable vertexId = new IntWritable(i);
       for (int j = 0; j < i; ++j) {
@@ -102,7 +103,7 @@ public class RequestFailureTest {
     SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable,
             IntWritable> request =
         new SendWorkerMessagesRequest<IntWritable, IntWritable,
-                    IntWritable, IntWritable>(sendMap);
+                    IntWritable, IntWritable>(dataToSend);
     return request;
   }
 

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Nov  6 17:36:36
2012
@@ -34,6 +34,7 @@ import org.apache.giraph.graph.VertexMut
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.PartitionStore;
 import org.apache.giraph.utils.MockUtils;
+import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.junit.Before;
@@ -149,13 +150,15 @@ public class RequestTest {
   @Test
   public void sendWorkerMessagesRequest() throws IOException {
     // Data to send
-    Map<Integer, VertexIdMessageCollection<IntWritable, IntWritable>> sendMap
=
-        Maps.newHashMap();
+    PairList<Integer, VertexIdMessageCollection<IntWritable, IntWritable>>
+        dataToSend = new PairList<Integer,
+        VertexIdMessageCollection<IntWritable, IntWritable>>();
+    dataToSend.initialize();
     int partitionId = 0;
     VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
         new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
     vertexIdMessages.initialize();
-    sendMap.put(partitionId, vertexIdMessages);
+    dataToSend.add(partitionId, vertexIdMessages);
     for (int i = 1; i < 7; ++i) {
       IntWritable vertexId = new IntWritable(i);
       for (int j = 0; j < i; ++j) {
@@ -167,7 +170,7 @@ public class RequestTest {
     SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable,
         IntWritable> request =
       new SendWorkerMessagesRequest<IntWritable, IntWritable,
-            IntWritable, IntWritable>(sendMap);
+            IntWritable, IntWritable>(dataToSend);
     client.sendWritableRequest(-1, request);
     client.waitAllRequests();
 



Mime
View raw message