flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Madhire, Naveen" <Naveen.Madh...@capitalone.com>
Subject Re: Flink Storm
Date Mon, 07 Dec 2015 22:05:39 GMT
Hi Matthias, Sorry for the confusion. I just used a simple code in the
Count Bolt to write the bolt output into a file and was not using
BiltFileSink.

OutputStream o;
try {
    o = new FileOutputStream("/tmp/wordcount.txt", true);
    o.write((word + " " + count.toString() + "\n").getBytes());
    o.close();
} catch (IOException e) {
    e.printStackTrace();
}




Coming to BoltFileSink, I tried using cluster.shutdown at the end which
stops the local cluster but getting the below exception,

java.lang.Exception: TaskManager is shutting down.
	at 
org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala
:216)
	at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
	at 
org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager
.scala:119)
	at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
nishTerminate(FaultHandling.scala:210)
	at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
	at akka.actor.ActorCell.terminate(ActorCell.scala:369)
	at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
	at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
	at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
1339)
	at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
a:107)



I added the below lines of code for stoping the local cluster at the end,
the code is same as flink-storm-examples one.

Utils.sleep(10 * 1000);

cluster.shutdown();




Thanks,
Naveen




On 12/5/15, 7:54 AM, "Matthias J. Sax" <mjsax@apache.org> wrote:

>Hi Naveen,
>
>in you previous mail you mention that
>
>> Yeah, I did route the ³count² bolt output to a file and I see the
>>output.
>> I can see the Storm and Flink output matching.
>
>How did you do this? Modifying the "count bolt" code? Or did you use
>some other bolt that consumes the "count bolt" output?
>
>One more thought: how much data do you have and did you terminate you
>program before looking into the result file? I am asking because
>BoltFileSink uses a BufferedOutputWriter internally -- if you have only
>a few records in your result and do not terminate, the data might still
>be buffered. I would get flushed to disc if you terminate the program.
>
>Otherwise, I could not spot any issue with your code. And as Max
>mentioned that the console output worked for him using you program I am
>little puzzled what might go wrong in your setup. The program seems to
>be correct.
>
>
>-Matthias
>
>
>On 12/04/2015 08:55 PM, Madhire, Naveen wrote:
>> Hi Max,
>> 
>> I forgot to include flink-storm-examples dependency in the application
>>to
>> use BoltFileSink.
>> 
>> However, the file created by the BoltFileSink is empty. Is there any
>>other
>> stuff which I need to do to write it into a file by using BoltFileSink?
>> 
>> I am using the same code what you mentioned,
>> 
>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>> OutputFormatter() {
>>    @Override
>>    public String format(Tuple tuple) {
>>       return tuple.toString();
>>    }
>> }), 1).shuffleGrouping("count");
>> 
>> 
>> 
>> 
>> Thanks,
>> Naveen
>> 
>> 
>> 
>> 
>>>
>>> On 12/4/15, 5:36 AM, "Maximilian Michels" <mxm@apache.org> wrote:
>>>
>>>> Hi Naveen,
>>>>
>>>> Were you using Maven before? The syncing of changes in the master
>>>> always takes a while for Maven. The documentation happened to be
>>>> updated before Maven synchronized. Building and installing manually
>>>> (what you did) solves the problem.
>>>>
>>>> Strangely, when I run your code on my machine with the latest
>>>> 1.0-SNAPSHOT I see a lot of output on my console.
>>>>
>>>> Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89
>>>>
>>>> Could you add bolt which writes the Storm tuples to a file? Is that
>>>> file also empty?
>>>>
>>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>>>> OutputFormatter() {
>>>>   @Override
>>>>   public String format(Tuple tuple) {
>>>>      return tuple.toString();
>>>>   }
>>>> }), 1).shuffleGrouping("count");
>>>>
>>>>
>>>> Thanks,
>>>> Max
>>>
>>> ________________________________________________________
>>>
>>> The information contained in this e-mail is confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The
>>> information transmitted herewith is intended only for use by the
>>> individual or entity to which it is addressed. If the reader of this
>>> message is not the intended recipient, you are hereby notified that any
>>> review, retransmission, dissemination, distribution, copying or other
>>>use
>>> of, or taking of any action in reliance upon this information is
>>>strictly
>>> prohibited. If you have received this communication in error, please
>>> contact the sender and delete the material from your computer.
>>>
>> 
>> ________________________________________________________
>> 
>> The information contained in this e-mail is confidential and/or
>>proprietary to Capital One and/or its affiliates and may only be used
>>solely in performance of work or services for Capital One. The
>>information transmitted herewith is intended only for use by the
>>individual or entity to which it is addressed. If the reader of this
>>message is not the intended recipient, you are hereby notified that any
>>review, retransmission, dissemination, distribution, copying or other
>>use of, or taking of any action in reliance upon this information is
>>strictly prohibited. If you have received this communication in error,
>>please contact the sender and delete the material from your computer.
>> 
>

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One
and/or its affiliates and may only be used solely in performance of work or services for Capital
One. The information transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended recipient, you
are hereby notified that any review, retransmission, dissemination, distribution, copying
or other use of, or taking of any action in reliance upon this information is strictly prohibited.
If you have received this communication in error, please contact the sender and delete the
material from your computer.
Mime
View raw message