giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clau...@apache.org
Subject [2/2] git commit: updated refs/heads/trunk to 9ded6c3
Date Mon, 18 Nov 2013 14:52:08 GMT
GIRAPH-758


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9ded6c37
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9ded6c37
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9ded6c37

Branch: refs/heads/trunk
Commit: 9ded6c372b3bcbceb28f6186efea7140c1f62dcd
Parents: adc87ae
Author: Claudio Martella <claudio.martella@gmail.com>
Authored: Mon Nov 18 15:51:18 2013 +0100
Committer: Claudio Martella <claudio.martella@gmail.com>
Committed: Mon Nov 18 15:51:18 2013 +0100

----------------------------------------------------------------------
 giraph-gora/conf/edge.avsc                      |  12 +
 giraph-gora/conf/gora-cassandra-mapping.xml     |  42 ++
 giraph-gora/conf/gora-hbase-mapping.xml         |  63 +++
 giraph-gora/conf/gora.properties                |  29 ++
 giraph-gora/conf/vertex.json                    |  18 +
 giraph-gora/conf/zoo.cfg                        |   4 +
 giraph-gora/pom.xml                             | 125 ++++++
 giraph-gora/src/main/assembly/compile.xml       |  39 ++
 .../io/gora/GoraGVertexVertexInputFormat.java   |  98 +++++
 .../io/gora/GoraGVertexVertexOutputFormat.java  |  83 ++++
 .../giraph/io/gora/GoraVertexInputFormat.java   | 414 +++++++++++++++++++
 .../giraph/io/gora/GoraVertexOutputFormat.java  | 297 +++++++++++++
 .../io/gora/constants/GiraphGoraConstants.java  |  80 ++++
 .../giraph/io/gora/constants/package-info.java  |  21 +
 .../giraph/io/gora/generated/GVertex.java       | 280 +++++++++++++
 .../giraph/io/gora/generated/GVertexResult.java | 280 +++++++++++++
 .../giraph/io/gora/generated/package-info.java  |  21 +
 .../org/apache/giraph/io/gora/package-info.java |  21 +
 .../giraph/io/gora/utils/DefaultKeyFactory.java |  45 ++
 .../io/gora/utils/ExtraGoraInputFormat.java     | 172 ++++++++
 .../apache/giraph/io/gora/utils/GoraUtils.java  | 194 +++++++++
 .../apache/giraph/io/gora/utils/KeyFactory.java |  54 +++
 .../giraph/io/gora/utils/package-info.java      |  21 +
 .../io/gora/GoraTestVertexInputFormat.java      | 128 ++++++
 .../io/gora/GoraTestVertexOutputFormat.java     | 114 +++++
 .../io/gora/TestGoraVertexInputFormat.java      | 121 ++++++
 .../io/gora/TestGoraVertexOutputFormat.java     |  74 ++++
 27 files changed, 2850 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/edge.avsc
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/edge.avsc b/giraph-gora/conf/edge.avsc
new file mode 100644
index 0000000..c5caeb1
--- /dev/null
+++ b/giraph-gora/conf/edge.avsc
@@ -0,0 +1,12 @@
+{
+  "type": "record",
+  "name": "GEdge",
+  "namespace": "org.apache.giraph.gora.generated",
+  "fields" : [
+    {"name": "edgeId", "type": "string"},
+    {"name": "edgeWeight", "type": "float"},
+    {"name": "vertexInId", "type": "string"},
+    {"name": "vertexOutId", "type": "string"},
+    {"name": "label", "type": "string"}
+  ]
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/gora-cassandra-mapping.xml
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/gora-cassandra-mapping.xml b/giraph-gora/conf/gora-cassandra-mapping.xml
new file mode 100644
index 0000000..85079dd
--- /dev/null
+++ b/giraph-gora/conf/gora-cassandra-mapping.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<gora-orm>
+
+  <keyspace name="graphGiraph" host="localhost" cluster="Test Cluster">
+    <family name="vertices"/>
+  </keyspace>
+
+  <class name="org.apache.giraph.io.gora.generated.GVertex" keyClass="java.lang.String" keyspace="graphGiraph">
+    <field name="vertexId" family="vertices" qualifier="vertexId"/>
+    <field name="value" family="vertices" qualifier="value"/>
+    <field name="edges" family="vertices" qualifier="edges"/>
+  </class>
+
+  <keyspace name="graphGiraphResults" host="localhost" cluster="Test Cluster">
+    <family name="vertices"/>
+  </keyspace>
+
+  <class name="org.apache.giraph.io.gora.generated.GVertexResult" keyClass="java.lang.String" keyspace="graphGiraphResults">
+    <field name="vertexId" family="vertices" qualifier="vertexId"/>
+    <field name="value" family="vertices" qualifier="value"/>
+    <field name="edges" family="vertices" qualifier="edges"/>
+  </class>
+
+</gora-orm>

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/gora-hbase-mapping.xml
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/gora-hbase-mapping.xml b/giraph-gora/conf/gora-hbase-mapping.xml
new file mode 100644
index 0000000..fd40e30
--- /dev/null
+++ b/giraph-gora/conf/gora-hbase-mapping.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<gora-orm>
+
+  <table name="graphGiraph">
+    <family name="vertices"/>
+  </table>
+
+  <class name="org.apache.giraph.io.gora.generated.GVertex" keyClass="java.lang.String" table="graphGiraph">
+    <field name="vertexId" family="vertices" qualifier="vertexId"/>
+    <field name="value" family="vertices" qualifier="value"/>
+    <field name="edges" family="vertices" qualifier="edges"/>
+  </class>
+
+  <table name="graphGiraphResults">
+    <family name="vertices"/>
+  </table>
+
+  <class name="org.apache.giraph.io.gora.generated.GVertexResult" keyClass="java.lang.String" table="graphGiraphResults">
+    <field name="vertexId" family="vertices" qualifier="vertexId"/>
+    <field name="value" family="vertices" qualifier="value"/>
+    <field name="edges" family="vertices" qualifier="edges"/>
+  </class>
+
+  <table name="graphGiraphEdges">
+    <family name="edges"/>
+  </table>
+  <class name="org.apache.giraph.io.gora.generated.GEdge" keyClass="java.lang.String" table="graphGiraphEdges">
+    <field name="edgeId" family="edges" qualifier="edgeId"/>
+    <field name="edgeWeight" family="edges" qualifier="edgeWeight"/>
+    <field name="label" family="edges" qualifier="label"/>
+    <field name="vertexInId" family="edges" qualifier="vertexInId"/>
+    <field name="vertexOutId" family="edges" qualifier="vertexOutId"/>
+  </class>
+
+  <table name="graphGiraphResultEdges">
+    <family name="edges"/>
+  </table>
+  <class name="org.apache.giraph.io.gora.generated.GEdgeResult" keyClass="java.lang.String" table="graphGiraphResultEdges">
+    <field name="edgeId" family="edges" qualifier="edgeId"/>
+    <field name="edgeWeight" family="edges" qualifier="edgeWeight"/>
+    <field name="label" family="edges" qualifier="label"/>
+    <field name="vertexInId" family="edges" qualifier="vertexInId"/>
+    <field name="vertexOutId" family="edges" qualifier="vertexOutId"/>
+  </class>
+</gora-orm>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/gora.properties
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/gora.properties b/giraph-gora/conf/gora.properties
new file mode 100644
index 0000000..d59faad
--- /dev/null
+++ b/giraph-gora/conf/gora.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# FOR HBASE DATASTORE
+gora.datastore.default=org.apache.gora.hbase.store.HBaseStore
+
+# FOR CASSANDRA DATASTORE
+#gora.datastore.default=org.apache.gora.cassandra.CassandraStore
+
+# FOR DYNAMO DATASTORE
+#gora.datastore.default=org.apache.gora.dynamodb.store.DynamoDBStore
+#gora.datastore.autocreateschema=true
+
+#preferred.schema.name=person
+#gora.dynamodb.client=sync
+#gora.dynamodb.consistent.reads=true
+#gora.dynamodb.endpoint=http://dynamodb.us-east-1.amazonaws.com/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/vertex.json
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/vertex.json b/giraph-gora/conf/vertex.json
new file mode 100644
index 0000000..9f435fa
--- /dev/null
+++ b/giraph-gora/conf/vertex.json
@@ -0,0 +1,18 @@
+{
+  "type": "record",
+  "name": "Vertex",
+  "namespace": "org.apache.giraph.gora.generated",
+  "fields" : [
+    {"name": "vertexId", "type": "long"},
+    {"name": "value", "type": "float"},
+    {"name": "edges", "type": {"type":"array", "items": {
+      "name": "Edge",
+      "type": "record",
+      "namespace": "org.apache.giraph.gora.generated",
+      "fields": [
+        {"name": "vertexId", "type": "long"},
+        {"name": "edgeValue", "type": "float"}
+      ]
+    }}}
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/conf/zoo.cfg
----------------------------------------------------------------------
diff --git a/giraph-gora/conf/zoo.cfg b/giraph-gora/conf/zoo.cfg
new file mode 100644
index 0000000..f4cec32
--- /dev/null
+++ b/giraph-gora/conf/zoo.cfg
@@ -0,0 +1,4 @@
+tickTime=20000
+dataDir=/var/zookeeper
+clientPort=2181
+maxClientCnxns=300
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-gora/pom.xml b/giraph-gora/pom.xml
new file mode 100644
index 0000000..4b9ca6f
--- /dev/null
+++ b/giraph-gora/pom.xml
@@ -0,0 +1,125 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.giraph</groupId>
+    <artifactId>giraph-parent</artifactId>
+    <version>1.1.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>giraph-gora</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Giraph Gora I/O</name>
+  <url>http://gora.apache.org/giraph-gora/</url>
+  <description>Giraph Gora input/output classes</description>
+
+  <properties>
+    <top.dir>${project.basedir}/..</top.dir>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-site-plugin</artifactId>
+        <configuration>
+          <siteDirectory>${project.basedir}/src/site</siteDirectory>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.6</version>
+        <configuration>
+          <systemProperties>
+            <property>
+              <name>prop.jarLocation</name>
+              <value>${top.dir}/giraph-core/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <!-- compile dependencies. sorted lexicographically. -->
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.giraph</groupId>
+      <artifactId>giraph-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.giraph</groupId>
+      <artifactId>giraph-core</artifactId>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gora</groupId>
+      <artifactId>gora-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>xerces</groupId>
+      <artifactId>xercesImpl</artifactId>
+      <version>2.9.1</version>
+  </dependency>
+  <dependency>
+      <groupId>xalan</groupId>
+      <artifactId>xalan</artifactId>
+      <version>2.7.1</version>
+  </dependency>
+    <!-- test dependencies. sorted lexicographically. -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/assembly/compile.xml
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/assembly/compile.xml b/giraph-gora/src/main/assembly/compile.xml
new file mode 100644
index 0000000..6acf679
--- /dev/null
+++ b/giraph-gora/src/main/assembly/compile.xml
@@ -0,0 +1,39 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <id>jar-with-dependencies</id>
+   <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>true</useProjectArtifact>
+      <outputDirectory>/</outputDirectory>
+      <unpackOptions>
+          <excludes>
+              <exclude>META-INF/LICENSE</exclude>
+          </excludes>
+      </unpackOptions>
+      <unpack>true</unpack>
+      <scope>runtime</scope>
+    </dependencySet>
+  </dependencySets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexInputFormat.java
new file mode 100644
index 0000000..cb0f005
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexInputFormat.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.avro.util.Utf8;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.gora.generated.GVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Example implementation of a specific reader for a generated data bean.
+ */
+public class GoraGVertexVertexInputFormat
+  extends GoraVertexInputFormat<LongWritable, DoubleWritable,
+          FloatWritable> {
+
+  /**
+   * DEfault constructor
+   */
+  public GoraGVertexVertexInputFormat() {
+  }
+
+  /**
+   * Creates specific vertex reader to be used inside Hadoop.
+   * @param split split to be read.
+   * @param context JobContext to be used.
+   * @return GoraVertexReader Vertex reader to be used by Hadoop.
+   */
+  @Override
+  public GoraVertexReader createVertexReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new GoraGVertexVertexReader();
+  }
+
+  /**
+   * Gora vertex reader
+   */
+  protected class GoraGVertexVertexReader extends GoraVertexReader {
+
+    /**
+     * Transforms a GoraObject into a Vertex object.
+     * @param goraObject Object from Gora to be translated.
+     * @return Vertex Result from transforming the gora object.
+     */
+    @Override
+    protected Vertex<LongWritable, DoubleWritable, FloatWritable>
+    transformVertex(Object goraObject) {
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex;
+      /* create the actual vertex */
+      vertex = getConf().createVertex();
+      GVertex tmpGVertex = (GVertex) goraObject;
+
+      LongWritable vrtxId = new LongWritable(
+          Long.parseLong(tmpGVertex.getVertexId().toString()));
+      DoubleWritable vrtxValue = new DoubleWritable(tmpGVertex.getValue());
+      vertex.initialize(vrtxId, vrtxValue);
+      if (tmpGVertex.getEdges() != null && !tmpGVertex.getEdges().isEmpty()) {
+        Set<Utf8> keyIt = tmpGVertex.getEdges().keySet();
+        for (Utf8 key : keyIt) {
+          String keyVal = key.toString();
+          String valVal = tmpGVertex.getEdges().get(key).toString();
+          Edge<LongWritable, FloatWritable> edge;
+          if (!keyVal.contains("vertexId") && !keyVal.contains("value")) {
+            edge = EdgeFactory.create(
+                new LongWritable(Long.parseLong(keyVal)),
+                new FloatWritable(Float.parseFloat(valVal)));
+            vertex.addEdge(edge);
+          }
+        }
+      }
+      return vertex;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexOutputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexOutputFormat.java
new file mode 100644
index 0000000..893e083
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGVertexVertexOutputFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.util.Utf8;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.io.gora.generated.GVertexResult;
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implementation of a specific reader for a generated data bean.
+ */
+public class GoraGVertexVertexOutputFormat
+  extends GoraVertexOutputFormat<LongWritable, DoubleWritable,
+  FloatWritable> {
+
+  /**
+   * DEfault constructor
+   */
+  public GoraGVertexVertexOutputFormat() {
+  }
+
+  @Override
+  public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
+  createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new GoraGVertexVertexWriter();
+  }
+
+  /**
+   * Gora vertex writer.
+   */
+  protected class GoraGVertexVertexWriter extends GoraVertexWriter {
+
+    @Override
+    protected Persistent getGoraVertex(
+        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex) {
+      GVertexResult tmpGVertex = new GVertexResult();
+      tmpGVertex.setVertexId(new Utf8(vertex.getId().toString()));
+      tmpGVertex.setValue(Float.parseFloat(vertex.getValue().toString()));
+      Iterator<Edge<LongWritable, FloatWritable>> it =
+          vertex.getEdges().iterator();
+      while (it.hasNext()) {
+        Edge<LongWritable, FloatWritable> edge = it.next();
+        tmpGVertex.putToEdges(
+            new Utf8(edge.getTargetVertexId().toString()),
+            new Utf8(edge.getValue().toString()));
+      }
+      return tmpGVertex;
+    }
+
+    @Override
+    protected Object getGoraKey(
+        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex) {
+      String goraKey = String.valueOf(vertex.getId());
+      return goraKey;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexInputFormat.java
new file mode 100644
index 0000000..9a6ad8c
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexInputFormat.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.gora.utils.KeyFactory;
+import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat;
+import org.apache.giraph.io.gora.utils.GoraUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ *  Class which wraps the GoraInputFormat. It's designed
+ *  as an extension point to VertexInputFormat subclasses who wish
+ *  to read from Gora data sources.
+ *
+ *  Works with
+ *  {@link GoraVertexOutputFormat}
+ *
+ * @param <I> vertex id type
+ * @param <V>  vertex value type
+ * @param <E>  edge type
+ */
+public abstract class GoraVertexInputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable>
+        extends VertexInputFormat<I, V, E> {
+
+  /** Start key for querying Gora data store. */
+  private static Object START_KEY;
+
+  /** End key for querying Gora data store. */
+  private static Object END_KEY;
+
+  /** Logger for Gora's vertex input format. */
+  private static final Logger LOG =
+          Logger.getLogger(GoraVertexInputFormat.class);
+
+  /** KeyClass used for getting data. */
+  private static Class<?> KEY_CLASS;
+
+  /** The vertex itself will be used as a value inside Gora. */
+  private static Class<? extends Persistent> PERSISTENT_CLASS;
+
+  /** Data store class to be used as backend. */
+  private static Class<? extends DataStore> DATASTORE_CLASS;
+
+  /** Class used to transform strings into Keys */
+  private static Class<?> KEY_FACTORY_CLASS;
+
+  /** Data store used for querying data. */
+  private static DataStore DATA_STORE;
+
+  /** counter for iinput records */
+  private static int RECORD_COUNTER = 0;
+
+  /** Delegate Gora input format */
+  private static ExtraGoraInputFormat GORA_INPUT_FORMAT =
+         new ExtraGoraInputFormat();
+
+  /** @param conf configuration parameters */
+  public void checkInputSpecs(Configuration conf) {
+    String sDataStoreType =
+        GIRAPH_GORA_DATASTORE_CLASS.get(getConf());
+    String sKeyType =
+        GIRAPH_GORA_KEY_CLASS.get(getConf());
+    String sPersistentType =
+        GIRAPH_GORA_PERSISTENT_CLASS.get(getConf());
+    String sKeyFactoryClass =
+        GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf());
+    try {
+      Class<?> keyClass = Class.forName(sKeyType);
+      Class<?> persistentClass = Class.forName(sPersistentType);
+      Class<?> dataStoreClass = Class.forName(sDataStoreType);
+      Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass);
+      setKeyClass(keyClass);
+      setPersistentClass((Class<? extends Persistent>) persistentClass);
+      setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
+      setKeyFactoryClass(keyFactoryClass);
+      setDataStore(createDataStore());
+      GORA_INPUT_FORMAT.setDataStore(getDataStore());
+    } catch (ClassNotFoundException e) {
+      LOG.error("Error while reading Gora Input parameters");
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Create a vertex reader for a given split. Guaranteed to have been
+   * configured with setConf() prior to use. The framework will also call
+   * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param split the split to be read
+   * @param context the information about the task
+   * @return a new record reader
+   * @throws IOException
+   */
+  public abstract GoraVertexReader createVertexReader(InputSplit split,
+    TaskAttemptContext context) throws IOException;
+
+  /**
+   * Gets the splits for a data store.
+   * @param context JobContext
+   * @param minSplitCountHint Hint for a minimum split count
+   * @return List<InputSplit> A list of splits
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+    throws IOException, InterruptedException {
+    KeyFactory kFact = null;
+    try {
+      kFact = (KeyFactory) getKeyFactoryClass().newInstance();
+      kFact.setDataStore(getDataStore());
+    } catch (InstantiationException e) {
+      LOG.error("Key factory was not instantiated. Please verify.");
+      LOG.error(e.getMessage());
+      e.printStackTrace();
+    } catch (IllegalAccessException e) {
+      LOG.error("Key factory was not instantiated. Please verify.");
+      LOG.error(e.getMessage());
+      e.printStackTrace();
+    }
+    String sKey = GIRAPH_GORA_START_KEY.get(getConf());
+    String eKey = GIRAPH_GORA_END_KEY.get(getConf());
+    if (sKey == null || sKey.isEmpty()) {
+      LOG.warn("No start key has been defined.");
+      LOG.warn("Querying all the data store.");
+      sKey = null;
+      eKey = null;
+    } else {
+      setStartKey(kFact.buildKey(sKey));
+      setEndKey(kFact.buildKey(eKey));
+    }
+    Query tmpQuery = GoraUtils.getQuery(
+        getDataStore(), getStartKey(), getEndKey());
+    GORA_INPUT_FORMAT.setQuery(tmpQuery);
+    List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context);
+    return splits;
+  }
+
+  /**
+   * Gets the data store object initialized.
+   * @return DataStore created
+   */
+  public DataStore createDataStore() {
+    DataStore dsCreated = null;
+    try {
+      dsCreated = GoraUtils.createSpecificDataStore(getDatastoreClass(),
+          getKeyClass(), getPersistentClass());
+    } catch (GoraException e) {
+      LOG.error("Error creating data store.");
+      e.printStackTrace();
+    }
+    return dsCreated;
+  }
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex input. Easiest to ignore the key value separator and only use
+   * key instead.
+   */
+  protected abstract class GoraVertexReader extends VertexReader<I, V, E> {
+    /** Current vertex */
+    private Vertex<I, V, E> vertex;
+    /** Results gotten from Gora data store. */
+    private Result readResults;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      getResults();
+      RECORD_COUNTER = 0;
+    }
+
+    /**
+     * Gets the next vertex from Gora data store.
+     * @return true/false depending on the existence of vertices.
+     * @throws IOException exceptions passed along.
+     * @throws InterruptedException exceptions passed along.
+     */
+    @Override
+    // CHECKSTYLE: stop IllegalCatch
+    public boolean nextVertex() throws IOException, InterruptedException {
+      boolean flg = false;
+      try {
+        flg = this.getReadResults().next();
+        this.vertex = transformVertex(this.getReadResults().get());
+        RECORD_COUNTER++;
+      } catch (Exception e) {
+        LOG.error("Error transforming vertices.");
+        LOG.error(e.getMessage());
+        flg = false;
+      }
+      LOG.debug(RECORD_COUNTER + " were transformed.");
+      return flg;
+    }
+    // CHECKSTYLE: resume IllegalCatch
+
+    /**
+     * Gets the progress of reading results from Gora.
+     * @return the progress of reading results from Gora.
+     */
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      float progress = 0.0f;
+      if (getReadResults() != null) {
+        progress = getReadResults().getProgress();
+      }
+      return progress;
+    }
+
+    /**
+     * Gets current vertex.
+     *
+     * @return  The vertex object represented by a Gora object
+     */
+    @Override
+    public Vertex<I, V, E> getCurrentVertex()
+      throws IOException, InterruptedException {
+      return this.vertex;
+    }
+
+    /**
+     * Parser for a single Gora object
+     *
+     * @param   goraObject vertex represented as a GoraObject
+     * @return  The vertex object represented by a Gora object
+     */
+    protected abstract Vertex<I, V, E> transformVertex(Object goraObject);
+
+    /**
+     * Performs a range query to a Gora data store.
+     */
+    protected void getResults() {
+      setReadResults(GoraUtils.getRequest(getDataStore(),
+          getStartKey(), getEndKey()));
+    }
+
+    /**
+     * Finishes the reading process.
+     * @throws IOException.
+     */
+    @Override
+    public void close() throws IOException {
+    }
+
+    /**
+     * Gets the results read.
+     * @return results read.
+     */
+    Result getReadResults() {
+      return readResults;
+    }
+
+    /**
+     * Sets the results read.
+     * @param readResults results read.
+     */
+    void setReadResults(Result readResults) {
+      this.readResults = readResults;
+    }
+  }
+
+  /**
+   * Gets the persistent Class
+   * @return persistentClass used
+   */
+  static Class<? extends Persistent> getPersistentClass() {
+    return PERSISTENT_CLASS;
+  }
+
+  /**
+   * Sets the persistent Class
+   * @param persistentClassUsed to be set
+   */
+  static void setPersistentClass
+  (Class<? extends Persistent> persistentClassUsed) {
+    PERSISTENT_CLASS = persistentClassUsed;
+  }
+
+  /**
+   * Gets the key class used.
+   * @return the key class used.
+   */
+  static Class<?> getKeyClass() {
+    return KEY_CLASS;
+  }
+
+  /**
+   * Sets the key class used.
+   * @param keyClassUsed key class used.
+   */
+  static void setKeyClass(Class<?> keyClassUsed) {
+    KEY_CLASS = keyClassUsed;
+  }
+
+  /**
+   * @return Class the DATASTORE_CLASS
+   */
+  public static Class<? extends DataStore> getDatastoreClass() {
+    return DATASTORE_CLASS;
+  }
+
+  /**
+   * @param dataStoreClass the dataStore class to set
+   */
+  public static void setDatastoreClass(
+      Class<? extends DataStore> dataStoreClass) {
+    DATASTORE_CLASS = dataStoreClass;
+  }
+
+  /**
+   * Gets the start key for querying.
+   * @return the start key.
+   */
+  public Object getStartKey() {
+    return START_KEY;
+  }
+
+  /**
+   * Gets the start key for querying.
+   * @param startKey start key.
+   */
+  public static void setStartKey(Object startKey) {
+    START_KEY = startKey;
+  }
+
+  /**
+   * Gets the end key for querying.
+   * @return the end key.
+   */
+  static Object getEndKey() {
+    return END_KEY;
+  }
+
+  /**
+   * Sets the end key for querying.
+   * @param pEndKey start key.
+   */
+  static void setEndKey(Object pEndKey) {
+    END_KEY = pEndKey;
+  }
+
+  /**
+   * Gets the key factory class.
+   * @return the kEY_FACTORY_CLASS
+   */
+  static Class<?> getKeyFactoryClass() {
+    return KEY_FACTORY_CLASS;
+  }
+
+  /**
+   * Sets the key factory class.
+   * @param keyFactoryClass the keyFactoryClass to set.
+   */
+  static void setKeyFactoryClass(Class<?> keyFactoryClass) {
+    KEY_FACTORY_CLASS = keyFactoryClass;
+  }
+
+  /**
+   * Gets the data store.
+   * @return DataStore
+   */
+  public static DataStore getDataStore() {
+    return DATA_STORE;
+  }
+
+  /**
+   * Sets the data store
+   * @param dStore the dATA_STORE to set
+   */
+  public static void setDataStore(DataStore dStore) {
+    DATA_STORE = dStore;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexOutputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexOutputFormat.java
new file mode 100644
index 0000000..5fcc684
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraVertexOutputFormat.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.io.gora.utils.GoraUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+/**
+ *
+ *  Class which wraps the GoraOutputFormat. It's designed
+ *  as an extension point to VertexOutputFormat subclasses who wish
+ *  to write vertices back to an Accumulo table.
+ *
+ *  Works with
+ *  {@link GoraVertexInputFormat}
+ *
+ *
+ * @param <I> vertex id type
+ * @param <V>  vertex value type
+ * @param <E>  edge type
+ */
+public abstract class GoraVertexOutputFormat<
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable>
+        extends VertexOutputFormat<I, V, E> {
+
+  /** Logger for Gora's vertex input format. */
+  private static final Logger LOG =
+        Logger.getLogger(GoraVertexOutputFormat.class);
+
+  /** KeyClass used for getting data. */
+  private static Class<?> KEY_CLASS;
+
+  /** The vertex itself will be used as a value inside Gora. */
+  private static Class<? extends Persistent> PERSISTENT_CLASS;
+
+  /** Data store class to be used as backend. */
+  private static Class<? extends DataStore> DATASTORE_CLASS;
+
+  /** Data store used for querying data. */
+  private static DataStore DATA_STORE;
+
+  /**
+   * checkOutputSpecs
+   *
+   * @param context information about the job
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+  }
+
+  /**
+   * Gets the data store object initialized.
+   * @return DataStore created
+   */
+  public DataStore createDataStore() {
+    DataStore dsCreated = null;
+    try {
+      dsCreated = GoraUtils.createSpecificDataStore(getDatastoreClass(),
+          getKeyClass(), getPersistentClass());
+    } catch (GoraException e) {
+      getLogger().error("Error creating data store.");
+      e.printStackTrace();
+    }
+    return dsCreated;
+  }
+
+  /**
+   * getOutputCommitter
+   *
+   * @param context the task context
+   * @return OutputCommitter
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new NullOutputCommitter();
+  }
+
+  /**
+   * Empty output commiter for hadoop.
+   */
+  private static class NullOutputCommitter extends OutputCommitter {
+    @Override
+    public void abortTask(TaskAttemptContext arg0) throws IOException {    }
+
+    @Override
+    public void commitTask(TaskAttemptContext arg0) throws IOException {    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void setupJob(JobContext arg0) throws IOException {    }
+
+    @Override
+    public void setupTask(TaskAttemptContext arg0) throws IOException {    }
+  }
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex/edges output. Easiest to ignore the key value separator and only
+   * use key instead.
+   */
+  protected abstract class GoraVertexWriter
+    extends VertexWriter<I, V, E>
+    implements Watcher {
+    /** lock for management of the barrier */
+    private final Object lock = new Object();
+
+    @Override
+    public void initialize(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      String sDataStoreType =
+        GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf());
+      String sKeyType =
+        GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf());
+      String sPersistentType =
+        GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf());
+      try {
+        Class<?> keyClass = Class.forName(sKeyType);
+        Class<?> persistentClass = Class.forName(sPersistentType);
+        Class<?> dataStoreClass = Class.forName(sDataStoreType);
+        setKeyClass(keyClass);
+        setPersistentClass((Class<? extends Persistent>) persistentClass);
+        setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
+        setDataStore(createDataStore());
+        if (getDataStore() != null) {
+          getLogger().info("The output data store has been created.");
+        }
+      } catch (ClassNotFoundException e) {
+        getLogger().error("Error while reading Gora Output parameters");
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void close(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      getDataStore().flush();
+      getDataStore().close();
+    }
+
+    @Override
+    public void writeVertex(Vertex<I, V, E> vertex)
+      throws IOException, InterruptedException {
+      Persistent goraVertex = null;
+      Object goraKey = getGoraKey(vertex);
+      goraVertex = getGoraVertex(vertex);
+      getDataStore().put(goraKey, goraVertex);
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      EventType type = event.getType();
+      if (type == EventType.NodeChildrenChanged) {
+        if (getLogger().isDebugEnabled()) {
+          getLogger().debug("signal: number of children changed.");
+        }
+        synchronized (lock) {
+          lock.notify();
+        }
+      }
+    }
+
+    /**
+     * Each vertex needs to be transformed into a Gora object to be sent to
+     * a specific data store.
+     *
+     * @param  vertex   vertex to be transformed into a Gora object
+     * @return          Gora representation of the vertex
+     */
+    protected abstract Persistent getGoraVertex(Vertex<I, V, E> vertex);
+
+    /**
+     * Gets the correct key from a computed vertex.
+     * @param vertex  vertex to extract the key from.
+     * @return        The key representing such vertex
+     */
+    protected abstract Object getGoraKey(Vertex<I, V, E> vertex);
+
+  }
+
+  /**
+   * Gets the data store.
+   * @return DataStore
+   */
+  public static DataStore getDataStore() {
+    return DATA_STORE;
+  }
+
+  /**
+   * Sets the data store
+   * @param dStore the dATA_STORE to set
+   */
+  public static void setDataStore(DataStore dStore) {
+    DATA_STORE = dStore;
+  }
+
+  /**
+   * Gets the persistent Class
+   * @return persistentClass used
+   */
+  static Class<? extends Persistent> getPersistentClass() {
+    return PERSISTENT_CLASS;
+  }
+
+  /**
+   * Sets the persistent Class
+   * @param persistentClassUsed to be set
+   */
+  static void setPersistentClass
+  (Class<? extends Persistent> persistentClassUsed) {
+    PERSISTENT_CLASS = persistentClassUsed;
+  }
+
+  /**
+   * Gets the key class used.
+   * @return the key class used.
+   */
+  static Class<?> getKeyClass() {
+    return KEY_CLASS;
+  }
+
+  /**
+   * Sets the key class used.
+   * @param keyClassUsed key class used.
+   */
+  static void setKeyClass(Class<?> keyClassUsed) {
+    KEY_CLASS = keyClassUsed;
+  }
+
+  /**
+   * @return Class the DATASTORE_CLASS
+   */
+  public static Class<? extends DataStore> getDatastoreClass() {
+    return DATASTORE_CLASS;
+  }
+
+  /**
+   * @param dataStoreClass the dataStore class to set
+   */
+  public static void setDatastoreClass(
+      Class<? extends DataStore> dataStoreClass) {
+    DATASTORE_CLASS = dataStoreClass;
+  }
+
+  /**
+   * Returns a logger.
+   * @return the log for the output format.
+   */
+  public static Logger getLogger() {
+    return LOG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/GiraphGoraConstants.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/GiraphGoraConstants.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/GiraphGoraConstants.java
new file mode 100644
index 0000000..187bf59
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/GiraphGoraConstants.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora.constants;
+
+import org.apache.giraph.conf.StrConfOption;
+
+/**
+ * Constants used all over Giraph for configuration specific for Gora
+ */
+// CHECKSTYLE: stop InterfaceIsTypeCheck
+public interface GiraphGoraConstants {
+  /** Gora data store class which provides data access. */
+  StrConfOption GIRAPH_GORA_DATASTORE_CLASS =
+    new StrConfOption("giraph.gora.datastore.class", null,
+                      "Gora DataStore class to access to data from. " +
+                      "- required");
+
+  /** Gora key class to query the data store. */
+  StrConfOption GIRAPH_GORA_KEY_CLASS =
+    new StrConfOption("giraph.gora.key.class", null,
+                      "Gora Key class to query the datastore. " +
+                      "- required");
+
+  /** Gora persistent class to query the data store. */
+  StrConfOption GIRAPH_GORA_PERSISTENT_CLASS =
+    new StrConfOption("giraph.gora.persistent.class", null,
+                      "Gora Persistent class to read objects from Gora. " +
+                      "- required");
+
+  /** Gora start key to query the datastore. */
+  StrConfOption GIRAPH_GORA_START_KEY =
+    new StrConfOption("giraph.gora.start.key", null,
+                      "Gora start key to query the datastore. ");
+
+  /** Gora end key to query the datastore. */
+  StrConfOption GIRAPH_GORA_END_KEY =
+    new StrConfOption("giraph.gora.end.key", null,
+                      "Gora end key to query the datastore. ");
+
+  /** Gora data store class which provides data access. */
+  StrConfOption GIRAPH_GORA_KEYS_FACTORY_CLASS =
+    new StrConfOption("giraph.gora.keys.factory.class", null,
+                      "Keys factory to convert strings into desired keys" +
+                      "- required");
+
+  // OUTPUT
+  /** Gora data store class which provides data access. */
+  StrConfOption GIRAPH_GORA_OUTPUT_DATASTORE_CLASS =
+    new StrConfOption("giraph.gora.output.datastore.class", null,
+                      "Gora DataStore class to write data to. " +
+                      "- required");
+
+  /** Gora key class to query the data store. */
+  StrConfOption GIRAPH_GORA_OUTPUT_KEY_CLASS =
+    new StrConfOption("giraph.gora.output.key.class", null,
+                      "Gora Key class to write to datastore. " +
+                      "- required");
+
+  /** Gora persistent class to query the data store. */
+  StrConfOption GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS =
+    new StrConfOption("giraph.gora.output.persistent.class", null,
+                      "Gora Persistent class to write to Gora. " +
+                      "- required");
+}
+// CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/package-info.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/package-info.java
new file mode 100644
index 0000000..3a8b96f
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/constants/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Constants for Gora Input/Output formats
+ */
+package org.apache.giraph.io.gora.constants;

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertex.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertex.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertex.java
new file mode 100644
index 0000000..efd7b95
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertex.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.gora.generated;
+
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+
+/**
+ * Example class for defining a Giraph-Vertex.
+ */
+@SuppressWarnings("all")
+public class GVertex extends PersistentBase {
+  /**
+   * Schema used for the class.
+   */
+  public static final Schema OBJ_SCHEMA = Schema.parse(
+      "{\"type\":\"record\",\"name\":\"Vertex\"," +
+      "\"namespace\":\"org.apache.giraph.gora.generated\"," +
+      "\"fields\":[{\"name\":\"vertexId\",\"type\":\"string\"}," +
+      "{\"name\":\"value\",\"type\":\"float\"},{\"name\":\"edges\"," +
+      "\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}");
+
+  /**
+   * Field enum
+   */
+  public static enum Field {
+    /**
+     * VertexId
+     */
+    VERTEX_ID(0, "vertexId"),
+
+    /**
+     * Field value
+     */
+    VALUE(1, "value"),
+
+    /**
+     * Edges
+     */
+    EDGES(2, "edges");
+
+    /**
+     * Field index
+     */
+    private int index;
+
+    /**
+     * Field name
+     */
+    private String name;
+
+    /**
+     * Field constructor
+     * @param index of attribute
+     * @param name of attribute
+     */
+    Field(int index, String name) {
+      this.index = index;
+      this.name = name;
+    }
+
+    /**
+     * Gets index
+     * @return int of attribute.
+     */
+    public int getIndex() {
+      return index;
+    }
+
+    /**
+     * Gets name
+     * @return String of name.
+     */
+    public String getName() {
+      return name;
+    }
+
+    /**
+     * Gets name
+     * @return String of name.
+     */
+    public String toString() {
+      return name;
+    }
+  };
+
+  /**
+   * Array containing all fields/
+   */
+  private static final String[] ALL_FIELDS = {
+    "vertexId", "value", "edges"
+  };
+
+  static {
+    PersistentBase.registerFields(GVertex.class, ALL_FIELDS);
+  }
+
+  /**
+   * Vertex Id
+   */
+  private Utf8 vertexId;
+
+  /**
+   * Value
+   */
+  private float value;
+
+  /**
+   * Edges
+   */
+  private Map<Utf8, Utf8> edges;
+
+  /**
+   * Default constructor
+   */
+  public GVertex() {
+    this(new StateManagerImpl());
+  }
+
+  /**
+   * Constructor
+   * @param stateManager from which the object will be created.
+   */
+  public GVertex(StateManager stateManager) {
+    super(stateManager);
+    edges = new StatefulHashMap<Utf8, Utf8>();
+  }
+
+  /**
+   * Creates a new instance
+   * @param stateManager from which the object will be created.
+   * @return GVertex created
+   */
+  public GVertex newInstance(StateManager stateManager) {
+    return new GVertex(stateManager);
+  }
+
+  /**
+   * Gets the object schema
+   * @return Schema of the object.
+   */
+  public Schema getSchema() {
+    return OBJ_SCHEMA;
+  }
+
+  /**
+   * Gets field
+   * @param fieldIndex index of field to be used.
+   * @return Object from an index.
+   */
+  public Object get(int fieldIndex) {
+    switch (fieldIndex) {
+    case 0:
+      return vertexId;
+    case 1:
+      return value;
+    case 2:
+      return edges;
+    default:
+      throw new AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Puts a value into a field.
+   * @param fieldIndex index of field used.
+   * @param fieldValue value of field used.
+   */
+  @SuppressWarnings(value = "unchecked")
+  public void put(int fieldIndex, Object fieldValue) {
+    if (isFieldEqual(fieldIndex, fieldValue)) {
+      return;
+    }
+    getStateManager().setDirty(this, fieldIndex);
+    switch (fieldIndex) {
+    case 0:
+      vertexId = (Utf8) fieldValue; break;
+    case 1:
+      value = (Float) fieldValue; break;
+    case 2:
+      edges = (Map<Utf8, Utf8>) fieldValue; break;
+    default:
+      throw new AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets vertexId
+   * @return Utf8 vertexId
+   */
+  public Utf8 getVertexId() {
+    return (Utf8) get(0);
+  }
+
+  /**
+   * Sets vertexId
+   * @param value vertexId
+   */
+  public void setVertexId(Utf8 value) {
+    put(0, value);
+  }
+
+  /**
+   * Gets value
+   * @return String of value.
+   */
+  public float getValue() {
+    return (Float) get(1);
+  }
+
+  /**
+   * Sets value
+   * @param value .
+   */
+  public void setValue(float value) {
+    put(1, value);
+  }
+
+  /**
+   * Get edges.
+   * @return Map of edges.
+   */
+  public Map<Utf8, Utf8> getEdges() {
+    return (Map<Utf8, Utf8>) get(2);
+  }
+
+  /**
+   * Gets value from edge.
+   * @param key Edge key.
+   * @return Utf8 containing the value of edge.
+   */
+  public Utf8 getFromEdges(Utf8 key) {
+    if (edges == null) { return null; }
+    return edges.get(key);
+  }
+
+  /**
+   * Puts a new edge.
+   * @param key of new edge.
+   * @param value of new edge.
+   */
+  public void putToEdges(Utf8 key, Utf8 value) {
+    getStateManager().setDirty(this, 2);
+    edges.put(key, value);
+  }
+
+  /**
+   * Remove from edges
+   * @param key of edge to be deleted.
+   * @return Utf8 containing value of deleted key.
+   */
+  public Utf8 removeFromEdges(Utf8 key) {
+    if (edges == null) { return null; }
+    getStateManager().setDirty(this, 2);
+    return edges.remove(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertexResult.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertexResult.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertexResult.java
new file mode 100644
index 0000000..2c1952d
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GVertexResult.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.gora.generated;
+
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+
+/**
+ * Example class for defining a Giraph-Vertex.
+ */
+@SuppressWarnings("all")
+public class GVertexResult extends PersistentBase {
+  /**
+   * Schema used for the class.
+   */
+  public static final Schema OBJ_SCHEMA = Schema.parse(
+      "{\"type\":\"record\",\"name\":\"Vertex\"," +
+      "\"namespace\":\"org.apache.giraph.gora.generated\"," +
+      "\"fields\":[{\"name\":\"vertexId\",\"type\":\"string\"}," +
+      "{\"name\":\"value\",\"type\":\"float\"},{\"name\":\"edges\"," +
+      "\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}");
+
+  /**
+   * Field enum
+   */
+  public static enum Field {
+    /**
+     * VertexId
+     */
+    VERTEX_ID(0, "vertexId"),
+
+    /**
+     * Field value
+     */
+    VALUE(1, "value"),
+
+    /**
+     * Edges
+     */
+    EDGES(2, "edges");
+
+    /**
+     * Field index
+     */
+    private int index;
+
+    /**
+     * Field name
+     */
+    private String name;
+
+    /**
+     * Field constructor
+     * @param index of attribute
+     * @param name of attribute
+     */
+    Field(int index, String name) {
+      this.index = index;
+      this.name = name;
+    }
+
+    /**
+     * Gets index
+     * @return int of attribute.
+     */
+    public int getIndex() {
+      return index;
+    }
+
+    /**
+     * Gets name
+     * @return String of name.
+     */
+    public String getName() {
+      return name;
+    }
+
+    /**
+     * Gets name
+     * @return String of name.
+     */
+    public String toString() {
+      return name;
+    }
+  };
+
+  /**
+   * Array containing all fields/
+   */
+  private static final String[] ALL_FIELDS = {
+    "vertexId", "value", "edges"
+  };
+
+  static {
+    PersistentBase.registerFields(GVertexResult.class, ALL_FIELDS);
+  }
+
+  /**
+   * Vertex Id
+   */
+  private Utf8 vertexId;
+
+  /**
+   * Value
+   */
+  private float value;
+
+  /**
+   * Edges
+   */
+  private Map<Utf8, Utf8> edges;
+
+  /**
+   * Default constructor
+   */
+  public GVertexResult() {
+    this(new StateManagerImpl());
+  }
+
+  /**
+   * Constructor
+   * @param stateManager from which the object will be created.
+   */
+  public GVertexResult(StateManager stateManager) {
+    super(stateManager);
+    edges = new StatefulHashMap<Utf8, Utf8>();
+  }
+
+  /**
+   * Creates a new instance
+   * @param stateManager from which the object will be created.
+   * @return GVertex created
+   */
+  public GVertexResult newInstance(StateManager stateManager) {
+    return new GVertexResult(stateManager);
+  }
+
+  /**
+   * Gets the object schema
+   * @return Schema of the object.
+   */
+  public Schema getSchema() {
+    return OBJ_SCHEMA;
+  }
+
+  /**
+   * Gets field
+   * @param fieldIndex index of field to be used.
+   * @return Object from an index.
+   */
+  public Object get(int fieldIndex) {
+    switch (fieldIndex) {
+    case 0:
+      return vertexId;
+    case 1:
+      return value;
+    case 2:
+      return edges;
+    default:
+      throw new AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Puts a value into a field.
+   * @param fieldIndex index of field used.
+   * @param fieldValue value of field used.
+   */
+  @SuppressWarnings(value = "unchecked")
+  public void put(int fieldIndex, Object fieldValue) {
+    if (isFieldEqual(fieldIndex, fieldValue)) {
+      return;
+    }
+    getStateManager().setDirty(this, fieldIndex);
+    switch (fieldIndex) {
+    case 0:
+      vertexId = (Utf8) fieldValue; break;
+    case 1:
+      value = (Float) fieldValue; break;
+    case 2:
+      edges = (Map<Utf8, Utf8>) fieldValue; break;
+    default:
+      throw new AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets vertexId
+   * @return Utf8 vertexId
+   */
+  public Utf8 getVertexId() {
+    return (Utf8) get(0);
+  }
+
+  /**
+   * Sets vertexId
+   * @param value vertexId
+   */
+  public void setVertexId(Utf8 value) {
+    put(0, value);
+  }
+
+  /**
+   * Gets value
+   * @return String of value.
+   */
+  public float getValue() {
+    return (Float) get(1);
+  }
+
+  /**
+   * Sets value
+   * @param value .
+   */
+  public void setValue(float value) {
+    put(1, value);
+  }
+
+  /**
+   * Get edges.
+   * @return Map of edges.
+   */
+  public Map<Utf8, Utf8> getEdges() {
+    return (Map<Utf8, Utf8>) get(2);
+  }
+
+  /**
+   * Gets value from edge.
+   * @param key Edge key.
+   * @return Utf8 containing the value of edge.
+   */
+  public Utf8 getFromEdges(Utf8 key) {
+    if (edges == null) { return null; }
+    return edges.get(key);
+  }
+
+  /**
+   * Puts a new edge.
+   * @param key of new edge.
+   * @param value of new edge.
+   */
+  public void putToEdges(Utf8 key, Utf8 value) {
+    getStateManager().setDirty(this, 2);
+    edges.put(key, value);
+  }
+
+  /**
+   * Remove from edges
+   * @param key of edge to be deleted.
+   * @return Utf8 containing value of deleted key.
+   */
+  public Utf8 removeFromEdges(Utf8 key) {
+    if (edges == null) { return null; }
+    getStateManager().setDirty(this, 2);
+    return edges.remove(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/package-info.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/package-info.java
new file mode 100644
index 0000000..6c218d1
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Gora Input/Output for Giraph
+ */
+package org.apache.giraph.io.gora.generated;

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/package-info.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/package-info.java
new file mode 100644
index 0000000..3a9b488
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Gora Input/Output for Giraph
+ */
+package org.apache.giraph.io.gora;

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/DefaultKeyFactory.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/DefaultKeyFactory.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/DefaultKeyFactory.java
new file mode 100644
index 0000000..8d814f5
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/DefaultKeyFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora.utils;
+
+/**
+ * Example class for defining a way to construct Gora keys.
+ * Uses strings as keys inside Gora.
+ */
+public class DefaultKeyFactory extends KeyFactory {
+
+  /**
+   * Builds a key from a string parameter.
+   * @param keyString the key object as a string.
+   * @return the key object.
+   */
+  @Override
+  public Object buildKey(String keyString) {
+    Object key = null;
+    if (getDataStore() == null) {
+      throw new RuntimeException(
+          "DataStore must be defined before using a key Builder.");
+    } else {
+      key = getDataStore().newKey();
+      // Do specific transformation
+      key = keyString;
+    }
+    return key;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/ExtraGoraInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/ExtraGoraInputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/ExtraGoraInputFormat.java
new file mode 100644
index 0000000..e11f910
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/ExtraGoraInputFormat.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.gora.mapreduce.GoraInputSplit;
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.mapreduce.GoraRecordReader;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * InputFormat to fetch the input from Gora data stores. The
+ * query to fetch the items from the datastore should be prepared and
+ * set via setQuery(Job, Query), before submitting the job.
+ *
+ * Hadoop jobs can be either configured through static
+ *<code>setInput()</code> methods, or from GoraMapper.
+ * @param <K> KeyClass.
+ * @param <T> PersistentClass.
+ */
+public class ExtraGoraInputFormat<K, T extends PersistentBase>
+  extends InputFormat<K, T> {
+
+  /**
+   * String used to map partitioned queries into configuration object.
+   */
+  public static final String QUERY_KEY = "gora.inputformat.query";
+
+  /**
+   * Data store to be used.
+   */
+  private DataStore<K, T> dataStore;
+
+  /**
+   * Query to be performed.
+   */
+  private Query<K, T> query;
+
+  /**
+   * @param split InputSplit to be used.
+   * @param context JobContext to be used.
+   * @return RecordReader record reader used inside Hadoop job.
+   */
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordReader<K, T> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+
+    PartitionQuery<K, T> partitionQuery = (PartitionQuery<K, T>)
+        ((GoraInputSplit) split).getQuery();
+
+    //setInputPath(partitionQuery, context);
+    return new GoraRecordReader<K, T>(partitionQuery, context);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException,
+      InterruptedException {
+    List<PartitionQuery<K, T>> queries =
+        getDataStore().getPartitions(getQuery());
+    List<InputSplit> splits = new ArrayList<InputSplit>(queries.size());
+
+    for (PartitionQuery<K, T> partQuery : queries) {
+      splits.add(new GoraInputSplit(context.getConfiguration(), partQuery));
+    }
+
+    return splits;
+  }
+
+  /**
+   * @return the dataStore
+   */
+  public DataStore<K, T> getDataStore() {
+    return dataStore;
+  }
+
+  /**
+   * @param datStore the dataStore to set
+   */
+  public void setDataStore(DataStore<K, T> datStore) {
+    this.dataStore = datStore;
+  }
+
+  /**
+   * @return the query
+   */
+  public Query<K, T> getQuery() {
+    return query;
+  }
+
+  /**
+   * @param query the query to set
+   */
+  public void setQuery(Query<K, T> query) {
+    this.query = query;
+  }
+
+  /**
+   * Sets the partitioned query inside the job object.
+   * @param conf Configuration used.
+   * @param query Query to be executed.
+   * @param <K> Key class
+   * @param <T> Persistent class
+   * @throws IOException Exception that be might thrown.
+   */
+  public static <K, T extends Persistent> void setQuery(Configuration conf,
+      Query<K, T> query) throws IOException {
+    IOUtils.storeToConf(query, conf, QUERY_KEY);
+  }
+
+  /**
+   * Gets the partitioned query from the conf object passed.
+   * @param conf Configuration object.
+   * @return Query<K, T> passed inside the configuration object
+   * @throws IOException Exception that might be thrown.
+   */
+  public Query<K, T> getQuery(Configuration conf) throws IOException {
+    return IOUtils.loadFromConf(conf, QUERY_KEY);
+  }
+
+  /**
+   * Sets the input parameters for the job
+   * @param job the job to set the properties for
+   * @param query the query to get the inputs from
+   * @param dataStore the datastore as the input
+   * @param reuseObjects whether to reuse objects in serialization
+   * @param <K> Key class
+   * @param <V> Persistent class
+   * @throws IOException
+   */
+  public static <K, V extends Persistent> void setInput(Job job,
+  Query<K, V> query, DataStore<K, V> dataStore, boolean reuseObjects)
+    throws IOException {
+
+    Configuration conf = job.getConfiguration();
+
+    GoraMapReduceUtils.setIOSerializations(conf, reuseObjects);
+
+    job.setInputFormatClass(ExtraGoraInputFormat.class);
+    ExtraGoraInputFormat.setQuery(job.getConfiguration(), query);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/GoraUtils.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/GoraUtils.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/GoraUtils.java
new file mode 100644
index 0000000..c3fc268
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/GoraUtils.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora.utils;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Class used to handle the creation and querying of data stores through Gora.
+ */
+public class GoraUtils {
+
+  /**
+   * Attribute handling the specific class to be created.
+   */
+  private static Class<? extends DataStore> DATASTORECLASS;
+
+  /**
+   * Attribute handling configuration for data stores.
+   */
+  private static Configuration CONF = new Configuration();
+
+  /**
+   * The default constructor is set to be private by default so that the
+   * class is not instantiated.
+   */
+  private GoraUtils() { /* private constructor */ }
+
+  /**
+   * Creates a generic data store using the data store class.
+   * set using the class property
+   * @param <K> key class
+   * @param <T> value class
+   * @param keyClass key class used
+   * @param persistentClass persistent class used
+   * @return created data store
+   * @throws GoraException exception threw
+   */
+  @SuppressWarnings("unchecked")
+  public static <K, T extends Persistent> DataStore<K, T>
+  createDataStore(Class<K> keyClass, Class<T> persistentClass)
+    throws GoraException {
+    DataStoreFactory.createProps();
+    DataStore<K, T> dataStore =
+        DataStoreFactory.createDataStore((Class<? extends DataStore<K, T>>)
+                                          DATASTORECLASS,
+                                          keyClass, persistentClass,
+                                          getConf());
+
+    return dataStore;
+  }
+
+  /**
+   * Creates a specific data store specified by.
+   * @param <K> key class
+   * @param <T> value class
+   * @param dataStoreClass  Defines the type of data store used.
+   * @param keyClass  Handles the key class to be used.
+   * @param persistentClass Handles the persistent class to be used.
+   * @return DataStore created using parameters passed.
+   * @throws GoraException  if an error occurs.
+   */
+  public static <K, T extends Persistent> DataStore<K, T>
+  createSpecificDataStore(Class<? extends DataStore> dataStoreClass,
+      Class<K> keyClass, Class<T> persistentClass) throws GoraException {
+    DATASTORECLASS = dataStoreClass;
+    return createDataStore(keyClass, persistentClass);
+  }
+
+  /**
+   * Performs a range query to Gora datastores
+   * @param <K> key class
+   * @param <T> value class
+   * @param pDataStore  data store being used.
+   * @param pStartKey start key for the range query.
+   * @param pEndKey end key for the range query.
+   * @return Result containing all results for the query.
+   */
+  public static <K, T extends Persistent> Result<K, T>
+  getRequest(DataStore<K, T> pDataStore, K pStartKey, K pEndKey) {
+    Query<K, T> query = getQuery(pDataStore, pStartKey, pEndKey);
+    return getRequest(pDataStore, query);
+  }
+
+  /**
+   * Performs a query to Gora datastores
+   * @param pDataStore data store being used.
+   * @param query query executed over data stores.
+   * @param <K> key class
+   * @param <T> value class
+   * @return Result containing all results for the query.
+   */
+  public static <K, T extends Persistent> Result<K, T>
+  getRequest(DataStore<K, T> pDataStore, Query<K, T> query) {
+    return pDataStore.execute(query);
+  }
+
+  /**
+   * Performs a range query to Gora datastores
+   * @param <K> key class
+   * @param <T> value class
+   * @param pDataStore  data store being used.
+   * @param pStartKey start key for the range query.
+   * @return  Result containing all results for the query.
+   */
+  public static <K, T extends Persistent> Result<K, T>
+  getRequest(DataStore<K, T> pDataStore, K pStartKey) {
+    return getRequest(pDataStore, pStartKey, null);
+  }
+
+  /**
+   * Gets a query object to be used as a range query.
+   * @param pDataStore data store used.
+   * @param pStartKey range start key.
+   * @param pEndKey range end key.
+   * @param <K> key class
+   * @param <T> value class
+   * @return range query object.
+   */
+  public static <K, T extends Persistent> Query<K, T>
+  getQuery(DataStore<K, T> pDataStore, K pStartKey, K pEndKey) {
+    Query<K, T> query = pDataStore.newQuery();
+    query.setStartKey(pStartKey);
+    query.setEndKey(pEndKey);
+    return query;
+  }
+
+  /**
+   * Gets a query object to be used as a simple get.
+   * @param pDataStore data store used.
+   * @param pStartKey range start key.
+   * @param <K> key class
+   * @param <T> value class
+   * @return query object.
+   */
+  public static <K, T extends Persistent> Query<K, T>
+  getQuery(DataStore<K, T> pDataStore, K pStartKey) {
+    Query<K, T> query = pDataStore.newQuery();
+    query.setStartKey(pStartKey);
+    query.setEndKey(null);
+    return query;
+  }
+
+  /**
+   * Gets a query object to be used as a simple get.
+   * @param pDataStore data store used.
+   * @param <K> key class
+   * @param <T> value class
+   * @return query object.
+   */
+  public static <K, T extends Persistent> Query<K, T>
+  getQuery(DataStore<K, T> pDataStore) {
+    Query<K, T> query = pDataStore.newQuery();
+    query.setStartKey(null);
+    query.setEndKey(null);
+    return query;
+  }
+
+  /**
+   * Gets the configuration object.
+   * @return the configuration object.
+   */
+  public static Configuration getConf() {
+    return CONF;
+  }
+
+  /**
+   * Sets the configuration object.
+   * @param conf to be set as the configuration object.
+   */
+  public static void setConf(Configuration conf) {
+    CONF = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9ded6c37/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/KeyFactory.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/KeyFactory.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/KeyFactory.java
new file mode 100644
index 0000000..c8dd4a7
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/utils/KeyFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora.utils;
+
+import org.apache.gora.store.DataStore;
+
+/**
+ * Class used to convert strings into more complex keys.
+ */
+public abstract class KeyFactory {
+
+  /**
+   * Data store used for creating a new key.
+   */
+  private DataStore dataStore;
+
+  /**
+   * Builds a key from a string parameter.
+   * @param keyString the key object as a string.
+   * @return the key object.
+   */
+  public abstract Object buildKey(String keyString);
+
+  /**
+   * Gets the data store used in this factory.
+   * @return the dataStore
+   */
+  public DataStore getDataStore() {
+    return dataStore;
+  }
+
+  /**
+   * Sets the data store used in this factory.
+   * @param dataStore the dataStore to set
+   */
+  public void setDataStore(DataStore dataStore) {
+    this.dataStore = dataStore;
+  }
+}


Mime
View raw message