giraph-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From José Luis Larroque <larroques...@gmail.com>
Subject Re: java.io.IOException: ensureRemaining: Only * bytes remaining, trying to read *
Date Sun, 13 Sep 2015 02:50:38 GMT
Solved. The problem here was difficult to understand.

My data was invalid. In my giraph configuration, if a vertex had an
outgoing edge to a vertex, this vertex must exist. If it doesn´t, it will
try to keep reading data until it find it, or until reach EOF, but for some
reason Giraph doesn´t stop there, and keep reading until it gives that damn
exception.

The solution for this is using data with none missing vertex´s

Pd: maybe Giraph can configurated for letting this situation happen, i
really not sure about this, but right know, after losing a lot of time on
this, will go for the easy way ;)

2015-09-08 20:01 GMT-03:00 José Luis Larroque <larroquester@gmail.com>:

> Hi guys, i'm having some problems with custom classes in giraph. I made a
> VertexInput and Output format, but i always getting the following error:
>
> *java.io.IOException: ensureRemaining: Only * bytes remaining, trying to
> read **
>
> with different values where the "*" are placed. This was This problem
> happen when a vertexIterator do next(), and there aren't any more vertex
> left. This iterator it's invocated from a flush method, but i don't
> understand, basically, why the  "next()"  method is failing. Here are some
> logs and classes...
>
> *My log is the following*:
>
> 15/09/08 00:52:21 INFO bsp.BspService: BspService: Connecting to ZooKeeper
> with job giraph_yarn_application_1441683854213_0001, 1 on localhost:22181
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client environment:host.name
> =localhost
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:java.version=1.7.0_79
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:java.vendor=Oracle Corporation
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:java.class.path=.:${CLASSPATH}:./*:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/share/hadoop/common/*:$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:$HADOOP_H$
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/l$
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:java.io.tmpdir=/tmp
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:java.compiler=<NA>
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client environment:os.name
> =Linux
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:os.arch=amd64
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:os.version=3.13.0-62-generic
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client environment:user.name
> =hduser
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:user.home=/home/hduser
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Client
> environment:user.dir=/app/hadoop/tmp/nm-local-dir/usercache/hduser/appcache/application_1441683854213_0001/container_1441683854213_0001_01_000003
> 15/09/08 00:52:21 INFO zookeeper.ZooKeeper: Initiating client connection,
> connectString=localhost:22181 sessionTimeout=60000
> watcher=org.apache.giraph.worker.BspServiceWorker@4256d3a0
> 15/09/08 00:52:21 INFO zookeeper.ClientCnxn: Opening socket connection to
> server localhost/127.0.0.1:22181. Will not attempt to authenticate using
> SASL (unknown error)
> 15/09/08 00:52:21 INFO zookeeper.ClientCnxn: Socket connection established
> to localhost/127.0.0.1:22181, initiating session
> 15/09/08 00:52:21 INFO zookeeper.ClientCnxn: Session establishment
> complete on server localhost/127.0.0.1:22181, sessionid =
> 0x14fab0de0bb0002, negotiated timeout = 40000
> 15/09/08 00:52:21 INFO bsp.BspService: process: Asynchronous connection
> complete.
> 15/09/08 00:52:21 INFO netty.NettyServer: NettyServer: Using execution
> group with 8 threads for requestFrameDecoder.
> 15/09/08 00:52:21 INFO Configuration.deprecation: mapred.map.tasks is
> deprecated. Instead, use mapreduce.job.maps
> 15/09/08 00:52:21 INFO netty.NettyServer: start: Started server
> communication server: localhost/127.0.0.1:30001 with up to 16 threads on
> bind attempt 0 with sendBufferSize = 32768 receiveBufferSize = 524288
> 15/09/08 00:52:21 INFO netty.NettyClient: NettyClient: Using execution
> handler with 8 threads after request-encoder.
> 15/09/08 00:52:21 INFO graph.GraphTaskManager: setup: Registering health
> of this worker...
> 15/09/08 00:52:21 INFO yarn.GiraphYarnTask: [STATUS: task-1] WORKER_ONLY
> starting...
> 15/09/08 00:52:22 INFO bsp.BspService: getJobState: Job state already
> exists
> (/_hadoopBsp/giraph_yarn_application_1441683854213_0001/_masterJobState)
> 15/09/08 00:52:22 INFO bsp.BspService: getApplicationAttempt: Node
> /_hadoopBsp/giraph_yarn_application_1441683854213_0001/_applicationAttemptsDir
> already exists!
> 15/09/08 00:52:22 INFO bsp.BspService: getApplicationAttempt: Node
> /_hadoopBsp/giraph_yarn_application_1441683854213_0001/_applicationAttemptsDir
> already exists!
> 15/09/08 00:52:22 INFO worker.BspServiceWorker: registerHealth: Created my
> health node for attempt=0, superstep=-1 with
> /_hadoopBsp/giraph_yarn_application_1441683854213_0001/_applicationAttemptsDir/0/_superstepD$
> 15/09/08 00:52:22 INFO netty.NettyServer: start: Using Netty without
> authentication.
> 15/09/08 00:52:22 INFO bsp.BspService: process:
> partitionAssignmentsReadyChanged (partitions are assigned)
> 15/09/08 00:52:22 INFO worker.BspServiceWorker: startSuperstep:
> Master(hostname=localhost, MRtaskID=0, port=30000)
> 15/09/08 00:52:22 INFO worker.BspServiceWorker: startSuperstep: Ready for
> computation on superstep -1 since worker selection and vertex range
> assignments are done in /_hadoopBsp/giraph_yarn_application_1441683854$
> 15/09/08 00:52:22 INFO yarn.GiraphYarnTask: [STATUS: task-1]
> startSuperstep: WORKER_ONLY - Attempt=0, Superstep=-1
> 15/09/08 00:52:22 INFO netty.NettyClient: Using Netty without
> authentication.
> 15/09/08 00:52:22 INFO netty.NettyClient: Using Netty without
> authentication.
> 15/09/08 00:52:22 INFO netty.NettyClient: connectAllAddresses:
> Successfully added 2 connections, (2 total connected) 0 failed, 0 failures
> total.
> 15/09/08 00:52:22 INFO netty.NettyServer: start: Using Netty without
> authentication.
> 15/09/08 00:52:22 INFO handler.RequestDecoder: decode: Server window
> metrics MBytes/sec received = 0, MBytesReceived = 0.0001, ave received req
> MBytes = 0.0001, secs waited = 1.44168435E9
> 15/09/08 00:52:22 INFO worker.BspServiceWorker: loadInputSplits: Using 1
> thread(s), originally 1 threads(s) for 1 total splits.
> 15/09/08 00:52:22 INFO worker.InputSplitsHandler: reserveInputSplit:
> Reserved input split path
> /_hadoopBsp/giraph_yarn_application_1441683854213_0001/_vertexInputSplitDir/0,
> overall roughly 0.0% input splits rese$
> 15/09/08 00:52:22 INFO worker.InputSplitsCallable: getInputSplit: Reserved
> /_hadoopBsp/giraph_yarn_application_1441683854213_0001/_vertexInputSplitDir/0
> from ZooKeeper and got input split 'hdfs://hdnode01:54310/u$
> 15/09/08 00:52:22 INFO worker.InputSplitsCallable: loadFromInputSplit:
> Finished loading
> /_hadoopBsp/giraph_yarn_application_1441683854213_0001/_vertexInputSplitDir/0
> (v=6, e=10)
> 15/09/08 00:52:22 INFO worker.InputSplitsCallable: call: Loaded 1 input
> splits in 0.16241108 secs, (v=6, e=10) 36.94329 vertices/sec, 61.572155
> edges/sec
> 15/09/08 00:52:22 ERROR utils.LogStacktraceCallable: Execution of callable
> failed
>
> java.lang.IllegalStateException: next: IOException
>         at
> org.apache.giraph.utils.VertexIterator.next(VertexIterator.java:101)
>         at
> org.apache.giraph.partition.BasicPartition.addPartitionVertices(BasicPartition.java:99)
>         at
> org.apache.giraph.comm.requests.SendWorkerVerticesRequest.doRequest(SendWorkerVerticesRequest.java:115)
>         at
> org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.doRequest(NettyWorkerClientRequestProcessor.java:466)
>         at
> org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.flush(NettyWorkerClientRequestProcessor.java:412)
>         at
> org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:241)
>         at
> org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:60)
>         at
> org.apache.giraph.utils.LogStacktraceCallable.call(LogStacktraceCallable.java:51)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: ensureRemaining: Only 0 bytes remaining,
> trying to read 1
>         at
> org.apache.giraph.utils.UnsafeReads.ensureRemaining(UnsafeReads.java:77)
>         at
> org.apache.giraph.utils.UnsafeArrayReads.readByte(UnsafeArrayReads.java:123)
>         at
> org.apache.giraph.utils.UnsafeReads.readLine(UnsafeReads.java:100)
>         at
> pruebas.TextAndDoubleComplexWritable.readFields(TextAndDoubleComplexWritable.java:37)
>         at
> org.apache.giraph.utils.WritableUtils.reinitializeVertexFromDataInput(WritableUtils.java:540)
>         at
> org.apache.giraph.utils.VertexIterator.next(VertexIterator.java:98)
>         ... 11 more
> 15/09/08 00:52:22 ERROR worker.BspServiceWorker: unregisterHealth: Got
> failure, unregistering health on
> /_hadoopBsp/giraph_yarn_application_1441683854213_0001/_applicationAttemptsDir/0/_superstepDir/-1/_workerHea$
> 15/09/08 00:52:22 ERROR yarn.GiraphYarnTask: GiraphYarnTask threw a
> top-level exception, failing task
> java.lang.RuntimeException: run: Caught an unrecoverable exception
> waitFor: ExecutionException occurred while waiting for
> org.apache.giraph.utils.ProgressableUtils$FutureWaitable@4bbf48f0
>         at
> org.apache.giraph.yarn.GiraphYarnTask.run(GiraphYarnTask.java:104)
>         at
> org.apache.giraph.yarn.GiraphYarnTask.main(GiraphYarnTask.java:183)
> Caused by: java.lang.IllegalStateException: waitFor: ExecutionException
> occurred while waiting for
> org.apache.giraph.utils.ProgressableUtils$FutureWaitable@4bbf48f0
>         at
> org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:193)
>         at
> org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:151)
>         at
> org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:136)
>         at
> org.apache.giraph.utils.ProgressableUtils.getFutureResult(ProgressableUtils.java:99)
>         at
> org.apache.giraph.utils.ProgressableUtils.getResultsWithNCallables(ProgressableUtils.java:233)
>         at
> org.apache.giraph.worker.BspServiceWorker.loadInputSplits(BspServiceWorker.java:316)
>         at
> org.apache.giraph.worker.BspServiceWorker.loadVertices(BspServiceWorker.java:409)
>         at
> org.apache.giraph.worker.BspServiceWorker.setup(BspServiceWorker.java:629)
>         at
> org.apache.giraph.graph.GraphTaskManager.execute(GraphTaskManager.java:284)
>         at
> org.apache.giraph.yarn.GiraphYarnTask.run(GiraphYarnTask.java:92)
>         ... 1 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalStateException: next: IOException
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:202)
>         at
> org.apache.giraph.utils.ProgressableUtils$FutureWaitable.waitFor(ProgressableUtils.java:312)
>         at
> org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:185)
>         ... 10 more
> Caused by: java.lang.IllegalStateException: next: IOException
>         at
> org.apache.giraph.utils.VertexIterator.next(VertexIterator.java:101)
>         at
> org.apache.giraph.partition.BasicPartition.addPartitionVertices(BasicPartition.java:99)
>         at
> org.apache.giraph.comm.requests.SendWorkerVerticesRequest.doRequest(SendWorkerVerticesRequest.java:115)
>         at
> org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.doRequest(NettyWorkerClientRequestProcessor.java:466)
>         at
> org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor.flush(NettyWorkerClientRequestProcessor.java:412)
>         at
> org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:241)
>         at
> org.apache.giraph.worker.InputSplitsCallable.call(InputSplitsCallable.java:60)
>         at
> org.apache.giraph.utils.LogStacktraceCallable.call(LogStacktraceCallable.java:51)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> *Caused by: java.io.IOException: ensureRemaining: Only 0 bytes remaining,
> trying to read 1*
>         at
> org.apache.giraph.utils.UnsafeReads.ensureRemaining(UnsafeReads.java:77)
>         at
> org.apache.giraph.utils.UnsafeArrayReads.readByte(UnsafeArrayReads.java:123)
>         at
> org.apache.giraph.utils.UnsafeReads.readLine(UnsafeReads.java:100)
>         at
> pruebas.TextAndDoubleComplexWritable.readFields(TextAndDoubleComplexWritable.java:37)
>         at
> org.apache.giraph.utils.WritableUtils.reinitializeVertexFromDataInput(WritableUtils.java:540)
>         at
> org.apache.giraph.utils.VertexIterator.next(VertexIterator.java:98)
>         ... 11 more
>
>
>
> *My computation class:*
> import java.io.IOException;
>
> import org.apache.giraph.conf.StrConfOption;
> import org.apache.giraph.edge.Edge;
> import org.apache.giraph.examples.Algorithm;
> import org.apache.giraph.graph.BasicComputation;
> import org.apache.giraph.graph.Vertex;
> import org.apache.hadoop.io.DoubleWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.log4j.Logger;
>
> import pruebas.TextAndDoubleComplexWritable;
>
> /**
>  * Busca todos los caminos navegacionales. Adaptacion de
>  * https://github.com/MarcoLotz
>  * /GiraphBFSSO/blob/master/src/uk/co/qmul/giraph/structurebfs
>  * /SimpleBFSStructureComputation.java de @MarcoLotz, a los datos de
> wikiquotes
>  * generados por el proyecto generacion_de_grafo_wikiquotes
>  */
> @Algorithm(name = "Busqueda de Caminos Navegacionales", description =
> "Busca todos los caminos navegacionales de un grafo")
> public class BusquedaDeCaminosNavegacionalesWikiquote
>         extends
>         BasicComputation<Text, TextAndDoubleComplexWritable,
> DoubleWritable, Text> {
>     /**
>      * Id origen de la busqueda de caminos navegacionales, indica el primer
>      * vertice a procesar en el superstep 0
>      */
>     public static final StrConfOption SOURCE_ID = new StrConfOption(
>             "BusquedaDeCaminosNavegacionales.sourceId", "Portada",
>             "El vertice de origen, de la busqueda de todos los caminos
> navegacionales");
>     // public static final LongConfOption DEST_ID = new LongConfOption(
>     // "BusquedaDeCaminosNavegacionales.destId", 2,
>     // "El vertice de destino, de la busqueda de todos los caminos
> navegacionales");
>     /** Class logger */
>     private static final Logger LOG = Logger
>             .getLogger(BusquedaDeCaminosNavegacionalesWikiquote.class);
>
>     /**
>      * Define a maximum number of supersteps
>      */
>     public final int MAX_SUPERSTEPS = 5;
>
>     /**
>      * Is this vertex the source id?
>      *
>      * @param vertex
>      *            Vertex
>      * @return True if the source id
>      */
>     private boolean isStart(Vertex<Text, ?, ?> vertex) {
>         return vertex.getId().toString()
>                 .equals(SOURCE_ID.get(getConf()).toString());
>     }
>
>     /**
>      * Send messages to all the connected vertices. The content of the
> messages
>      * is not important, since just the event of receiving a message
> removes the
>      * vertex from the inactive status.
>      *
>      * @param vertex
>      */
>     public void BFSMessages(
>             Vertex<Text, TextAndDoubleComplexWritable, DoubleWritable>
> vertex) {
>         for (Edge<Text, DoubleWritable> edge : vertex.getEdges()) {
>             sendMessage(edge.getTargetVertexId(), vertex.getId());
>         }
>     }
>
>     @Override
>     public void compute(
>             Vertex<Text, TextAndDoubleComplexWritable, DoubleWritable>
> vertex,
>             Iterable<Text> messages) throws IOException {
>
>         // Forces convergence in maximum superstep
>         if (!(getSuperstep() == MAX_SUPERSTEPS)) {
>             // Only start vertex should work in the first superstep
>             // All the other should vote to halt and wait for
>             // messages.
>
>             // Si se corre desde JUnit, el primer superstep es el 1, pero
> al
>             // invocar desde la linea de comandos es 0
>             if (getSuperstep() == 0) {
>                 if (isStart(vertex)) {
>                     vertex.getValue().setVertexData(new
> Double(getSuperstep()));
>                     BFSMessages(vertex);
>                     if (LOG.isInfoEnabled()) {
>                         LOG.info("[Start Vertex] Vertex ID: " +
> vertex.getId());
>                     }
>                 } else { // Initialise with infinite depth other vertex
>                     vertex.getValue().setVertexData(
>                             new Double(Integer.MAX_VALUE));
>                 }
>             }
>
>             // if it is not the first Superstep (Superstep 0) :
>             // Check vertex ID
>
>             else {
>                 // It is the first time that this vertex is being computed
>                 if (vertex.getValue().getVertexData() ==
> Integer.MAX_VALUE) {
>                     // The depth has the same value that the superstep
>                     vertex.getValue().setVertexData(new
> Double(getSuperstep()));
>
>                     String idsDeVerticesPredecesores = "";
>                     for (Text message : messages) {
>                         idsDeVerticesPredecesores += message.toString();
>                     }
>                     vertex.getValue().setIds_vertices_anteriores(
>                             idsDeVerticesPredecesores);
>
>                     // Continue on the structure
>                     BFSMessages(vertex);
>                 }
>                 // Else this vertex was already analysed in a previous
>                 // iteration.
>             }
>             vertex.voteToHalt();
>         }
>     }
> }
>
> My
>
> *Input format:*package pruebas;
>
> import org.apache.giraph.edge.Edge;
> import org.apache.giraph.edge.EdgeFactory;
> import org.apache.giraph.io.formats.AdjacencyListTextVertexInputFormat;
> import org.apache.hadoop.io.DoubleWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.InputSplit;
> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>
> /**
>  * @author hduser
>  *
>  */
> public class IdTextWithComplexValueInputFormat
>         extends
>         AdjacencyListTextVertexInputFormat<Text,
> TextAndDoubleComplexWritable, DoubleWritable> {
>
>     @Override
>     public AdjacencyListTextVertexReader createVertexReader(InputSplit
> split,
>             TaskAttemptContext context) {
>         return new TextComplexValueDoubleAdjacencyListVertexReader();
>     }
>
>     protected class TextComplexValueDoubleAdjacencyListVertexReader extends
>             AdjacencyListTextVertexReader {
>
>         /**
>          * Constructor with
>          * {@link AdjacencyListTextVertexInputFormat.LineSanitizer}.
>          *
>          * @param lineSanitizer
>          *            the sanitizer to use for reading
>          */
>         public TextComplexValueDoubleAdjacencyListVertexReader() {
>             super();
>         }
>
>         @Override
>         public Text decodeId(String s) {
>             return new Text(s);
>         }
>
>         @Override
>         public TextAndDoubleComplexWritable decodeValue(String s) {
>             TextAndDoubleComplexWritable valorComplejo = new
> TextAndDoubleComplexWritable();
>             valorComplejo.setVertexData(Double.valueOf(s));
>             valorComplejo.setIds_vertices_anteriores("");
>             return valorComplejo;
>         }
>
>         @Override
>         public Edge<Text, DoubleWritable> decodeEdge(String s1, String s2)
> {
>             return EdgeFactory.create(new Text(s1),
>                     new DoubleWritable(Double.valueOf(s2)));
>         }
>     }
>
> }
>
> My *Output format:*
> package pruebas;
>
> import java.io.IOException;
>
> import org.apache.giraph.graph.Vertex;
> import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
> import org.apache.giraph.io.formats.TextVertexOutputFormat;
> import org.apache.hadoop.io.DoubleWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>
> /**
>  * @author hduser
>  *
>  */
> public class IdTextWithComplexValueOutputFormat
>         extends
>         TextVertexOutputFormat<Text, TextAndDoubleComplexWritable,
> DoubleWritable> {
>
>     /** Specify the output delimiter */
>     public static final String LINE_TOKENIZE_VALUE = "output.delimiter";
>     /** Default output delimiter */
>     public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
>     /** Reverse id and value order? */
>     public static final String REVERSE_ID_AND_VALUE =
> "reverse.id.and.value";
>     /** Default is to not reverse id and value order. */
>     public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false;
>
>     @Override
>     public TextVertexOutputFormat<Text, TextAndDoubleComplexWritable,
> DoubleWritable>.TextVertexWriter createVertexWriter(
>             TaskAttemptContext context) throws IOException,
>             InterruptedException {
>
>         return new IdWithValueVertexWriter();
>     }
>
>     /**
>      * Vertex writer used with {@link IdWithValueTextOutputFormat}.
>      */
>     protected class IdWithValueVertexWriter extends
> TextVertexWriterToEachLine {
>         /** Saved delimiter */
>         private String delimiter;
>         /** Cached reserve option */
>         private boolean reverseOutput;
>
>         @Override
>         public void initialize(TaskAttemptContext context) throws
> IOException,
>                 InterruptedException {
>             super.initialize(context);
>             delimiter = getConf().get(LINE_TOKENIZE_VALUE,
>                     LINE_TOKENIZE_VALUE_DEFAULT);
>             reverseOutput = getConf().getBoolean(REVERSE_ID_AND_VALUE,
>                     REVERSE_ID_AND_VALUE_DEFAULT);
>         }
>
>         protected Text convertVertexToLine(
>                 Vertex<Text, TextAndDoubleComplexWritable, DoubleWritable>
> vertex)
>                 throws IOException {
>             StringBuilder str = new StringBuilder();
>             if (reverseOutput) {
>                 imprimirCaminosNavegacionales(vertex, str);
>                 str.append(delimiter);
>                 str.append(vertex.getId().toString());
>             } else {
>                 str.append(vertex.getId().toString());
>                 str.append(delimiter);
>                 imprimirCaminosNavegacionales(vertex, str);
>             }
>             return new Text(str.toString());
>         }
>
>         private void imprimirCaminosNavegacionales(
>                 Vertex<Text, TextAndDoubleComplexWritable, DoubleWritable>
> vertex,
>                 StringBuilder str) {
>             str.append(vertex.getId());
>             str.append(LINE_TOKENIZE_VALUE_DEFAULT);
>             for (String idVerticeAnterior : vertex.getValue()
>                     .getIds_vertices_anteriores()
>                     .split(LINE_TOKENIZE_VALUE_DEFAULT)) {
>                 str.append(idVerticeAnterior + "/" + vertex.getId());
>                 str.append(LINE_TOKENIZE_VALUE_DEFAULT);
>             }
>             str.append(vertex.getValue().toString());
>         }
>     }
> }
>
> *My input file:*
> Portada    0.0    Sugerencias    1.0
> Proverbios    0.0
> Neil    0.0    Luna    1.0    ideal    1.0    verdad    1.0
> Categoria:Ingenieros    2.0    Categoria:Estadounidenses    2.0
> Categoria:Astronautas    2.0
> Categoria:Ingenieros    1.0    Neil    2.0
> Categoria:Estadounidenses    1.0    Neil    2.0
> Categoria:Astronautas    1.0    Neil    2.0
>
> and i execute it with this *command*:
>
> $HADOOP_HOME/bin/yarn jar
> $GIRAPH_HOME/giraph-examples/target/giraph-examples-1.1.0-for-hadoop-2.4.0-jar-with-dependencies.jar
> org.apache.giraph.GiraphRunner
> lectura_de_grafo.BusquedaDeCaminosNavegacionalesWikiquote -vif
> pruebas.IdTextWithComplexValueInputFormat -vip
> /user/hduser/input/wiki-graph-chiquito.txt -vof
> pruebas.IdTextWithComplexValueOutputFormat -op
> /user/hduser/output/caminosNavegacionales -w 2 -yh 250
>
>

Mime
View raw message