flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <s.bort...@gmail.com>
Subject Re: MongoOutputFormat does not write back to collection
Date Wed, 22 Jul 2015 14:48:09 GMT
In fact, on close() of the HadoopOutputFormat the fileOutputCommitter
returns false on if
(this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.

i    /**
     * commit the task by moving the output file out from the temporary
directory.
     * @throws java.io.IOException
     */
    @Override
    public void close() throws IOException {
        this.recordWriter.close(new HadoopDummyReporter());

        if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
            this.fileOutputCommitter.commitTask(this.context);
        }
    }


Also, both the close and the finalize global use a FileOutputCommitter, and
never the MongoOutputCommitter

@Override
    public void finalizeGlobal(int parallelism) throws IOException {

        try {
            JobContext jobContext =
HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
            FileOutputCommitter fileOutputCommitter = new
FileOutputCommitter();

            // finalize HDFS output format
            fileOutputCommitter.commitJob(jobContext);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

anyone can have a look into that?

saluti,
Stefano

2015-07-22 15:53 GMT+02:00 Stefano Bortoli <bortoli@okkam.it>:

> Debugging, it seem the commitTask method of the MongoOutputCommitter is
> never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4
> does not fit the task execution method of Flink?
>
> any idea? thanks a lot in advance.
>
> saluti,
> Stefano
>
> Stefano Bortoli, PhD
>
> *ENS Technical Director *_______________________________________________
> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>
> *Email:* bortoli@okkam.it
>
> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>
> *Headquarters:* Trento (Italy), Via Trener 8
> *Registered office:* Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you
> are not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.
>
> 2015-07-22 14:26 GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
>
>> Hi,
>>
>> I am trying to analyze and update a MongoDB collection with Apache Flink
>> 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>>
>> The process is fairly simple, and the MongoInputFormat works smoothly,
>> however it does not write back to the collection. The process works,
>> because the writeAsText works as expected. I am quite puzzled because
>> debugging I can see it writes in some temporary directory.
>>
>> The mapred.output.uri seem to serve just to output a file named
>> _SUCCESS, and if I do not set it fails with
>> java.lang.IllegalArgumentException: Can not create a Path from a null
>> string
>>     at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>>     at org.apache.hadoop.fs.Path.<init>(Path.java:135)
>>     at
>> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
>>     at
>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> Anyone experienced something similar? any hints where to look at? Thanks
>> a lot in advance!
>>
>> saluti,
>> Stefano
>>
>> ====================================================
>> Configuration conf = new Configuration();
>> conf.set("mapred.output.dir", "/tmp/");
>>         conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
>>                 collectionsUri);
>>         conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
>>                 collectionsUri);
>>
>>         Job job = Job.getInstance(conf);
>>
>>         // create a MongodbInputFormat, using a Hadoop input format
>> wrapper
>>         InputFormat<Object, BSONObject>  mapreduceInputFormat =  new
>> MyMongoInputFormat<Object, BSONObject>();
>>         HadoopInputFormat<Object, BSONObject> hdIf = new
>> HadoopInputFormat<Object, BSONObject>(
>>                 mapreduceInputFormat, Object.class, BSONObject.class,
>>                 job);
>> DataSet<Tuple2<Text, BSONWritable>> fin = input
>>                 .flatMap(new myFlatMapFunction()).setParallelism(16);
>>
>>         MongoConfigUtil.setOutputURI(job.getConfiguration(),
>> collectionsUri);
>>
>>         fin.output(new HadoopOutputFormat<Text, BSONWritable>(
>>                 new MongoOutputFormat<Text, BSONWritable>(),
>>                 job));
>> //        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);
>>
>>
>

Mime
View raw message