hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1676879 - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/util/ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/...
Date Thu, 30 Apr 2015 00:11:24 GMT
Author: edwardyoon
Date: Thu Apr 30 00:11:23 2015
New Revision: 1676879

URL: http://svn.apache.org/r1676879
Log:
HAMA-946: stores vertices in serialized form

Added:
    hama/trunk/core/src/main/java/org/apache/hama/util/WritableUtils.java
Removed:
    hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java
    hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/MaxFlowTest.java
Modified:
    hama/trunk/core/pom.xml
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
    hama/trunk/pom.xml

Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Thu Apr 30 00:11:23 2015
@@ -149,16 +149,6 @@
       <artifactId>zookeeper</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.directmemory</groupId>
-      <artifactId>directmemory-cache</artifactId>
-      <version>0.2</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.directmemory</groupId>
-      <artifactId>directmemory-kryo</artifactId>
-      <version>0.2</version>
-    </dependency>
-    <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
       <version>4.0.21.Final</version>

Added: hama/trunk/core/src/main/java/org/apache/hama/util/WritableUtils.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/WritableUtils.java?rev=1676879&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/WritableUtils.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/WritableUtils.java Thu Apr 30 00:11:23
2015
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class WritableUtils {
+
+  public static byte[] serialize(Writable w) {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataOutput output = new DataOutputStream(out);
+    try {
+      w.write(output);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return out.toByteArray();
+  }
+
+  public static void deserialize(byte[] bytes, Writable obj) {
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
+    try {
+      obj.readFields(in);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Apr 30 00:11:23
2015
@@ -18,9 +18,7 @@
 package org.apache.hama.graph;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
@@ -50,6 +48,7 @@ import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.commons.util.KeyValuePair;
 import org.apache.hama.util.ReflectionUtils;
+import org.apache.hama.util.WritableUtils;
 
 /**
  * Fully generic graph job runner.
@@ -239,6 +238,7 @@ public final class GraphJobRunner<V exte
    * iterate over our messages and vertices in sorted order. That means that we
    * need to seek the first vertex that has the same ID as the iterated message.
    */
+  @SuppressWarnings("unchecked")
   private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
@@ -269,7 +269,9 @@ public final class GraphJobRunner<V exte
       LOG.error(e);
     }
 
-    for (Vertex<V, E, M> vertex : vertices.getValues()) {
+    Iterator it = vertices.iterator();
+    while (it.hasNext()) {
+      Vertex<V, E, M> vertex = (Vertex<V, E, M>) it.next();
       if (!vertex.isHalted() && !vertex.isComputed()) {
         vertex.compute(Collections.<M> emptyList());
         vertices.finishVertexComputation(vertex);
@@ -305,7 +307,7 @@ public final class GraphJobRunner<V exte
     executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
     executor.setRejectedExecutionHandler(retryHandler);
 
-    for (Vertex<V, E, M> v : vertices.getValues()) {
+    for (V v : vertices.keySet()) {
       Runnable worker = new ComputeRunnable(v);
       executor.execute(worker);
     }
@@ -328,13 +330,23 @@ public final class GraphJobRunner<V exte
 
     @SuppressWarnings("unchecked")
     public ComputeRunnable(GraphJobMessage msg) {
-      this.vertex = vertices.get((V) msg.getVertexId());
+      try {
+        this.vertex = vertices.get((V) msg.getVertexId());
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
       this.msgs = (Iterable<M>) getIterableMessages(msg.getValuesBytes(),
           msg.getNumOfValues());
     }
 
-    public ComputeRunnable(Vertex<V, E, M> v) {
-      this.vertex = v;
+    public ComputeRunnable(V v) {
+      try {
+        this.vertex = vertices.get(v);
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
     }
 
     @Override
@@ -434,24 +446,26 @@ public final class GraphJobRunner<V exte
     executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
     executor.setRejectedExecutionHandler(retryHandler);
 
-    try {
-      KeyValuePair<Writable, Writable> next = null;
-      while ((next = peer.readNext()) != null) {
-        Vertex<V, E, M> vertex = GraphJobRunner
-            .<V, E, M> newVertexInstance(VERTEX_CLASS);
-
-        boolean vertexFinished = reader.parseVertex(next.getKey(),
-            next.getValue(), vertex);
+    KeyValuePair<Writable, Writable> next = null;
+    while ((next = peer.readNext()) != null) {
+      Vertex<V, E, M> vertex = GraphJobRunner
+          .<V, E, M> newVertexInstance(VERTEX_CLASS);
 
-        if (!vertexFinished) {
-          continue;
-        }
-        Runnable worker = new Parser(vertex);
-        executor.execute(worker);
+      boolean vertexFinished = false;
+      try {
+        vertexFinished = reader.parseVertex(next.getKey(), next.getValue(),
+            vertex);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
 
+      if (!vertexFinished) {
+        continue;
       }
-    } catch (Exception e) {
-      e.printStackTrace();
+
+      Runnable worker = new Parser(vertex);
+      executor.execute(worker);
+
     }
 
     executor.shutdown();
@@ -526,7 +540,7 @@ public final class GraphJobRunner<V exte
         if (peer.getPeerIndex() == partition) {
           addVertex(vertex);
         } else {
-          messages.get(partition).add(serialize(vertex));
+          messages.get(partition).add(WritableUtils.serialize(vertex));
         }
       } catch (Exception e) {
         e.printStackTrace();
@@ -544,7 +558,6 @@ public final class GraphJobRunner<V exte
       vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
     }
 
-    vertex.setRunner(this);
     vertices.put(vertex);
   }
 
@@ -690,8 +703,8 @@ public final class GraphJobRunner<V exte
 
       if (combiner != null && e.getValue().getNumOfValues() > 1) {
         GraphJobMessage combined = new GraphJobMessage(e.getKey(),
-            serialize(combiner.combine(getIterableMessages(e.getValue()
-                .getValuesBytes(), e.getValue().getNumOfValues()))));
+            WritableUtils.serialize(combiner.combine(getIterableMessages(e
+                .getValue().getValuesBytes(), e.getValue().getNumOfValues()))));
         combined.setFlag(GraphJobMessage.VERTEX_FLAG);
         peer.send(getHostName(e.getKey()), combined);
       } else {
@@ -707,14 +720,6 @@ public final class GraphJobRunner<V exte
     }
   }
 
-  public static byte[] serialize(Writable writable) throws IOException {
-    ByteArrayOutputStream a = new ByteArrayOutputStream();
-    DataOutputStream b = new DataOutputStream(a);
-    writable.write(b);
-    a.close();
-    return a.toByteArray();
-  }
-
   public Iterable<Writable> getIterableMessages(final byte[] valuesBytes,
       final int numOfValues) {
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java Thu Apr 30 00:11:23
2015
@@ -18,7 +18,6 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,6 +26,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.util.WritableUtils;
 
 /**
  * Stores the vertices into a memory-based tree map. This implementation allows
@@ -40,23 +40,29 @@ import org.apache.hama.bsp.TaskAttemptID
  */
 public final class MapVerticesInfo<V extends WritableComparable<V>, E extends Writable,
M extends Writable>
     implements VerticesInfo<V, E, M> {
-  private final ConcurrentHashMap<V, Vertex<V, E, M>> vertices = new ConcurrentHashMap<V,
Vertex<V, E, M>>();
+  private final ConcurrentHashMap<V, byte[]> vertices = new ConcurrentHashMap<V,
byte[]>();
+
+  private GraphJobRunner<V, E, M> runner;
 
   private int activeVertices = 0;
 
   @Override
   public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
       TaskAttemptID attempt) throws IOException {
+    this.runner = runner;
   }
 
   @Override
   public void put(Vertex<V, E, M> vertex) throws IOException {
     if (!vertices.containsKey(vertex.getVertexID())) {
-      vertices.putIfAbsent(vertex.getVertexID(), vertex);
+      vertices.putIfAbsent(vertex.getVertexID(),
+          WritableUtils.serialize(vertex));
     } else {
+      Vertex<V, E, M> v = this.get(vertex.getVertexID());
       for (Edge<V, E> e : vertex.getEdges()) {
-        vertices.get(vertex.getVertexID()).addEdge(e);
+        v.addEdge(e);
       }
+      vertices.put(vertex.getVertexID(), WritableUtils.serialize(v));
     }
   }
 
@@ -70,41 +76,44 @@ public final class MapVerticesInfo<V ext
   }
 
   @Override
-  public Collection<Vertex<V, E, M>> getValues() {
-    return vertices.values();
-  }
-
-  @Override
   public int size() {
     return vertices.size();
   }
 
   @Override
-  public Vertex<V, E, M> get(V vertexID) {
-    return vertices.get(vertexID);
+  public Vertex<V, E, M> get(V vertexID) throws IOException {
+    Vertex<V, E, M> v = GraphJobRunner
+        .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+    WritableUtils.deserialize(vertices.get(vertexID), v);
+    v.setRunner(runner);
+
+    return v;
   }
 
   @Override
   public Iterator<Vertex<V, E, M>> iterator() {
 
-    final Iterator<Vertex<V, E, M>> vertexIterator = vertices.values()
-        .iterator();
+    final Iterator<byte[]> it = vertices.values().iterator();
 
     return new Iterator<Vertex<V, E, M>>() {
 
       @Override
       public boolean hasNext() {
-        return vertexIterator.hasNext();
+        return it.hasNext();
       }
 
       @Override
       public Vertex<V, E, M> next() {
-        return vertexIterator.next();
+        Vertex<V, E, M> v = GraphJobRunner
+            .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+        WritableUtils.deserialize(it.next(), v);
+        v.setRunner(runner);
+        return v;
       }
 
       @Override
       public void remove() {
-        // TODO Auto-generated method stub
+        it.remove();
       }
 
     };
@@ -120,6 +129,7 @@ public final class MapVerticesInfo<V ext
       throws IOException {
     incrementCount();
     vertex.setComputed();
+    vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
   }
 
   public synchronized void incrementCount() {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Apr 30 00:11:23 2015
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.util.WritableUtils;
 
 /**
  * Vertex is a abstract definition of Google Pregel Vertex. For implementing a
@@ -65,6 +66,9 @@ public abstract class Vertex<V extends W
     return runner.getPeer().getConfiguration();
   }
 
+  public Vertex() {
+  }
+
   @Override
   public V getVertexID() {
     return this.vertexID;
@@ -77,18 +81,18 @@ public abstract class Vertex<V extends W
   @Override
   public void sendMessage(Edge<V, E> e, M msg) throws IOException {
     runner.sendMessage(e.getDestinationVertexID(),
-        GraphJobRunner.serialize(msg));
+        WritableUtils.serialize(msg));
   }
 
   @Override
   public void sendMessage(V destinationVertexID, M msg) throws IOException {
-    runner.sendMessage(destinationVertexID, GraphJobRunner.serialize(msg));
+    runner.sendMessage(destinationVertexID, WritableUtils.serialize(msg));
   }
 
   @Override
   public void sendMessageToNeighbors(M msg) throws IOException {
     final List<Edge<V, E>> outEdges = this.getEdges();
-    byte[] serialized = GraphJobRunner.serialize(msg);
+    byte[] serialized = WritableUtils.serialize(msg);
     for (Edge<V, E> e : outEdges) {
       runner.sendMessage(e.getDestinationVertexID(), serialized);
     }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Thu Apr 30 00:11:23
2015
@@ -18,7 +18,6 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
 
@@ -48,7 +47,7 @@ public interface VerticesInfo<V extends
    */
   public void put(Vertex<V, E, M> vertex) throws IOException;
 
-  public Vertex<V, E, M> get(V vertexID);
+  public Vertex<V, E, M> get(V vertexID) throws IOException;
   
   /**
    * Remove a vertex to the underlying structure.
@@ -80,8 +79,6 @@ public interface VerticesInfo<V extends
    */
   public Set<V> keySet();
   
-  public Collection<Vertex<V, E, M>> getValues();
-
   public int getActiveVerticesNum();
   
   /**

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java Thu Apr
30 00:11:23 2015
@@ -26,6 +26,7 @@ import junit.framework.TestCase;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hama.util.WritableUtils;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -57,11 +58,11 @@ public class TestGraphJobMessage extends
   public List<GraphJobMessage> getMessages() throws IOException {
     GraphJobMessage mapMsg = new GraphJobMessage(new MapWritable());
     GraphJobMessage vertexMsg1 = new GraphJobMessage(new Text("1"),
-        GraphJobRunner.serialize(new IntWritable()));
+        WritableUtils.serialize(new IntWritable()));
     GraphJobMessage vertexMsg2 = new GraphJobMessage(new Text("2"),
-        GraphJobRunner.serialize(new IntWritable()));
+        WritableUtils.serialize(new IntWritable()));
     GraphJobMessage vertexMsg3 = new GraphJobMessage(new Text("3"),
-        GraphJobRunner.serialize(new IntWritable()));
+        WritableUtils.serialize(new IntWritable()));
     return Lists.newArrayList(mapMsg, vertexMsg1, vertexMsg2, vertexMsg3);
   }
 

Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Thu Apr 30 00:11:23 2015
@@ -103,7 +103,6 @@
     <log4j.version>1.2.16</log4j.version>
     <zookeeper.version>3.4.5</zookeeper.version>
     <ant.version>1.7.1</ant.version>
-    <kryo.version>2.20</kryo.version>
   </properties>
 
   <repositories>
@@ -224,11 +223,6 @@
         <artifactId>snappy-java</artifactId>
         <version>1.0.5</version>
       </dependency>
-       <dependency>
-         <groupId>com.esotericsoftware.kryo</groupId>
-         <artifactId>kryo</artifactId>
-         <version>${kryo.version}</version>
-      </dependency>
       <dependency>
         <groupId>commons-io</groupId>
         <artifactId>commons-io</artifactId>



Mime
View raw message