Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4778A18D02 for ; Wed, 9 Dec 2015 20:58:30 +0000 (UTC) Received: (qmail 1801 invoked by uid 500); 9 Dec 2015 20:58:30 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 1701 invoked by uid 500); 9 Dec 2015 20:58:29 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 1691 invoked by uid 99); 9 Dec 2015 20:58:29 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Dec 2015 20:58:29 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 4D376C4149 for ; Wed, 9 Dec 2015 20:58:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.801 X-Spam-Level: * X-Spam-Status: No, score=1.801 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id bRrWrSKPfI3l for ; Wed, 9 Dec 2015 20:58:20 +0000 (UTC) Received: from mx1.mailbox.org (mx1.mailbox.org [80.241.60.212]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 2052F42AD9 for ; Wed, 9 Dec 2015 20:58:20 +0000 (UTC) Received: from smtp1.mailbox.org (smtp1.mailbox.org [80.241.60.240]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by mx1.mailbox.org (Postfix) with ESMTPS id E1B7B43AE5 for ; Wed, 9 Dec 2015 21:58:18 +0100 (CET) X-Virus-Scanned: amavisd-new at heinlein-support.de Received: from smtp1.mailbox.org ([80.241.60.240]) by gerste.heinlein-support.de (gerste.heinlein-support.de [91.198.250.173]) (amavisd-new, port 10030) with ESMTP id 7BNtb1CDFhf6 for ; Wed, 9 Dec 2015 21:58:17 +0100 (CET) Subject: Re: Flink Storm To: user@flink.apache.org References: <5662EC96.1060507@apache.org> From: "Matthias J. Sax" X-Enigmail-Draft-Status: N1110 Message-ID: <566895A8.6010408@apache.org> Date: Wed, 9 Dec 2015 21:57:12 +0100 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Icedove/38.4.0 MIME-Version: 1.0 In-Reply-To: Content-Type: multipart/signed; micalg=pgp-sha256; protocol="application/pgp-signature"; boundary="qpKKWxXpWDhRPMb3DAXs8Ls0gC5FAxCiM" This is an OpenPGP/MIME signed message (RFC 4880 and 3156) --qpKKWxXpWDhRPMb3DAXs8Ls0gC5FAxCiM Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable Hi Naveen, just for completeness: Max fixed this bug today and we also updated the documentation. As you are using SNAPSHOT version, you do not need to include "flink-java" any more if you update to the latest version containing the fix. Furthermore, *do not* include "storm-core" as an dependency -- this will result in a Kryo problem due to a Flink/Storm Kryo version conflict. (The dependency is not needed anyway, as you get it automatically via "flink-storm-examples" or "flink-storm".) This Kryo version conflict was the problem in the first place. It resulted in a Kryo exception when running your program longer that 10 seconds. As you stopped after 10 seconds, you did not see the exception and just an empty result file :/ -Matthias On 12/08/2015 05:22 PM, Maximilian Michels wrote: > Hi Naveen, >=20 > Turns out I had changed the pom.xml after I checked out your code while= > trying to get your example working. I have found the real issue of your= > problem. Please make sure you have the following dependency in your > pom.xml (in addition to the storm modules). >=20 > > org.apache.flink > flink-java > 1.0-SNAPSHOT > >=20 > The quickstart also contains this. It shouldn't be necessary but it's a= > workaround for a bug which we just discovered with your help. Thank you= > for reporting! >=20 > Best regards, > Max >=20 > On Tue, Dec 8, 2015 at 2:56 PM, Maximilian Michels > wrote: >=20 > Hi Naveen, >=20 > In your code on GitHub, please remove the following from the > WordCount file: >=20 >=20 > OutputStream o; > try { > o =3D new FileOutputStream("/tmp/wordcount1.txt", true); > o.write((word + " " + count.toString() + "\n").getBytes()); > o.close(); > } catch (IOException e) { > e.printStackTrace(); > } >=20 >=20 > It is not necessary because you already have a bolt which prints to= > a file. What this code did, is overwriting the wordcount1.txt file > on every incoming tuple. >=20 > You were not seeing console output because you didn't set up a > log4j.properties file. Put the following in a file called > log4j.properties in a folder "resources" under src/main/resources: >=20 > log4j.rootLogger=3DINFO, console >=20 > log4j.appender.console=3Dorg.apache.log4j.ConsoleAppender > log4j.appender.console.layout=3Dorg.apache.log4j.PatternLayout > log4j.appender.console.layout.ConversionPattern=3D%d{HH:mm:ss,SSS} = %-5p %-60c %x - %m%n >=20 > Then you will also see console output. We will fix the submission > code of Storm such that this won't be necessary in the future. By > the way, the recommended template for Flink Jobs on Storm is to > start off with the Flink Quickstart project: > https://ci.apache.org/projects/flink/flink-docs-release-0.10/quicks= tart/java_api_quickstart.html > This would already contain the log4.properties file. >=20 > Best, > Max >=20 >=20 > On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen > > wrote: >=20 > Hi Matthias, Sorry for the confusion. I just used a simple code= > in the > Count Bolt to write the bolt output into a file and was not usi= ng > BiltFileSink. >=20 > OutputStream o; > try { > o =3D new FileOutputStream("/tmp/wordcount.txt", true); > o.write((word + " " + count.toString() + "\n").getBytes());= > o.close(); > } catch (IOException e) { > e.printStackTrace(); > } >=20 >=20 >=20 >=20 > Coming to BoltFileSink, I tried using cluster.shutdown at the > end which > stops the local cluster but getting the below exception, >=20 > java.lang.Exception: TaskManager is shutting down. > at > org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskM= anager.scala > :216) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:47= 5) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop= (TaskManager > .scala:119) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$Fault= Handling$$fi > nishTerminate(FaultHandling.scala:210) > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.= scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462= ) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:47= 8) > at > akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:27= 9) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:2= 57) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java= :260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJo= inPool.java: > 1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.j= ava:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWork= erThread.jav > a:107) >=20 >=20 >=20 > I added the below lines of code for stoping the local cluster a= t > the end, > the code is same as flink-storm-examples one. >=20 > Utils.sleep(10 * 1000); >=20 > cluster.shutdown(); >=20 >=20 >=20 >=20 > Thanks, > Naveen >=20 >=20 >=20 >=20 > On 12/5/15, 7:54 AM, "Matthias J. Sax" > wrote: >=20 > >Hi Naveen, > > > >in you previous mail you mention that > > > >> Yeah, I did route the =C2=B3count=C2=B2 bolt output to a fil= e and I see the > >>output. > >> I can see the Storm and Flink output matching. > > > >How did you do this? Modifying the "count bolt" code? Or did > you use > >some other bolt that consumes the "count bolt" output? > > > >One more thought: how much data do you have and did you > terminate you > >program before looking into the result file? I am asking becau= se > >BoltFileSink uses a BufferedOutputWriter internally -- if you > have only > >a few records in your result and do not terminate, the data > might still > >be buffered. I would get flushed to disc if you terminate the > program. > > > >Otherwise, I could not spot any issue with your code. And as M= ax > >mentioned that the console output worked for him using you > program I am > >little puzzled what might go wrong in your setup. The program > seems to > >be correct. > > > > > >-Matthias > > > > > >On 12/04/2015 08:55 PM, Madhire, Naveen wrote: > >> Hi Max, > >> > >> I forgot to include flink-storm-examples dependency in the > application > >>to > >> use BoltFileSink. > >> > >> However, the file created by the BoltFileSink is empty. Is > there any > >>other > >> stuff which I need to do to write it into a file by using > BoltFileSink? > >> > >> I am using the same code what you mentioned, > >> > >> builder.setBolt("file", new BoltFileSink("/tmp/storm", new > >> OutputFormatter() { > >> @Override > >> public String format(Tuple tuple) { > >> return tuple.toString(); > >> } > >> }), 1).shuffleGrouping("count"); > >> > >> > >> > >> > >> Thanks, > >> Naveen > >> > >> > >> > >> > >>> > >>> On 12/4/15, 5:36 AM, "Maximilian Michels" > wrote: > >>> > >>>> Hi Naveen, > >>>> > >>>> Were you using Maven before? The syncing of changes in the= > master > >>>> always takes a while for Maven. The documentation happened= > to be > >>>> updated before Maven synchronized. Building and installing= > manually > >>>> (what you did) solves the problem. > >>>> > >>>> Strangely, when I run your code on my machine with the lat= est > >>>> 1.0-SNAPSHOT I see a lot of output on my console. > >>>> > >>>> Here's the output: > https://gist.github.com/mxm/98cd927866b193ce0f89 > >>>> > >>>> Could you add bolt which writes the Storm tuples to a file= ? > Is that > >>>> file also empty? > >>>> > >>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new= > >>>> OutputFormatter() { > >>>> @Override > >>>> public String format(Tuple tuple) { > >>>> return tuple.toString(); > >>>> } > >>>> }), 1).shuffleGrouping("count"); > >>>> > >>>> > >>>> Thanks, > >>>> Max > >>> > >>> ________________________________________________________ > >>> > >>> The information contained in this e-mail is confidential an= d/or > >>> proprietary to Capital One and/or its affiliates and may > only be used > >>> solely in performance of work or services for Capital One. = The > >>> information transmitted herewith is intended only for use b= y the > >>> individual or entity to which it is addressed. If the reade= r > of this > >>> message is not the intended recipient, you are hereby > notified that any > >>> review, retransmission, dissemination, distribution, copyin= g > or other > >>>use > >>> of, or taking of any action in reliance upon this informati= on is > >>>strictly > >>> prohibited. If you have received this communication in > error, please > >>> contact the sender and delete the material from your comput= er. > >>> > >> > >> ________________________________________________________ > >> > >> The information contained in this e-mail is confidential and= /or > >>proprietary to Capital One and/or its affiliates and may only= > be used > >>solely in performance of work or services for Capital One. Th= e > >>information transmitted herewith is intended only for use by = the > >>individual or entity to which it is addressed. If the reader > of this > >>message is not the intended recipient, you are hereby notifie= d > that any > >>review, retransmission, dissemination, distribution, copying > or other > >>use of, or taking of any action in reliance upon this > information is > >>strictly prohibited. If you have received this communication > in error, > >>please contact the sender and delete the material from your > computer. > >> > > >=20 > ________________________________________________________ >=20 > The information contained in this e-mail is confidential and/or= > proprietary to Capital One and/or its affiliates and may only b= e > used solely in performance of work or services for Capital One.= > The information transmitted herewith is intended only for use b= y > the individual or entity to which it is addressed. If the reade= r > of this message is not the intended recipient, you are hereby > notified that any review, retransmission, dissemination, > distribution, copying or other use of, or taking of any action > in reliance upon this information is strictly prohibited. If yo= u > have received this communication in error, please contact the > sender and delete the material from your computer. >=20 >=20 >=20 --qpKKWxXpWDhRPMb3DAXs8Ls0gC5FAxCiM Content-Type: application/pgp-signature; name="signature.asc" Content-Description: OpenPGP digital signature Content-Disposition: attachment; filename="signature.asc" -----BEGIN PGP SIGNATURE----- Version: GnuPG v2 iQIcBAEBCAAGBQJWaJW1AAoJEFCVK48prEZ4f+kP/i1yzoLArq0ah9jDNULgEQrr FZhDqVautZG+IqyHB+am8srxtRHclLMTyXNKu6rzgjRxaSrRbav0YZexqHD8cdY3 1GtsbHj+PsjhmXt/VEFz3vsXJkpdBv+726+BKdpDdQD4Sj69a4T/orQeZ28hhmdE pMevQ3v55PAHl7J0wkLb2rGKZSalQ+12e45NuHgyx18h8hctey2bm7gCBL0YoLVQ RAPblkcrZE9ocp6tOzMQKDi9oCqNx9eMwszyuwvdV1+e4uk5zkK0e+15UsLmJ9D/ fNok85qIqsqVayIou0pKCN6UEXuvw4iRpRZfxQMEekWP8ph0CiwxKvs679+NUF8c 4dZlQZmpMIspP8pCB1NAESzPqybhIMnCJVo55gP+AZ9C8urm/Xl2MKQRSuBnZW2M tu9fAb5iACdg23QMQa/g0skT39ko6G/xUC/qct4G5IBF9gK5ixsefRlbcG9Juv0J egQaRlZ1nrk3juWoQ1f3xZXQikJUF/vqs8JrxMFXxc3CHMkvVN9yzlTkCZdze/1P mbAyXhTDVs13HbuXSCnwQVcEX7ojgzWOcGoyXCL6BAK91D/B3WqZPIqQtNDGK78v rdK1dm5DPeP9mHRIjFgIAD4m8oBuyjncRjMIFqPma7vS9pm4DwTnpJuN2tPFEW6Q 5MdAzYiMgUVEUrfIHfAz =5pZL -----END PGP SIGNATURE----- --qpKKWxXpWDhRPMb3DAXs8Ls0gC5FAxCiM--