giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [30/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:32 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
new file mode 100644
index 0000000..04d2282
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.partition.BasicPartitionOwner;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/** simplify mocking for unit testing vertices */
+public class MockUtils {
+
+    private MockUtils() {
+    }
+
+    /**
+     * mocks and holds  "environment objects" that are injected into a vertex
+     *
+     * @param <I> vertex id
+     * @param <V> vertex data
+     * @param <E> edge data
+     * @param <M> message data
+     */
+    public static class MockedEnvironment<I extends WritableComparable,
+            V extends Writable, E extends Writable, M extends Writable> {
+
+        private final GraphState<I, V, E, M> graphState;
+        private final Mapper.Context context;
+        private final Configuration conf;
+        private final WorkerClientRequestProcessor workerClientRequestProcessor;
+
+        public MockedEnvironment() {
+            graphState = Mockito.mock(GraphState.class);
+            context = Mockito.mock(Mapper.Context.class);
+            conf = Mockito.mock(Configuration.class);
+            workerClientRequestProcessor =
+                Mockito.mock(WorkerClientRequestProcessor.class);
+        }
+
+        /** the injected graph state */
+        public GraphState getGraphState() {
+            return graphState;
+        }
+
+        /** the injected mapper context  */
+        public Mapper.Context getContext() {
+            return context;
+        }
+
+        /** the injected hadoop configuration */
+        public Configuration getConfiguration() {
+            return conf;
+        }
+
+        /** the injected worker communications */
+        public WorkerClientRequestProcessor getWorkerClientRequestProcessor() {
+            return workerClientRequestProcessor;
+        }
+
+        /** assert that the test vertex message has been sent to a particular vertex */
+        public void verifyMessageSent(I targetVertexId, M message) {
+            Mockito.verify(workerClientRequestProcessor).sendMessageRequest
+                (targetVertexId, message);
+        }
+
+        /** assert that the test vertex has sent no message to a particular vertex */
+        public void verifyNoMessageSent() {
+            Mockito.verifyZeroInteractions(workerClientRequestProcessor);
+        }
+    }
+
+    /**
+     * prepare a vertex for use in a unit test by setting its internal state and injecting mocked
+     * dependencies,
+     *
+     * @param vertex
+     * @param superstep the superstep to emulate
+     * @param vertexId initial vertex id
+     * @param vertexValue initial vertex value
+     * @param isHalted initial halted state of the vertex
+     * @param <I> vertex id
+     * @param <V> vertex data
+     * @param <E> edge data
+     * @param <M> message data
+     * @return
+     * @throws Exception
+     */
+    public static <I extends WritableComparable, V extends Writable,
+            E extends Writable, M extends Writable>
+            MockedEnvironment<I, V, E, M> prepareVertex(
+            Vertex<I, V, E, M> vertex, long superstep, I vertexId,
+            V vertexValue, boolean isHalted) throws Exception {
+
+        MockedEnvironment<I, V, E, M>  env =
+                new MockedEnvironment<I, V, E, M>();
+
+        Mockito.when(env.getGraphState().getSuperstep()).thenReturn(superstep);
+        Mockito.when(env.getGraphState().getContext())
+                .thenReturn(env.getContext());
+        Mockito.when(env.getContext().getConfiguration())
+                .thenReturn(env.getConfiguration());
+        Mockito.when(env.getGraphState().getWorkerClientRequestProcessor())
+                .thenReturn(env.getWorkerClientRequestProcessor());
+
+        ReflectionUtils.setField(vertex, "id", vertexId);
+        ReflectionUtils.setField(vertex, "value", vertexValue);
+        ReflectionUtils.setField(vertex, "graphState", env.getGraphState());
+        ReflectionUtils.setField(vertex, "halt", isHalted);
+
+        return env;
+    }
+
+  public static CentralizedServiceWorker<IntWritable, IntWritable,
+      IntWritable, IntWritable> mockServiceGetVertexPartitionOwner(final int
+      numOfPartitions) {
+    CentralizedServiceWorker<IntWritable, IntWritable, IntWritable,
+        IntWritable> service = Mockito.mock(CentralizedServiceWorker.class);
+    Answer<PartitionOwner> answer = new Answer<PartitionOwner>() {
+      @Override
+      public PartitionOwner answer(InvocationOnMock invocation) throws
+          Throwable {
+        IntWritable vertexId = (IntWritable) invocation.getArguments()[0];
+        return new BasicPartitionOwner(vertexId.get() % numOfPartitions, null);
+      }
+    };
+    Mockito.when(service.getVertexPartitionOwner(
+        Mockito.any(IntWritable.class))).thenAnswer(answer);
+    return service;
+  }
+
+  public static ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+  createNewServerData(ImmutableClassesGiraphConfiguration conf,
+      Mapper.Context context) {
+    return new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+        conf,
+        ByteArrayMessagesPerVertexStore.newFactory(
+            MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
+        context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/test/java/zk/TestZooKeeperManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/zk/TestZooKeeperManager.java b/giraph-core/src/test/java/zk/TestZooKeeperManager.java
new file mode 100644
index 0000000..41c94a8
--- /dev/null
+++ b/giraph-core/src/test/java/zk/TestZooKeeperManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zk;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.zk.ZooKeeperManager;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestZooKeeperManager {
+  @Test
+  public void testGetBasePath() {
+    Configuration conf = new Configuration();
+
+    // Default is empty, everything goes in root znode
+    assertEquals("Default value for base path should be empty",
+        "", ZooKeeperManager.getBasePath(conf));
+
+    conf.set(GiraphConstants.BASE_ZNODE_KEY, "/howdy");
+    assertEquals("Base path should reflect value of " +
+        GiraphConstants.BASE_ZNODE_KEY,
+        "/howdy", ZooKeeperManager.getBasePath(conf));
+
+    conf.set(GiraphConstants.BASE_ZNODE_KEY, "no_slash");
+    try {
+      ZooKeeperManager.getBasePath(conf);
+      fail("Should not have allowed path without starting slash");
+    } catch (IllegalArgumentException iae) {
+      assertTrue(iae.getMessage().contains(GiraphConstants.BASE_ZNODE_KEY));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/README
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/README b/giraph-formats-contrib/README
deleted file mode 100644
index 9e1e75a..0000000
--- a/giraph-formats-contrib/README
+++ /dev/null
@@ -1,16 +0,0 @@
-giraph-formats-contrib hosts additional custom input/output formats that contain bulky or heavy-weight dependencies, and
-can't be bundled directly in the main giraph jar.
-
-In order to build, the following prerequisite conditions must be met.
-
-1) You must 'mvn install' the latest giraph maven artificat in your local m2 repo. In future releases this will
-be not be required in favor of a maven central artifact available for download.
-
-2) A build of core giraph jar with the following absolute path must be available:
-    ${giraph.trunk.base}/target/giraph-${project.version}-jar-with-dependencies.jar
-
-Default checkout has ${giraph.trunk.base} set to the parent directory '..'. This is a relative path to the trunk directory
-
-Once these conditions have been met, you will be able to compile and test giraph-formats-contrib.jar
-
-See https://cwiki.apache.org/confluence/display/GIRAPH/Giraph+formats+contrib for usage information.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/pom.xml b/giraph-formats-contrib/pom.xml
deleted file mode 100644
index f8df115..0000000
--- a/giraph-formats-contrib/pom.xml
+++ /dev/null
@@ -1,248 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.giraph</groupId>
-    <artifactId>parent</artifactId>
-    <version>0.2-SNAPSHOT</version>
-  </parent>
-  <artifactId>giraph-formats-contrib</artifactId>
-  <packaging>jar</packaging>
-
-  <name>Apache Giraph Formats</name>
-
-  <properties>
-    <top.dir>${project.basedir}/..</top.dir>
-  </properties>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <version>2.9</version>
-        <configuration>
-          <configLocation>${top.dir}/checkstyle.xml</configLocation>
-          <headerLocation>${top.dir}/license-header.txt</headerLocation>
-          <enableRulesSummary>false</enableRulesSummary>
-          <failOnError>true</failOnError>
-          <includeTestSourceDirectory>false</includeTestSourceDirectory>
-        </configuration>
-        <executions>
-          <execution>
-            <phase>verify</phase>
-            <goals>
-               <goal>check</goal>
-             </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <version>2.6</version>
-        <configuration>
-          <systemProperties>
-            <property>
-              <name>prop.jarLocation</name>
-              <value>${top.dir}/giraph/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value>
-            </property>
-          </systemProperties>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
-        <version>2.5.1</version>
-        <configuration>
-          <xmlOutput>true</xmlOutput>
-          <findbugsXmlOutput>false</findbugsXmlOutput>
-          <excludeFilterFile>${top.dir}/findbugs-exclude.xml</excludeFilterFile>
-        </configuration>
-        <executions>
-          <execution>
-            <phase>verify</phase>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-  <repositories>
-    <!-- This is the main maven repository. Normally we wouldn't need to put
-       it here when it's the only one being used, but since we need to add
-       special repositories to get hcatalog we need to mention this one
-       specifically otherwise it won't be included. -->
-    <repository>
-      <id>central</id>
-      <name>Maven Repository</name>
-      <url>http://repo1.maven.org/maven2</url>
-      <releases>
-        <enabled>true</enabled>
-      </releases>
-    </repository>
-    <!-- This is necessary for hcatalog. -->
-    <repository>
-      <id>apache</id>
-      <name>Apache Repository</name>
-      <url>https://repository.apache.org/content/repositories/snapshots</url>
-      <snapshots>
-        <enabled>true</enabled>
-      </snapshots>
-    </repository>
-    <!-- This is necessary for hive-metastore dependencies for hcatalog. -->
-    <repository>
-      <id>datanucleus</id>
-      <name>datanucleus maven repository</name>
-      <url>http://www.datanucleus.org/downloads/maven2</url>
-      <layout>default</layout>
-      <releases>
-        <enabled>true</enabled>
-        <checksumPolicy>warn</checksumPolicy>
-      </releases>
-    </repository>
-  </repositories>
-
-  <profiles>
-    <profile>
-      <id>hadoop_0.20.203</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-test</artifactId>
-          <version>${hadoop.version}</version>
-          <scope>test</scope>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <profile>
-      <id>hadoop_1.0</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-test</artifactId>
-          <version>${hadoop.version}</version>
-          <scope>test</scope>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <profile>
-      <id>hadoop_non_secure</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-test</artifactId>
-          <version>${hadoop.version}</version>
-          <scope>test</scope>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <profile>
-      <id>hadoop_facebook</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-test</artifactId>
-          <version>${hadoop.version}</version>
-          <scope>system</scope>
-          <systemPath>${lib.dir}/facebook-hadoop-0.20-test.jar</systemPath>
-        </dependency>
-      </dependencies>
-    </profile>
-  </profiles>
-
-  <dependencies>
-    <!-- compile dependencies. sorted lexicographically. -->
-    <dependency>
-      <groupId>commons-cli</groupId>
-      <artifactId>commons-cli</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>commons-io</groupId>
-      <artifactId>commons-io</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.giraph</groupId>
-      <artifactId>giraph</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hcatalog</groupId>
-      <artifactId>hcatalog-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-exec</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-metastore</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.giraph</groupId>
-      <artifactId>giraph</artifactId>
-      <version>0.2-SNAPSHOT</version>
-      <type>test-jar</type>
-    </dependency>
-
-    <!-- provided dependencies. sorted lexicographically. -->
-    <dependency>
-      <groupId>org.apache.accumulo</groupId>
-      <artifactId>accumulo-core</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <!-- test dependencies. sorted lexicographically. -->
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/src/main/assembly/compile.xml
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/src/main/assembly/compile.xml b/giraph-formats-contrib/src/main/assembly/compile.xml
deleted file mode 100644
index d950c2c..0000000
--- a/giraph-formats-contrib/src/main/assembly/compile.xml
+++ /dev/null
@@ -1,40 +0,0 @@
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
-  <id>jar-with-dependencies</id>
-   <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-
-  <dependencySets>
-    <dependencySet>
-      <useProjectArtifact>true</useProjectArtifact>
-      <outputDirectory>/</outputDirectory>
-      <unpackOptions>
-          <excludes>
-              <exclude>META-INF/LICENSE
-              </exclude>
-          </excludes>
-      </unpackOptions>
-      <unpack>false</unpack>
-      <scope>runtime</scope>
-    </dependencySet>
-  </dependencySets>
-</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java b/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
deleted file mode 100644
index 92328b7..0000000
--- a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io.accumulo;
-
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- *  Class which wraps the AccumuloInputFormat. It's designed
- *  as an extension point to VertexInputFormat subclasses who wish
- *  to read from AccumuloTables.
- *
- *  Works with
- *  {@link org.apache.giraph.io.accumulo.AccumuloVertexOutputFormat}
- *
- * @param <I> vertex id type
- * @param <V>  vertex value type
- * @param <E>  edge type
- * @param <M>  message type
- */
-public abstract class AccumuloVertexInputFormat<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        extends VertexInputFormat<I, V, E, M> {
-  /**
-   * delegate input format for all accumulo operations.
-   */
-  protected AccumuloInputFormat accumuloInputFormat =
-      new AccumuloInputFormat();
-
-  /**
-  * Abstract class which provides a template for instantiating vertices
-  * from Accumulo Key/Value pairs.
-  *
-  * @param <I>  vertex id type
-  * @param <V>  vertex value type
-  * @param <E>  edge type
-  * @param <M>  message type
-  */
-  public abstract static class AccumuloVertexReader<
-      I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable>
-      implements VertexReader<I, V, E, M> {
-
-    /** Giraph configuration */
-    private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
-    /**
-     * Used by subclasses to read key/value pairs.
-     */
-    private final RecordReader<Key, Value> reader;
-    /** Context passed to initialize */
-    private TaskAttemptContext context;
-
-    /**
-     * Constructor used to pass Record Reader instance
-     * @param reader  Accumulo record reader
-     */
-    public AccumuloVertexReader(RecordReader<Key, Value> reader) {
-      this.reader = reader;
-    }
-
-    public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
-      return configuration;
-    }
-
-    /**
-     * initialize the reader.
-     *
-     * @param inputSplit Input split to be used for reading vertices.
-     * @param context Context from the task.
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public void initialize(InputSplit inputSplit,
-                           TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      reader.initialize(inputSplit, context);
-      this.context = context;
-      this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
-          context.getConfiguration());
-    }
-
-    /**
-     * close
-     *
-     * @throws IOException
-     */
-    public void close() throws IOException {
-      reader.close();
-    }
-
-    /**
-     * getProgress
-     *
-     * @return progress
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public float getProgress() throws IOException, InterruptedException {
-      return reader.getProgress();
-    }
-
-    /**
-    * Get the result record reader
-    *
-    * @return Record reader to be used for reading.
-    */
-    protected RecordReader<Key, Value> getRecordReader() {
-      return reader;
-    }
-
-    /**
-     * getContext
-     *
-     * @return Context passed to initialize.
-     */
-    protected TaskAttemptContext getContext() {
-      return context;
-    }
-
-  }
-
-  /**
-   * getSplits
-   *
-   * @param context Context of the job
-   * @param numWorkers Number of workers used for this job
-   * @return  tablet splits
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  @Override
-  public List<InputSplit> getSplits(
-    JobContext context, int numWorkers)
-    throws IOException, InterruptedException {
-    List<InputSplit> splits = null;
-    try {
-      splits = accumuloInputFormat.getSplits(context);
-    } catch (IOException e) {
-      if (e.getMessage().contains("Input info has not been set")) {
-        throw new IOException(e.getMessage() +
-                " Make sure you initialized" +
-                " AccumuloInputFormat static setters " +
-                "before passing the config to GiraphJob.");
-      }
-    }
-    return splits;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexOutputFormat.java b/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexOutputFormat.java
deleted file mode 100644
index 24e9061..0000000
--- a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexOutputFormat.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io.accumulo;
-
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.giraph.graph.VertexOutputFormat;
-import org.apache.giraph.graph.VertexWriter;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-/**
- *
- *  Class which wraps the AccumuloOutputFormat. It's designed
- *  as an extension point to VertexOutputFormat subclasses who wish
- *  to write vertices back to an Accumulo table.
- *
- *  Works with
- *  {@link org.apache.giraph.io.accumulo.AccumuloVertexInputFormat}
- *
- *
- * @param <I> vertex id type
- * @param <V>  vertex value type
- * @param <E>  edge type
- */
-public abstract class AccumuloVertexOutputFormat<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable>
-        extends VertexOutputFormat<I, V, E> {
-
-
-  /**
-   * Output table parameter
-   */
-  public static final String OUTPUT_TABLE = "OUTPUT_TABLE";
-
-  /**
-   * Accumulo delegate for table output
-   */
-  protected AccumuloOutputFormat accumuloOutputFormat =
-          new AccumuloOutputFormat();
-
-  /**
-   *
-   * Main abstraction point for vertex writers to persist back
-   * to Accumulo tables.
-   *
-   * @param <I> vertex id type
-   * @param <V> vertex value type
-   * @param <E>  edge type
-   */
-  public abstract static class AccumuloVertexWriter<
-          I extends WritableComparable,
-          V extends Writable,
-          E extends Writable>
-          implements VertexWriter<I, V, E> {
-
-    /**
-     * task attempt context.
-     */
-    private TaskAttemptContext context;
-
-    /**
-     * Accumulo record writer
-     */
-    private RecordWriter<Text, Mutation> recordWriter;
-
-    /**
-     * Constructor for use with subclasses
-     *
-     * @param recordWriter accumulo record writer
-     */
-    public AccumuloVertexWriter(RecordWriter<Text, Mutation> recordWriter) {
-      this.recordWriter = recordWriter;
-    }
-
-    /**
-     * initialize
-     *
-     * @param context Context used to write the vertices.
-     * @throws IOException
-     */
-    public void initialize(TaskAttemptContext context) throws IOException {
-      this.context = context;
-    }
-
-    /**
-     *  close
-     *
-     * @param context the context of the task
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public void close(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      recordWriter.close(context);
-    }
-
-    /**
-     * Get the table record writer;
-     *
-     * @return Record writer to be used for writing.
-     */
-    public RecordWriter<Text, Mutation> getRecordWriter() {
-      return recordWriter;
-    }
-
-    /**
-     * Get the context.
-     *
-     * @return Context passed to initialize.
-     */
-    public TaskAttemptContext getContext() {
-      return context;
-    }
-
-  }
-  /**
-   *
-   * checkOutputSpecs
-   *
-   * @param context information about the job
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  @Override
-  public void checkOutputSpecs(JobContext context)
-    throws IOException, InterruptedException {
-    try {
-      accumuloOutputFormat.checkOutputSpecs(context);
-    } catch (IOException e) {
-      if (e.getMessage().contains("Output info has not been set")) {
-        throw new IOException(e.getMessage() + " Make sure you initialized" +
-                " AccumuloOutputFormat static setters " +
-                "before passing the config to GiraphJob.");
-      }
-    }
-  }
-
-  /**
-   * getOutputCommitter
-   *
-   * @param context the task context
-   * @return OutputCommitter
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-    throws IOException, InterruptedException {
-    return accumuloOutputFormat.getOutputCommitter(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/package-info.java b/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/package-info.java
deleted file mode 100644
index c5560a7..0000000
--- a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of all generic utility classes.
- */
-package org.apache.giraph.io.accumulo;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java b/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
deleted file mode 100644
index cf87035..0000000
--- a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.io.hbase;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- *
- * Base class that wraps an HBase TableInputFormat and underlying Scan object
- * to help instantiate vertices from an HBase table. All
- * the static TableInputFormat properties necessary to configure
- * an HBase job are available.
- *
- * For example, setting conf.set(TableInputFormat.INPUT_TABLE, "in_table");
- * from the job setup routine will properly delegate to the
- * TableInputFormat instance. The Configurable interface prevents specific
- * wrapper methods from having to be called.
- *
- * Works with {@link HBaseVertexOutputFormat}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class HBaseVertexInputFormat<
-    I extends WritableComparable,
-    V extends Writable,
-    E extends Writable,
-    M extends Writable>
-    extends VertexInputFormat<I, V, E, M>  {
-
-
-   /**
-   * delegate HBase table input format
-   */
-  protected static final TableInputFormat BASE_FORMAT =
-          new TableInputFormat();
-  /**
-  * logger
-  */
-  private static final Logger LOG =
-          Logger.getLogger(HBaseVertexInputFormat.class);
-
-  /**
-   * Takes an instance of RecordReader that supports
-   * HBase row-key, result records.  Subclasses can focus on
-   * vertex instantiation details without worrying about connection
-   * semantics. Subclasses are expected to implement nextVertex() and
-   * getCurrentVertex()
-   *
-   *
-   *
-   * @param <I> Vertex index value
-   * @param <V> Vertex value
-   * @param <E> Edge value
-   * @param <M> Message data
-   */
-  public abstract static class HBaseVertexReader<
-          I extends WritableComparable,
-          V extends Writable,
-          E extends Writable, M extends Writable>
-          implements VertexReader<I, V, E, M> {
-    /** Giraph configuration */
-    private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
-    /** Reader instance */
-    private final RecordReader<ImmutableBytesWritable, Result> reader;
-    /** Context passed to initialize */
-    private TaskAttemptContext context;
-
-    /**
-     * Sets the base TableInputFormat and creates a record reader.
-     *
-     * @param split InputSplit
-     * @param context Context
-     * @throws IOException
-     */
-    public HBaseVertexReader(InputSplit split, TaskAttemptContext context)
-      throws IOException {
-      BASE_FORMAT.setConf(context.getConfiguration());
-      this.reader = BASE_FORMAT.createRecordReader(split, context);
-    }
-
-    public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
-      return configuration;
-    }
-
-    /**
-     * initialize
-     *
-     * @param inputSplit Input split to be used for reading vertices.
-     * @param context Context from the task.
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public void initialize(InputSplit inputSplit,
-                           TaskAttemptContext context)
-      throws IOException,
-      InterruptedException {
-      reader.initialize(inputSplit, context);
-      this.context = context;
-      this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
-          context.getConfiguration());
-    }
-
-    /**
-     * close
-     * @throws IOException
-     */
-    public void close() throws IOException {
-      reader.close();
-    }
-
-    /**
-     * getProgress
-     *
-     * @return progress
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public float getProgress() throws
-      IOException, InterruptedException {
-      return reader.getProgress();
-    }
-
-    /**
-     * getRecordReader
-     *
-     * @return Record reader to be used for reading.
-     */
-    protected RecordReader<ImmutableBytesWritable,
-      Result> getRecordReader() {
-      return reader;
-    }
-
-   /**
-    * getContext
-    *
-    * @return Context passed to initialize.
-    */
-    protected TaskAttemptContext getContext() {
-      return context;
-    }
-
-  }
-
-  /**
-   * getSplits
-   *
-   * @param context Context of the job
-   * @param numWorkers Number of workers used for this job
-   * @return HBase region splits
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public List<InputSplit> getSplits(
-  JobContext context, int numWorkers)
-    throws IOException, InterruptedException {
-    BASE_FORMAT.setConf(context.getConfiguration());
-    return BASE_FORMAT.getSplits(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexOutputFormat.java b/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexOutputFormat.java
deleted file mode 100644
index 2a27b63..0000000
--- a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexOutputFormat.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hbase;
-
-import org.apache.giraph.graph.VertexOutputFormat;
-import org.apache.giraph.graph.VertexWriter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- *
- * Base class for writing Vertex mutations back to specific
- * rows in an HBase table. This class wraps an instance of TableOutputFormat
- * for easy configuration with the existing properties.
- *
- * Setting conf.set(TableOutputFormat.OUTPUT_TABLE, "out_table");
- * will properly delegate to the TableOutputFormat instance contained
- * in this class. The Configurable interface prevents specific
- * wrapper methods from having to be called.
- *
- * Works with {@link HBaseVertexInputFormat}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class HBaseVertexOutputFormat<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable>
-        extends VertexOutputFormat
-                <I, V, E> {
-
-  /**
-   * delegate output format that writes to HBase
-   */
-  protected static final TableOutputFormat<ImmutableBytesWritable>
-  BASE_FORMAT = new TableOutputFormat<ImmutableBytesWritable>();
-
-  /**
-   *   Constructor
-   *
-   *   Simple class which takes an instance of RecordWriter
-   *   over Writable objects. Subclasses are
-   *   expected to implement writeVertex()
-   *
-   * @param <I> Vertex index value
-   * @param <V> Vertex value
-   * @param <E> Edge value
-   */
-  public abstract static class HBaseVertexWriter<
-          I extends WritableComparable,
-          V extends Writable,
-          E extends Writable>
-          implements VertexWriter<I, V, E> {
-
-    /**
-     * context
-     */
-    private TaskAttemptContext context;
-
-    /**
-     * record writer instance
-     */
-    private RecordWriter<ImmutableBytesWritable,
-              Writable> recordWriter;
-
-   /**
-    * Sets up base table output format and creates a record writer.
-    * @param context task attempt context
-    */
-    public HBaseVertexWriter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      BASE_FORMAT.setConf(context.getConfiguration());
-      this.recordWriter = BASE_FORMAT.getRecordWriter(context);
-    }
-
-    /**
-     * initialize
-     *
-     * @param context Context used to write the vertices.
-     * @throws IOException
-     */
-    public void initialize(TaskAttemptContext context)
-      throws IOException {
-      this.context = context;
-    }
-
-    /**
-     * close
-     *
-     * @param context the context of the task
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public void close(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      recordWriter.close(context);
-    }
-
-    /**
-     * Get the table record writer;
-     *
-     * @return Record writer to be used for writing.
-     */
-    public RecordWriter<ImmutableBytesWritable,
-            Writable> getRecordWriter() {
-      return recordWriter;
-    }
-
-    /**
-     * getContext
-     *
-     * @return Context passed to initialize.
-     */
-    public TaskAttemptContext getContext() {
-      return context;
-    }
-
-  }
-
-  /**
-   * checkOutputSpecs
-   *
-   * @param context information about the job
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public void checkOutputSpecs(JobContext context)
-    throws IOException, InterruptedException {
-    BASE_FORMAT.checkOutputSpecs(context);
-  }
-
-  /**
-   * getOutputCommitter
-   *
-   * @param context the task context
-   * @return  OutputCommitter ouputCommitter
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public OutputCommitter getOutputCommitter(
-    TaskAttemptContext context)
-    throws IOException, InterruptedException {
-    BASE_FORMAT.setConf(context.getConfiguration());
-    return BASE_FORMAT.getOutputCommitter(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/package-info.java b/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/package-info.java
deleted file mode 100644
index 36b546d..0000000
--- a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of all generic utility classes.
- */
-package org.apache.giraph.io.hbase;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java b/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
deleted file mode 100644
index 2e91cba..0000000
--- a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/GiraphHCatInputFormat.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
-import org.apache.hcatalog.mapreduce.HCatSplit;
-import org.apache.hcatalog.mapreduce.HCatStorageHandler;
-import org.apache.hcatalog.mapreduce.HCatUtils;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
-import org.apache.hcatalog.mapreduce.PartInfo;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Provides functionality similar to
- * {@link org.apache.hcatalog.mapreduce.HCatInputFormat},
- * but allows for different data sources (vertex and edge data).
- */
-public class GiraphHCatInputFormat extends HCatBaseInputFormat {
-  /** Vertex input job info for HCatalog. */
-  public static final String VERTEX_INPUT_JOB_INFO =
-      "giraph.hcat.vertex.input.job.info";
-  /** Edge input job info for HCatalog. */
-  public static final String EDGE_INPUT_JOB_INFO =
-      "giraph.hcat.edge.input.job.info";
-
-  /**
-   * Set vertex {@link InputJobInfo}.
-   *
-   * @param job The job
-   * @param inputJobInfo Vertex input job info
-   * @throws IOException
-   */
-  public static void setVertexInput(Job job,
-                                    InputJobInfo inputJobInfo)
-    throws IOException {
-    InputJobInfo vertexInputJobInfo = InputJobInfo.create(
-        inputJobInfo.getDatabaseName(),
-        inputJobInfo.getTableName(),
-        inputJobInfo.getFilter());
-    vertexInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
-    Configuration conf = job.getConfiguration();
-    conf.set(VERTEX_INPUT_JOB_INFO, HCatUtil.serialize(
-        HCatUtils.getInputJobInfo(conf, vertexInputJobInfo)));
-  }
-
-  /**
-   * Set edge {@link InputJobInfo}.
-   *
-   * @param job The job
-   * @param inputJobInfo Edge input job info
-   * @throws IOException
-   */
-  public static void setEdgeInput(Job job,
-                                  InputJobInfo inputJobInfo)
-    throws IOException {
-    InputJobInfo edgeInputJobInfo = InputJobInfo.create(
-        inputJobInfo.getDatabaseName(),
-        inputJobInfo.getTableName(),
-        inputJobInfo.getFilter());
-    edgeInputJobInfo.getProperties().putAll(inputJobInfo.getProperties());
-    Configuration conf = job.getConfiguration();
-    conf.set(EDGE_INPUT_JOB_INFO, HCatUtil.serialize(
-        HCatUtils.getInputJobInfo(conf, edgeInputJobInfo)));
-  }
-
-  /**
-   * Get table schema from input job info.
-   *
-   * @param inputJobInfo Input job info
-   * @return Input table schema
-   * @throws IOException
-   */
-  private static HCatSchema getTableSchema(InputJobInfo inputJobInfo)
-    throws IOException {
-    HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-    for (HCatFieldSchema field :
-        inputJobInfo.getTableInfo().getDataColumns().getFields()) {
-      allCols.append(field);
-    }
-    for (HCatFieldSchema field :
-        inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
-      allCols.append(field);
-    }
-    return allCols;
-  }
-
-  /**
-   * Get vertex input table schema.
-   *
-   * @param conf Job configuration
-   * @return Vertex input table schema
-   * @throws IOException
-   */
-  public static HCatSchema getVertexTableSchema(Configuration conf)
-    throws IOException {
-    return getTableSchema(getVertexJobInfo(conf));
-  }
-
-  /**
-   * Get edge input table schema.
-   *
-   * @param conf Job configuration
-   * @return Edge input table schema
-   * @throws IOException
-   */
-  public static HCatSchema getEdgeTableSchema(Configuration conf)
-    throws IOException {
-    return getTableSchema(getEdgeJobInfo(conf));
-  }
-
-  /**
-   * Set input path for job.
-   *
-   * @param jobConf Job configuration
-   * @param location Location of input files
-   * @throws IOException
-   */
-  private void setInputPath(JobConf jobConf, String location)
-    throws IOException {
-    int length = location.length();
-    int curlyOpen = 0;
-    int pathStart = 0;
-    boolean globPattern = false;
-    List<String> pathStrings = new ArrayList<String>();
-
-    for (int i = 0; i < length; i++) {
-      char ch = location.charAt(i);
-      switch (ch) {
-      case '{':
-        curlyOpen++;
-        if (!globPattern) {
-          globPattern = true;
-        }
-        break;
-      case '}':
-        curlyOpen--;
-        if (curlyOpen == 0 && globPattern) {
-          globPattern = false;
-        }
-        break;
-      case ',':
-        if (!globPattern) {
-          pathStrings.add(location.substring(pathStart, i));
-          pathStart = i + 1;
-        }
-        break;
-      default:
-      }
-    }
-    pathStrings.add(location.substring(pathStart, length));
-
-    Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
-
-    FileSystem fs = FileSystem.get(jobConf);
-    Path path = paths[0].makeQualified(fs);
-    StringBuilder str = new StringBuilder(StringUtils.escapeString(
-        path.toString()));
-    for (int i = 1; i < paths.length; i++) {
-      str.append(StringUtils.COMMA_STR);
-      path = paths[i].makeQualified(fs);
-      str.append(StringUtils.escapeString(path.toString()));
-    }
-
-    jobConf.set("mapred.input.dir", str.toString());
-  }
-
-  /**
-   * Get input splits for job.
-   *
-   * @param jobContext Job context
-   * @param inputJobInfo Input job info
-   * @return MapReduce setting for file input directory
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private List<InputSplit> getSplits(JobContext jobContext,
-                                     InputJobInfo inputJobInfo)
-    throws IOException, InterruptedException {
-    Configuration conf = jobContext.getConfiguration();
-
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-    List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
-    if (partitionInfoList == null) {
-      //No partitions match the specified partition filter
-      return splits;
-    }
-
-    HCatStorageHandler storageHandler;
-    JobConf jobConf;
-    //For each matching partition, call getSplits on the underlying InputFormat
-    for (PartInfo partitionInfo : partitionInfoList) {
-      jobConf = HCatUtil.getJobConfFromContext(jobContext);
-      setInputPath(jobConf, partitionInfo.getLocation());
-      Map<String, String> jobProperties = partitionInfo.getJobProperties();
-
-      HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-      for (HCatFieldSchema field :
-          inputJobInfo.getTableInfo().getDataColumns().getFields()) {
-        allCols.append(field);
-      }
-      for (HCatFieldSchema field :
-          inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
-        allCols.append(field);
-      }
-
-      HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
-
-      storageHandler = HCatUtil.getStorageHandler(
-          jobConf, partitionInfo);
-
-      //Get the input format
-      Class inputFormatClass = storageHandler.getInputFormatClass();
-      org.apache.hadoop.mapred.InputFormat inputFormat =
-          getMapRedInputFormat(jobConf, inputFormatClass);
-
-      //Call getSplit on the InputFormat, create an HCatSplit for each
-      //underlying split. When the desired number of input splits is missing,
-      //use a default number (denoted by zero).
-      //TODO: Currently each partition is split independently into
-      //a desired number. However, we want the union of all partitions to be
-      //split into a desired number while maintaining balanced sizes of input
-      //splits.
-      int desiredNumSplits =
-          conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
-      org.apache.hadoop.mapred.InputSplit[] baseSplits =
-          inputFormat.getSplits(jobConf, desiredNumSplits);
-
-      for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
-        splits.add(new HCatSplit(partitionInfo, split, allCols));
-      }
-    }
-
-    return splits;
-  }
-
-  /**
-   * Get vertex {@link InputJobInfo}.
-   *
-   * @param conf Configuration
-   * @return Vertex input job info
-   * @throws IOException
-   */
-  private static InputJobInfo getVertexJobInfo(Configuration conf)
-    throws IOException {
-    String jobString = conf.get(VERTEX_INPUT_JOB_INFO);
-    if (jobString == null) {
-      throw new IOException("Vertex job information not found in JobContext." +
-          " GiraphHCatInputFormat.setVertexInput() not called?");
-    }
-    return (InputJobInfo) HCatUtil.deserialize(jobString);
-  }
-
-  /**
-   * Get edge {@link InputJobInfo}.
-   *
-   * @param conf Configuration
-   * @return Edge input job info
-   * @throws IOException
-   */
-  private static InputJobInfo getEdgeJobInfo(Configuration conf)
-    throws IOException {
-    String jobString = conf.get(EDGE_INPUT_JOB_INFO);
-    if (jobString == null) {
-      throw new IOException("Edge job information not found in JobContext." +
-          " GiraphHCatInputFormat.setEdgeInput() not called?");
-    }
-    return (InputJobInfo) HCatUtil.deserialize(jobString);
-  }
-
-  /**
-   * Get vertex input splits.
-   *
-   * @param jobContext Job context
-   * @return List of vertex {@link InputSplit}s
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public List<InputSplit> getVertexSplits(JobContext jobContext)
-    throws IOException, InterruptedException {
-    return getSplits(jobContext,
-        getVertexJobInfo(jobContext.getConfiguration()));
-  }
-
-  /**
-   * Get edge input splits.
-   *
-   * @param jobContext Job context
-   * @return List of edge {@link InputSplit}s
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public List<InputSplit> getEdgeSplits(JobContext jobContext)
-    throws IOException, InterruptedException {
-    return getSplits(jobContext,
-        getEdgeJobInfo(jobContext.getConfiguration()));
-  }
-
-  /**
-   * Create an {@link org.apache.hcatalog.mapreduce.HCatRecordReader}.
-   *
-   * @param split Input split
-   * @param schema Table schema
-   * @param taskContext Context
-   * @return Record reader
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private RecordReader<WritableComparable, HCatRecord>
-  createRecordReader(InputSplit split,
-                     HCatSchema schema,
-                     TaskAttemptContext taskContext)
-    throws IOException, InterruptedException {
-    HCatSplit hcatSplit = HCatUtils.castToHCatSplit(split);
-    PartInfo partitionInfo = hcatSplit.getPartitionInfo();
-    JobContext jobContext = taskContext;
-    Configuration conf = jobContext.getConfiguration();
-
-    HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
-        conf, partitionInfo);
-
-    JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
-    Map<String, String> jobProperties = partitionInfo.getJobProperties();
-    HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
-
-    Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns(
-        schema, partitionInfo);
-
-    return HCatUtils.newHCatReader(storageHandler, valuesNotInDataCols);
-  }
-
-  /**
-   * Create a {@link RecordReader} for vertices.
-   *
-   * @param split Input split
-   * @param taskContext Context
-   * @return Record reader
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public RecordReader<WritableComparable, HCatRecord>
-  createVertexRecordReader(InputSplit split, TaskAttemptContext taskContext)
-    throws IOException, InterruptedException {
-    return createRecordReader(split, getVertexTableSchema(
-        taskContext.getConfiguration()), taskContext);
-  }
-
-  /**
-   * Create a {@link RecordReader} for edges.
-   *
-   * @param split Input split
-   * @param taskContext Context
-   * @return Record reader
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public RecordReader<WritableComparable, HCatRecord>
-  createEdgeRecordReader(InputSplit split, TaskAttemptContext taskContext)
-    throws IOException, InterruptedException {
-    return createRecordReader(split, getEdgeTableSchema(
-        taskContext.getConfiguration()), taskContext);
-  }
-
-  /**
-   * Get values for fields requested by output schema which will not be in the
-   * data.
-   *
-   * @param outputSchema Output schema
-   * @param partInfo Partition info
-   * @return Values not in data columns
-   */
-  private static Map<String, String> getColValsNotInDataColumns(
-      HCatSchema outputSchema,
-      PartInfo partInfo) {
-    HCatSchema dataSchema = partInfo.getPartitionSchema();
-    Map<String, String> vals = new HashMap<String, String>();
-    for (String fieldName : outputSchema.getFieldNames()) {
-      if (dataSchema.getPosition(fieldName) == null) {
-        // this entry of output is not present in the output schema
-        // so, we first check the table schema to see if it is a part col
-        if (partInfo.getPartitionValues().containsKey(fieldName)) {
-          vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
-        } else {
-          vals.put(fieldName, null);
-        }
-      }
-    }
-    return vals;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
deleted file mode 100644
index 2112df3..0000000
--- a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeInputFormat;
-import org.apache.giraph.graph.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hcatalog.data.HCatRecord;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * HCatalog {@link EdgeInputFormat} for reading edges from Hive/Pig.
- *
- * @param <I> Vertex id
- * @param <E> Edge value
- */
-public abstract class HCatalogEdgeInputFormat<
-    I extends WritableComparable,
-    E extends Writable>
-    extends EdgeInputFormat<I, E> {
-  /**
-   * HCatalog input format.
-   */
-  private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
-
-  @Override
-  public final List<InputSplit> getSplits(JobContext context, int numWorkers)
-    throws IOException, InterruptedException {
-    return hCatInputFormat.getEdgeSplits(context);
-  }
-
-  /**
-   * {@link EdgeReader} for {@link HCatalogEdgeInputFormat}.
-   */
-  protected abstract class HCatalogEdgeReader implements EdgeReader<I, E> {
-    /** Internal {@link RecordReader}. */
-    private RecordReader<WritableComparable, HCatRecord> hCatRecordReader;
-    /** Context passed to initialize. */
-    private TaskAttemptContext context;
-
-    @Override
-    public final void initialize(InputSplit inputSplit,
-                                 TaskAttemptContext context)
-      throws IOException, InterruptedException {
-      hCatRecordReader =
-          hCatInputFormat.createEdgeRecordReader(inputSplit, context);
-      hCatRecordReader.initialize(inputSplit, context);
-      this.context = context;
-    }
-
-    @Override
-    public boolean nextEdge() throws IOException, InterruptedException {
-      return hCatRecordReader.nextKeyValue();
-    }
-
-    @Override
-    public final void close() throws IOException {
-      hCatRecordReader.close();
-    }
-
-    @Override
-    public final float getProgress() throws IOException, InterruptedException {
-      return hCatRecordReader.getProgress();
-    }
-
-    /**
-     * Get the record reader.
-     *
-     * @return Record reader to be used for reading.
-     */
-    protected final RecordReader<WritableComparable, HCatRecord>
-    getRecordReader() {
-      return hCatRecordReader;
-    }
-
-    /**
-     * Get the context.
-     *
-     * @return Context passed to initialize.
-     */
-    protected final TaskAttemptContext getContext() {
-      return context;
-    }
-  }
-
-  /**
-   * Create {@link EdgeReader}.
-
-   * @return {@link HCatalogEdgeReader} instance.
-   */
-  protected abstract HCatalogEdgeReader createEdgeReader();
-
-  @Override
-  public EdgeReader<I, E>
-  createEdgeReader(InputSplit split, TaskAttemptContext context)
-    throws IOException {
-    try {
-      HCatalogEdgeReader reader = createEdgeReader();
-      reader.initialize(split, context);
-      return reader;
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "createEdgeReader: Interrupted creating reader.", e);
-    }
-  }
-
-  /**
-   * {@link HCatalogEdgeReader} for tables holding a complete edge
-   * in each row.
-   */
-  protected abstract class SingleRowHCatalogEdgeReader
-      extends HCatalogEdgeReader {
-    /**
-     * Get source vertex id from a record.
-     *
-     * @param record Input record
-     * @return I Source vertex id
-     */
-    protected abstract I getSourceVertexId(HCatRecord record);
-
-    /**
-     * Get target vertex id from a record.
-     *
-     * @param record Input record
-     * @return I Target vertex id
-     */
-    protected abstract I getTargetVertexId(HCatRecord record);
-
-    /**
-     * Get edge value from a record.
-     *
-     * @param record Input record
-     * @return E Edge value
-     */
-    protected abstract E getEdgeValue(HCatRecord record);
-
-    @Override
-    public EdgeWithSource<I, E> getCurrentEdge() throws IOException,
-        InterruptedException {
-      HCatRecord record = getRecordReader().getCurrentValue();
-      return new EdgeWithSource<I, E>(
-          getSourceVertexId(record),
-          new Edge<I, E>(getTargetVertexId(record), getEdgeValue(record)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java b/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
deleted file mode 100644
index ec49137..0000000
--- a/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io.hcatalog;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.graph.VertexReader;
-import org.apache.giraph.utils.TimedLogger;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Abstract class that users should subclass to load data from a Hive or Pig
- * table. You can easily implement a {@link HCatalogVertexReader} by extending
- * either {@link SingleRowHCatalogVertexReader} or
- * {@link MultiRowHCatalogVertexReader} depending on how data for each vertex is
- * stored in the input table.
- * <p>
- * The desired database and table name to load from can be specified via
- * {@link GiraphHCatInputFormat#setVertexInput(org.apache.hadoop.mapreduce.Job,
- * org.apache.hcatalog.mapreduce.InputJobInfo)}
- * as you setup your vertex input format with
- * {@link org.apache.giraph.conf.GiraphConfiguration#
- * setVertexInputFormatClass(Class)}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-
-@SuppressWarnings("rawtypes")
-public abstract class HCatalogVertexInputFormat<
-    I extends WritableComparable,
-    V extends Writable,
-    E extends Writable,
-    M extends Writable>
-    extends VertexInputFormat<I, V, E, M> {
-  /**
-   * HCatalog input format.
-   */
-  private GiraphHCatInputFormat hCatInputFormat = new GiraphHCatInputFormat();
-
-  @Override
-  public final List<InputSplit> getSplits(
-      final JobContext context, final int numWorkers)
-    throws IOException, InterruptedException {
-    return hCatInputFormat.getVertexSplits(context);
-  }
-
-  /**
-   * Abstract class that users should subclass
-   * based on their specific vertex
-   * input. HCatRecord can be parsed to get the
-   * required data for implementing
-   * getCurrentVertex(). If the vertex spans more
-   * than one HCatRecord,
-   * nextVertex() should be overwritten to handle that logic as well.
-   */
-  protected abstract class HCatalogVertexReader implements
-      VertexReader<I, V, E, M> {
-    /** Giraph configuration */
-    private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
-    /** Internal HCatRecordReader. */
-    private RecordReader<WritableComparable,
-        HCatRecord> hCatRecordReader;
-    /** Context passed to initialize. */
-    private TaskAttemptContext context;
-
-    public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
-      return configuration;
-    }
-
-    /**
-     * Initialize with the HCatRecordReader.
-     *
-     * @param recordReader internal reader
-     */
-    private void initialize(
-        final RecordReader<
-            WritableComparable, HCatRecord>
-            recordReader) {
-      this.hCatRecordReader = recordReader;
-    }
-
-    @Override
-    public final void initialize(
-        final InputSplit inputSplit,
-        final TaskAttemptContext ctxt)
-      throws IOException, InterruptedException {
-      hCatRecordReader.initialize(inputSplit, ctxt);
-      this.context = ctxt;
-      this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
-          context.getConfiguration());
-    }
-
-    @Override
-    public boolean nextVertex() throws IOException, InterruptedException {
-      // Users can override this if desired,
-      // and a vertex is bigger than
-      // a single row.
-      return hCatRecordReader.nextKeyValue();
-    }
-
-    @Override
-    public final void close() throws IOException {
-      hCatRecordReader.close();
-    }
-
-    @Override
-    public final float getProgress() throws IOException, InterruptedException {
-      return hCatRecordReader.getProgress();
-    }
-
-    /**
-     * Get the record reader.
-     * @return Record reader to be used for reading.
-     */
-    protected final RecordReader<WritableComparable, HCatRecord>
-    getRecordReader() {
-      return hCatRecordReader;
-    }
-
-    /**
-     * Get the context.
-     *
-     *
-     *
-     * @return Context passed to initialize.
-     */
-    protected final TaskAttemptContext getContext() {
-      return context;
-    }
-  }
-
-  /**
-   * create vertex reader instance.
-   * @return HCatalogVertexReader
-   */
-  protected abstract HCatalogVertexReader createVertexReader();
-
-  @Override
-  public final VertexReader<I, V, E, M>
-  createVertexReader(final InputSplit split,
-                     final TaskAttemptContext context)
-    throws IOException {
-    try {
-      HCatalogVertexReader reader = createVertexReader();
-      reader.initialize(hCatInputFormat.
-          createVertexRecordReader(split, context));
-      return reader;
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "createVertexReader: " +
-              "Interrupted creating reader.", e);
-    }
-  }
-
-  /**
-   * HCatalogVertexReader for tables holding
-   * complete vertex info within each
-   * row.
-   */
-  protected abstract class SingleRowHCatalogVertexReader
-      extends HCatalogVertexReader {
-    /**
-     * 1024 const.
-     */
-    private static final int BYTE_CONST = 1024;
-    /**
-     *  logger
-     */
-    private final Logger log =
-        Logger.getLogger(SingleRowHCatalogVertexReader.class);
-    /**
-     * record count.
-     */
-    private int recordCount = 0;
-    /**
-     * modulus check counter.
-     */
-    private final int recordModLimit = 1000;
-    /**
-     * Timed logger to print every 30 seconds
-     */
-    private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
-        log);
-
-    /**
-     * get vertex id.
-     * @param record hcat record
-     * @return I id
-     */
-    protected abstract I getVertexId(HCatRecord record);
-
-    /**
-     * get vertex value.
-     * @param record hcat record
-     * @return V value
-     */
-    protected abstract V getVertexValue(HCatRecord record);
-
-    /**
-     * get edges.
-     * @param record hcat record
-     * @return Edges
-     */
-    protected abstract Iterable<Edge<I, E>> getEdges(HCatRecord record);
-
-    @Override
-    public final Vertex<I, V, E, M> getCurrentVertex()
-      throws IOException, InterruptedException {
-      HCatRecord record = getRecordReader().getCurrentValue();
-      Vertex<I, V, E, M> vertex = getConfiguration().createVertex();
-      vertex.initialize(getVertexId(record), getVertexValue(record),
-          getEdges(record));
-      ++recordCount;
-      if (log.isInfoEnabled() &&
-          ((recordCount % recordModLimit) == 0)) {
-        // memory usage
-        Runtime runtime = Runtime.getRuntime();
-        double gb = BYTE_CONST *
-            BYTE_CONST *
-            BYTE_CONST;
-        timedLogger.info(
-            "read " + recordCount + " records. Memory: " +
-            (runtime.totalMemory() / gb) +
-            "GB total = " +
-            ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
-            "GB used + " + (runtime.freeMemory() / gb) +
-            "GB free, " + (runtime.maxMemory() / gb) + "GB max");
-      }
-      return vertex;
-    }
-  }
-  /**
-   * HCatalogVertexReader for tables
-   * holding vertex info across multiple rows
-   * sorted by vertex id column,
-   * so that they appear consecutively to the
-   * RecordReader.
-   */
-  protected abstract class MultiRowHCatalogVertexReader extends
-      HCatalogVertexReader {
-    /**
-     * modulus check counter.
-     */
-    private static final int RECORD_MOD_LIMIT = 1000;
-    /**
-     *  logger
-     */
-    private final Logger log =
-        Logger.getLogger(MultiRowHCatalogVertexReader.class);
-    /**
-     * current vertex id.
-     */
-    private I currentVertexId = null;
-    /**
-     * current vertex edges.
-     */
-    private List<Edge<I, E>> currentEdges = Lists.newLinkedList();
-    /**
-     * record for vertex.
-     */
-    private List<HCatRecord> recordsForVertex = Lists.newArrayList();
-    /**
-     * record count.
-     */
-    private int recordCount = 0;
-    /**
-     * vertex.
-     */
-    private Vertex<I, V, E, M> vertex = null;
-    /**
-     * Timed logger to print every 30 seconds
-     */
-    private final TimedLogger timedLogger = new TimedLogger(30 * 1000,
-        log);
-
-
-    /**
-     * get vertex id from record.
-     *
-     * @param record hcat
-     * @return I vertex id
-     */
-    protected abstract I getVertexId(HCatRecord record);
-
-    /**
-     * get vertex value from record.
-     * @param records all vertex values
-     * @return V iterable of record values
-     */
-    protected abstract V getVertexValue(
-        Iterable<HCatRecord> records);
-
-    /**
-     * get target vertex id from record.
-     *
-     * @param record hcat
-     * @return I vertex id of target.
-     */
-    protected abstract I getTargetVertexId(HCatRecord record);
-
-    /**
-     * get edge value from record.
-     *
-     * @param record hcat.
-     * @return E edge value.
-     */
-    protected abstract E getEdgeValue(HCatRecord record);
-
-    @Override
-    public final Vertex<I, V, E, M>
-    getCurrentVertex() throws IOException, InterruptedException {
-      return vertex;
-    }
-
-    @Override
-    public boolean nextVertex() throws IOException, InterruptedException {
-      while (getRecordReader().nextKeyValue()) {
-        HCatRecord record =
-            getRecordReader().getCurrentValue();
-        if (currentVertexId == null) {
-          currentVertexId = getVertexId(record);
-        }
-        if (currentVertexId.equals(getVertexId(record))) {
-          currentEdges.add(new Edge<I, E>(
-                  getTargetVertexId(record),
-                  getEdgeValue(record)));
-          recordsForVertex.add(record);
-        } else {
-          createCurrentVertex();
-          if (log.isInfoEnabled() && (recordCount % RECORD_MOD_LIMIT) == 0) {
-            timedLogger.info("read " + recordCount);
-          }
-          currentVertexId = getVertexId(record);
-          recordsForVertex.add(record);
-          return true;
-        }
-      }
-
-      if (currentEdges.isEmpty()) {
-        return false;
-      } else {
-        createCurrentVertex();
-        return true;
-      }
-    }
-
-    /**
-     * create current vertex.
-     */
-    private void createCurrentVertex() {
-      vertex = getConfiguration().createVertex();
-      vertex.initialize(currentVertexId, getVertexValue(recordsForVertex),
-          currentEdges);
-      currentEdges.clear();
-      recordsForVertex.clear();
-      ++recordCount;
-    }
-  }
-}


Mime
View raw message