flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <mj...@apache.org>
Subject Re: Flink Storm
Date Wed, 09 Dec 2015 20:57:12 GMT
Hi Naveen,

just for completeness: Max fixed this bug today and we also updated the
documentation.

As you are using SNAPSHOT version, you do not need to include
"flink-java" any more if you update to the latest version containing the
fix.

Furthermore, *do not* include "storm-core" as an dependency -- this will
result in a Kryo problem due to a Flink/Storm Kryo version conflict.

(The dependency is not needed anyway, as you get it automatically via
"flink-storm-examples" or "flink-storm".)

This Kryo version conflict was the problem in the first place. It
resulted in a Kryo exception when running your program longer that 10
seconds. As you stopped after 10 seconds, you did not see the exception
and just an empty result file :/

-Matthias


On 12/08/2015 05:22 PM, Maximilian Michels wrote:
> Hi Naveen,
> 
> Turns out I had changed the pom.xml after I checked out your code while
> trying to get your example working. I have found the real issue of your
> problem. Please make sure you have the following dependency in your
> pom.xml (in addition to the storm modules).
> 
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>1.0-SNAPSHOT</version>
> </dependency>
> 
> The quickstart also contains this. It shouldn't be necessary but it's a
> workaround for a bug which we just discovered with your help. Thank you
> for reporting!
> 
> Best regards,
> Max
> 
> On Tue, Dec 8, 2015 at 2:56 PM, Maximilian Michels <mxm@apache.org
> <mailto:mxm@apache.org>> wrote:
> 
>     Hi Naveen,
> 
>     In your code on GitHub, please remove the following from the
>     WordCount file:
> 
> 
>     OutputStream o;
>     try {
>         o = new FileOutputStream("/tmp/wordcount1.txt", true);
>     o.write((word + " " + count.toString() + "\n").getBytes());
>     o.close();
>     } catch (IOException e) {
>     e.printStackTrace();
>     }
> 
> 
>     It is not necessary because you already have a bolt which prints to
>     a file. What this code did, is overwriting the wordcount1.txt file
>     on every incoming tuple.
> 
>     You were not seeing console output because you didn't set up a
>     log4j.properties file. Put the following in a file called
>     log4j.properties in a folder "resources" under src/main/resources:
> 
>     log4j.rootLogger=INFO, console
> 
>     log4j.appender.console=org.apache.log4j.ConsoleAppender
>     log4j.appender.console.layout=org.apache.log4j.PatternLayout
>     log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x -
%m%n
> 
>     Then you will also see console output. We will fix the submission
>     code of Storm such that this won't be necessary in the future. By
>     the way, the recommended template for Flink Jobs on Storm is to
>     start off with the Flink Quickstart project:
>     https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/java_api_quickstart.html
>     This would already contain the log4.properties file.
> 
>     Best,
>     Max
> 
> 
>     On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen
>     <Naveen.Madhire@capitalone.com
>     <mailto:Naveen.Madhire@capitalone.com>> wrote:
> 
>         Hi Matthias, Sorry for the confusion. I just used a simple code
>         in the
>         Count Bolt to write the bolt output into a file and was not using
>         BiltFileSink.
> 
>         OutputStream o;
>         try {
>             o = new FileOutputStream("/tmp/wordcount.txt", true);
>             o.write((word + " " + count.toString() + "\n").getBytes());
>             o.close();
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
> 
> 
> 
> 
>         Coming to BoltFileSink, I tried using cluster.shutdown at the
>         end which
>         stops the local cluster but getting the below exception,
> 
>         java.lang.Exception: TaskManager is shutting down.
>                 at
>         org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala
>         :216)
>                 at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>                 at
>         org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager
>         .scala:119)
>                 at
>         akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
>         nishTerminate(FaultHandling.scala:210)
>                 at
>         akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>                 at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>                 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>                 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>                 at
>         akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>                 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>                 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>                 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>                 at
>         scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>                 at
>         scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
>         1339)
>                 at
>         scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>                 at
>         scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
>         a:107)
> 
> 
> 
>         I added the below lines of code for stoping the local cluster at
>         the end,
>         the code is same as flink-storm-examples one.
> 
>         Utils.sleep(10 * 1000);
> 
>         cluster.shutdown();
> 
> 
> 
> 
>         Thanks,
>         Naveen
> 
> 
> 
> 
>         On 12/5/15, 7:54 AM, "Matthias J. Sax" <mjsax@apache.org
>         <mailto:mjsax@apache.org>> wrote:
> 
>         >Hi Naveen,
>         >
>         >in you previous mail you mention that
>         >
>         >> Yeah, I did route the ³count² bolt output to a file and I see the
>         >>output.
>         >> I can see the Storm and Flink output matching.
>         >
>         >How did you do this? Modifying the "count bolt" code? Or did
>         you use
>         >some other bolt that consumes the "count bolt" output?
>         >
>         >One more thought: how much data do you have and did you
>         terminate you
>         >program before looking into the result file? I am asking because
>         >BoltFileSink uses a BufferedOutputWriter internally -- if you
>         have only
>         >a few records in your result and do not terminate, the data
>         might still
>         >be buffered. I would get flushed to disc if you terminate the
>         program.
>         >
>         >Otherwise, I could not spot any issue with your code. And as Max
>         >mentioned that the console output worked for him using you
>         program I am
>         >little puzzled what might go wrong in your setup. The program
>         seems to
>         >be correct.
>         >
>         >
>         >-Matthias
>         >
>         >
>         >On 12/04/2015 08:55 PM, Madhire, Naveen wrote:
>         >> Hi Max,
>         >>
>         >> I forgot to include flink-storm-examples dependency in the
>         application
>         >>to
>         >> use BoltFileSink.
>         >>
>         >> However, the file created by the BoltFileSink is empty. Is
>         there any
>         >>other
>         >> stuff which I need to do to write it into a file by using
>         BoltFileSink?
>         >>
>         >> I am using the same code what you mentioned,
>         >>
>         >> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>         >> OutputFormatter() {
>         >>    @Override
>         >>    public String format(Tuple tuple) {
>         >>       return tuple.toString();
>         >>    }
>         >> }), 1).shuffleGrouping("count");
>         >>
>         >>
>         >>
>         >>
>         >> Thanks,
>         >> Naveen
>         >>
>         >>
>         >>
>         >>
>         >>>
>         >>> On 12/4/15, 5:36 AM, "Maximilian Michels" <mxm@apache.org
>         <mailto:mxm@apache.org>> wrote:
>         >>>
>         >>>> Hi Naveen,
>         >>>>
>         >>>> Were you using Maven before? The syncing of changes in the
>         master
>         >>>> always takes a while for Maven. The documentation happened
>         to be
>         >>>> updated before Maven synchronized. Building and installing
>         manually
>         >>>> (what you did) solves the problem.
>         >>>>
>         >>>> Strangely, when I run your code on my machine with the latest
>         >>>> 1.0-SNAPSHOT I see a lot of output on my console.
>         >>>>
>         >>>> Here's the output:
>         https://gist.github.com/mxm/98cd927866b193ce0f89
>         >>>>
>         >>>> Could you add bolt which writes the Storm tuples to a file?
>         Is that
>         >>>> file also empty?
>         >>>>
>         >>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>         >>>> OutputFormatter() {
>         >>>>   @Override
>         >>>>   public String format(Tuple tuple) {
>         >>>>      return tuple.toString();
>         >>>>   }
>         >>>> }), 1).shuffleGrouping("count");
>         >>>>
>         >>>>
>         >>>> Thanks,
>         >>>> Max
>         >>>
>         >>> ________________________________________________________
>         >>>
>         >>> The information contained in this e-mail is confidential and/or
>         >>> proprietary to Capital One and/or its affiliates and may
>         only be used
>         >>> solely in performance of work or services for Capital One. The
>         >>> information transmitted herewith is intended only for use by the
>         >>> individual or entity to which it is addressed. If the reader
>         of this
>         >>> message is not the intended recipient, you are hereby
>         notified that any
>         >>> review, retransmission, dissemination, distribution, copying
>         or other
>         >>>use
>         >>> of, or taking of any action in reliance upon this information is
>         >>>strictly
>         >>> prohibited. If you have received this communication in
>         error, please
>         >>> contact the sender and delete the material from your computer.
>         >>>
>         >>
>         >> ________________________________________________________
>         >>
>         >> The information contained in this e-mail is confidential and/or
>         >>proprietary to Capital One and/or its affiliates and may only
>         be used
>         >>solely in performance of work or services for Capital One. The
>         >>information transmitted herewith is intended only for use by the
>         >>individual or entity to which it is addressed. If the reader
>         of this
>         >>message is not the intended recipient, you are hereby notified
>         that any
>         >>review, retransmission, dissemination, distribution, copying
>         or other
>         >>use of, or taking of any action in reliance upon this
>         information is
>         >>strictly prohibited. If you have received this communication
>         in error,
>         >>please contact the sender and delete the material from your
>         computer.
>         >>
>         >
> 
>         ________________________________________________________
> 
>         The information contained in this e-mail is confidential and/or
>         proprietary to Capital One and/or its affiliates and may only be
>         used solely in performance of work or services for Capital One.
>         The information transmitted herewith is intended only for use by
>         the individual or entity to which it is addressed. If the reader
>         of this message is not the intended recipient, you are hereby
>         notified that any review, retransmission, dissemination,
>         distribution, copying or other use of, or taking of any action
>         in reliance upon this information is strictly prohibited. If you
>         have received this communication in error, please contact the
>         sender and delete the material from your computer.
> 
> 
> 


Mime
View raw message