hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kennethx...@apache.org
Subject svn commit: r1492955 - in /hama/trunk: CHANGES.txt graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Date Fri, 14 Jun 2013 05:07:49 GMT
Author: kennethxian
Date: Fri Jun 14 05:07:49 2013
New Revision: 1492955

URL: http://svn.apache.org/r1492955
Log:
HAMA-761: Improvement for GraphJobMessage comparation operation

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1492955&r1=1492954&r2=1492955&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Jun 14 05:07:49 2013
@@ -15,6 +15,7 @@ Release 0.7 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-761: Improvement for GraphJobMessage comparation operation (MaoYuan Xian)
    HAMA-760: Add new features to existing Multi Layer Perceptron (Yexi Jiang via edwardyoon)
    HAMA-758: Send message to non-exist vertex makes the job fail (MaoYuan Xian via edwardyoon)
    HAMA-757: The partitioning job output should be un-splitable (MaoYuan Xian via edwardyoon)

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1492955&r1=1492954&r2=1492955&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Fri Jun 14 05:07:49
2013
@@ -21,10 +21,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -46,6 +48,15 @@ public final class GraphJobMessage imple
   private WritableComparable vertexId;
   private Writable vertexValue;
   private IntWritable verticesSize;
+  private static GraphJobMessageComparator comparator;
+
+  static {
+    if (comparator == null) {
+      comparator = new GraphJobMessageComparator();
+    }
+
+    WritableComparator.define(GraphJobMessage.class, comparator);
+  }
 
   public GraphJobMessage() {
   }
@@ -84,6 +95,28 @@ public final class GraphJobMessage imple
 
   }
 
+  public void fastReadFields(DataInput in) throws IOException {
+    flag = in.readByte();
+    if (isVertexMessage()) {
+      vertexId = GraphJobRunner.createVertexIDObject();
+      vertexId.readFields(in);
+      /*
+       * vertexValue = GraphJobRunner.createVertexValue();
+       * vertexValue.readFields(in);
+       */
+    } else if (isMapMessage()) {
+      map = new MapWritable();
+      map.readFields(in);
+    } else if (isVerticesSizeMessage()) {
+      verticesSize = new IntWritable();
+      verticesSize.readFields(in);
+    } else {
+      vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
+          null);
+      vertexId.readFields(in);
+    }
+  }
+
   @Override
   public void readFields(DataInput in) throws IOException {
     flag = in.readByte();
@@ -162,4 +195,41 @@ public final class GraphJobMessage imple
     }
   }
 
+  public static class GraphJobMessageComparator extends WritableComparator {
+    private final DataInputBuffer buffer;
+    private final GraphJobMessage key1;
+    private final GraphJobMessage key2;
+
+    public GraphJobMessageComparator() {
+      this(GraphJobMessage.class);
+    }
+
+    protected GraphJobMessageComparator(
+        Class<? extends WritableComparable<?>> keyClass) {
+      this(keyClass, false);
+    }
+
+    protected GraphJobMessageComparator(
+        Class<? extends WritableComparable<?>> keyClass, boolean createInstances)
{
+      super(keyClass, createInstances);
+      key1 = new GraphJobMessage();
+      key2 = new GraphJobMessage();
+      buffer = new DataInputBuffer();
+    }
+
+    public synchronized int compare(byte[] b1, int s1, int l1, byte[] b2,
+        int s2, int l2) {
+      try {
+        buffer.reset(b1, s1, l1); // parse key1
+        key1.fastReadFields(buffer);
+
+        buffer.reset(b2, s2, l2); // parse key2
+        key2.fastReadFields(buffer);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      return compare(key1, key2); // compare them
+    }
+  }
 }



Mime
View raw message