flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: Flink Storm
Date Tue, 08 Dec 2015 13:56:14 GMT
Hi Naveen,

In your code on GitHub, please remove the following from the WordCount file:


OutputStream o;
try {
    o = new FileOutputStream("/tmp/wordcount1.txt", true);
    o.write((word + " " + count.toString() + "\n").getBytes());
    o.close();
} catch (IOException e) {
    e.printStackTrace();
}


It is not necessary because you already have a bolt which prints to a file.
What this code did, is overwriting the wordcount1.txt file on every
incoming tuple.

You were not seeing console output because you didn't set up a
log4j.properties file. Put the following in a file called log4j.properties
in a folder "resources" under src/main/resources:

log4j.rootLogger=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p
%-60c %x - %m%n

Then you will also see console output. We will fix the submission code of
Storm such that this won't be necessary in the future. By the way, the
recommended template for Flink Jobs on Storm is to start off with the Flink
Quickstart project:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/java_api_quickstart.html
This would already contain the log4.properties file.

Best,
Max


On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen <
Naveen.Madhire@capitalone.com> wrote:

> Hi Matthias, Sorry for the confusion. I just used a simple code in the
> Count Bolt to write the bolt output into a file and was not using
> BiltFileSink.
>
> OutputStream o;
> try {
>     o = new FileOutputStream("/tmp/wordcount.txt", true);
>     o.write((word + " " + count.toString() + "\n").getBytes());
>     o.close();
> } catch (IOException e) {
>     e.printStackTrace();
> }
>
>
>
>
> Coming to BoltFileSink, I tried using cluster.shutdown at the end which
> stops the local cluster but getting the below exception,
>
> java.lang.Exception: TaskManager is shutting down.
>         at
> org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala
> :216)
>         at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>         at
> org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager
> .scala:119)
>         at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
> nishTerminate(FaultHandling.scala:210)
>         at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>         at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>         at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>         at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>         at
> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
> 1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
> a:107)
>
>
>
> I added the below lines of code for stoping the local cluster at the end,
> the code is same as flink-storm-examples one.
>
> Utils.sleep(10 * 1000);
>
> cluster.shutdown();
>
>
>
>
> Thanks,
> Naveen
>
>
>
>
> On 12/5/15, 7:54 AM, "Matthias J. Sax" <mjsax@apache.org> wrote:
>
> >Hi Naveen,
> >
> >in you previous mail you mention that
> >
> >> Yeah, I did route the ³count² bolt output to a file and I see the
> >>output.
> >> I can see the Storm and Flink output matching.
> >
> >How did you do this? Modifying the "count bolt" code? Or did you use
> >some other bolt that consumes the "count bolt" output?
> >
> >One more thought: how much data do you have and did you terminate you
> >program before looking into the result file? I am asking because
> >BoltFileSink uses a BufferedOutputWriter internally -- if you have only
> >a few records in your result and do not terminate, the data might still
> >be buffered. I would get flushed to disc if you terminate the program.
> >
> >Otherwise, I could not spot any issue with your code. And as Max
> >mentioned that the console output worked for him using you program I am
> >little puzzled what might go wrong in your setup. The program seems to
> >be correct.
> >
> >
> >-Matthias
> >
> >
> >On 12/04/2015 08:55 PM, Madhire, Naveen wrote:
> >> Hi Max,
> >>
> >> I forgot to include flink-storm-examples dependency in the application
> >>to
> >> use BoltFileSink.
> >>
> >> However, the file created by the BoltFileSink is empty. Is there any
> >>other
> >> stuff which I need to do to write it into a file by using BoltFileSink?
> >>
> >> I am using the same code what you mentioned,
> >>
> >> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
> >> OutputFormatter() {
> >>    @Override
> >>    public String format(Tuple tuple) {
> >>       return tuple.toString();
> >>    }
> >> }), 1).shuffleGrouping("count");
> >>
> >>
> >>
> >>
> >> Thanks,
> >> Naveen
> >>
> >>
> >>
> >>
> >>>
> >>> On 12/4/15, 5:36 AM, "Maximilian Michels" <mxm@apache.org> wrote:
> >>>
> >>>> Hi Naveen,
> >>>>
> >>>> Were you using Maven before? The syncing of changes in the master
> >>>> always takes a while for Maven. The documentation happened to be
> >>>> updated before Maven synchronized. Building and installing manually
> >>>> (what you did) solves the problem.
> >>>>
> >>>> Strangely, when I run your code on my machine with the latest
> >>>> 1.0-SNAPSHOT I see a lot of output on my console.
> >>>>
> >>>> Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89
> >>>>
> >>>> Could you add bolt which writes the Storm tuples to a file? Is that
> >>>> file also empty?
> >>>>
> >>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
> >>>> OutputFormatter() {
> >>>>   @Override
> >>>>   public String format(Tuple tuple) {
> >>>>      return tuple.toString();
> >>>>   }
> >>>> }), 1).shuffleGrouping("count");
> >>>>
> >>>>
> >>>> Thanks,
> >>>> Max
> >>>
> >>> ________________________________________________________
> >>>
> >>> The information contained in this e-mail is confidential and/or
> >>> proprietary to Capital One and/or its affiliates and may only be used
> >>> solely in performance of work or services for Capital One. The
> >>> information transmitted herewith is intended only for use by the
> >>> individual or entity to which it is addressed. If the reader of this
> >>> message is not the intended recipient, you are hereby notified that any
> >>> review, retransmission, dissemination, distribution, copying or other
> >>>use
> >>> of, or taking of any action in reliance upon this information is
> >>>strictly
> >>> prohibited. If you have received this communication in error, please
> >>> contact the sender and delete the material from your computer.
> >>>
> >>
> >> ________________________________________________________
> >>
> >> The information contained in this e-mail is confidential and/or
> >>proprietary to Capital One and/or its affiliates and may only be used
> >>solely in performance of work or services for Capital One. The
> >>information transmitted herewith is intended only for use by the
> >>individual or entity to which it is addressed. If the reader of this
> >>message is not the intended recipient, you are hereby notified that any
> >>review, retransmission, dissemination, distribution, copying or other
> >>use of, or taking of any action in reliance upon this information is
> >>strictly prohibited. If you have received this communication in error,
> >>please contact the sender and delete the material from your computer.
> >>
> >
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Mime
View raw message