giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clau...@apache.org
Subject [1/2] git commit: updated refs/heads/trunk to 67e0f11
Date Thu, 11 Apr 2013 20:05:16 GMT
Updated Branches:
  refs/heads/trunk 96fd05385 -> 67e0f11d2


GIRAPH-613


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6cf79dbe
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6cf79dbe
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6cf79dbe

Branch: refs/heads/trunk
Commit: 6cf79dbe2c4397732820c435df723fe50e9f3daf
Parents: 96fd053
Author: Claudio Martella <claudio@apache.org>
Authored: Thu Apr 11 18:09:25 2013 +0200
Committer: Claudio Martella <claudio@apache.org>
Committed: Thu Apr 11 18:09:25 2013 +0200

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../java/org/apache/giraph/conf/GiraphClasses.java |    2 +-
 .../main/java/org/apache/giraph/graph/Vertex.java  |   25 +--
 .../org/apache/giraph/graph/VertexMutations.java   |    6 +-
 .../giraph/partition/ByteArrayPartition.java       |   31 ++--
 .../giraph/partition/DiskBackedPartitionStore.java |    9 +-
 .../apache/giraph/partition/SimplePartition.java   |    9 +-
 .../org/apache/giraph/utils/WritableUtils.java     |  182 +++++++++++++++
 .../apache/giraph/graph/TestVertexAndEdges.java    |   24 ++-
 9 files changed, 231 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index babbb88..4a1e7ca 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-613: Remove Writable from the interfaces implemented by Vertex (claudio)
+
   GIRAPH-543: Fix PageRankBenchmark and make WeightedPageRankBenchmark (majakabiljo)
 
   GIRAPH-615: Add support for multithreaded output (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 64f8bb1..95499bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -134,7 +134,7 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
-   * Contructor that reads classes from a Configuration object.
+   * Constructor that reads classes from a Configuration object.
    *
    * @param conf Configuration object to read from.
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
index fda6023..a1b1a87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
@@ -35,13 +35,13 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
 /**
  * Basic abstract class for writing a BSP application for computation.
+ * Giraph will checkpoint Vertex value and edges, hence all user data should
+ * be stored as part of the vertex value.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
@@ -51,7 +51,7 @@ import java.util.Iterator;
 public abstract class Vertex<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
-    implements WorkerAggregatorUsage, Writable {
+    implements WorkerAggregatorUsage {
   /** Vertex id. */
   private I id;
   /** Vertex value. */
@@ -507,25 +507,6 @@ public abstract class Vertex<I extends WritableComparable,
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {
-    id = getConf().createVertexId();
-    id.readFields(in);
-    value = getConf().createVertexValue();
-    value.readFields(in);
-    edges = getConf().createVertexEdges();
-    edges.readFields(in);
-    halt = in.readBoolean();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    id.write(out);
-    value.write(out);
-    edges.write(out);
-    out.writeBoolean(halt);
-  }
-
-  @Override
   public String toString() {
     return "Vertex(id=" + getId() + ",value=" + getValue() +
         ",#edges=" + getNumEdges() + ")";

http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
index ea50f25..75c0aef 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
@@ -87,8 +87,8 @@ public class VertexMutations<I extends WritableComparable,
 
     int addedVertexListSize = input.readInt();
     for (int i = 0; i < addedVertexListSize; ++i) {
-      Vertex<I, V, E, M> vertex = conf.createVertex();
-      vertex.readFields(input);
+      Vertex<I, V, E, M> vertex =
+          WritableUtils.readVertexFromDataInput(input, getConf());
       addedVertexList.add(vertex);
     }
     removedVertexCount = input.readInt();
@@ -110,7 +110,7 @@ public class VertexMutations<I extends WritableComparable,
   public void write(DataOutput output) throws IOException {
     output.writeInt(addedVertexList.size());
     for (Vertex<I, V, E, M> vertex : addedVertexList) {
-      vertex.write(output);
+      WritableUtils.writeVertexToDataOutput(output, vertex, getConf());
     }
     output.writeInt(removedVertexCount);
     output.writeInt(addedEdgeList.size());

http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index dd8c974..d2e7599 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -79,21 +79,22 @@ public class ByteArrayPartition<I extends WritableComparable,
     if (vertexData == null) {
       return null;
     }
-    WritableUtils.readFieldsFromByteArrayWithSize(
-        vertexData, representativeVertex, useUnsafeSerialization);
+    WritableUtils.reinitializeVertexFromByteArray(
+        vertexData, representativeVertex, useUnsafeSerialization, getConf());
     return representativeVertex;
   }
 
   @Override
   public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
     byte[] vertexData =
-        WritableUtils.writeToByteArrayWithSize(vertex, useUnsafeSerialization);
+        WritableUtils.writeVertexToByteArray(
+            vertex, useUnsafeSerialization, getConf());
     byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
     if (oldVertexBytes == null) {
       return null;
     } else {
-      WritableUtils.readFieldsFromByteArrayWithSize(
-          oldVertexBytes, representativeVertex, useUnsafeSerialization);
+      WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
+          representativeVertex, useUnsafeSerialization, getConf());
       return representativeVertex;
     }
   }
@@ -104,8 +105,8 @@ public class ByteArrayPartition<I extends WritableComparable,
     if (vertexBytes == null) {
       return null;
     }
-    WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
-        representativeVertex, useUnsafeSerialization);
+    WritableUtils.reinitializeVertexFromByteArray(vertexBytes,
+        representativeVertex, useUnsafeSerialization, getConf());
     return representativeVertex;
   }
 
@@ -134,8 +135,8 @@ public class ByteArrayPartition<I extends WritableComparable,
   public long getEdgeCount() {
     long edges = 0;
     for (byte[] vertexBytes : vertexMap.values()) {
-      WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
-          representativeVertex, useUnsafeSerialization);
+      WritableUtils.reinitializeVertexFromByteArray(vertexBytes,
+          representativeVertex, useUnsafeSerialization, getConf());
       edges += representativeVertex.getNumEdges();
     }
     return edges;
@@ -147,12 +148,12 @@ public class ByteArrayPartition<I extends WritableComparable,
     byte[] oldVertexData = vertexMap.get(vertex.getId());
     if (oldVertexData != null) {
       vertexMap.put(vertex.getId(),
-          WritableUtils.writeToByteArrayWithSize(
-              vertex, oldVertexData, useUnsafeSerialization));
+          WritableUtils.writeVertexToByteArray(
+              vertex, oldVertexData, useUnsafeSerialization, getConf()));
     } else {
       vertexMap.put(vertex.getId(),
-          WritableUtils.writeToByteArrayWithSize(
-              vertex, useUnsafeSerialization));
+          WritableUtils.writeVertexToByteArray(
+              vertex, useUnsafeSerialization, getConf()));
     }
   }
 
@@ -223,9 +224,9 @@ public class ByteArrayPartition<I extends WritableComparable,
 
     @Override
     public Vertex<I, V, E, M> next() {
-      WritableUtils.readFieldsFromByteArrayWithSize(
+      WritableUtils.reinitializeVertexFromByteArray(
           vertexDataIterator.next(), representativeVertex,
-          useUnsafeSerialization);
+          useUnsafeSerialization, getConf());
       return representativeVertex;
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 3525302..11e0a90 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -20,6 +20,7 @@ package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -370,8 +371,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     DataInputStream inputStream = new DataInputStream(
         new BufferedInputStream(new FileInputStream(file)));
     for (int i = 0; i < numVertices; ++i) {
-      Vertex<I, V, E, M> vertex = conf.createVertex();
-      vertex.readFields(inputStream);
+      Vertex<I, V , E, M> vertex =
+          WritableUtils.readVertexFromDataInput(inputStream, conf);
       partition.putVertex(vertex);
     }
     inputStream.close();
@@ -397,7 +398,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     DataOutputStream outputStream = new DataOutputStream(
         new BufferedOutputStream(new FileOutputStream(file)));
     for (Vertex<I, V, E, M> vertex : partition) {
-      vertex.write(outputStream);
+      WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf);
     }
     outputStream.close();
   }
@@ -418,7 +419,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     DataOutputStream outputStream = new DataOutputStream(
         new BufferedOutputStream(new FileOutputStream(file, true)));
     for (Vertex<I, V, E, M> vertex : partition) {
-      vertex.write(outputStream);
+      WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf);
     }
     outputStream.close();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index 23e0f05..d6a46bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.partition;
 
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
@@ -121,9 +122,9 @@ public class SimplePartition<I extends WritableComparable,
     }
     int vertices = input.readInt();
     for (int i = 0; i < vertices; ++i) {
-      Vertex<I, V, E, M> vertex = getConf().createVertex();
       progress();
-      vertex.readFields(input);
+      Vertex<I, V, E, M> vertex =
+          WritableUtils.readVertexFromDataInput(input, getConf());
       if (vertexMap.put(vertex.getId(), vertex) != null) {
         throw new IllegalStateException(
             "readFields: " + this +
@@ -136,9 +137,9 @@ public class SimplePartition<I extends WritableComparable,
   public void write(DataOutput output) throws IOException {
     super.write(output);
     output.writeInt(vertexMap.size());
-    for (Vertex vertex : vertexMap.values()) {
+    for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
       progress();
-      vertex.write(output);
+      WritableUtils.writeVertexToDataOutput(output, vertex, getConf());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 6e7b87a..e3d79f7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -18,7 +18,10 @@
 
 package org.apache.giraph.utils;
 
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.VertexEdges;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.giraph.zk.ZooKeeperExt.PathStat;
 import org.apache.hadoop.conf.Configuration;
@@ -328,6 +331,103 @@ public class WritableUtils {
   }
 
   /**
+   * Write vertex data to byte array with the first 4 bytes as the size of the
+   * entire buffer (including the size).
+   *
+   * @param vertex Vertex to write from.
+   * @param buffer Use this buffer instead
+   * @param unsafe Use unsafe serialization?
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   * @param <M> Message value
+   * @return Byte array with serialized object.
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, M extends Writable> byte[] writeVertexToByteArray(
+      Vertex<I, V, E, M> vertex,
+      byte[] buffer,
+      boolean unsafe,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+    ExtendedDataOutput extendedDataOutput;
+    if (unsafe) {
+      extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
+    } else {
+      extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
+    }
+    try {
+      extendedDataOutput.writeInt(-1);
+      writeVertexToDataOutput(extendedDataOutput, vertex, conf);
+      extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
+    } catch (IOException e) {
+      throw new IllegalStateException("writeVertexToByteArray: " +
+          "IOException", e);
+    }
+
+    return extendedDataOutput.getByteArray();
+  }
+
+  /**
+   * Write vertex data to byte array with the first 4 bytes as the size of the
+   * entire buffer (including the size).
+   *
+   * @param vertex Vertex to write from.
+   * @param unsafe Use unsafe serialization?
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   * @param <M> Message value
+   * @return Byte array with serialized object.
+   */
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, M extends Writable> byte[] writeVertexToByteArray(
+      Vertex<I, V, E, M> vertex,
+      boolean unsafe,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+    return writeVertexToByteArray(vertex, null, unsafe, conf);
+  }
+
+  /**
+  * Read vertex data from byteArray to a Writeable object, skipping the size.
+  * Serialization method is choosable. Assumes the vertex has already been
+  * initialized and contains values for Id, value, and edges.
+  *
+  * @param byteArray Byte array to find the fields in.
+  * @param vertex Vertex to fill in the fields.
+  * @param unsafe Use unsafe deserialization
+  * @param <I> Vertex id
+  * @param <V> Vertex value
+  * @param <E> Edge value
+  * @param <M> Message value
+  * @param conf Configuration
+  * @return The vertex
+  */
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable> Vertex<I, V, E, M>
+  reinitializeVertexFromByteArray(
+      byte[] byteArray,
+      Vertex<I, V, E, M> vertex,
+      boolean unsafe,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+    ExtendedDataInput extendedDataInput;
+    if (unsafe) {
+      extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
+    } else {
+      extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
+    }
+    try {
+      extendedDataInput.readInt();
+      reinitializeVertexFromDataInput(extendedDataInput, vertex, conf);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "readFieldsFromByteArrayWithSize: IOException", e);
+    }
+    return vertex;
+  }
+
+  /**
    * Write an edge to an output stream.
    *
    * @param out Data output
@@ -356,4 +456,86 @@ public class WritableUtils {
     edge.getTargetVertexId().readFields(in);
     edge.getValue().readFields(in);
   }
+
+  /**
+   * Reads data from input stream to inizialize Vertex. Assumes the vertex has
+   * already been initialized and contains values for Id, value, and edges.
+   *
+   * @param input The input stream
+   * @param vertex The vertex to initialize
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   * @param <M> Message value
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable> void reinitializeVertexFromDataInput(
+      DataInput input,
+      Vertex<I, V, E, M> vertex,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+    throws IOException {
+    vertex.getId().readFields(input);
+    vertex.getValue().readFields(input);
+    ((VertexEdges<I, E>) vertex.getEdges()).readFields(input);
+    if (input.readBoolean()) {
+      vertex.voteToHalt();
+    } else {
+      vertex.wakeUp();
+    }
+  }
+
+  /**
+   * Reads data from input stream to inizialize Vertex.
+   *
+   * @param input The input stream
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   * @param <M> Message value
+   * @return The vertex
+   * @throws IOException
+   */
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable> Vertex<I, V, E, M>
+  readVertexFromDataInput(
+      DataInput input,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+    throws IOException {
+    Vertex<I, V, E, M> vertex = conf.createVertex();
+    I id = conf.createVertexId();
+    V value = conf.createVertexValue();
+    VertexEdges<I, E> edges = conf.createVertexEdges();
+    vertex.initialize(id, value, edges);
+    reinitializeVertexFromDataInput(input, vertex, conf);
+    return vertex;
+  }
+
+  /**
+   * Writes Vertex data to output stream.
+   *
+   * @param output the output stream
+   * @param vertex The vertex to serialize
+   * @param conf Configuration
+   * @param <I> Vertex id
+   * @param <V> Vertex value
+   * @param <E> Edge value
+   * @param <M> Message value
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable> void writeVertexToDataOutput(
+      DataOutput output,
+      Vertex<I, V, E, M> vertex,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+    throws IOException {
+    vertex.getId().write(output);
+    vertex.getValue().write(output);
+    ((VertexEdges<I, E>) vertex.getEdges()).write(output);
+    output.writeBoolean(vertex.isHalted());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
index fb5b685..8a048fd 100644
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
@@ -370,7 +370,8 @@ public class TestVertexAndEdges {
     byte[] byteArray = null;
     for (int i = 0; i < REPS; ++i) {
       serializeNanosStart = SystemTime.get().getNanoseconds();
-      byteArray = WritableUtils.writeToByteArray(vertex);
+      byteArray = WritableUtils.writeVertexToByteArray(
+          vertex, false, vertex.getConf());
       serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
           serializeNanosStart);
     }
@@ -381,13 +382,14 @@ public class TestVertexAndEdges {
         " bytes / sec for " + edgesClass.getName());
 
     Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
-        readVertex = instantiateVertex(edgesClass);
-
+        readVertex = buildVertex(edgesClass);
+    
     long deserializeNanosStart;
     long deserializeNanos = 0;
     for (int i = 0; i < REPS; ++i) {
       deserializeNanosStart = SystemTime.get().getNanoseconds();
-      WritableUtils.readFieldsFromByteArray(byteArray, readVertex);
+      WritableUtils.reinitializeVertexFromByteArray(byteArray, readVertex, false, 
+          readVertex.getConf());
       deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
           deserializeNanosStart);
     }
@@ -416,7 +418,7 @@ public class TestVertexAndEdges {
       serializeNanosStart = SystemTime.get().getNanoseconds();
       outputStream =
           new DynamicChannelBufferOutputStream(32);
-      vertex.write(outputStream);
+      WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf());
       serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
           serializeNanosStart);
     }
@@ -429,7 +431,7 @@ public class TestVertexAndEdges {
         " bytes / sec for " + edgesClass.getName());
 
     Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
-        readVertex = instantiateVertex(edgesClass);
+        readVertex = buildVertex(edgesClass);
 
     long deserializeNanosStart;
     long deserializeNanos = 0;
@@ -438,7 +440,8 @@ public class TestVertexAndEdges {
       DynamicChannelBufferInputStream inputStream = new
           DynamicChannelBufferInputStream(
           outputStream.getDynamicChannelBuffer());
-      readVertex.readFields(inputStream);
+      WritableUtils.reinitializeVertexFromDataInput(
+          inputStream, readVertex, readVertex.getConf());
       deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
           deserializeNanosStart);
       outputStream.getDynamicChannelBuffer().readerIndex(0);
@@ -470,7 +473,7 @@ public class TestVertexAndEdges {
       serializeNanosStart = SystemTime.get().getNanoseconds();
       outputStream =
           new UnsafeByteArrayOutputStream(32);
-      vertex.write(outputStream);
+      WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf());
       serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
           serializeNanosStart);
     }
@@ -485,7 +488,7 @@ public class TestVertexAndEdges {
         " bytes / sec for " + edgesClass.getName());
 
     Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
-        readVertex = instantiateVertex(edgesClass);
+        readVertex = buildVertex(edgesClass);
 
     long deserializeNanosStart;
     long deserializeNanos = 0;
@@ -494,7 +497,8 @@ public class TestVertexAndEdges {
       UnsafeByteArrayInputStream inputStream = new
           UnsafeByteArrayInputStream(
           outputStream.getByteArray(), 0, outputStream.getPos());
-      readVertex.readFields(inputStream);
+      WritableUtils.reinitializeVertexFromDataInput(
+          inputStream, readVertex, readVertex.getConf());
       deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
           deserializeNanosStart);
     }


Mime
View raw message