giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [10/23] GIRAPH-409: Refactor / cleanups (nitay)
Date Fri, 04 Jan 2013 20:52:39 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
new file mode 100644
index 0000000..293bd0e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * edge output format.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextEdgeInputFormat<I extends WritableComparable,
+    E extends Writable> extends EdgeInputFormat<I, E> {
+  /** Underlying GiraphTextInputFormat. */
+  protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
+
+  @Override
+  public List<InputSplit> getSplits(
+      JobContext context, int numWorkers) throws IOException,
+      InterruptedException {
+    // Ignore the hint of numWorkers here since we are using
+    // GiraphTextInputFormat to do this for us
+    return textInputFormat.getEdgeSplits(context);
+  }
+
+  @Override
+  public abstract TextEdgeReader createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException;
+
+  /**
+   * {@link EdgeReader} for {@link TextEdgeInputFormat}.
+   */
+  protected abstract class TextEdgeReader implements EdgeReader<I, E> {
+    /** Internal line record reader */
+    private RecordReader<LongWritable, Text> lineRecordReader;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+    /**
+     * Cached configuration. We don't care about vertex value and message type.
+     */
+    private ImmutableClassesGiraphConfiguration<I, NullWritable, E,
+        NullWritable> conf;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      this.context = context;
+      conf = new ImmutableClassesGiraphConfiguration<I, NullWritable, E,
+          NullWritable>(context.getConfiguration());
+      lineRecordReader = createLineRecordReader(inputSplit, context);
+      lineRecordReader.initialize(inputSplit, context);
+    }
+
+    /**
+     * Create the line record reader. Override this to use a different
+     * underlying record reader (useful for testing).
+     *
+     * @param inputSplit
+     *          the split to read
+     * @param context
+     *          the context passed to initialize
+     * @return
+     *         the record reader to be used
+     * @throws IOException
+     *           exception that can be thrown during creation
+     * @throws InterruptedException
+     *           exception that can be thrown during creation
+     */
+    protected RecordReader<LongWritable, Text>
+    createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      return textInputFormat.createRecordReader(inputSplit, context);
+    }
+
+    @Override
+    public void close() throws IOException {
+      lineRecordReader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return lineRecordReader.getProgress();
+    }
+
+    /**
+     * Get the line record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected RecordReader<LongWritable, Text> getRecordReader() {
+      return lineRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+
+    /**
+     * Get the configuration.
+     *
+     * @return Configuration for this reader
+     */
+    protected ImmutableClassesGiraphConfiguration<I, NullWritable, E,
+        NullWritable> getConf() {
+      return conf;
+    }
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read an edge from each
+   * text line.
+   */
+  protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader {
+    @Override
+    public final EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+        InterruptedException {
+      Text line = getRecordReader().getCurrentValue();
+      I sourceVertexId = getSourceVertexId(line);
+      I targetVertexId = getTargetVertexId(line);
+      E edgeValue = getValue(line);
+      return new EdgeWithSource<I, E>(sourceVertexId,
+          new Edge<I, E>(targetVertexId, edgeValue));
+    }
+
+    @Override
+    public final boolean nextEdge() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    /**
+     * Reads source vertex id from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the source vertex id corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getSourceVertexId(Text line) throws IOException;
+
+
+    /**
+     * Reads target vertex id from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the target vertex id corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getTargetVertexId(Text line) throws IOException;
+
+    /**
+     * Reads edge value from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the edge value corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract E getValue(Text line) throws IOException;
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read an edge from each
+   * text line after preprocessing it.
+   *
+   * @param <T>
+   *          The resulting type of preprocessing.
+   */
+  protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends
+      TextEdgeReader {
+    @Override
+    public final EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+        InterruptedException {
+      Text line = getRecordReader().getCurrentValue();
+      T processed = preprocessLine(line);
+      I sourceVertexId = getSourceVertexId(processed);
+      I targetVertexId = getTargetVertexId(processed);
+      E edgeValue = getValue(processed);
+      return new EdgeWithSource<I, E>(sourceVertexId,
+          new Edge<I, E>(targetVertexId, edgeValue));
+    }
+
+    @Override
+    public final boolean nextEdge() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    /**
+     * Preprocess the line so other methods can easily read necessary
+     * information for creating edge
+     *
+     * @param line
+     *          the current line to be read
+     * @return
+     *         the preprocessed object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract T preprocessLine(Text line) throws IOException;
+
+    /**
+     * Reads target vertex id from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the target vertex id
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getTargetVertexId(T line) throws IOException;
+
+    /**
+     * Reads source vertex id from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the source vertex id
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getSourceVertexId(T line) throws IOException;
+
+    /**
+     * Reads edge value from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the edge value
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract E getValue(T line) throws IOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
new file mode 100644
index 0000000..7beddb8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * vertex input format.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends VertexInputFormat<I, V, E, M> {
+
+  /** Uses the GiraphTextInputFormat to do everything */
+  protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    // Ignore the hint of numWorkers here since we are using
+    // GiraphTextInputFormat to do this for us
+    return textInputFormat.getVertexSplits(context);
+  }
+
+  /**
+   * The factory method which produces the {@link TextVertexReader} used by this
+   * input format.
+   *
+   * @param split
+   *          the split to be read
+   * @param context
+   *          the information about the task
+   * @return
+   *         the text vertex reader to be used
+   */
+  @Override
+  public abstract TextVertexReader createVertexReader(InputSplit split,
+      TaskAttemptContext context) throws IOException;
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex input. Easiest to ignore the key value separator and only use
+   * key instead.
+   *
+   * When reading a vertex from each line, extend
+   * {@link TextVertexReaderFromEachLine}. If you need to preprocess each line
+   * first, then extend {@link TextVertexReaderFromEachLineProcessed}. If you
+   * need common exception handling while preprocessing, then extend
+   * {@link TextVertexReaderFromEachLineProcessedHandlingExceptions}.
+   */
+  protected abstract class TextVertexReader implements
+    VertexReader<I, V, E, M> {
+    /** Internal line record reader */
+    private RecordReader<LongWritable, Text> lineRecordReader;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+    /** Cached configuration */
+    private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      this.context = context;
+      conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+          context.getConfiguration());
+      lineRecordReader = createLineRecordReader(inputSplit, context);
+      lineRecordReader.initialize(inputSplit, context);
+    }
+
+    /**
+     * Create the line record reader. Override this to use a different
+     * underlying record reader (useful for testing).
+     *
+     * @param inputSplit
+     *          the split to read
+     * @param context
+     *          the context passed to initialize
+     * @return
+     *         the record reader to be used
+     * @throws IOException
+     *           exception that can be thrown during creation
+     * @throws InterruptedException
+     *           exception that can be thrown during creation
+     */
+    protected RecordReader<LongWritable, Text>
+    createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      return textInputFormat.createRecordReader(inputSplit, context);
+    }
+
+    @Override
+    public void close() throws IOException {
+      lineRecordReader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return lineRecordReader.getProgress();
+    }
+
+    /**
+     * Get the line record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected RecordReader<LongWritable, Text> getRecordReader() {
+      return lineRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+
+    /**
+     * Get the configuration.
+     *
+     * @return Configuration for this reader
+     */
+    protected ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+      return conf;
+    }
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read a vertex from each
+   * text line.
+   */
+  protected abstract class TextVertexReaderFromEachLine extends
+    TextVertexReader {
+
+    @Override
+    public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+    InterruptedException {
+      Text line = getRecordReader().getCurrentValue();
+      Vertex<I, V, E, M> vertex = getConf().createVertex();
+      vertex.initialize(getId(line), getValue(line), getEdges(line));
+      return vertex;
+    }
+
+    @Override
+    public final boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    /**
+     * Reads vertex id from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the vertex id corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getId(Text line) throws IOException;
+
+    /**
+     * Reads vertex value from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the vertex value corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract V getValue(Text line) throws IOException;
+
+    /**
+     * Reads edges value from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the edges
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract Iterable<Edge<I, E>> getEdges(Text line) throws
+        IOException;
+
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read a vertex from each
+   * text line after preprocessing it.
+   *
+   * @param <T>
+   *          The resulting type of preprocessing.
+   */
+  protected abstract class TextVertexReaderFromEachLineProcessed<T> extends
+      TextVertexReader {
+
+    @Override
+    public final boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    @Override
+    public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+    InterruptedException {
+      Text line = getRecordReader().getCurrentValue();
+      Vertex<I, V, E, M> vertex;
+      T processed = preprocessLine(line);
+      vertex = getConf().createVertex();
+      vertex.initialize(getId(processed), getValue(processed),
+          getEdges(processed));
+      return vertex;
+    }
+
+    /**
+     * Preprocess the line so other methods can easily read necessary
+     * information for creating vertex.
+     *
+     * @param line
+     *          the current line to be read
+     * @return
+     *         the preprocessed object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract T preprocessLine(Text line) throws IOException;
+
+    /**
+     * Reads vertex id from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex id
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getId(T line) throws IOException;
+
+    /**
+     * Reads vertex value from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex value
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract V getValue(T line) throws IOException;
+
+    /**
+     * Reads edges from the preprocessed line.
+     *
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the edges
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract Iterable<Edge<I, E>> getEdges(T line) throws
+        IOException;
+
+  }
+
+  // CHECKSTYLE: stop RedundantThrows
+  /**
+   * Abstract class to be implemented by the user to read a vertex from each
+   * text line after preprocessing it with exception handling.
+   *
+   * @param <T>
+   *          The resulting type of preprocessing.
+   * @param <X>
+   *          The exception type that can be thrown due to preprocessing.
+   */
+  protected abstract class
+  TextVertexReaderFromEachLineProcessedHandlingExceptions<T, X extends
+    Throwable> extends TextVertexReader {
+
+    @Override
+    public final boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+        InterruptedException {
+      // Note we are reading from value only since key is the line number
+      Text line = getRecordReader().getCurrentValue();
+      Vertex<I, V, E, M> vertex;
+      T processed = null;
+      try {
+        processed = preprocessLine(line);
+        Configuration conf = getContext().getConfiguration();
+        vertex = getConf().createVertex();
+        vertex.initialize(getId(processed), getValue(processed),
+            getEdges(processed));
+      } catch (IOException e) {
+        throw e;
+      // CHECKSTYLE: stop IllegalCatch
+      } catch (Throwable t) {
+        return handleException(line, processed, (X) t);
+      // CHECKSTYLE: resume IllegalCatch
+      }
+      return vertex;
+    }
+
+    /**
+     * Preprocess the line so other methods can easily read necessary
+     * information for creating vertex.
+     *
+     * @param line
+     *          the current line to be read
+     * @return
+     *         the preprocessed object
+     * @throws X
+     *           exception that can be thrown while preprocessing the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract T preprocessLine(Text line) throws X, IOException;
+
+    /**
+     * Reads vertex id from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex id
+     * @throws X
+     *           exception that can be thrown while reading the preprocessed
+     *           object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getId(T line) throws X, IOException;
+
+    /**
+     * Reads vertex value from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex value
+     * @throws X
+     *           exception that can be thrown while reading the preprocessed
+     *           object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract V getValue(T line) throws X, IOException;
+
+    /**
+     * Reads edges from the preprocessed line.
+     *
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the edges
+     * @throws X
+     *           exception that can be thrown while reading the preprocessed
+     *           object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract Iterable<Edge<I, E>> getEdges(T line) throws X,
+        IOException;
+
+    /**
+     * Handles exceptions while reading vertex from each line.
+     *
+     * @param line
+     *          the line that was being read when the exception was thrown
+     * @param processed
+     *          the object obtained by preprocessing the line. Can be null if
+     *          exception was thrown during preprocessing.
+     * @param e
+     *          the exception thrown while reading the line
+     * @return the recovered/alternative vertex to be used
+     */
+    protected Vertex<I, V, E, M> handleException(Text line, T processed, X e) {
+      throw new IllegalArgumentException(e);
+    }
+
+  }
+  // CHECKSTYLE: resume RedundantThrows
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
new file mode 100644
index 0000000..9f1fe1f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * vertex output format.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexOutputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends VertexOutputFormat<I, V, E> {
+
+  /** Uses the TextOutputFormat to do everything */
+  protected TextOutputFormat<Text, Text> textOutputFormat =
+      new TextOutputFormat<Text, Text>();
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    textOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return textOutputFormat.getOutputCommitter(context);
+  }
+
+  /**
+   * The factory method which produces the {@link TextVertexWriter} used by this
+   * output format.
+   *
+   * @param context
+   *          the information about the task
+   * @return
+   *         the text vertex writer to be used
+   */
+  @Override
+  public abstract TextVertexWriter createVertexWriter(TaskAttemptContext
+      context) throws IOException, InterruptedException;
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex output.  Easiest to ignore the key value separator and only use
+   * key instead.
+   */
+  protected abstract class TextVertexWriter implements VertexWriter<I, V, E> {
+    /** Internal line record writer */
+    private RecordWriter<Text, Text> lineRecordWriter;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+           InterruptedException {
+      lineRecordWriter = createLineRecordWriter(context);
+      this.context = context;
+    }
+
+    /**
+     * Create the line record writer. Override this to use a different
+     * underlying record writer (useful for testing).
+     *
+     * @param context
+     *          the context passed to initialize
+     * @return
+     *         the record writer to be used
+     * @throws IOException
+     *           exception that can be thrown during creation
+     * @throws InterruptedException
+     *           exception that can be thrown during creation
+     */
+    protected RecordWriter<Text, Text> createLineRecordWriter(
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      return textOutputFormat.getRecordWriter(context);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      lineRecordWriter.close(context);
+    }
+
+    /**
+     * Get the line record writer.
+     *
+     * @return Record writer to be used for writing.
+     */
+    public RecordWriter<Text, Text> getRecordWriter() {
+      return lineRecordWriter;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    public TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+   * Abstract class to be implemented by the user to write a line for each
+   * vertex.
+   */
+  protected abstract class TextVertexWriterToEachLine extends TextVertexWriter {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public final void writeVertex(Vertex vertex) throws
+      IOException, InterruptedException {
+      // Note we are writing line as key with null value
+      getRecordWriter().write(convertVertexToLine(vertex), null);
+    }
+
+    /**
+     * Writes a line for the given vertex.
+     *
+     * @param vertex
+     *          the current vertex for writing
+     * @return the text line to be written
+     * @throws IOException
+     *           exception that can be thrown while writing
+     */
+    protected abstract Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+      throws IOException;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
new file mode 100644
index 0000000..4e607c2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.io.VertexValueInputFormat;
+import org.apache.giraph.io.VertexValueReader;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Abstract class that users should subclass to use their own text based
+ * vertex value input format.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class TextVertexValueInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends VertexValueInputFormat<I, V, E, M> {
+  /** Uses the GiraphTextInputFormat to do everything */
+  protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    // Ignore the hint of numWorkers here since we are using
+    // GiraphTextInputFormat to do this for us
+    return textInputFormat.getVertexSplits(context);
+  }
+
+  @Override
+  public abstract TextVertexValueReader createVertexValueReader(
+      InputSplit split, TaskAttemptContext context) throws IOException;
+
+  /**
+   * {@link VertexValueReader} for {@link VertexValueInputFormat}.
+   */
+  protected abstract class TextVertexValueReader extends
+      VertexValueReader<I, V, E, M> {
+    /** Internal line record reader */
+    private RecordReader<LongWritable, Text> lineRecordReader;
+    /** Context passed to initialize */
+    private TaskAttemptContext context;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      super.initialize(inputSplit, context);
+      this.context = context;
+      lineRecordReader = createLineRecordReader(inputSplit, context);
+      lineRecordReader.initialize(inputSplit, context);
+    }
+
+    /**
+     * Create the line record reader. Override this to use a different
+     * underlying record reader (useful for testing).
+     *
+     * @param inputSplit
+     *          the split to read
+     * @param context
+     *          the context passed to initialize
+     * @return
+     *         the record reader to be used
+     * @throws IOException
+     *           exception that can be thrown during creation
+     * @throws InterruptedException
+     *           exception that can be thrown during creation
+     */
+    protected RecordReader<LongWritable, Text>
+    createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      return textInputFormat.createRecordReader(inputSplit, context);
+    }
+
+    @Override
+    public void close() throws IOException {
+      lineRecordReader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return lineRecordReader.getProgress();
+    }
+
+    /**
+     * Get the line record reader.
+     *
+     * @return Record reader to be used for reading.
+     */
+    protected RecordReader<LongWritable, Text> getRecordReader() {
+      return lineRecordReader;
+    }
+
+    /**
+     * Get the context.
+     *
+     * @return Context passed to initialize.
+     */
+    protected TaskAttemptContext getContext() {
+      return context;
+    }
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read a vertex value from
+   * each text line.
+   */
+  protected abstract class TextVertexValueReaderFromEachLine extends
+      TextVertexValueReader {
+    @Override
+    public final I getCurrentVertexId() throws IOException,
+        InterruptedException {
+      return getId(getRecordReader().getCurrentValue());
+    }
+
+    @Override
+    public final V getCurrentVertexValue() throws IOException,
+        InterruptedException {
+      return getValue(getRecordReader().getCurrentValue());
+    }
+
+    @Override
+    public final boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+
+    /**
+     * Reads vertex id from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the vertex id corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getId(Text line) throws IOException;
+
+    /**
+     * Reads vertex value from the current line.
+     *
+     * @param line
+     *          the current line
+     * @return
+     *         the vertex value corresponding to the line
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract V getValue(Text line) throws IOException;
+  }
+
+  /**
+   * Abstract class to be implemented by the user to read a vertex value from
+   * each text line after preprocessing it.
+   *
+   * @param <T>
+   *          The resulting type of preprocessing.
+   */
+  protected abstract class TextVertexValueReaderFromEachLineProcessed<T>
+      extends TextVertexValueReader {
+    /** Last preprocessed line. */
+    private T processedLine = null;
+
+    /** Get last preprocessed line. Generate it if missing.
+     *
+     * @return The last preprocessed line
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    private T getProcessedLine() throws IOException, InterruptedException {
+      if (processedLine == null) {
+        processedLine = preprocessLine(getRecordReader().getCurrentValue());
+      }
+      return processedLine;
+    }
+
+    @Override
+    public I getCurrentVertexId() throws IOException,
+        InterruptedException {
+      return getId(getProcessedLine());
+    }
+
+    @Override
+    public V getCurrentVertexValue() throws IOException,
+        InterruptedException {
+      return getValue(getProcessedLine());
+    }
+
+    @Override
+    public final boolean nextVertex() throws IOException, InterruptedException {
+      processedLine = null;
+      return getRecordReader().nextKeyValue();
+    }
+
+    /**
+     * Preprocess the line so other methods can easily read necessary
+     * information for creating vertex.
+     *
+     * @param line
+     *          the current line to be read
+     * @return
+     *         the preprocessed object
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract T preprocessLine(Text line) throws IOException;
+
+    /**
+     * Reads vertex id from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex id
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract I getId(T line) throws IOException;
+
+    /**
+     * Reads vertex value from the preprocessed line.
+     *
+     * @param line
+     *          the object obtained by preprocessing the line
+     * @return
+     *         the vertex value
+     * @throws IOException
+     *           exception that can be thrown while reading
+     */
+    protected abstract V getValue(T line) throws IOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/formats/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/package-info.java
new file mode 100644
index 0000000..27df034
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of reusable library Giraph objects.
+ */
+package org.apache.giraph.io.formats;

http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/io/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/package-info.java
index 3b0519a..fd631db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/package-info.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/package-info.java
@@ -16,6 +16,6 @@
  * limitations under the License.
  */
 /**
- * Package of reusable library Giraph objects.
+ * Input/Output related things.
  */
 package org.apache.giraph.io;


Mime
View raw message