Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9C3511839A for ; Thu, 23 Jul 2015 11:31:16 +0000 (UTC) Received: (qmail 59919 invoked by uid 500); 23 Jul 2015 11:31:16 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 59849 invoked by uid 500); 23 Jul 2015 11:31:16 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 59839 invoked by uid 99); 23 Jul 2015 11:31:16 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jul 2015 11:31:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 07000C0708 for ; Thu, 23 Jul 2015 11:31:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.982 X-Spam-Level: ** X-Spam-Status: No, score=2.982 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id kOLgAUJZtv25 for ; Thu, 23 Jul 2015 11:31:06 +0000 (UTC) Received: from mail-vn0-f52.google.com (mail-vn0-f52.google.com [209.85.216.52]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 5CBA643DDA for ; Thu, 23 Jul 2015 11:31:06 +0000 (UTC) Received: by vnds125 with SMTP id s125so59129974vnd.1 for ; Thu, 23 Jul 2015 04:31:05 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=ahyUy4ysDRa07mi7zNCEq2hXQfq73hu1+rzD8UIEZ9k=; b=Tj6oz6RicHQyJFVeAyCRg27AvhIK5hNfSI97bsTbrKtx/N0k03CXPd2k/chLPDAN1G prbBlDtV6W1TLuOMvCEMfRozCwGDZBGqP3IVxaQVXCvmTEZmyRTLN7iHJyRLQ4WhKEe+ 1UqVlYdG8LV2w3pcq3v0ogWX6/TDqgxxzUwEhLsYFEuYIjAXGcEH5Zb+9JfVNm5Beh7g 22O+t8tK/7kU9/PtDDRRy2ufZxwqvLE0V+pe+XpkF4E2FFEi4axG109UpCm1ji+QA+2T BiTzOu3VgdOVU9mnaK6pe5fllHshPK4oTpA7f4wdW68uYdOQIcLFBrX8P0nX5qRrOqBP IspA== MIME-Version: 1.0 X-Received: by 10.52.14.7 with SMTP id l7mr9005434vdc.82.1437651065779; Thu, 23 Jul 2015 04:31:05 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.31.128.19 with HTTP; Thu, 23 Jul 2015 04:31:05 -0700 (PDT) In-Reply-To: References: Date: Thu, 23 Jul 2015 13:31:05 +0200 X-Google-Sender-Auth: 01l38yIYVAAhgta0FDoDPNXEMCE Message-ID: Subject: Re: MongoOutputFormat does not write back to collection From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=20cf30334c17163e20051b89396e --20cf30334c17163e20051b89396e Content-Type: text/plain; charset=UTF-8 Does this make the MongoHadoopOutputFormat work for you? On Thu, Jul 23, 2015 at 12:44 PM, Stefano Bortoli wrote: > https://issues.apache.org/jira/browse/FLINK-2394?filter=-2 > > Meanwhile, I have implemented the MongoHadoopOutputFormat overriding open, > close and globalFinalize methods. > > saluti, > Stefano > > 2015-07-22 17:11 GMT+02:00 Stephan Ewen : > >> Thank's for reporting this, Stefano! >> >> Seems like the HadoopOutputFormat wrapper is pretty much specialized on >> File Output Formats. >> >> Can you open an issue for that? Someone will need to look into this... >> >> On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli >> wrote: >> >>> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter >>> returns false on if >>> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false. >>> >>> i /** >>> * commit the task by moving the output file out from the temporary >>> directory. >>> * @throws java.io.IOException >>> */ >>> @Override >>> public void close() throws IOException { >>> this.recordWriter.close(new HadoopDummyReporter()); >>> >>> if (this.fileOutputCommitter.needsTaskCommit(this.context)) { >>> this.fileOutputCommitter.commitTask(this.context); >>> } >>> } >>> >>> >>> Also, both the close and the finalize global use a FileOutputCommitter, >>> and never the MongoOutputCommitter >>> >>> @Override >>> public void finalizeGlobal(int parallelism) throws IOException { >>> >>> try { >>> JobContext jobContext = >>> HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); >>> FileOutputCommitter fileOutputCommitter = new >>> FileOutputCommitter(); >>> >>> // finalize HDFS output format >>> fileOutputCommitter.commitJob(jobContext); >>> } catch (Exception e) { >>> throw new RuntimeException(e); >>> } >>> } >>> >>> anyone can have a look into that? >>> >>> saluti, >>> Stefano >>> >>> 2015-07-22 15:53 GMT+02:00 Stefano Bortoli : >>> >>>> Debugging, it seem the commitTask method of the MongoOutputCommitter is >>>> never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 >>>> does not fit the task execution method of Flink? >>>> >>>> any idea? thanks a lot in advance. >>>> >>>> saluti, >>>> Stefano >>>> >>>> Stefano Bortoli, PhD >>>> >>>> *ENS Technical Director * >>>> _______________________________________________ >>>> *OKKAM**Srl **- www.okkam.it * >>>> >>>> *Email:* bortoli@okkam.it >>>> >>>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> * >>>> >>>> *Headquarters:* Trento (Italy), Via Trener 8 >>>> *Registered office:* Trento (Italy), via Segantini 23 >>>> >>>> Confidentially notice. This e-mail transmission may contain legally >>>> privileged and/or confidential information. Please do not read it if you >>>> are not the intended recipient(S). Any use, distribution, reproduction or >>>> disclosure by any other person is strictly prohibited. If you have received >>>> this e-mail in error, please notify the sender and destroy the original >>>> transmission and its attachments without reading or saving it in any manner. >>>> >>>> 2015-07-22 14:26 GMT+02:00 Stefano Bortoli : >>>> >>>>> Hi, >>>>> >>>>> I am trying to analyze and update a MongoDB collection with Apache >>>>> Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0. >>>>> >>>>> The process is fairly simple, and the MongoInputFormat works smoothly, >>>>> however it does not write back to the collection. The process works, >>>>> because the writeAsText works as expected. I am quite puzzled because >>>>> debugging I can see it writes in some temporary directory. >>>>> >>>>> The mapred.output.uri seem to serve just to output a file named >>>>> _SUCCESS, and if I do not set it fails with >>>>> java.lang.IllegalArgumentException: Can not create a Path from a null >>>>> string >>>>> at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) >>>>> at org.apache.hadoop.fs.Path.(Path.java:135) >>>>> at >>>>> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108) >>>>> at >>>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Anyone experienced something similar? any hints where to look at? >>>>> Thanks a lot in advance! >>>>> >>>>> saluti, >>>>> Stefano >>>>> >>>>> ==================================================== >>>>> Configuration conf = new Configuration(); >>>>> conf.set("mapred.output.dir", "/tmp/"); >>>>> conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY, >>>>> collectionsUri); >>>>> conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY, >>>>> collectionsUri); >>>>> >>>>> Job job = Job.getInstance(conf); >>>>> >>>>> // create a MongodbInputFormat, using a Hadoop input format >>>>> wrapper >>>>> InputFormat mapreduceInputFormat = new >>>>> MyMongoInputFormat(); >>>>> HadoopInputFormat hdIf = new >>>>> HadoopInputFormat( >>>>> mapreduceInputFormat, Object.class, BSONObject.class, >>>>> job); >>>>> DataSet> fin = input >>>>> .flatMap(new myFlatMapFunction()).setParallelism(16); >>>>> >>>>> MongoConfigUtil.setOutputURI(job.getConfiguration(), >>>>> collectionsUri); >>>>> >>>>> fin.output(new HadoopOutputFormat( >>>>> new MongoOutputFormat(), >>>>> job)); >>>>> // fin.writeAsText("/tmp/out", WriteMode.OVERWRITE); >>>>> >>>>> >>>> >>> >> > --20cf30334c17163e20051b89396e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Does this make the=C2=A0MongoHadoopOutputFormat=C2=A0work for you?

On Thu, Jul 23, 2015 at 12= :44 PM, Stefano Bortoli <s.bortoli@gmail.com> wrote:
https://issues.apache.org/jira/browse/FLINK-2394?filter=3D-2

<= /div>Meanwhile, I have implemented the MongoHadoopOutputFormat overriding o= pen, close and globalFinalize methods.

saluti,
Stefa= no

2015-07-22 17:11 GMT+02:00 Stephan Ewen = <sewen@apache.org>:
Thank's for reporting this, Stefano!

Seems= like the HadoopOutputFormat wrapper is pretty much specialized on File Out= put Formats.

Can you open an issue for that? Someo= ne will need to look into this...

On Wed, Jul 22, 2015 at 4:48 PM, Stef= ano Bortoli <s.bortoli@gmail.com> wrote:
In fact, on close() of the HadoopOut= putFormat the fileOutputCommitter returns false on if (this.fileOutputCommi= tter.needsTaskCommit(this.context)) returns false.

i=C2=A0=C2=A0=C2= =A0 /**
=C2=A0=C2=A0=C2=A0 =C2=A0* commit the task by moving the output = file out from the temporary directory.
=C2=A0=C2=A0=C2=A0 =C2=A0* @throw= s java.io.IOException
=C2=A0=C2=A0=C2=A0 =C2=A0*/
=C2=A0=C2=A0=C2=A0 = @Override
=C2=A0=C2=A0=C2=A0 public void close() throws IOException {=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 this.recordWriter.close(new HadoopDu= mmyReporter());
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 if (this.fileOutputCommitter.needsTaskCommit(this= .context)) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 th= is.fileOutputCommitter.commitTask(this.context);
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 }


Also, both the c= lose and the finalize global use a FileOutputCommitter, and never the Mongo= OutputCommitter

@Override
=C2=A0=C2=A0=C2=A0 public void fin= alizeGlobal(int parallelism) throws IOException {

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 try {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 JobContext jobContext =3D HadoopUtils.instantiateJobContext(th= is.jobConf, new JobID());
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 FileOutputCommitter fileOutputCommitter =3D new FileOutputComm= itter();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // finalize HDFS o= utput format
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 fi= leOutputCommitter.commitJob(jobContext);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 } catch (Exception e) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 throw new RuntimeException(e);
=C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 }

anyone can have a = look into that?

saluti,
Stefano

201= 5-07-22 15:53 GMT+02:00 Stefano Bortoli <bortoli@okkam.it>:
Debugging, it s= eem the commitTask method of the MongoOutputCommitter is never called. Is i= t possible that this 'bulk' approach of mongo-hadoop 1.4 does not f= it the task execution method of Flink?

any idea? thanks a= lot in advance.

saluti,
Stefano
=

Stefano Bortoli, PhD
ENS Technical Director __= _____________________________________________
OKK= AMSrl=C2=A0-=C2=A0www.okkam.it=

Email: bortoli@okkam.= it

Phone nr: +39 0461 1823912 = =

Headquarters:=C2=A0Trento (Italy), Via Tren= er 8
Registered office:=C2=A0Trento (It= aly), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or=20 confidential information. Please do not read it if you are not the=20 intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this=20 e-mail in error, please notify the sender and destroy the original=20 transmission and its attachments without reading or saving it in any=20 manner.


2015-07-22 14:26 = GMT+02:00 Stefano Bortoli <s.bortoli@gmail.com>:
Hi,=

I am trying to analyze and update a MongoDB collection with A= pache Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.

The pr= ocess is fairly simple, and the MongoInputFormat works smoothly, however it= does not write back to the collection. The process works, because the writ= eAsText works as expected. I am quite puzzled because debugging I can see i= t writes in some temporary directory.

The mapred.output.uri se= em to serve just to output a file named=C2=A0 _SUCCESS, and if I do not set= it fails with
java.lang.IllegalArgumentException: Can not create a Path= from a null string
=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.fs.Path.chec= kPathArg(Path.java:123)
=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.fs.Path.= <init>(Path.java:135)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.j= ava.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.jav= a:108)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.operators.DataSink= Task.invoke(DataSinkTask.java:186)
=C2=A0=C2=A0=C2=A0 at org.apache.flin= k.runtime.taskmanager.Task.run(Task.java:559)
=C2=A0=C2=A0=C2=A0 at java= .lang.Thread.run(Thread.java:745)

Anyone experienced something= similar? any hints where to look at? Thanks a lot in advance!

saluti,
Stefano

=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
Configurati= on conf =3D new Configuration();
conf.set("mapred.output.dir",= "/tmp/");
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 conf.set(Mong= oUtil.MONGO_INPUT_URI_PROPERTY,
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 collectionsUri);
=C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 co= llectionsUri);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 Job job =3D Job.getInstanc= e(conf);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 // create a MongodbInputFormat, using a Hadoop input= format wrapper
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 InputFormat<Obj= ect, BSONObject>=C2=A0 mapreduceInputFormat =3D=C2=A0 new MyMongoInputFo= rmat<Object, BSONObject>();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 = HadoopInputFormat<Object, BSONObject> hdIf =3D new HadoopInputFormat&= lt;Object, BSONObject>(
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 mapreduceInputFormat, Object.class, BSONObj= ect.class,
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 job);
DataSet<Tuple2<Text, BSONWritable>> fi= n =3D input
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 .flatMap(new myFlatMapFunction()).setParallelism(16);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 MongoConfigUtil.setOutputURI(job.ge= tConfiguration(), collectionsUri);

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 fin.output(new HadoopOutputFormat<Text, BSONWritable>(
=C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 ne= w MongoOutputFormat<Text, BSONWritable>(),
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 job));
//=C2=A0=C2= =A0=C2=A0 =C2=A0=C2=A0=C2=A0 fin.writeAsText("/tmp/out", WriteMod= e.OVERWRITE);






--20cf30334c17163e20051b89396e--