hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1560570 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ graph/src/main/java/org/apache/hama/graph/
Date Thu, 23 Jan 2014 01:51:44 GMT
Author: edwardyoon
Date: Thu Jan 23 01:51:44 2014
New Revision: 1560570

URL: http://svn.apache.org/r1560570
Log:
HAMA-857: Graph Combiners is wrongly implemented (edwardyoon)

Added:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
  (with props)
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Jan 23 01:51:44 2014
@@ -14,6 +14,7 @@ Release 0.7.0 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-857: Graph Combiners is wrongly implemented (edwardyoon)
    HAMA-845: The size() of Spilling Queue returns always numMessagesWritten (edwardyoon)
    HAMA-834: Fix KMeans example (Martin Illecker)
    HAMA-831: Support for multi records with same vertexID (edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Jan 23 01:51:44
2014
@@ -59,7 +59,7 @@ public final class BSPPeerImpl<K1, V1, K
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ,
MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED,
COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+    COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ,
MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED,
TOTAL_MESSAGES_COMBINED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
   }
 
   private final HamaConfiguration conf;
@@ -459,6 +459,12 @@ public final class BSPPeerImpl<K1, V1, K
   }
 
   public final void close() {
+    long combinedMessages = this.getCounter(PeerCounter.TOTAL_MESSAGES_SENT)
+        .getCounter()
+        - this.getCounter(PeerCounter.TOTAL_MESSAGES_RECEIVED).getCounter();
+    this.getCounter(PeerCounter.TOTAL_MESSAGES_COMBINED).increment(
+        combinedMessages);
+
     // there are many catches, because we want to close always every component
     // even if the one before failed.
     if (in != null) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Thu Jan 23 01:51:44 2014
@@ -139,6 +139,8 @@ public abstract class AbstractMessageMan
    */
   @Override
   public final void clearOutgoingMessages() {
+    outgoingMessageManager.clear();
+
     if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
         && localQueue.size() > 0) {
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan 23 01:51:44
2014
@@ -31,6 +31,7 @@ import org.apache.hama.bsp.HashPartition
 import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
 import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
 import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
 
@@ -57,6 +58,9 @@ public class GraphJob extends BSPJob {
   public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
       throws IOException {
     super(conf);
+    conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+        OutgoingVertexMessagesManager.class, OutgoingMessageManager.class);
+    
     this.setBoolean(Constants.PARTITION_SORT_BY_KEY, true);
     this.setBspClass(GraphJobRunner.class);
     this.setJarByClass(exampleClass);

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Thu Jan 23 01:51:44
2014
@@ -156,7 +156,8 @@ public final class GraphJobMessage imple
     return map;
   }
 
-  public Writable getVertexId() {
+  @SuppressWarnings("rawtypes")
+  public WritableComparable getVertexId() {
     return vertexId;
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1560570&r1=1560569&r2=1560570&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Jan 23 01:51:44
2014
@@ -34,7 +34,6 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.PartitioningRunner.DefaultRecordConverter;
@@ -83,7 +82,6 @@ public final class GraphJobRunner<V exte
   public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
 
   private HamaConfiguration conf;
-  private Combiner<M> combiner;
   private Partitioner<V, M> partitioner;
 
   public static Class<?> VERTEX_CLASS;
@@ -234,7 +232,7 @@ public final class GraphJobRunner<V exte
     IDSkippingIterator<V, E, M> iterator = vertices.skippingIterator();
     VertexMessageIterable<V, M> iterable = null;
     Vertex<V, E, M> vertex = null;
-    
+
     // note that can't skip inactive vertices because we have to rewrite the
     // complete vertex file in each iteration
     while (iterator.hasNext(
@@ -255,12 +253,7 @@ public final class GraphJobRunner<V exte
         if (iterable == null) {
           vertex.compute(Collections.<M> emptyList());
         } else {
-          if (combiner != null) {
-            M combined = combiner.combine(iterable);
-            vertex.compute(Collections.singleton(combined));
-          } else {
-            vertex.compute(iterable);
-          }
+          vertex.compute(iterable);
           currentMessage = iterable.getOverflowMessage();
         }
         activeVertices++;
@@ -358,15 +351,6 @@ public final class GraphJobRunner<V exte
             conf.getClass("bsp.input.partitioner.class", HashPartitioner.class),
             conf);
 
-    if (!conf.getClass(MESSAGE_COMBINER_CLASS_KEY, Combiner.class).equals(
-        Combiner.class)) {
-      LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS_KEY));
-
-      combiner = (Combiner<M>) org.apache.hadoop.util.ReflectionUtils
-          .newInstance(conf.getClass("hama.vertex.message.combiner.class",
-              Combiner.class), conf);
-    }
-
     Class<?> outputWriter = conf.getClass(
         GraphJob.VERTEX_OUTPUT_WRITER_CLASS_ATTR, VertexOutputWriter.class);
     vertexOutputWriter = (VertexOutputWriter<Writable, Writable, V, E, M>) ReflectionUtils

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java?rev=1560570&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
(added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
Thu Jan 23 01:51:44 2014
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.util.BSPNetUtils;
+
+public class OutgoingVertexMessagesManager<M extends Writable> implements
+    OutgoingMessageManager<GraphJobMessage> {
+  protected static final Log LOG = LogFactory
+      .getLog(OutgoingVertexMessagesManager.class);
+
+  private Combiner<Writable> combiner;
+  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String,
InetSocketAddress>();
+  private HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> outgoingBundles
= new HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>();
+
+  @SuppressWarnings("rawtypes")
+  private HashMap<InetSocketAddress, Map<WritableComparable, Writable>> vertexMessageMap
= new HashMap<InetSocketAddress, Map<WritableComparable, Writable>>();
+  private List<Writable> tmp;
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void init(Configuration conf) {
+    if (!conf.getClass(GraphJobRunner.MESSAGE_COMBINER_CLASS_KEY,
+        Combiner.class).equals(Combiner.class)) {
+      LOG.debug("Combiner class: "
+          + conf.get(GraphJobRunner.MESSAGE_COMBINER_CLASS_KEY));
+
+      combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
+          .newInstance(conf.getClass("hama.vertex.message.combiner.class",
+              Combiner.class), conf);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void addMessage(String peerName, GraphJobMessage msg) {
+    InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
+
+    if (msg.isVertexMessage() && combiner != null) {
+      WritableComparable vertexID = msg.getVertexId();
+      Writable vertexValue = msg.getVertexValue();
+
+      if (!vertexMessageMap.containsKey(targetPeerAddress)) {
+        vertexMessageMap.put(targetPeerAddress,
+            new HashMap<WritableComparable, Writable>());
+      }
+
+      Map<WritableComparable, Writable> combinedMessage = vertexMessageMap
+          .get(targetPeerAddress);
+
+      if (combinedMessage.containsKey(vertexID)) {
+        tmp = new ArrayList<Writable>();
+        tmp.add(combinedMessage.get(vertexID));
+        tmp.add(vertexValue);
+
+        Iterable<Writable> iterable = new Iterable<Writable>() {
+          @Override
+          public Iterator<Writable> iterator() {
+            return tmp.iterator();
+          }
+        };
+
+        combinedMessage.put(vertexID, combiner.combine(iterable));
+      } else {
+        combinedMessage.put(vertexID, vertexValue);
+      }
+
+    } else {
+      outgoingBundles.get(targetPeerAddress).addMessage(msg);
+    }
+  }
+
+  private InetSocketAddress getSocketAddress(String peerName) {
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+
+    if (!outgoingBundles.containsKey(targetPeerAddress)) {
+      outgoingBundles.put(targetPeerAddress,
+          new BSPMessageBundle<GraphJobMessage>());
+    }
+    return targetPeerAddress;
+  }
+
+  @Override
+  public void clear() {
+    outgoingBundles.clear();
+    vertexMessageMap.clear();
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Iterator<Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>>
getBundleIterator() {
+    if (combiner != null) {
+      for (Map.Entry<InetSocketAddress, Map<WritableComparable, Writable>> e
: vertexMessageMap
+          .entrySet()) {
+        for (Map.Entry<WritableComparable, Writable> v : e.getValue()
+            .entrySet()) {
+          outgoingBundles.get(e.getKey()).addMessage(
+              new GraphJobMessage(v.getKey(), v.getValue()));
+        }
+      }
+    }
+
+    vertexMessageMap.clear();
+    return outgoingBundles.entrySet().iterator();
+  }
+}

Propchange: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message