giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [1/2] GIRAPH-494: Make Edge an interface (nitay)
Date Thu, 31 Jan 2013 16:17:35 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
b/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
index 8e6c92b..b08e7ad 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -17,31 +17,40 @@
  */
 package org.apache.giraph.io;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
 import org.apache.giraph.bsp.BspUtils;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.graph.*;
+import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.io.formats.AdjacencyListTextVertexInputFormat;
 import org.apache.giraph.io.formats.TextDoubleDoubleAdjacencyListVertexInputFormat;
 import org.apache.giraph.vertex.EdgeListVertex;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -164,9 +173,9 @@ public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends
TextDoub
         vr.getCurrentVertex();
     setGraphState(vertex, graphState);
     assertValidVertex(conf, graphState, vertex, new Text("Hi"), new DoubleWritable(0),
-        new Edge<Text, DoubleWritable>(new Text("Ciao"), new DoubleWritable(1.123d)),
-        new Edge<Text, DoubleWritable>(new Text("Bomdia"), new DoubleWritable(2.234d)),
-        new Edge<Text, DoubleWritable>(new Text("Ola"), new DoubleWritable(3.345d)));
+        new DefaultEdge<Text, DoubleWritable>(new Text("Ciao"), new DoubleWritable(1.123d)),
+        new DefaultEdge<Text, DoubleWritable>(new Text("Bomdia"), new DoubleWritable(2.234d)),
+        new DefaultEdge<Text, DoubleWritable>(new Text("Ola"), new DoubleWritable(3.345d)));
     assertEquals(vertex.getNumEdges(), 3);
   }
 
@@ -192,9 +201,9 @@ public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends
TextDoub
     setGraphState(vertex, graphState);
     assertValidVertex(conf, graphState, vertex,
         new Text("BYE"), new DoubleWritable(0.01d),
-        new Edge<Text, DoubleWritable>(new Text("CIAO"), new DoubleWritable(1.001d)),
-        new Edge<Text, DoubleWritable>(new Text("TCHAU"), new DoubleWritable(2.0001d)),
-        new Edge<Text, DoubleWritable>(new Text("ADIOS"), new DoubleWritable(3.00001d)));
+        new DefaultEdge<Text, DoubleWritable>(new Text("CIAO"), new DoubleWritable(1.001d)),
+        new DefaultEdge<Text, DoubleWritable>(new Text("TCHAU"), new DoubleWritable(2.0001d)),
+        new DefaultEdge<Text, DoubleWritable>(new Text("ADIOS"), new DoubleWritable(3.00001d)));
 
     assertEquals(vertex.getNumEdges(), 3);
   }
@@ -213,7 +222,7 @@ public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends
TextDoub
         vr.getCurrentVertex();
     setGraphState(vertex, graphState);
     assertValidVertex(conf, graphState, vertex, new Text("alpha"), new DoubleWritable(42d),
-        new Edge<Text, DoubleWritable>(new Text("beta"), new DoubleWritable(99d)));
+        new DefaultEdge<Text, DoubleWritable>(new Text("beta"), new DoubleWritable(99d)));
     assertEquals(vertex.getNumEdges(), 1);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
b/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
index 61e8863..185ba50 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestGiraphTransferRegulator.java
@@ -17,11 +17,11 @@
  */
 package org.apache.giraph.partition;
 
+import org.apache.giraph.graph.DefaultEdge;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.vertex.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.GiraphTransferRegulator;
+import org.apache.giraph.vertex.EdgeListVertex;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -29,16 +29,16 @@ import org.apache.hadoop.io.LongWritable;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import com.google.common.collect.Lists;
 
 import java.io.IOException;
 import java.util.List;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Test the GiraphTransferRegulator.
  */
@@ -75,11 +75,11 @@ public class TestGiraphTransferRegulator {
     job.getConfiguration()
         .setInt(GiraphTransferRegulator.MAX_EDGES_PER_TRANSFER, 3);
     List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList();
-    edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(2),
+    edges.add(new DefaultEdge<IntWritable, DoubleWritable>(new IntWritable(2),
         new DoubleWritable(22)));
-    edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(3),
+    edges.add(new DefaultEdge<IntWritable, DoubleWritable>(new IntWritable(3),
         new DoubleWritable(33)));
-    edges.add(new Edge<IntWritable, DoubleWritable>(new IntWritable(4),
+    edges.add(new DefaultEdge<IntWritable, DoubleWritable>(new IntWritable(4),
         new DoubleWritable(44)));
     vertex.initialize(null, null, edges);
     GiraphTransferRegulator gtr =

http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-core/src/test/java/org/apache/giraph/vertex/TestIntIntNullIntVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertex/TestIntIntNullIntVertex.java
b/giraph-core/src/test/java/org/apache/giraph/vertex/TestIntIntNullIntVertex.java
index 73bd135..155861c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestIntIntNullIntVertex.java
+++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestIntIntNullIntVertex.java
@@ -19,12 +19,11 @@
 package org.apache.giraph.vertex;
 
 import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeNoValue;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 import com.google.common.collect.Lists;
 
 import java.io.ByteArrayInputStream;
@@ -36,6 +35,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Tests {@link org.apache.giraph.vertex.IntIntNullIntVertex}.
  */
@@ -54,10 +55,8 @@ public class TestIntIntNullIntVertex {
     IntIntNullIntVertex vertex = new MyIntIntNullVertex();
 
     List<Edge<IntWritable, NullWritable>> edges = Lists.newLinkedList();
-    edges.add(new Edge<IntWritable, NullWritable>(new IntWritable(3),
-        NullWritable.get()));
-    edges.add(new Edge<IntWritable, NullWritable>(new IntWritable(47),
-        NullWritable.get()));
+    edges.add(new EdgeNoValue<IntWritable>(new IntWritable(3)));
+    edges.add(new EdgeNoValue<IntWritable>(new IntWritable(47)));
 
     vertex.initialize(new IntWritable(23), new IntWritable(7), edges);
     vertex.voteToHalt();

http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java
b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java
index 8822d2e..a5a3545 100644
--- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java
+++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java
@@ -21,7 +21,7 @@ package org.apache.giraph.vertex;
 import com.google.common.collect.Lists;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.DefaultEdge;
 import org.apache.hadoop.io.IntWritable;
 import org.junit.Before;
 import org.junit.Test;
@@ -77,11 +77,11 @@ public class TestMultiGraphVertex {
     // in order to catch corner cases:
 
     // Edge list of form: [A, B, A]
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+    vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1),
         new IntWritable(1)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2),
+    vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(2),
         new IntWritable(2)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+    vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1),
         new IntWritable(10)));
     assertEquals(vertex.getNumEdges(), 3);
     assertEquals(vertex.removeEdges(new IntWritable(1)), 2);
@@ -89,11 +89,11 @@ public class TestMultiGraphVertex {
 
     // Edge list of form: [A, B, B]
     vertex = instantiateVertex(vertexClass);
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2),
+    vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(2),
         new IntWritable(2)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+    vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1),
         new IntWritable(1)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+    vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1),
         new IntWritable(10)));
     assertEquals(vertex.getNumEdges(), 3);
     assertEquals(vertex.removeEdges(new IntWritable(1)), 2);
@@ -101,11 +101,11 @@ public class TestMultiGraphVertex {
 
     // Edge list of form: [A, A, B]
     vertex = instantiateVertex(vertexClass);
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+    vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1),
         new IntWritable(1)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(1),
+    vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(1),
         new IntWritable(10)));
-    vertex.addEdge(new Edge<IntWritable, IntWritable>(new IntWritable(2),
+    vertex.addEdge(new DefaultEdge<IntWritable, IntWritable>(new IntWritable(2),
         new IntWritable(2)));
     assertEquals(vertex.getNumEdges(), 3);
     assertEquals(vertex.removeEdges(new IntWritable(1)), 2);

http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java
index 9ed2d81..ca4ba1a 100644
--- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java
+++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java
@@ -17,14 +17,18 @@
  */
 package org.apache.giraph.vertex;
 
-import com.google.common.collect.Lists;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.DefaultEdge;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
-import org.apache.giraph.utils.*;
+import org.apache.giraph.utils.DynamicChannelBufferInputStream;
+import org.apache.giraph.utils.DynamicChannelBufferOutputStream;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -32,11 +36,16 @@ import org.apache.hadoop.io.LongWritable;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test all the mutable vertices (except multigraph versions)
@@ -138,7 +147,7 @@ public class TestMutableVertex {
 
     List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList();
     for (int i = 1000; i > 0; --i) {
-      edges.add(new Edge<IntWritable, DoubleWritable>(
+      edges.add(new DefaultEdge<IntWritable, DoubleWritable>(
           new IntWritable(i), new DoubleWritable(i * 2.0)));
     }
 
@@ -173,7 +182,7 @@ public class TestMutableVertex {
 
     List<Edge<IntWritable, DoubleWritable>> edges = Lists.newLinkedList();
     for (int i = 1000; i > 0; --i) {
-      edges.add(new Edge<IntWritable, DoubleWritable>(
+      edges.add(new DefaultEdge<IntWritable, DoubleWritable>(
           new IntWritable(i), new DoubleWritable(i * 3.0)));
     }
 
@@ -211,19 +220,19 @@ public class TestMutableVertex {
 
     vertex.initialize(new IntWritable(0), new FloatWritable(0.0f));
     assertEquals(vertex.getNumEdges(), 0);
-    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
+    assertTrue(vertex.addEdge(new DefaultEdge<IntWritable, DoubleWritable>(
         new IntWritable(2),
         new DoubleWritable(2.0))));
     assertEquals(vertex.getNumEdges(), 1);
     assertEquals(vertex.getEdgeValue(new IntWritable(2)),
         new DoubleWritable(2.0));
-    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
+    assertTrue(vertex.addEdge(new DefaultEdge<IntWritable, DoubleWritable>(
         new IntWritable(4),
         new DoubleWritable(4.0))));
-    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
+    assertTrue(vertex.addEdge(new DefaultEdge<IntWritable, DoubleWritable>(
         new IntWritable(3),
         new DoubleWritable(3.0))));
-    assertTrue(vertex.addEdge(new Edge<IntWritable, DoubleWritable>(
+    assertTrue(vertex.addEdge(new DefaultEdge<IntWritable, DoubleWritable>(
         new IntWritable(1),
         new DoubleWritable(1.0))));
     assertEquals(vertex.getNumEdges(), 4);
@@ -271,7 +280,7 @@ public class TestMutableVertex {
     List<Edge<IntWritable, DoubleWritable>> edges =
         Lists.newArrayListWithCapacity(edgesCount);
     for (int i = edgesCount; i > 0; --i) {
-      edges.add(new Edge<IntWritable, DoubleWritable>(
+      edges.add(new DefaultEdge<IntWritable, DoubleWritable>(
           new IntWritable(i), new DoubleWritable(i * 2.0)));
     }
     vertex.initialize(new IntWritable(2), new FloatWritable(3.0f), edges);

http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
index 18fee52..a823971 100644
--- a/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
+++ b/giraph-hbase/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
@@ -17,10 +17,11 @@
  */
 package org.apache.giraph.io.hbase.edgemarker;
 
+import org.apache.giraph.graph.DefaultEdge;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
+import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Text;
@@ -86,7 +87,7 @@ public class TableEdgeInputFormat extends
             String edge = Bytes.toString(row.getValue(CF, CHILDREN));
             Text vertexValue = new Text();
             Text edgeId = new Text(edge);
-            edges.add(new Edge<Text, Text>(edgeId, uselessEdgeValue));
+            edges.add(new DefaultEdge<Text, Text>(edgeId, uselessEdgeValue));
             vertex.initialize(vertexId, vertexValue, edges);
 
             return vertex;

http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
index 018972e..c92cc34 100644
--- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
@@ -18,10 +18,12 @@
 
 package org.apache.giraph.io.hcatalog;
 
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.graph.EdgeNoValue;
+import org.apache.giraph.graph.EdgeWithSource;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -55,14 +57,36 @@ public abstract class HCatalogEdgeInputFormat<
   }
 
   /**
+   * Get underlying HCatalog input format. Used for creating readers.
+   *
+   * @return GiraphHCatInputFormat stored.
+   */
+  protected GiraphHCatInputFormat getHCatInputFormat() {
+    return hCatInputFormat;
+  }
+
+  /**
    * {@link EdgeReader} for {@link HCatalogEdgeInputFormat}.
    */
-  protected abstract class HCatalogEdgeReader implements EdgeReader<I, E> {
+  protected abstract static class HCatalogEdgeReader<
+      I extends WritableComparable, E extends Writable>
+      implements EdgeReader<I, E> {
+    /** HCatalog input format to use */
+    private final GiraphHCatInputFormat hCatInputFormat;
     /** Internal {@link RecordReader}. */
     private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
     /** Context passed to initialize. */
     private TaskAttemptContext context;
 
+    /**
+     * Constructor taking hcat input format to use.
+     *
+     * @param hCatInputFormat HCatalog input format
+     */
+    public HCatalogEdgeReader(GiraphHCatInputFormat hCatInputFormat) {
+      this.hCatInputFormat = hCatInputFormat;
+    }
+
     @Override
     public final void initialize(InputSplit inputSplit,
                                  TaskAttemptContext context)
@@ -110,10 +134,10 @@ public abstract class HCatalogEdgeInputFormat<
 
   /**
    * Create {@link EdgeReader}.
-
+   *
    * @return {@link HCatalogEdgeReader} instance.
    */
-  protected abstract HCatalogEdgeReader createEdgeReader();
+  protected abstract HCatalogEdgeReader<I, E> createEdgeReader();
 
   @Override
   public EdgeReader<I, E>
@@ -133,8 +157,17 @@ public abstract class HCatalogEdgeInputFormat<
    * {@link HCatalogEdgeReader} for tables holding a complete edge
    * in each row.
    */
-  protected abstract class SingleRowHCatalogEdgeReader
-      extends HCatalogEdgeReader {
+  protected abstract static class SingleRowHCatalogEdgeReader<
+      I extends WritableComparable, E extends Writable>
+      extends HCatalogEdgeReader<I, E> {
+    /**
+     * Constructor
+     * @param hCatInputFormat giraph input format to use
+     */
+    public SingleRowHCatalogEdgeReader(GiraphHCatInputFormat hCatInputFormat) {
+      super(hCatInputFormat);
+    }
+
     /**
      * Get source vertex id from a record.
      *
@@ -165,7 +198,50 @@ public abstract class HCatalogEdgeInputFormat<
       HCatRecord record = getRecordReader().getCurrentValue();
       return new EdgeWithSource<I, E>(
           getSourceVertexId(record),
-          new Edge<I, E>(getTargetVertexId(record), getEdgeValue(record)));
+          new DefaultEdge<I, E>(getTargetVertexId(record),
+              getEdgeValue(record)));
+    }
+  }
+
+  /**
+   * {@link HCatalogEdgeReader} for tables holding a complete edge
+   * in each row where the edges contain no data other than IDs they point to.
+   */
+  protected abstract static class SingleRowHCatalogEdgeNoValueReader<
+      I extends WritableComparable>
+      extends HCatalogEdgeReader<I, NullWritable> {
+    /**
+     * Constructor
+     * @param hCatInputFormat giraph input format to use
+     */
+    public SingleRowHCatalogEdgeNoValueReader(
+        GiraphHCatInputFormat hCatInputFormat) {
+      super(hCatInputFormat);
+    }
+
+    /**
+     * Get source vertex id from a record.
+     *
+     * @param record Input record
+     * @return I Source vertex id
+     */
+    protected abstract I getSourceVertexId(HCatRecord record);
+
+    /**
+     * Get target vertex id from a record.
+     *
+     * @param record Input record
+     * @return I Target vertex id
+     */
+    protected abstract I getTargetVertexId(HCatRecord record);
+
+    @Override
+    public EdgeWithSource<I, NullWritable> getCurrentEdge() throws IOException,
+        InterruptedException {
+      HCatRecord record = getRecordReader().getCurrentValue();
+      return new EdgeWithSource<I, NullWritable>(
+          getSourceVertexId(record),
+          new EdgeNoValue<I>(getTargetVertexId(record)));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/10ad3c02/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
index 52b9ae3..319242d 100644
--- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
@@ -18,13 +18,13 @@
 
 package org.apache.giraph.io.hcatalog;
 
-import com.google.common.collect.Lists;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.DefaultEdge;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.utils.TimedLogger;
+import org.apache.giraph.vertex.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -34,6 +34,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -353,7 +355,7 @@ public abstract class HCatalogVertexInputFormat<
           currentVertexId = getVertexId(record);
         }
         if (currentVertexId.equals(getVertexId(record))) {
-          currentEdges.add(new Edge<I, E>(
+          currentEdges.add(new DefaultEdge<I, E>(
                   getTargetVertexId(record),
                   getEdgeValue(record)));
           recordsForVertex.add(record);


Mime
View raw message