hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1675359 - in /hama/trunk: examples/src/main/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/
Date Wed, 22 Apr 2015 13:32:55 GMT
Author: edwardyoon
Date: Wed Apr 22 13:32:55 2015
New Revision: 1675359

URL: http://svn.apache.org/r1675359
Log:
Fix race conditions about ConcurrentHashMap

Modified:
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1675359&r1=1675358&r2=1675359&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Wed Apr 22 13:32:55
2015
@@ -56,14 +56,14 @@ public class PageRank {
       if (val != null) {
         MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
       }
+      
+      // initialize this vertex to 1 / count of global vertices in this graph
+      setValue(new DoubleWritable(1.0 / getTotalNumVertices()));
     }
 
     @Override
     public void compute(Iterable<DoubleWritable> messages) throws IOException {
-      // initialize this vertex to 1 / count of global vertices in this graph
-      if (this.getSuperstepCount() == 0) {
-        setValue(new DoubleWritable(1.0 / getTotalNumVertices()));
-      } else if (this.getSuperstepCount() >= 1) {
+      if (this.getSuperstepCount() >= 1) {
         double sum = 0;
         for (DoubleWritable msg : messages) {
           sum += msg.get();
@@ -110,14 +110,13 @@ public class PageRank {
 
   public static class PagerankJsonReader extends
       VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> {
-    JSONParser parser = new JSONParser();
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean parseVertex(LongWritable key, Text value,
         Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
-
-      String strValue = value.toString();
-      JSONArray jsonArray = (JSONArray) parser.parse(strValue);
+      JSONArray jsonArray = (JSONArray) new JSONParser()
+          .parse(value.toString());
 
       vertex.setVertexID(new Text(jsonArray.get(0).toString()));
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1675359&r1=1675358&r2=1675359&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Wed Apr 22 13:32:55
2015
@@ -74,7 +74,8 @@ public final class GraphJobMessage imple
     this.flag = VERTEX_FLAG;
     this.vertexId = vertexId;
 
-    add(vertexValue);
+    if (vertexValue != null)
+      add(vertexValue);
   }
 
   public GraphJobMessage(IntWritable size) {
@@ -299,4 +300,8 @@ public final class GraphJobMessage imple
     }
   }
 
+  public void setFlag(int partitionFlag) {
+    this.flag = partitionFlag;
+  }
+
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1675359&r1=1675358&r2=1675359&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Apr 22 13:32:55
2015
@@ -23,9 +23,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -319,7 +317,7 @@ public final class GraphJobRunner<V exte
     } catch (InterruptedException e) {
       LOG.error(e);
     }
-    
+
     getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
     finishSuperstep();
@@ -414,6 +412,9 @@ public final class GraphJobRunner<V exte
     EDGE_VALUE_CLASS = edgeValueClass;
   }
 
+  private final ConcurrentHashMap<String, GraphJobMessage> messages = new ConcurrentHashMap<String,
GraphJobMessage>();
+  private VertexInputReader<Writable, Writable, V, E, M> reader;
+
   /**
    * Loads vertices into memory of each peer.
    */
@@ -421,9 +422,8 @@ public final class GraphJobRunner<V exte
   private void loadVertices(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
-    final Map<String, GraphJobMessage> messages = new HashMap<String, GraphJobMessage>();
-    
-    VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable,
Writable, V, E, M>) ReflectionUtils
+
+    reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
         .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
             VertexInputReader.class));
 
@@ -435,37 +435,37 @@ public final class GraphJobRunner<V exte
     try {
       KeyValuePair<Writable, Writable> next = null;
       while ((next = peer.readNext()) != null) {
-
         Vertex<V, E, M> vertex = GraphJobRunner
             .<V, E, M> newVertexInstance(VERTEX_CLASS);
 
         boolean vertexFinished = reader.parseVertex(next.getKey(),
             next.getValue(), vertex);
+
         if (!vertexFinished) {
           continue;
         }
 
-        String dstHost = getHostName(vertex.getVertexID());
-        if (peer.getPeerName().equals(dstHost)) {
-          Runnable worker = new LoadWorker(vertex);
-          executor.execute(worker);
-        } else {
-          if (!messages.containsKey(dstHost)) {
-            messages.put(dstHost, new GraphJobMessage(serialize(vertex)));
-          } else {
-            messages.get(dstHost).add(serialize(vertex));
-          }
-        }
+        Runnable worker = new LoadWorker(vertex);
+        executor.execute(worker);
+
       }
     } catch (Exception e) {
       e.printStackTrace();
     }
 
-    for (Entry<String, GraphJobMessage> e : messages.entrySet()) {
-      peer.send(e.getKey(), e.getValue());
+    executor.shutdown();
+    executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+
+    Iterator<Entry<String, GraphJobMessage>> it;
+    it = messages.entrySet().iterator();
+    while (it.hasNext()) {
+      Entry<String, GraphJobMessage> e = it.next();
+      it.remove();
+      GraphJobMessage msg = e.getValue();
+      msg.setFlag(GraphJobMessage.PARTITION_FLAG);
+      peer.send(e.getKey(), msg);
     }
-    messages.clear();
-    
+
     peer.sync();
 
     GraphJobMessage msg;
@@ -477,12 +477,9 @@ public final class GraphJobRunner<V exte
         Vertex<V, E, M> vertex = newVertexInstance(VERTEX_CLASS);
         vertex.readFields(dis);
 
-        Runnable worker = new LoadWorker(vertex);
-        executor.execute(worker);
+        addVertex(vertex);
       }
     }
-    executor.shutdown();
-    executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
 
     LOG.info(vertices.size() + " vertices are loaded into "
         + peer.getPeerName());
@@ -498,7 +495,16 @@ public final class GraphJobRunner<V exte
     @Override
     public void run() {
       try {
-        addVertex(vertex);
+        String dstHost = getHostName(vertex.getVertexID());
+        if (peer.getPeerName().equals(dstHost)) {
+          addVertex(vertex);
+        } else {
+          if (!messages.containsKey(dstHost)) {
+            messages.putIfAbsent(dstHost, new GraphJobMessage());
+          }
+          messages.get(dstHost).add(serialize(vertex));
+        }
+
       } catch (IOException e) {
         e.printStackTrace();
       }
@@ -644,12 +650,11 @@ public final class GraphJobRunner<V exte
   private final ConcurrentHashMap<V, GraphJobMessage> storage = new ConcurrentHashMap<V,
GraphJobMessage>();
 
   public void sendMessage(V vertexID, byte[] msg) throws IOException {
-    if (storage.containsKey(vertexID)) {
-      storage.get(vertexID).add(msg);
-    } else {
+    if (!storage.containsKey(vertexID)) {
       // To save bit memory we don't set vertexID twice
-      storage.put(vertexID, new GraphJobMessage(null, msg));
+      storage.putIfAbsent(vertexID, new GraphJobMessage());
     }
+    storage.get(vertexID).add(msg);
   }
 
   public void finishSuperstep() throws IOException {
@@ -659,21 +664,21 @@ public final class GraphJobRunner<V exte
     while (it.hasNext()) {
       Entry<V, GraphJobMessage> e = it.next();
       it.remove();
-      
+
       if (combiner != null && e.getValue().getNumOfValues() > 1) {
-        peer.send(
-            getHostName(e.getKey()),
-            new GraphJobMessage(e.getKey(), serialize(combiner
-                .combine(getIterableMessages(e.getValue().getValuesBytes(), e
-                    .getValue().getNumOfValues())))));
+        GraphJobMessage combined = new GraphJobMessage(e.getKey(),
+            serialize(combiner.combine(getIterableMessages(e.getValue()
+                .getValuesBytes(), e.getValue().getNumOfValues()))));
+        combined.setFlag(GraphJobMessage.VERTEX_FLAG);
+        peer.send(getHostName(e.getKey()), combined);
       } else {
         // set vertexID
         e.getValue().setVertexId(e.getKey());
+        e.getValue().setFlag(GraphJobMessage.VERTEX_FLAG);
         peer.send(getHostName(e.getKey()), e.getValue());
       }
     }
-    storage.clear();
-    
+
     if (isMasterTask(peer)) {
       peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
     }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1675359&r1=1675358&r2=1675359&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Wed Apr 22 13:32:55 2015
@@ -69,11 +69,11 @@ public class IncomingVertexMessageManage
   @Override
   public void add(GraphJobMessage item) {
     if (item.isVertexMessage()) {
-      if (storage.containsKey(item.getVertexId())) {
+      if (!storage.containsKey(item.getVertexId())) {
+        storage.putIfAbsent(item.getVertexId(), item);
+      } else {
         storage.get(item.getVertexId()).addValuesBytes(item.getValuesBytes(),
             item.size());
-      } else {
-        storage.put(item.getVertexId(), item);
       }
     } else {
       mapMessages.add(item);

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1675359&r1=1675358&r2=1675359&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java Wed Apr 22 13:32:55
2015
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -44,7 +43,7 @@ import com.google.common.collect.Sets;
  */
 public final class MapVerticesInfo<V extends WritableComparable<V>, E extends Writable,
M extends Writable>
     implements VerticesInfo<V, E, M> {
-  private final Map<V, Vertex<V, E, M>> vertices = new ConcurrentHashMap<V,
Vertex<V, E, M>>();
+  private final ConcurrentHashMap<V, Vertex<V, E, M>> vertices = new ConcurrentHashMap<V,
Vertex<V, E, M>>();
 
   private Set<V> computedVertices = new HashSet<V>();
 
@@ -55,11 +54,12 @@ public final class MapVerticesInfo<V ext
 
   @Override
   public void put(Vertex<V, E, M> vertex) throws IOException {
-    if (vertices.containsKey(vertex.getVertexID())) {
-      for (Edge<V, E> e : vertex.getEdges())
-        vertices.get(vertex.getVertexID()).addEdge(e);
+    if (!vertices.containsKey(vertex.getVertexID())) {
+      vertices.putIfAbsent(vertex.getVertexID(), vertex);
     } else {
-      vertices.put(vertex.getVertexID(), vertex);
+      for (Edge<V, E> e : vertex.getEdges()) {
+        vertices.get(vertex.getVertexID()).addEdge(e);
+      }
     }
   }
 



Mime
View raw message