hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1674779 - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/message/queue/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/graph/ mesos/
Date Mon, 20 Apr 2015 10:18:37 GMT
Author: edwardyoon
Date: Mon Apr 20 10:18:36 2015
New Revision: 1674779

URL: http://svn.apache.org/r1674779
Log:
HAMA-946: Refactor graph package

Removed:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
Modified:
    hama/trunk/core/pom.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
    hama/trunk/mesos/pom.xml
    hama/trunk/pom.xml

Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Mon Apr 20 10:18:36 2015
@@ -64,6 +64,10 @@
       <artifactId>commons-configuration</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+    </dependency>
+    <dependency>
       <groupId>commons-lang</groupId>
       <artifactId>commons-lang</artifactId>
     </dependency>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Mon Apr
20 10:18:36 2015
@@ -17,9 +17,7 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,8 +25,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
-import com.google.common.collect.Lists;
-
 /**
  * LinkedList backed queue structure for bookkeeping messages.
  */

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Mon Apr 20 10:18:36
2015
@@ -17,20 +17,14 @@
  */
 package org.apache.hama.graph;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
 
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -76,20 +70,24 @@ public final class GraphJobMessage imple
     this.map = map;
   }
 
-  public GraphJobMessage(WritableComparable<?> vertexId, Writable vertexValue) {
+  public GraphJobMessage(WritableComparable<?> vertexId, byte[] vertexValue) {
     this.flag = VERTEX_FLAG;
     this.vertexId = vertexId;
 
     add(vertexValue);
   }
 
-  public GraphJobMessage(WritableComparable<?> vertexId, List<Writable> values)
{
-    this.flag = VERTEX_FLAG;
-    this.vertexId = vertexId;
-
-    addAll(values);
+  public GraphJobMessage(IntWritable size) {
+    this.flag = VERTICES_SIZE_FLAG;
+    this.integerMessage = size;
   }
 
+  public GraphJobMessage(byte[] vertex) {
+    this.flag = PARTITION_FLAG;
+    
+    add(vertex);
+  }
+  
   public MapWritable getMap() {
     return map;
   }
@@ -112,49 +110,19 @@ public final class GraphJobMessage imple
     }
   }
 
-  public void add(Writable value) {
+  public void add(byte[] value) {
     try {
-      ByteArrayOutputStream a = new ByteArrayOutputStream();
-      DataOutputStream b = new DataOutputStream(a);
-      value.write(b);
-      
-      byteBuffer.write(a.toByteArray());
+      byteBuffer.write(value);
       numOfValues++;
     } catch (IOException e) {
       e.printStackTrace();
     }
   }
 
-  public void addAll(List<Writable> values) {
-    ByteArrayOutputStream a = new ByteArrayOutputStream();
-    DataOutputStream b = new DataOutputStream(a);
-    try {
-      for (Writable v : values) {
-        v.write(b);
-      }
-
-      byteBuffer.write(a.toByteArray());
-      numOfValues += values.size();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
   public int getNumOfValues() {
     return this.numOfValues;
   }
 
-  public GraphJobMessage(IntWritable size) {
-    this.flag = VERTICES_SIZE_FLAG;
-    this.integerMessage = size;
-  }
-
-  public GraphJobMessage(Vertex<?, ?, ?> vertex) {
-    this.flag = PARTITION_FLAG;
-    
-    add(vertex);
-  }
-
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeByte(this.flag);
@@ -327,40 +295,4 @@ public final class GraphJobMessage imple
     }
   }
 
-  public Iterable<Writable> getIterableMessages() {
-
-    return new Iterable<Writable>() {
-      @Override
-      public Iterator<Writable> iterator() {
-        return new Iterator<Writable>() {
-          ByteArrayInputStream bis = new ByteArrayInputStream(
-              byteBuffer.toByteArray());
-          DataInputStream dis = new DataInputStream(bis);
-          int index = 0;
-
-          @Override
-          public boolean hasNext() {
-            return (index < numOfValues) ? true : false;
-          }
-
-          @Override
-          public Writable next() {
-            Writable v = GraphJobRunner.createVertexValue();
-            try {
-              v.readFields(dis);
-            } catch (IOException e) {
-              e.printStackTrace();
-            }
-            index++;
-            return v;
-          }
-
-          @Override
-          public void remove() {
-          }
-        };
-      }
-    };
-  }
-
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Apr 20 10:18:36
2015
@@ -18,13 +18,17 @@
 package org.apache.hama.graph;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -41,6 +45,7 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.sync.SyncException;
@@ -106,6 +111,7 @@ public final class GraphJobRunner<V exte
 
   private AggregationRunner<V, E, M> aggregationRunner;
   private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
+  private Combiner<Writable> combiner;
 
   private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
 
@@ -160,10 +166,7 @@ public final class GraphJobRunner<V exte
       }
 
       // loop over vertices and do their computation
-      startTime = System.currentTimeMillis();
       doSuperstep(firstVertexMessage, peer);
-      LOG.info("Total time spent for " + peer.getSuperstepCount()
-          + " superstep: " + (System.currentTimeMillis() - startTime) + " ms");
 
       if (isMasterTask(peer)) {
         peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
@@ -234,22 +237,28 @@ public final class GraphJobRunner<V exte
    * iterate over our messages and vertices in sorted order. That means that we
    * need to seek the first vertex that has the same ID as the iterated message.
    */
-  @SuppressWarnings("unchecked")
   private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
+    long startTime = System.currentTimeMillis();
+
     this.changedVertexCnt = 0;
     vertices.startSuperstep();
 
-    ExecutorService executor = Executors.newCachedThreadPool();
+    ExecutorService executor = Executors.newFixedThreadPool((peer
+        .getNumCurrentMessages() / conf.getInt(
+        "hama.graph.threadpool.percentage", 20)) + 1);
 
+    long loopStartTime = System.currentTimeMillis();
     while (currentMessage != null) {
-      Runnable worker = new ComputeRunnable(vertices.get((V) currentMessage
-          .getVertexId()), (Iterable<M>) currentMessage.getIterableMessages());
+      Runnable worker = new ComputeRunnable(currentMessage);
       executor.execute(worker);
 
       currentMessage = peer.getCurrentMessage();
     }
+        LOG.info("Total time spent for superstep-" + peer.getSuperstepCount()
+                        + " looping: " + (System.currentTimeMillis() - loopStartTime)
+                                + " ms");
 
     executor.shutdown();
     while (!executor.isTerminated()) {
@@ -263,11 +272,18 @@ public final class GraphJobRunner<V exte
       }
     }
 
-    vertices.finishSuperstep();
-
     getAggregationRunner().sendAggregatorValues(peer,
         vertices.getComputedVertices().size(), this.changedVertexCnt);
     this.iteration++;
+
+    LOG.info("Total time spent for superstep-" + peer.getSuperstepCount()
+        + " computing vertices: " + (System.currentTimeMillis() - startTime)
+        + " ms");
+
+    startTime = System.currentTimeMillis();
+    finishSuperstep();
+    LOG.info("Total time spent for superstep-" + peer.getSuperstepCount()
+        + " synchronizing: " + (System.currentTimeMillis() - startTime) + " ms");
   }
 
   /**
@@ -280,28 +296,36 @@ public final class GraphJobRunner<V exte
     this.changedVertexCnt = 0;
     vertices.startSuperstep();
 
-    ExecutorService executor = Executors.newCachedThreadPool();
+    ExecutorService executor = Executors
+        .newFixedThreadPool((vertices.size() / conf.getInt(
+            "hama.graph.threadpool.percentage", 20)) + 1);
 
     for (Vertex<V, E, M> v : vertices.getValues()) {
-      Runnable worker = new ComputeRunnable(v, null);
+      Runnable worker = new ComputeRunnable(v);
       executor.execute(worker);
     }
+    
     executor.shutdown();
     while (!executor.isTerminated()) {
     }
 
-    vertices.finishSuperstep();
     getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
+    finishSuperstep();
   }
 
   class ComputeRunnable implements Runnable {
     Vertex<V, E, M> vertex;
     Iterable<M> msgs;
 
-    public ComputeRunnable(Vertex<V, E, M> vertex, Iterable<M> msgs) {
-      this.vertex = vertex;
-      this.msgs = msgs;
+    @SuppressWarnings("unchecked")
+    public ComputeRunnable(GraphJobMessage msg) {
+      this.vertex = vertices.get((V) msg.getVertexId());
+      this.msgs = (Iterable<M>) getIterableMessages(msg.getValuesBytes(), msg.getNumOfValues());
+    }
+
+    public ComputeRunnable(Vertex<V, E, M> v) {
+      this.vertex = v;
     }
 
     @Override
@@ -350,6 +374,16 @@ public final class GraphJobRunner<V exte
             VerticesInfo.class);
     vertices = ReflectionUtils.newInstance(verticesInfoClass);
     vertices.init(this, conf, peer.getTaskId());
+
+    final String combinerName = conf.get(Constants.COMBINER_CLASS);
+    if (combinerName != null) {
+      try {
+        combiner = (Combiner<Writable>) ReflectionUtils
+            .newInstance(combinerName);
+      } catch (ClassNotFoundException e) {
+        e.printStackTrace();
+      }
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -381,9 +415,6 @@ public final class GraphJobRunner<V exte
   private void loadVertices(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
-    // kryo.register(GraphJobRunner
-    // .<V, E, M> newVertexInstance(VERTEX_CLASS).getClass());
-
     VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable,
Writable, V, E, M>) ReflectionUtils
         .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
             VertexInputReader.class));
@@ -409,9 +440,9 @@ public final class GraphJobRunner<V exte
           executor.execute(worker);
         } else {
           if (!messages.containsKey(dstHost)) {
-            messages.put(dstHost, new GraphJobMessage(vertex));
+            messages.put(dstHost, new GraphJobMessage(serialize(vertex)));
           } else {
-            messages.get(dstHost).add(vertex);
+            messages.get(dstHost).add(serialize(vertex));
           }
         }
       }
@@ -424,7 +455,7 @@ public final class GraphJobRunner<V exte
     }
     messages.clear();
     messages = null;
-    
+
     peer.sync();
 
     GraphJobMessage msg;
@@ -596,18 +627,87 @@ public final class GraphJobRunner<V exte
 
   private void finishRemovals() throws IOException {
     vertices.finishRemovals();
-    // finish the "superstep" because we have written a new file here
-    vertices.finishSuperstep();
   }
 
   private void finishAdditions() throws IOException {
     vertices.finishAdditions();
-    // finish the "superstep" because we have written a new file here
+  }
+
+  private final ConcurrentNavigableMap<V, GraphJobMessage> storage = new ConcurrentSkipListMap<V,
GraphJobMessage>();
+
+  public void sendMessage(V vertexID, byte[] msg) throws IOException {
+    if (storage.containsKey(vertexID)) {
+      storage.get(vertexID).add(msg);
+    } else {
+      storage.put(vertexID, new GraphJobMessage(vertexID, msg));
+    }
+  }
+
+  public void finishSuperstep() throws IOException {
     vertices.finishSuperstep();
+
+    for (Map.Entry<V, GraphJobMessage> m : storage.entrySet()) {
+      // Combining messages
+      if (combiner != null) {
+        if (m.getValue().getNumOfValues() > 1) {
+          peer.send(
+              getHostName(m.getKey()),
+              new GraphJobMessage(m.getKey(), serialize(combiner
+                  .combine(getIterableMessages(m.getValue().getValuesBytes(), m
+                      .getValue().getNumOfValues())))));
+        } else {
+          peer.send(getHostName(m.getKey()), m.getValue());
+        }
+      } else {
+        peer.send(getHostName(m.getKey()), m.getValue());
+      }
+    }
+
+    storage.clear();
   }
 
-  public void sendMessage(V vertexID, M msg) throws IOException {
-    peer.send(getHostName(vertexID), new GraphJobMessage(vertexID, msg));
+  public static byte[] serialize(Writable writable) throws IOException {
+    ByteArrayOutputStream a = new ByteArrayOutputStream();
+    DataOutputStream b = new DataOutputStream(a);
+    writable.write(b);
+
+    return a.toByteArray();
+  }
+
+  public Iterable<Writable> getIterableMessages(final byte[] valuesBytes,
+      final int numOfValues) {
+
+    return new Iterable<Writable>() {
+      @Override
+      public Iterator<Writable> iterator() {
+        return new Iterator<Writable>() {
+          ByteArrayInputStream bis = new ByteArrayInputStream(valuesBytes);
+          DataInputStream dis = new DataInputStream(bis);
+          int index = 0;
+
+          @Override
+          public boolean hasNext() {
+            return (index < numOfValues) ? true : false;
+          }
+
+          @Override
+          public Writable next() {
+            Writable v = createVertexValue();
+            try {
+              v.readFields(dis);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+            index++;
+            return v;
+          }
+
+          @Override
+          public void remove() {
+          }
+        };
+      }
+    };
   }
 
   /**

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Mon Apr 20 10:18:36 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.graph;
 
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.conf.Configuration;
@@ -31,7 +33,8 @@ public class IncomingVertexMessageManage
 
   private Configuration conf;
 
-  private final MessagePerVertex msgPerVertex = new MessagePerVertex();
+  @SuppressWarnings("rawtypes")
+  private final ConcurrentHashMap<WritableComparable, GraphJobMessage> storage = new
ConcurrentHashMap<WritableComparable, GraphJobMessage>();
   private final ConcurrentLinkedQueue<GraphJobMessage> mapMessages = new ConcurrentLinkedQueue<GraphJobMessage>();
 
   @Override
@@ -66,7 +69,11 @@ public class IncomingVertexMessageManage
   @Override
   public void add(GraphJobMessage item) {
     if (item.isVertexMessage()) {
-      msgPerVertex.add(item.getVertexId(), item);
+      if (storage.containsKey(item.getVertexId())) {
+        storage.get(item.getVertexId()).addValuesBytes(item.getValuesBytes(), item.size());
+      } else {
+        storage.put(item.getVertexId(), item);
+      }
     } else {
       mapMessages.add(item);
     }
@@ -75,21 +82,32 @@ public class IncomingVertexMessageManage
   @Override
   public void clear() {
     mapMessages.clear();
-    msgPerVertex.clear();
+    storage.clear();
   }
 
+  Iterator<GraphJobMessage> it;
+  
   @Override
   public GraphJobMessage poll() {
     if (mapMessages.size() > 0) {
       return mapMessages.poll();
     } else {
-      return msgPerVertex.pollFirstEntry();
+      if(it == null) {
+        it = storage.values().iterator();
+      }
+      
+      if(it.hasNext()) {
+        return it.next();
+      } else {
+        storage.clear();
+        return null;
+      }
     }
   }
 
   @Override
   public int size() {
-    return msgPerVertex.size() + mapMessages.size();
+    return storage.size() + mapMessages.size();
   }
 
   // empty, not needed to implement

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java Mon Apr 20 10:18:36
2015
@@ -46,8 +46,8 @@ public final class MapVerticesInfo<V ext
     implements VerticesInfo<V, E, M> {
   private final Map<V, Vertex<V, E, M>> vertices = new ConcurrentHashMap<V,
Vertex<V, E, M>>();
 
-  private Set<V> computedVertices; 
-  
+  private Set<V> computedVertices = new HashSet<V>();
+
   @Override
   public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
       TaskAttemptID attempt) throws IOException {
@@ -55,9 +55,9 @@ public final class MapVerticesInfo<V ext
 
   @Override
   public void put(Vertex<V, E, M> vertex) throws IOException {
-    if(vertices.containsKey(vertex.getVertexID())) {
-      for(Edge<V, E> e : vertex.getEdges()) 
-      vertices.get(vertex.getVertexID()).addEdge(e);
+    if (vertices.containsKey(vertex.getVertexID())) {
+      for (Edge<V, E> e : vertex.getEdges())
+        vertices.get(vertex.getVertexID()).addEdge(e);
     } else {
       vertices.put(vertex.getVertexID(), vertex);
     }
@@ -74,9 +74,9 @@ public final class MapVerticesInfo<V ext
 
   @Override
   public Collection<Vertex<V, E, M>> getValues() {
-    return vertices.values();  
+    return vertices.values();
   }
-  
+
   @Override
   public int size() {
     return vertices.size();
@@ -90,8 +90,9 @@ public final class MapVerticesInfo<V ext
   @Override
   public Iterator<Vertex<V, E, M>> iterator() {
 
-    final Iterator<Vertex<V, E, M>> vertexIterator = vertices.values().iterator();
-    
+    final Iterator<Vertex<V, E, M>> vertexIterator = vertices.values()
+        .iterator();
+
     return new Iterator<Vertex<V, E, M>>() {
 
       @Override
@@ -133,18 +134,18 @@ public final class MapVerticesInfo<V ext
 
   @Override
   public void startSuperstep() throws IOException {
-    computedVertices = new HashSet<V>();
   }
 
   @Override
   public void finishSuperstep() throws IOException {
+    computedVertices.clear();
   }
 
   @Override
   public Set<V> getComputedVertices() {
     return this.computedVertices;
   }
-  
+
   public Set<V> getNotComputedVertices() {
     return Sets.difference(vertices.keySet(), computedVertices);
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Mon Apr 20 10:18:36 2015
@@ -18,106 +18,39 @@
 package org.apache.hama.graph;
 
 import java.net.InetSocketAddress;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.message.AbstractOutgoingMessageManager;
-import org.apache.hama.util.ReflectionUtils;
 
 public class OutgoingVertexMessageManager<M extends Writable> extends
     AbstractOutgoingMessageManager<GraphJobMessage> {
   protected static final Log LOG = LogFactory
       .getLog(OutgoingVertexMessageManager.class);
 
-  private Combiner<Writable> combiner;
-  private HashMap<InetSocketAddress, MessagePerVertex> storage = new HashMap<InetSocketAddress,
MessagePerVertex>();
-
-  @SuppressWarnings("unchecked")
   @Override
   public void init(HamaConfiguration conf) {
     this.conf = conf;
-
-    final String combinerName = conf.get(Constants.COMBINER_CLASS);
-    if (combinerName != null) {
-      try {
-        combiner = (Combiner<Writable>) ReflectionUtils
-            .newInstance(combinerName);
-      } catch (ClassNotFoundException e) {
-        e.printStackTrace();
-      }
-    }
   }
 
   @Override
   public void addMessage(String peerName, GraphJobMessage msg) {
     InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
-    if (msg.isVertexMessage()) {
-      WritableComparable<?> vertexID = msg.getVertexId();
-
-      if (!storage.containsKey(targetPeerAddress)) {
-        storage.put(targetPeerAddress, new MessagePerVertex());
-      }
-
-      MessagePerVertex msgPerVertex = storage.get(targetPeerAddress);
-      msgPerVertex.add(vertexID, msg);
-
-      // Combining messages
-      if (combiner != null && msgPerVertex.get(vertexID).getNumOfValues() > 1)
{
-
-        // Overwrite
-        storage.get(targetPeerAddress).put(
-            vertexID,
-            new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
-                vertexID).getIterableMessages())));
-      }
-    } else {
-      outgoingBundles.get(targetPeerAddress).addMessage(msg);
-    }
+    outgoingBundles.get(targetPeerAddress).addMessage(msg);
   }
 
   @Override
   public void clear() {
     outgoingBundles.clear();
-    storage.clear();
   }
 
   @Override
   public Iterator<Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>>
getBundleIterator() {
-    return new Iterator<Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>>()
{
-      final Iterator<Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>>
bundles = outgoingBundles
-          .entrySet().iterator();
-
-      @Override
-      public boolean hasNext() {
-        return bundles.hasNext();
-      }
-
-      @Override
-      public Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> next()
{
-        Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> bundle =
bundles
-            .next();
-
-        MessagePerVertex msgStorage = storage.get(bundle.getKey());
-        if (msgStorage != null) {
-          bundle.getValue().addMessages(msgStorage.getMessages());
-        }
-        storage.remove(bundle.getKey());
-        return bundle;
-      }
-
-      @Override
-      public void remove() {
-      }
-
-    };
+    return outgoingBundles.entrySet().iterator();
   }
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Mon Apr 20 10:18:36 2015
@@ -76,19 +76,20 @@ public abstract class Vertex<V extends W
 
   @Override
   public void sendMessage(Edge<V, E> e, M msg) throws IOException {
-    runner.sendMessage(e.getDestinationVertexID(), msg);
+    runner.sendMessage(e.getDestinationVertexID(), GraphJobRunner.serialize(msg));
   }
 
   @Override
   public void sendMessage(V destinationVertexID, M msg) throws IOException {
-    runner.sendMessage(destinationVertexID, msg);
+    runner.sendMessage(destinationVertexID, GraphJobRunner.serialize(msg));
   }
   
   @Override
   public void sendMessageToNeighbors(M msg) throws IOException {
     final List<Edge<V, E>> outEdges = this.getEdges();
+    byte[] serialized = GraphJobRunner.serialize(msg);
     for (Edge<V, E> e : outEdges) {
-      sendMessage(e, msg);
+      runner.sendMessage(e.getDestinationVertexID(), serialized);
     }
   }
 

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java Mon Apr
20 10:18:36 2015
@@ -17,6 +17,7 @@
  */
 package org.apache.hama.graph;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.PriorityQueue;
 
@@ -32,7 +33,7 @@ import com.google.common.collect.Lists;
 public class TestGraphJobMessage extends TestCase {
 
   @Test
-  public void testPriorityQueue() {
+  public void testPriorityQueue() throws IOException {
     PriorityQueue<GraphJobMessage> prio = new PriorityQueue<GraphJobMessage>();
     prio.addAll(getMessages());
 
@@ -53,14 +54,14 @@ public class TestGraphJobMessage extends
     assertTrue(prio.isEmpty());
   }
 
-  public List<GraphJobMessage> getMessages() {
+  public List<GraphJobMessage> getMessages() throws IOException {
     GraphJobMessage mapMsg = new GraphJobMessage(new MapWritable());
     GraphJobMessage vertexMsg1 = new GraphJobMessage(new Text("1"),
-        new IntWritable());
+        GraphJobRunner.serialize(new IntWritable()));
     GraphJobMessage vertexMsg2 = new GraphJobMessage(new Text("2"),
-        new IntWritable());
+        GraphJobRunner.serialize(new IntWritable()));
     GraphJobMessage vertexMsg3 = new GraphJobMessage(new Text("3"),
-        new IntWritable());
+        GraphJobRunner.serialize(new IntWritable()));
     return Lists.newArrayList(mapMsg, vertexMsg1, vertexMsg2, vertexMsg3);
   }
 

Modified: hama/trunk/mesos/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/mesos/pom.xml?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/mesos/pom.xml (original)
+++ hama/trunk/mesos/pom.xml Mon Apr 20 10:18:36 2015
@@ -96,7 +96,7 @@
                   <outputDirectory>${project.parent.basedir}/lib</outputDirectory>
                 </artifactItem>
               </artifactItems>
-              <excludeTransitive>true</excludeTransitive>
+              <excludeTransitive>false</excludeTransitive>
               <fileMode>755</fileMode>
             </configuration>
           </execution>

Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Mon Apr 20 10:18:36 2015
@@ -92,6 +92,7 @@
     <commons-lang.version>2.6</commons-lang.version>
     <commons-httpclient.version>3.0.1</commons-httpclient.version>
     <commons-io.version>2.4</commons-io.version>
+    <commons-collections.version>3.2.1</commons-collections.version>
     <commons-compress.version>1.9</commons-compress.version>
     <hadoop.version>1.2.0</hadoop.version>
     <protobuf.version>2.5.0</protobuf.version>
@@ -269,6 +270,11 @@
         <version>${commons-httpclient.version}</version>
       </dependency>
       <dependency>
+        <groupId>commons-collections</groupId>
+        <artifactId>commons-collections</artifactId>
+        <version>${commons-collections.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.mortbay.jetty</groupId>
         <artifactId>jetty</artifactId>
         <version>${jetty.version}</version>



Mime
View raw message