giraph-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From José Luis Larroque <larroques...@gmail.com>
Subject Correct use of Writable interface - Vertex with Id text and a complex value as value (be worth of redundancy)
Date Wed, 18 Nov 2015 02:52:58 GMT
Hi, i'm Jose, and i'm doing a program that could read a text file in Giraph
like this:

Portada    1.0    Sugerencia    1.0
Sugerencia    1.0    Portada    1.0

It has this form:
VertexId VertexValue EdgeId EdgeValue

And all of those are separated for a TAB (/t).

For reading this, i implemented the following InputFormat:

package pruebas;
import java.io.IOException;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.io.formats.AdjacencyListTextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class IdTextWithComplexValueInputFormat
        extends
        AdjacencyListTextVertexInputFormat<Text,
TextAndDoubleComplexWritable, DoubleWritable> {

    /**
     * Utility for doing any cleaning of each line before it is tokenized.
     */
    public interface LineSanitizer {
        /**
         * Clean string s before attempting to tokenize it.
         *
         * @param s
         *            String to be cleaned.
         * @return Sanitized string.
         */
        String sanitize(String s);
    }

    @Override
    public TextComplexValueDoubleAdjacencyListVertexReader
createVertexReader(
            InputSplit inputSplit, TaskAttemptContext context) {
        return new TextComplexValueDoubleAdjacencyListVertexReader(null);
    }

    protected class TextComplexValueDoubleAdjacencyListVertexReader extends
            AdjacencyListTextVertexReader {

        /** Cached delimiter used for split */
        private String splitValue = null;

        /** Sanitizer from constructor. */
        private final LineSanitizer sanitizer;

        public TextComplexValueDoubleAdjacencyListVertexReader() {
            this(null);
        }

        public TextComplexValueDoubleAdjacencyListVertexReader(
                LineSanitizer sanitizer) {
            this.sanitizer = sanitizer;
        }

        @Override
        public void initialize(InputSplit inputSplit, TaskAttemptContext
context)
                throws IOException, InterruptedException {
            super.initialize(inputSplit, context);
            splitValue = getConf().get(LINE_TOKENIZE_VALUE,
                    LINE_TOKENIZE_VALUE_DEFAULT);
        }

        @Override
        protected String[] preprocessLine(Text line) throws IOException {
            String sanitizedLine;
            if (sanitizer != null) {
                sanitizedLine = sanitizer.sanitize(line.toString());
            } else {
                sanitizedLine = line.toString();
            }
            String[] values = sanitizedLine.split(splitValue);
            if ((values.length < 2) || (values.length % 2 != 0)) {
                throw new IllegalArgumentException(
                        "Line did not split correctly: " + line);
            }
            return values;
        }

        @Override
        public Text decodeId(String s) {
            return new Text(s);
        }

        @Override
        public TextAndDoubleComplexWritable decodeValue(String s) {
            TextAndDoubleComplexWritable valorComplejo = new
TextAndDoubleComplexWritable();
            valorComplejo.setVertexData(Double.valueOf(s));
            valorComplejo.setIds_vertices_anteriores("");
            return valorComplejo;
        }

        @Override
        public Edge<Text, DoubleWritable> decodeEdge(String s1, String s2) {
            return EdgeFactory.create(new Text(s1),
                    new DoubleWritable(Double.valueOf(s2)));
        }
    }

}


The objective of the previous class is have a Vertex with an Id of tipe
Text(the same that is on the file) and a complex value,
TextAndDoubleComplexWritable. It's implementation is the following:

package pruebas;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class TextAndDoubleComplexWritable implements Writable {

    private String idsVerticesAnteriores;

    private double vertexData;

    public TextAndDoubleComplexWritable() {
        super();
        this.idsVerticesAnteriores = "";
    }

    public TextAndDoubleComplexWritable(double vertexData) {
        super();
        this.vertexData = vertexData;
    }

    public TextAndDoubleComplexWritable(String ids_vertices_anteriores,
            double vertexData) {
        super();
        this.idsVerticesAnteriores = ids_vertices_anteriores;
        this.vertexData = vertexData;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(idsVerticesAnteriores);
    }

    public void readFields(DataInput in) throws IOException {
        idsVerticesAnteriores = in.readLine();
    }

    public String getIds_vertices_anteriores() {
        return idsVerticesAnteriores;
    }

    public void setIds_vertices_anteriores(String ids_vertices_anteriores) {
        this.idsVerticesAnteriores = ids_vertices_anteriores;
    }

    public double getVertexData() {
        return vertexData;
    }

    public void setVertexData(double vertexData) {
        this.vertexData = vertexData;
    }
}

I'm testing my giraph program internally, with InternalVertexRunner. I
didn't bring the class here too because this mail is huge already, but if
necessary i can do it, no problem

Both classes works perfect when the VertexReader is reading directly from
the file, but after that, giraph executes the method doRequest, and the
VertexIterator start his deathly iterations.

And here it's the problem, because the "readFields" method of my
TextAndDoubleComplexWritable
it's invocated, and i don't understand WHY? All vertex are already been
readed, why is readed again the information? this behavior generates the
following error:
java.io.IOException: ensureRemaining: Only 0 bytes remaining, trying to
read 1
<http://stackoverflow.com/questions/32468647/java-io-ioexception-ensureremaining-only-0-bytes-remaining-trying-to-read-1>

I probably don't understand well how a class that extends Writable must be
used and implemented, but all examples that i found on the internet are
confusing me a little, and i don't know very well what to do.

Any help will be apreciated!!

Thanks for reading!
Jose

Mime
View raw message