flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <s.bort...@gmail.com>
Subject MongoOutputFormat does not write back to collection
Date Wed, 22 Jul 2015 12:26:27 GMT
Hi,

I am trying to analyze and update a MongoDB collection with Apache Flink
0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The process is fairly simple, and the MongoInputFormat works smoothly,
however it does not write back to the collection. The process works,
because the writeAsText works as expected. I am quite puzzled because
debugging I can see it writes in some temporary directory.

The mapred.output.uri seem to serve just to output a file named  _SUCCESS,
and if I do not set it fails with
java.lang.IllegalArgumentException: Can not create a Path from a null string
    at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
    at org.apache.hadoop.fs.Path.<init>(Path.java:135)
    at
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
    at
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

Anyone experienced something similar? any hints where to look at? Thanks a
lot in advance!

saluti,
Stefano

====================================================
Configuration conf = new Configuration();
conf.set("mapred.output.dir", "/tmp/");
        conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
                collectionsUri);
        conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
                collectionsUri);

        Job job = Job.getInstance(conf);

        // create a MongodbInputFormat, using a Hadoop input format wrapper
        InputFormat<Object, BSONObject>  mapreduceInputFormat =  new
MyMongoInputFormat<Object, BSONObject>();
        HadoopInputFormat<Object, BSONObject> hdIf = new
HadoopInputFormat<Object, BSONObject>(
                mapreduceInputFormat, Object.class, BSONObject.class,
                job);
DataSet<Tuple2<Text, BSONWritable>> fin = input
                .flatMap(new myFlatMapFunction()).setParallelism(16);

        MongoConfigUtil.setOutputURI(job.getConfiguration(),
collectionsUri);

        fin.output(new HadoopOutputFormat<Text, BSONWritable>(
                new MongoOutputFormat<Text, BSONWritable>(),
                job));
//        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);

Mime
View raw message