flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <bort...@okkam.it>
Subject Re: MongoOutputFormat does not write back to collection
Date Wed, 22 Jul 2015 13:53:52 GMT
Debugging, it seem the commitTask method of the MongoOutputCommitter is
never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4
does not fit the task execution method of Flink?

any idea? thanks a lot in advance.

saluti,
Stefano

Stefano Bortoli, PhD

*ENS Technical Director *_______________________________________________
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Email:* bortoli@okkam.it

*Phone nr: +39 0461 1823912 *

*Headquarters:* Trento (Italy), Via Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

2015-07-22 14:26 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:

> Hi,
>
> I am trying to analyze and update a MongoDB collection with Apache Flink
> 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>
> The process is fairly simple, and the MongoInputFormat works smoothly,
> however it does not write back to the collection. The process works,
> because the writeAsText works as expected. I am quite puzzled because
> debugging I can see it writes in some temporary directory.
>
> The mapred.output.uri seem to serve just to output a file named  _SUCCESS,
> and if I do not set it fails with
> java.lang.IllegalArgumentException: Can not create a Path from a null
> string
>     at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>     at org.apache.hadoop.fs.Path.<init>(Path.java:135)
>     at
> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
>     at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)
>
> Anyone experienced something similar? any hints where to look at? Thanks a
> lot in advance!
>
> saluti,
> Stefano
>
> ====================================================
> Configuration conf = new Configuration();
> conf.set("mapred.output.dir", "/tmp/");
>         conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
>                 collectionsUri);
>         conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
>                 collectionsUri);
>
>         Job job = Job.getInstance(conf);
>
>         // create a MongodbInputFormat, using a Hadoop input format wrapper
>         InputFormat<Object, BSONObject>  mapreduceInputFormat =  new
> MyMongoInputFormat<Object, BSONObject>();
>         HadoopInputFormat<Object, BSONObject> hdIf = new
> HadoopInputFormat<Object, BSONObject>(
>                 mapreduceInputFormat, Object.class, BSONObject.class,
>                 job);
> DataSet<Tuple2<Text, BSONWritable>> fin = input
>                 .flatMap(new myFlatMapFunction()).setParallelism(16);
>
>         MongoConfigUtil.setOutputURI(job.getConfiguration(),
> collectionsUri);
>
>         fin.output(new HadoopOutputFormat<Text, BSONWritable>(
>                 new MongoOutputFormat<Text, BSONWritable>(),
>                 job));
> //        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);
>
>

Mime
View raw message