giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [20/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:31 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
new file mode 100644
index 0000000..4dfe1e2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Added the max key index in to the {@link PartitionOwner}.  Also can provide
+ * a split hint if desired.
+ *
+ * @param <I> Vertex index type
+ */
+@SuppressWarnings("rawtypes")
+public class RangePartitionOwner<I extends WritableComparable>
+    extends BasicPartitionOwner {
+  /** Max index for this partition */
+  private I maxIndex;
+
+  /**
+   * Default constructor.
+   */
+  public RangePartitionOwner() { }
+
+  /**
+   * Constructor with the max index.
+   *
+   * @param maxIndex Max index of this partition.
+   */
+  public RangePartitionOwner(I maxIndex) {
+    this.maxIndex = maxIndex;
+  }
+
+  /**
+   * Get the maximum index of this partition owner.
+   *
+   * @return Maximum index.
+   */
+  public I getMaxIndex() {
+    return maxIndex;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    super.readFields(input);
+    maxIndex = (I) getConf().createVertexId();
+    maxIndex.readFields(input);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    super.write(output);
+    maxIndex.write(output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
new file mode 100644
index 0000000..3ab43e8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Same as {@link PartitionStats}, but also includes the hint for range-based
+ * partitioning.
+ *
+ * @param <I> Vertex index type
+ */
+@SuppressWarnings("rawtypes")
+public class RangePartitionStats<I extends WritableComparable>
+    extends PartitionStats {
+  /** Can be null if no hint, otherwise a splitting hint */
+  private RangeSplitHint<I> hint;
+
+  /**
+   * Get the range split hint (if any)
+   *
+   * @return Hint of how to split the range if desired, null otherwise
+   */
+  public RangeSplitHint<I> getRangeSplitHint() {
+    return hint;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    super.readFields(input);
+    boolean hintExists = input.readBoolean();
+    if (hintExists) {
+      hint = new RangeSplitHint<I>();
+      hint.readFields(input);
+    } else {
+      hint = null;
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    super.write(output);
+    output.writeBoolean(hint != null);
+    if (hint != null) {
+      hint.write(output);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java
new file mode 100644
index 0000000..5855c0e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Range partitioning will split the vertices by a key range based on a generic
+ * type.  This allows vertices that have some locality with the vertex ids
+ * to reduce the amount of messages sent.  The tradeoffs are that
+ * range partitioning is more susceptible to hot spots if the keys
+ * are not randomly distributed.  Another negative is the user must implement
+ * some of the functionality around how to split the key range.
+ *
+ * See {@link RangeWorkerPartitioner}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangePartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements GraphPartitionerFactory<I, V, E, M> {
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
new file mode 100644
index 0000000..e415b9b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Hint to the {@link RangeMasterPartitioner} about how a
+ * {@link RangePartitionOwner} can be split.
+ *
+ * @param <I> Vertex index to split around
+ */
+@SuppressWarnings("rawtypes")
+public class RangeSplitHint<I extends WritableComparable>
+    implements Writable, ImmutableClassesGiraphConfigurable {
+  /** Hinted split index */
+  private I splitIndex;
+  /** Number of vertices in this range before the split */
+  private long preSplitVertexCount;
+  /** Number of vertices in this range after the split */
+  private long postSplitVertexCount;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, ?, ?, ?> conf;
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    splitIndex = conf.createVertexId();
+    splitIndex.readFields(input);
+    preSplitVertexCount = input.readLong();
+    postSplitVertexCount = input.readLong();
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    splitIndex.write(output);
+    output.writeLong(preSplitVertexCount);
+    output.writeLong(postSplitVertexCount);
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
new file mode 100644
index 0000000..b963d86
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import java.util.Collection;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Range partitioning will split the vertices by a key range based on a generic
+ * type.  This allows vertices that have some locality with the vertex ids
+ * to reduce the amount of messages sent.  The tradeoffs are that
+ * range partitioning is more susceptible to hot spots if the keys
+ * are not randomly distributed.  Another negative is the user must implement
+ * some of the functionality around how to split the key range.
+ *
+ * Note:  This implementation is incomplete, the developer must implement the
+ * various methods based on their index type.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RangeWorkerPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    WorkerGraphPartitioner<I, V, E, M> {
+  /** Mapping of the vertex ids to the {@link PartitionOwner} */
+  protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
+      new TreeMap<I, RangePartitionOwner<I>>();
+
+  @Override
+  public PartitionOwner createPartitionOwner() {
+    return new RangePartitionOwner<I>();
+  }
+
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    // Find the partition owner based on the maximum partition id.
+    // If the vertex id exceeds any of the maximum partition ids, give
+    // it to the last one
+    if (vertexId == null) {
+      throw new IllegalArgumentException(
+          "getPartitionOwner: Illegal null vertex id");
+    }
+    I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
+    if (maxVertexIndex == null) {
+      return vertexRangeMap.lastEntry().getValue();
+    } else {
+      return vertexRangeMap.get(vertexId);
+    }
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> getPartitionOwners() {
+    return vertexRangeMap.values();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java
new file mode 100644
index 0000000..0706660
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.collect.Maps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * A simple map-based container that stores vertices.  Vertex ids will map to
+ * exactly one partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class SimplePartition<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Partition<I, V, E, M> {
+  /** Configuration from the worker */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Partition id */
+  private int id;
+  /** Vertex map for this range (keyed by index) */
+  private ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
+  /** Context used to report progress */
+  private Progressable progressable;
+
+  /**
+   * Constructor for reflection.
+   */
+  public SimplePartition() { }
+
+  @Override
+  public void initialize(int partitionId, Progressable progressable) {
+    setId(partitionId);
+    setProgressable(progressable);
+    if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
+        GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
+    } else {
+      vertexMap = Maps.newConcurrentMap();
+    }
+  }
+
+  @Override
+  public Vertex<I, V, E, M> getVertex(I vertexIndex) {
+    return vertexMap.get(vertexIndex);
+  }
+
+  @Override
+  public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
+    return vertexMap.put(vertex.getId(), vertex);
+  }
+
+  @Override
+  public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
+    return vertexMap.remove(vertexIndex);
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    for (Vertex<I, V, E , M> vertex : partition) {
+      vertexMap.put(vertex.getId(), vertex);
+    }
+  }
+
+  @Override
+  public long getVertexCount() {
+    return vertexMap.size();
+  }
+
+  @Override
+  public long getEdgeCount() {
+    long edges = 0;
+    for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
+      edges += vertex.getNumEdges();
+    }
+    return edges;
+  }
+
+  @Override
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
+  @Override
+  public void setProgressable(Progressable progressable) {
+    this.progressable = progressable;
+  }
+
+  @Override
+  public void saveVertex(Vertex<I, V, E, M> vertex) {
+    // No-op, vertices are stored as Java objects in this partition
+  }
+
+  @Override
+  public String toString() {
+    return "(id=" + getId() + ",V=" + vertexMap.size() + ")";
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
+        GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
+    } else {
+      vertexMap = Maps.newConcurrentMap();
+    }
+    id = input.readInt();
+    int vertices = input.readInt();
+    for (int i = 0; i < vertices; ++i) {
+      Vertex<I, V, E, M> vertex = conf.createVertex();
+      if (progressable != null) {
+        progressable.progress();
+      }
+      vertex.readFields(input);
+      if (vertexMap.put(vertex.getId(), vertex) != null) {
+        throw new IllegalStateException(
+            "readFields: " + this +
+            " already has same id " + vertex);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(id);
+    output.writeInt(vertexMap.size());
+    for (Vertex vertex : vertexMap.values()) {
+      if (progressable != null) {
+        progressable.progress();
+      }
+      vertex.write(output);
+    }
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+    this.conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return conf;
+  }
+
+  @Override
+  public Iterator<Vertex<I, V, E, M>> iterator() {
+    return vertexMap.values().iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
new file mode 100644
index 0000000..37f9cac
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.google.common.collect.Maps;
+
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A simple in-memory partition store.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class SimplePartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends PartitionStore<I, V, E, M> {
+  /** Map of stored partitions. */
+  private final ConcurrentMap<Integer, Partition<I, V, E, M>> partitions =
+      Maps.newConcurrentMap();
+  /** Configuration. */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Context used to report progress */
+  private final Mapper<?, ?, ?, ?>.Context context;
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   * @param context Mapper context
+   */
+  public SimplePartitionStore(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+      Mapper<?, ?, ?, ?>.Context context) {
+    this.conf = conf;
+    this.context = context;
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    Partition<I, V, E, M> oldPartition = partitions.get(partition.getId());
+    if (oldPartition == null) {
+      oldPartition = partitions.putIfAbsent(partition.getId(), partition);
+      if (oldPartition == null) {
+        return;
+      }
+    }
+    oldPartition.addPartition(partition);
+  }
+
+  @Override
+  public Partition<I, V, E, M> getPartition(Integer partitionId) {
+    return partitions.get(partitionId);
+  }
+
+  @Override
+  public Partition<I, V, E, M> removePartition(Integer partitionId) {
+    return partitions.remove(partitionId);
+  }
+
+  @Override
+  public void deletePartition(Integer partitionId) {
+    partitions.remove(partitionId);
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return partitions.containsKey(partitionId);
+  }
+
+  @Override
+  public Iterable<Integer> getPartitionIds() {
+    return partitions.keySet();
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return partitions.size();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
new file mode 100644
index 0000000..2364cc1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Collection;
+
+/**
+ * Stores the {@link PartitionOwner} objects from the master and provides the
+ * mapping of vertex to {@link PartitionOwner}. Also generates the partition
+ * owner implementation.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface WorkerGraphPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Instantiate the {@link PartitionOwner} implementation used to read the
+   * master assignments.
+   *
+   * @return Instantiated {@link PartitionOwner} object
+   */
+  PartitionOwner createPartitionOwner();
+
+  /**
+   * Figure out the owner of a vertex
+   *
+   * @param vertexId Vertex id to get the partition for
+   * @return Correct partition owner
+   */
+  PartitionOwner getPartitionOwner(I vertexId);
+
+  /**
+   * At the end of a superstep, workers have {@link PartitionStats} generated
+   * for each of their partitions.  This method will allow the user to
+   * modify or create their own {@link PartitionStats} interfaces to send to
+   * the master.
+   *
+   * @param workerPartitionStats Stats generated by the infrastructure during
+   *        the superstep
+   * @param partitionStore Partition store for this worker
+   *        (could be used to provide more useful stat information)
+   * @return Final partition stats
+   */
+  Collection<PartitionStats> finalizePartitionStats(
+      Collection<PartitionStats> workerPartitionStats,
+      PartitionStore<I, V, E, M> partitionStore);
+
+  /**
+   * Get the partitions owners and update locally.  Returns the partitions
+   * to send to other workers and other dependencies.
+   *
+   * @param myWorkerInfo Worker info.
+   * @param masterSetPartitionOwners Master set partition owners, received
+   *        prior to beginning the superstep
+   * @param partitionStore Partition store for this worker
+   *        (can be used to fill the return map of partitions to send)
+   * @return Information for the partition exchange.
+   */
+  PartitionExchange updatePartitionOwners(
+      WorkerInfo myWorkerInfo,
+      Collection<? extends PartitionOwner> masterSetPartitionOwners,
+      PartitionStore<I, V, E, M> partitionStore);
+
+  /**
+   * Get a collection of the {@link PartitionOwner} objects.
+   *
+   * @return Collection of owners for every partition.
+   */
+  Collection<? extends PartitionOwner> getPartitionOwners();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/package-info.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/package-info.java
new file mode 100644
index 0000000..4d6f6c3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of partitioning related objects.
+ */
+package org.apache.giraph.graph.partition;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
new file mode 100644
index 0000000..5a6c12d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.integration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.BasicPartitionOwner;
+import org.apache.giraph.graph.partition.HashMasterPartitioner;
+import org.apache.giraph.graph.partition.HashPartitionerFactory;
+import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Example graph partitioner that builds on {@link HashMasterPartitioner} to
+ * send the partitions to the worker that matches the superstep.  It is for
+ * testing only and should never be used in practice.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class SuperstepHashPartitionerFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends HashPartitionerFactory<I, V, E, M> {
+  /**
+   * Changes the {@link HashMasterPartitioner} to make ownership of the
+   * partitions based on a superstep.  For testing only as it is totally
+   * unbalanced.
+   *
+   * @param <I> vertex id
+   * @param <V> vertex data
+   * @param <E> edge data
+   * @param <M> message data
+   */
+  private static class SuperstepMasterPartition<I extends WritableComparable,
+      V extends Writable, E extends Writable, M extends Writable>
+      extends HashMasterPartitioner<I, V, E, M> {
+    /** Class logger */
+    private static Logger LOG =
+        Logger.getLogger(SuperstepMasterPartition.class);
+
+    /**
+     * Construction with configuration.
+     *
+     * @param conf Configuration to be stored.
+     */
+    public SuperstepMasterPartition(ImmutableClassesGiraphConfiguration conf) {
+      super(conf);
+    }
+
+    @Override
+    public Collection<PartitionOwner> generateChangedPartitionOwners(
+        Collection<PartitionStats> allPartitionStatsList,
+        Collection<WorkerInfo> availableWorkerInfos,
+        int maxWorkers,
+        long superstep) {
+      // Assign all the partitions to
+      // superstep mod availableWorkerInfos
+      // Guaranteed to be different if the workers (and their order)
+      // do not change
+      long workerIndex = superstep % availableWorkerInfos.size();
+      int i = 0;
+      WorkerInfo chosenWorkerInfo = null;
+      for (WorkerInfo workerInfo : availableWorkerInfos) {
+        if (workerIndex == i) {
+          chosenWorkerInfo = workerInfo;
+        }
+        ++i;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("generateChangedPartitionOwners: Chosen worker " +
+                 "for superstep " + superstep + " is " +
+                 chosenWorkerInfo);
+      }
+
+      List<PartitionOwner> partitionOwnerList = new ArrayList<PartitionOwner>();
+      for (PartitionOwner partitionOwner :
+        getCurrentPartitionOwners()) {
+        WorkerInfo prevWorkerinfo =
+          partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ?
+            null : partitionOwner.getWorkerInfo();
+        PartitionOwner tmpPartitionOwner =
+          new BasicPartitionOwner(partitionOwner.getPartitionId(),
+                                  chosenWorkerInfo,
+                                  prevWorkerinfo,
+                                  null);
+        partitionOwnerList.add(tmpPartitionOwner);
+        LOG.info("partition owner was " + partitionOwner +
+            ", new " + tmpPartitionOwner);
+      }
+      setPartitionOwnerList(partitionOwnerList);
+      return partitionOwnerList;
+    }
+  }
+
+  @Override
+  public MasterGraphPartitioner<I, V, E, M>
+  createMasterGraphPartitioner() {
+    return new SuperstepMasterPartition<I, V, E, M>(getConf());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java b/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java
new file mode 100644
index 0000000..4c6ae30
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all helper integration test objects.
+ */
+package org.apache.giraph.integration;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
new file mode 100644
index 0000000..3b047f3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexInputFormat.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * VertexReader that readers lines of text with vertices encoded as adjacency
+ * lists and converts each token to the correct type.  For example, a graph
+ * with vertices as integers and values as doubles could be encoded as:
+ *   1 0.1 2 0.2 3 0.3
+ * to represent a vertex named 1, with 0.1 as its value and two edges, to
+ * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class AdjacencyListTextVertexInputFormat<I extends
+    WritableComparable, V extends Writable, E extends Writable, M extends
+    Writable> extends TextVertexInputFormat<I, V, E, M> {
+  /** Delimiter for split */
+  public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
+  /** Default delimiter for split */
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+
+  /**
+   * Utility for doing any cleaning of each line before it is tokenized.
+   */
+  public interface LineSanitizer {
+    /**
+     * Clean string s before attempting to tokenize it.
+     *
+     * @param s String to be cleaned.
+     * @return Sanitized string.
+     */
+    String sanitize(String s);
+  }
+
+  @Override
+  public abstract AdjacencyListTextVertexReader createVertexReader(
+      InputSplit split, TaskAttemptContext context);
+
+  /**
+   * Vertex reader associated with {@link AdjacencyListTextVertexInputFormat}.
+   */
+  protected abstract class AdjacencyListTextVertexReader extends
+    TextVertexReaderFromEachLineProcessed<String[]> {
+    /**
+     * Cached configuration.
+     */
+    private Configuration conf;
+
+    /** Cached delimiter used for split */
+    private String splitValue = null;
+
+    /**
+     * Sanitizer from constructor.
+     */
+    private final LineSanitizer sanitizer;
+
+
+    /**
+     * Constructor without line sanitizer.
+     *
+     */
+    public AdjacencyListTextVertexReader() {
+      this(null);
+    }
+
+    /**
+     * Constructor with line sanitizer.
+     *
+     * @param sanitizer Sanitizer to be used.
+     */
+    public AdjacencyListTextVertexReader(LineSanitizer sanitizer) {
+      this.sanitizer = sanitizer;
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(inputSplit, context);
+      conf = context.getConfiguration();
+      splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+    }
+
+    @Override
+    protected String[] preprocessLine(Text line) throws IOException {
+      String sanitizedLine;
+      if (sanitizer != null) {
+        sanitizedLine = sanitizer.sanitize(line.toString());
+      } else {
+        sanitizedLine = line.toString();
+      }
+      String [] values = sanitizedLine.split(splitValue);
+      if ((values.length < 2) || (values.length % 2 != 0)) {
+        throw new IllegalArgumentException(
+          "Line did not split correctly: " + line);
+      }
+      return values;
+    }
+
+    @Override
+    protected I getId(String[] values) throws IOException {
+      return decodeId(values[0]);
+    }
+
+    /**
+     * Decode the id for this line into an instance of its correct type.
+     *
+     * @param s Id of vertex from line
+     * @return Vertex id
+     */
+    public abstract I decodeId(String s);
+
+    @Override
+    protected V getValue(String[] values) throws IOException {
+      return decodeValue(values[1]);
+    }
+
+
+    /**
+     * Decode the value for this line into an instance of its correct type.
+     *
+     * @param s Value from line
+     * @return Vertex value
+     */
+    public abstract V decodeValue(String s);
+
+    @Override
+    protected Iterable<Edge<I, E>> getEdges(String[] values) throws
+        IOException {
+      int i = 2;
+      List<Edge<I, E>> edges = Lists.newLinkedList();
+      while (i < values.length) {
+        edges.add(decodeEdge(values[i], values[i + 1]));
+        i += 2;
+      }
+      return edges;
+    }
+
+    /**
+     * Decode an edge from the line into an instance of a correctly typed Edge
+     *
+     * @param id The edge's id from the line
+     * @param value The edge's value from the line
+     * @return Edge with given target id and value
+     */
+    public abstract Edge<I, E> decodeEdge(String id, String value);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
new file mode 100644
index 0000000..77abb85
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/AdjacencyListTextVertexOutputFormat.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * OutputFormat to write out the graph nodes as text, value-separated (by
+ * tabs, by default).  With the default delimiter, a vertex is written out as:
+ *
+ * <VertexId><tab><Vertex Value><tab>[<EdgeId><tab><EdgeValue>]+
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class AdjacencyListTextVertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends TextVertexOutputFormat<I, V, E> {
+
+  /** Split delimiter */
+  public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+  /** Default split delimiter */
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT =
+    AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE_DEFAULT;
+
+  @Override
+  public AdjacencyListTextVertexWriter createVertexWriter(
+      TaskAttemptContext context) {
+    return new AdjacencyListTextVertexWriter();
+  }
+
+  /**
+   * Vertex writer associated with {@link AdjacencyListTextVertexOutputFormat}.
+   */
+  protected class AdjacencyListTextVertexWriter extends
+    TextVertexWriterToEachLine {
+    /** Cached split delimeter */
+    private String delimiter;
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      super.initialize(context);
+      delimiter = context.getConfiguration()
+          .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+    }
+
+    @Override
+    public Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+      throws IOException {
+      StringBuffer sb = new StringBuffer(vertex.getId().toString());
+      sb.append(delimiter);
+      sb.append(vertex.getValue());
+
+      for (Edge<I, E> edge : vertex.getEdges()) {
+        sb.append(delimiter).append(edge.getTargetVertexId());
+        sb.append(delimiter).append(edge.getValue());
+      }
+
+      return new Text(sb.toString());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
new file mode 100644
index 0000000..ad4f2bf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GeneratedVertexInputFormat.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This VertexInputFormat is meant for testing/debugging.  It simply generates
+ * some vertex data that can be consumed by test applications.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedVertexInputFormat<
+    I extends WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable> extends VertexInputFormat<I, V, E, M> {
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    // This is meaningless, the VertexReader will generate all the test
+    // data.
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 0; i < numWorkers; ++i) {
+      inputSplitList.add(new BspInputSplit(i, numWorkers));
+    }
+    return inputSplitList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
new file mode 100644
index 0000000..114e75f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]
+import org.apache.hadoop.mapreduce.security.TokenCache;
+end[HADOOP_NON_SECURE]*/
+
+/**
+ * Provides functionality similar to {@link FileInputFormat},
+ * but allows for different data sources (vertex and edge data).
+ *
+ * @param <K> Key
+ * @param <V> Value
+ */
+public abstract class GiraphFileInputFormat<K, V>
+    extends FileInputFormat<K, V> {
+  /** Vertex input file paths. */
+  public static final String VERTEX_INPUT_DIR = "giraph.vertex.input.dir";
+  /** Edge input file paths. */
+  public static final String EDGE_INPUT_DIR = "giraph.edge.input.dir";
+  /** Number of vertex input files. */
+  public static final String NUM_VERTEX_INPUT_FILES =
+      "giraph.input.vertex.num.files";
+  /** Number of edge input files. */
+  public static final String NUM_EDGE_INPUT_FILES =
+      "giraph.input.edge.num.files";
+
+  /** Split slop. */
+  private static final double SPLIT_SLOP = 1.1; // 10% slop
+
+  /** Filter for hidden files. */
+  private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(GiraphFileInputFormat.class);
+
+  /**
+   * Add a {@link org.apache.hadoop.fs.Path} to the list of vertex inputs.
+   *
+   * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
+   * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
+   *                                              vertex inputs
+   */
+  public static void addVertexInputPath(Job job, Path path) throws IOException {
+    Configuration conf = job.getConfiguration();
+    path = path.getFileSystem(conf).makeQualified(path);
+    String dirStr = StringUtils.escapeString(path.toString());
+    String dirs = conf.get(VERTEX_INPUT_DIR);
+    conf.set(VERTEX_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
+  }
+
+  /**
+   * Add a {@link org.apache.hadoop.fs.Path} to the list of edge inputs.
+   *
+   * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
+   * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
+   *                                              edge inputs
+   */
+  public static void addEdgeInputPath(Job job, Path path) throws IOException {
+    Configuration conf = job.getConfiguration();
+    path = path.getFileSystem(conf).makeQualified(path);
+    String dirStr = StringUtils.escapeString(path.toString());
+    String dirs = conf.get(EDGE_INPUT_DIR);
+    conf.set(EDGE_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
+  }
+
+  /**
+   * Get the list of vertex input {@link Path}s.
+   *
+   * @param context The job
+   * @return The list of input {@link Path}s
+   */
+  public static Path[] getVertexInputPaths(JobContext context) {
+    String dirs = context.getConfiguration().get(VERTEX_INPUT_DIR, "");
+    String [] list = StringUtils.split(dirs);
+    Path[] result = new Path[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    }
+    return result;
+  }
+
+  /**
+   * Get the list of edge input {@link Path}s.
+   *
+   * @param context The job
+   * @return The list of input {@link Path}s
+   */
+  public static Path[] getEdgeInputPaths(JobContext context) {
+    String dirs = context.getConfiguration().get(EDGE_INPUT_DIR, "");
+    String [] list = StringUtils.split(dirs);
+    Path[] result = new Path[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    }
+    return result;
+  }
+
+  /**
+   * Proxy PathFilter that accepts a path only if all filters given in the
+   * constructor do. Used by the listPaths() to apply the built-in
+   * HIDDEN_FILE_FILTER together with a user provided one (if any).
+   */
+  private static class MultiPathFilter implements PathFilter {
+    /** List of filters. */
+    private List<PathFilter> filters;
+
+    /**
+     * Constructor.
+     *
+     * @param filters The list of filters
+     */
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    /**
+     * True iff all filters accept the given path.
+     *
+     * @param path The path to check
+     * @return Whether the path is accepted
+     */
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (!filter.accept(path)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Common method for listing vertex/edge input directories.
+   *
+   * @param job The job
+   * @param dirs list of vertex/edge input paths
+   * @return Array of FileStatus objects
+   * @throws IOException
+   */
+  private List<FileStatus> listStatus(JobContext job, Path[] dirs)
+    throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]
+    // get tokens for all the required FileSystems..
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
+        job.getConfiguration());
+end[HADOOP_NON_SECURE]*/
+
+    List<IOException> errors = new ArrayList<IOException>();
+
+    // creates a MultiPathFilter with the HIDDEN_FILE_FILTER and the
+    // user provided one (if any).
+    List<PathFilter> filters = new ArrayList<PathFilter>();
+    filters.add(HIDDEN_FILE_FILTER);
+    PathFilter jobFilter = getInputPathFilter(job);
+    if (jobFilter != null) {
+      filters.add(jobFilter);
+    }
+    PathFilter inputFilter = new MultiPathFilter(filters);
+
+    for (Path p : dirs) {
+      FileSystem fs = p.getFileSystem(job.getConfiguration());
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        errors.add(new IOException("Input path does not exist: " + p));
+      } else if (matches.length == 0) {
+        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+      } else {
+        for (FileStatus globStat: matches) {
+          if (globStat.isDir()) {
+            Collections.addAll(result, fs.listStatus(globStat.getPath()));
+          } else {
+            result.add(globStat);
+          }
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result;
+  }
+
+  /**
+   * List vertex input directories.
+   *
+   * @param job the job to list vertex input paths for
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listVertexStatus(JobContext job)
+    throws IOException {
+    return listStatus(job, getVertexInputPaths(job));
+  }
+
+  /**
+   * List edge input directories.
+   *
+   * @param job the job to list edge input paths for
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listEdgeStatus(JobContext job)
+    throws IOException {
+    return listStatus(job, getEdgeInputPaths(job));
+  }
+
+  /**
+   * Common method for generating the list of vertex/edge input splits.
+   *
+   * @param job The job
+   * @param files Array of FileStatus objects for vertex/edge input files
+   * @return The list of vertex/edge input splits
+   * @throws IOException
+   */
+  private List<InputSplit> getSplits(JobContext job, List<FileStatus> files)
+    throws IOException {
+    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
+    long maxSize = getMaxSplitSize(job);
+
+    // generate splits
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    for (FileStatus file: files) {
+      Path path = file.getPath();
+      FileSystem fs = path.getFileSystem(job.getConfiguration());
+      long length = file.getLen();
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+      if ((length != 0) && isSplitable(job, path)) {
+        long blockSize = file.getBlockSize();
+        long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+
+        long bytesRemaining = length;
+        while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+          int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+          splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+              blkLocations[blkIndex].getHosts()));
+          bytesRemaining -= splitSize;
+        }
+
+        if (bytesRemaining != 0) {
+          splits.add(new FileSplit(path, length - bytesRemaining,
+              bytesRemaining,
+              blkLocations[blkLocations.length - 1].getHosts()));
+        }
+      } else if (length != 0) {
+        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+      } else {
+        //Create empty hosts array for zero length files
+        splits.add(new FileSplit(path, 0, length, new String[0]));
+      }
+    }
+    return splits;
+  }
+
+  /**
+   * Generate the list of vertex input splits.
+   *
+   * @param job The job
+   * @return The list of vertex input splits
+   * @throws IOException
+   */
+  public List<InputSplit> getVertexSplits(JobContext job) throws IOException {
+    List<FileStatus> files = listVertexStatus(job);
+    List<InputSplit> splits = getSplits(job, files);
+    // Save the number of input files in the job-conf
+    job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size());
+    LOG.debug("Total # of vertex splits: " + splits.size());
+    return splits;
+  }
+
+  /**
+   * Generate the list of edge input splits.
+   *
+   * @param job The job
+   * @return The list of edge input splits
+   * @throws IOException
+   */
+  public List<InputSplit> getEdgeSplits(JobContext job) throws IOException {
+    List<FileStatus> files = listEdgeStatus(job);
+    List<InputSplit> splits = getSplits(job, files);
+    // Save the number of input files in the job-conf
+    job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size());
+    LOG.debug("Total # of edge splits: " + splits.size());
+    return splits;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
new file mode 100644
index 0000000..113b2ea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+
+/**
+ * Provides functionality similar to {@link org.apache.hadoop
+ * .mapreduce.lib.input.TextInputFormat}, but allows for different data
+ * sources (vertex and edge data).
+ */
+public class GiraphTextInputFormat
+    extends GiraphFileInputFormat<LongWritable, Text> {
+  @Override
+  public RecordReader<LongWritable, Text>
+  createRecordReader(InputSplit split, TaskAttemptContext context) {
+    return new LineRecordReader();
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    CompressionCodec codec =
+        new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    return codec == null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
new file mode 100644
index 0000000..53dd112
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IdWithValueTextOutputFormat.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Write out Vertices' IDs and values, but not their edges nor edges' values.
+ * This is a useful output format when the final value of the vertex is
+ * all that's needed. The boolean configuration parameter reverse.id.and.value
+ * allows reversing the output of id and value.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public class IdWithValueTextOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends TextVertexOutputFormat<I, V, E> {
+
+  /** Specify the output delimiter */
+  public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
+  /** Default output delimiter */
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+  /** Reverse id and value order? */
+  public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value";
+  /** Default is to not reverse id and value order. */
+  public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
+
+  @Override
+  public TextVertexWriter createVertexWriter(TaskAttemptContext context) {
+    return new IdWithValueVertexWriter();
+  }
+
+  /**
+   * Vertex writer used with {@link IdWithValueTextOutputFormat}.
+   */
+  protected class IdWithValueVertexWriter extends TextVertexWriterToEachLine {
+    /** Saved delimiter */
+    private String delimiter;
+    /** Cached reserve option */
+    private boolean reverseOutput;
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      super.initialize(context);
+      Configuration conf = context.getConfiguration();
+      delimiter = conf
+          .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+      reverseOutput = conf
+          .getBoolean(REVERSE_ID_AND_VALUE, REVERSE_ID_AND_VALUE_DEFAULT);
+    }
+
+    @Override
+    protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+      throws IOException {
+      String first;
+      String second;
+      if (reverseOutput) {
+        first = vertex.getValue().toString();
+        second = vertex.getId().toString();
+      } else {
+        first = vertex.getId().toString();
+        second = vertex.getValue().toString();
+      }
+      Text line = new Text(first + delimiter + second);
+      return line;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
new file mode 100644
index 0000000..9aa21c7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IntIntNullIntTextInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs with int ids.
+ *
+ * Each line consists of: vertex neighbor1 neighbor2 ...
+ */
+public class IntIntNullIntTextInputFormat extends
+    TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
+    IntWritable> {
+  /** Separator of the vertex and neighbors */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context)
+    throws IOException {
+    return new IntIntNullIntVertexReader();
+  }
+
+  /**
+   * Vertex reader associated with {@link IntIntNullIntTextInputFormat}.
+   */
+  public class IntIntNullIntVertexReader extends
+    TextVertexReaderFromEachLineProcessed<String[]> {
+    /**
+     * Cached vertex id for the current line
+     */
+    private IntWritable id;
+
+    @Override
+    protected String[] preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      id = new IntWritable(Integer.parseInt(tokens[0]));
+      return tokens;
+    }
+
+    @Override
+    protected IntWritable getId(String[] tokens) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected IntWritable getValue(String[] tokens) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
+        String[] tokens) throws IOException {
+      List<Edge<IntWritable, NullWritable>> edges =
+          Lists.newArrayListWithCapacity(tokens.length - 1);
+      for (int n = 1; n < tokens.length; n++) {
+        edges.add(new Edge<IntWritable, NullWritable>(
+            new IntWritable(Integer.parseInt(tokens[n])),
+            NullWritable.get()));
+      }
+      return edges;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
new file mode 100644
index 0000000..016e74c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.utils.IntPair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexValueInputFormat}
+ * for integer ids and values.
+ *
+ * Each line consists of: id, value
+ *
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class IntIntTextVertexValueInputFormat<E extends Writable,
+    M extends Writable> extends
+    TextVertexValueInputFormat<IntWritable, IntWritable, E, M> {
+  /** Separator for id and value */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextVertexValueReader createVertexValueReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new IntIntTextVertexValueReader();
+  }
+
+  /**
+   * {@link org.apache.giraph.graph.VertexValueReader} associated with
+   * {@link IntIntTextVertexValueInputFormat}.
+   */
+  public class IntIntTextVertexValueReader extends
+      TextVertexValueReaderFromEachLineProcessed<IntPair> {
+
+    @Override
+    protected IntPair preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      return new IntPair(Integer.valueOf(tokens[0]),
+          Integer.valueOf(tokens[1]));
+    }
+
+    @Override
+    protected IntWritable getId(IntPair data) throws IOException {
+      return new IntWritable(data.getFirst());
+    }
+
+    @Override
+    protected IntWritable getValue(IntPair data) throws IOException {
+      return new IntWritable(data.getSecond());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
new file mode 100644
index 0000000..4d98657
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs without edges or values, just vertices with ids.
+ *
+ * Each line is just simply the vertex id.
+ */
+public class IntNullNullNullTextInputFormat extends TextVertexInputFormat<
+    IntWritable, NullWritable, NullWritable, NullWritable> {
+  @Override
+  public TextVertexReader createVertexReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new IntNullNullNullVertexReader();
+  }
+
+  /**
+   * Reader for this InputFormat.
+   */
+  public class IntNullNullNullVertexReader extends
+      TextVertexReaderFromEachLineProcessed<String> {
+    /** Cached vertex id */
+    private IntWritable id;
+
+    @Override
+    protected String preprocessLine(Text line) throws IOException {
+      id = new IntWritable(Integer.parseInt(line.toString()));
+      return line.toString();
+    }
+
+    @Override
+    protected IntWritable getId(String line) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected NullWritable getValue(String line) throws IOException {
+      return NullWritable.get();
+    }
+
+    @Override
+    protected Iterable<Edge<IntWritable, NullWritable>> getEdges(String line)
+      throws IOException {
+      return ImmutableList.of();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
new file mode 100644
index 0000000..ed13b45
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.utils.IntPair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.EdgeInputFormat} for
+ * unweighted graphs with int ids.
+ *
+ * Each line consists of: source_vertex, target_vertex
+ */
+public class IntNullTextEdgeInputFormat extends
+    TextEdgeInputFormat<IntWritable, NullWritable> {
+  /** Splitter for endpoints */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextEdgeReader createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new IntNullTextEdgeReader();
+  }
+
+  /**
+   * {@link org.apache.giraph.graph.EdgeReader} associated with
+   * {@link IntNullTextEdgeInputFormat}.
+   */
+  public class IntNullTextEdgeReader extends
+      TextEdgeReaderFromEachLineProcessed<IntPair> {
+    @Override
+    protected IntPair preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      return new IntPair(Integer.valueOf(tokens[0]),
+          Integer.valueOf(tokens[1]));
+    }
+
+    @Override
+    protected IntWritable getSourceVertexId(IntPair endpoints)
+      throws IOException {
+      return new IntWritable(endpoints.getFirst());
+    }
+
+    @Override
+    protected IntWritable getTargetVertexId(IntPair endpoints)
+      throws IOException {
+      return new IntWritable(endpoints.getSecond());
+    }
+
+    @Override
+    protected NullWritable getValue(IntPair endpoints) throws IOException {
+      return NullWritable.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
new file mode 100644
index 0000000..c2c1cbb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexFormat.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+/**
+ * Keeps the vertex keys for the input/output vertex format
+ */
+public class JsonBase64VertexFormat {
+  /** Vertex id key */
+  public static final String VERTEX_ID_KEY = "vertexId";
+  /** Vertex value key*/
+  public static final String VERTEX_VALUE_KEY = "vertexValue";
+  /** Edge value array key (all the edges are stored here) */
+  public static final String EDGE_ARRAY_KEY = "edgeArray";
+
+  /**
+   * Don't construct.
+   */
+  private JsonBase64VertexFormat() { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
new file mode 100644
index 0000000..cc5872c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import net.iharder.Base64;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Simple way to represent the structure of the graph with a JSON object.
+ * The actual vertex ids, values, edges are stored by the
+ * Writable serialized bytes that are Byte64 encoded.
+ * Works with {@link JsonBase64VertexOutputFormat}
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class JsonBase64VertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends TextVertexInputFormat<I, V, E, M> {
+
+  @Override
+  public TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) {
+    return new JsonBase64VertexReader();
+  }
+
+  /**
+   * Simple reader that supports {@link JsonBase64VertexInputFormat}
+   */
+  protected class JsonBase64VertexReader extends
+    TextVertexReaderFromEachLineProcessed<JSONObject> {
+
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(inputSplit, context);
+    }
+
+    @Override
+    protected JSONObject preprocessLine(Text line) {
+      try {
+        return new JSONObject(line.toString());
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get the vertex", e);
+      }
+    }
+
+    @Override
+    protected I getId(JSONObject vertexObject) throws IOException {
+      try {
+        byte[] decodedWritable = Base64.decode(
+            vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
+        DataInput input = new DataInputStream(
+            new ByteArrayInputStream(decodedWritable));
+        I vertexId = getConf().createVertexId();
+        vertexId.readFields(input);
+        return vertexId;
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get vertex id", e);
+      }
+    }
+
+    @Override
+    protected V getValue(JSONObject vertexObject) throws IOException {
+      try {
+        byte[] decodedWritable = Base64.decode(
+            vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
+        DataInputStream input = new DataInputStream(
+            new ByteArrayInputStream(decodedWritable));
+        V vertexValue = getConf().createVertexValue();
+        vertexValue.readFields(input);
+        return vertexValue;
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get vertex value", e);
+      }
+    }
+
+    @Override
+    protected Iterable<Edge<I, E>> getEdges(JSONObject vertexObject) throws
+    IOException {
+      JSONArray edgeArray = null;
+      try {
+        edgeArray = vertexObject.getJSONArray(
+          JsonBase64VertexFormat.EDGE_ARRAY_KEY);
+      } catch (JSONException e) {
+        throw new IllegalArgumentException(
+          "next: Failed to get edge array", e);
+      }
+      byte[] decodedWritable;
+      List<Edge<I, E>> edges = Lists.newArrayListWithCapacity(
+          edgeArray.length());
+      for (int i = 0; i < edgeArray.length(); ++i) {
+        try {
+          decodedWritable = Base64.decode(edgeArray.getString(i));
+        } catch (JSONException e) {
+          throw new IllegalArgumentException(
+            "next: Failed to get edge value", e);
+        }
+        DataInputStream input = new DataInputStream(
+            new ByteArrayInputStream(decodedWritable));
+        I targetVertexId = getConf().createVertexId();
+        targetVertexId.readFields(input);
+        E edgeValue = getConf().createEdgeValue();
+        edgeValue.readFields(input);
+        edges.add(new Edge<I, E>(targetVertexId, edgeValue));
+      }
+      return edges;
+    }
+
+  }
+
+}


Mime
View raw message