Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E13D71060E for ; Wed, 12 Feb 2014 08:37:14 +0000 (UTC) Received: (qmail 3590 invoked by uid 500); 12 Feb 2014 08:37:14 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 3166 invoked by uid 500); 12 Feb 2014 08:37:11 -0000 Mailing-List: contact user-help@storm.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.incubator.apache.org Delivered-To: mailing list user@storm.incubator.apache.org Received: (qmail 3147 invoked by uid 99); 12 Feb 2014 08:37:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Feb 2014 08:37:10 +0000 X-ASF-Spam-Status: No, hits=2.7 required=5.0 tests=FREEMAIL_ENVFROM_END_DIGIT,FREEMAIL_REPLY,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of abhishek.bhattacharjee11@gmail.com designates 209.85.212.178 as permitted sender) Received: from [209.85.212.178] (HELO mail-wi0-f178.google.com) (209.85.212.178) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Feb 2014 08:37:06 +0000 Received: by mail-wi0-f178.google.com with SMTP id cc10so6401083wib.5 for ; Wed, 12 Feb 2014 00:36:44 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=xrU+kQNDUj90ZPba6J010THaCPwV6TP0fP6juu5mR0c=; b=IuEIG60R0w8FHJyKkcOzgGYGahxglq77lZ1ckFlqu7HXwn0vn4+k16PsPIOWCSkohz ETnzWmqC46uq6xe898NlkUbgiHd8Bq/VDmJozWv9/vV2KEIdzR0oDGlUIp++or3xZUe9 M/Foo+68Pe+oB05lISbYJ0a/pdVtfjsT4OMoakC7yYE3rjzT/55pOmvM+JdfcYjaKgLR 05SZl+g9Vt9KJq0MUo/xpdTfqMCSWxafYPG44cNRSzJvWQaAOyFpPGSNPpm7Y+uBFsXB TpHzfWYjSSLauKyecZ9jnpnymPF6vrZbtQSN5hIWKXy5XVZYpnjkKMzFxHi7i2nb6ksD Q36g== MIME-Version: 1.0 X-Received: by 10.194.60.16 with SMTP id d16mr12003wjr.46.1392194204671; Wed, 12 Feb 2014 00:36:44 -0800 (PST) Received: by 10.194.178.38 with HTTP; Wed, 12 Feb 2014 00:36:44 -0800 (PST) In-Reply-To: References: <545c18be1c664d35877e30423de74118@CO2PR07MB522.namprd07.prod.outlook.com> <1279108c9d714c22ae0618126081ef72@CO2PR07MB522.namprd07.prod.outlook.com> Date: Wed, 12 Feb 2014 14:06:44 +0530 Message-ID: Subject: Re: How to efficiently store the intermediate result of a bolt, and so it can be replayed after the crashes? From: Abhishek Bhattacharjee To: user@storm.incubator.apache.org Content-Type: multipart/alternative; boundary=047d7b86ccbc0741d104f231799c X-Virus-Checked: Checked by ClamAV on apache.org --047d7b86ccbc0741d104f231799c Content-Type: text/plain; charset=ISO-8859-1 Hi Cheng, If you see the repo that Aniket posted the link of and read its README you'll get what you are asking for in the above mail. I'll repost the link here https://github.com/pict2014/storm-redis . This does what you are asking for. It uses *kafka* for replaying and *redis* for caching the intermediate state in batches. If you have a good understanding of storm then you can read the code and understand how it works. It uses transactional topologies. Thanks, On Wed, Feb 12, 2014 at 4:17 AM, Cheng-Kang Hsieh (Andy) < changun@cs.ucla.edu> wrote: > Hi Adrian > > Thank you so much for the input! If I understand how Spout works > correctly, wouldn't the tuple be regarded failed if it has not been fully > acked before the timeout? (which, by default, is 30 secs) From my > understanding (can be totally wrong though), a storm-ish way to response to > a failed tuple is to call the *fail* method in the root Spout, which, in > turn, re-emits the failed tuple to the topology. > > It will be nice, if there is a *fail *method in the intermediate bolt > that will be called when the down-streaming bolts failed; then this bolt > can just re-emit the intermediate results to the downstreaming bolt without > restarting the process all the way up from the root spout. > > A use case of that will be that, says I have 3 components chained together > as follows: Spout -> Bolt1 -> Bolt2. What Bolt1 does is to aggregate the > data within every fixed-size time window in a day and compute some > measurements based on that.(e.g. the user's activities of each hour in a > day). With the current design of storm, when the Bolt 2 fails, the Spout > has to manage to resend all the data in the corresponding time window for > the Bolt 1 to recompute the results. It will be nice if Bolt1 can cache the > results and resend it when the Bolt 2 fails. > > Does it make any sense? > Any input is appreciated! > > Best, > Andy > > > On Tue, Feb 11, 2014 at 5:03 PM, Adrian Mocanu wrote: > >> You can have acks from bolt to bolt. >> >> >> >> Spout: >> >> //ties in tuple to this UID >> >> _collector.emit(new Values(queue.dequeue(), *uniqueID*) >> >> >> >> Then Bolt1 will ack the tuple only after it emits it to Bolt2 so that the >> ack can be tied to the tuple >> >> Bolt1: >> >> //emit first then ack >> >> _collector.emit(tuple, new Values("stuff")) //**anchoring** - read below >> to see what it means >> >> _collector.ack(tuple) >> >> >> >> At this point tuple from Spout has been acked in Bolt1, but at the same >> time the newly emitted tuple "stuff" to Bolt2 is "anchored" to the tuple >> from Spout. What this means is that it still needs to be acked later on >> otherwise on timeout it will be resent by spout. >> >> Bolt2: >> >> _collector.ack(tuple) >> >> Bolt2 needs to ack the tuple received from Bolt1 which will send in the >> last ack that Spout was waiting for. If at this point Bolt2 emits tuple, >> then there must be a Bolt3 which will get it and ack it. If the tuple is >> not acked at the last point, Spout will time it out and resend it. >> >> Each time anchoring is done on an emit statement from bolt to bolt, a >> new node in a "tree" structure is built... well more like a list in my case >> since I never send the same tuple to 2 or more tuples, I have a 1 to 1 >> relationship. >> >> All nodes in the tree need to be acked, and only then the tuple is marked >> as fully arrived. If the tuple is not acked and it is sent with a UID and >> anchored later on then it will be kept in memory forever (until acked). >> >> Hope this helps. >> >> >> >> *From:* Tom Brown [mailto:tombrown52@gmail.com] >> *Sent:* February-11-14 4:57 PM >> >> *To:* user@storm.incubator.apache.org >> *Subject:* Re: How to efficiently store the intermediate result of a >> bolt, and so it can be replayed after the crashes? >> >> >> >> We use 2 storm topologies, with kafka in between: Kafka --> TopologyA >> --> Kafka --> TopologyB --> Final output >> >> >> >> This allows the two halves of computation to be scaled and maintained >> independently. >> >> >> >> --Tom >> >> >> >> On Tue, Feb 11, 2014 at 2:36 PM, Cheng-Kang Hsieh (Andy) < >> changun@cs.ucla.edu> wrote: >> >> Hi Aniket & Andrian, >> >> >> >> Thank you guys so much for the kind reply! Although the replies don't >> directly solve my problem, it has been very rewarding following the code of >> redis-storm and Trident. >> >> >> >> I guess storing the intermediate data in an external db (like Cassandra, >> as suggested by Andrian) would work, but what if the Bolt that is supposed >> to receive the intermediate data fails? In this case, the emitter is also a >> Bolt, and does not have the nice ACK mechanism to rely on, so the emitting >> Bolt might never know when it should resend the data to the receiving Bolt. >> >> >> >> In other framework like Samza, or Spark Streaming, all the emitted data, >> no matter, by a Spout or Bolt is treated as the same way and so benefits >> from the same fault tolerance mechanism (they are not as easy to use as >> Storm though). For example, in Samza, all the data output of a component >> are push to a Kafka queue with the receiving components as the listeners >> (see here >> ). >> >> >> >> Conceptually maybe a more general solution for Storm is to make a Bolt >> also a Spout which can receive ACKs from the receiving Bolts; however it >> seems to violate the assumption of Storm? >> >> >> >> Again I appreciate any advice or suggestion. Thank you! >> >> >> >> Best, >> >> Andy >> >> >> >> On Fri, Feb 7, 2014 at 9:37 AM, Adrian Mocanu >> wrote: >> >> Hi Andy, >> >> I think you can use Trident to persist the results at any point in your >> stream processing. >> >> I believe the way you do that is by using STREAM.persistentAggregate(...) >> >> >> >> Here's an example from >> https://github.com/nathanmarz/storm/wiki/Trident-tutorial >> >> >> >> TridentTopology topology = new TridentTopology(); >> >> TridentState wordCounts = >> >> topology.newStream("spout1", spout) >> >> .each(new Fields("sentence"), new Split(), new Fields("word")) >> >> .groupBy(new Fields("word")) >> >> .persistentAggregate(new MemoryMapState.Factory(), new Count(), >> new Fields("count")) >> >> .parallelismHint(6); >> >> >> >> In this case the counts (re[place counts with whatever operations you are >> doing) are stored in a memory map, but you can make another class that >> saves this intermediate result to a db... at least that's my understanding... I >> am currently also learning these things. >> >> I'm currently working on a similar problem and I'm attempting to store >> into Cassandra. Feel free to watch my conversation threads (with Svend and >> Taylor Goetz) >> >> >> >> -A >> >> >> >> *From:* Aniket Alhat [mailto:aniket.alhat@gmail.com] >> *Sent:* February-06-14 11:57 PM >> *To:* user@storm.incubator.apache.org >> *Subject:* Re: How to efficiently store the intermediate result of a >> bolt, and so it can be replayed after the crashes? >> >> >> >> I hope this helps >> >> https://github.com/pict2014/storm-redis >> >> On Feb 7, 2014 12:07 AM, "Cheng-Kang Hsieh (Andy)" >> wrote: >> >> Sorry, I realized that question was badly written. Simply put, my >> question is that is there a recommended way to store the tuples emitted by >> a BOLT so that the tuples can be replayed after crash without repeating the >> process all the way up from the source spout? any advice would be >> appreciated. Thank you! >> >> >> >> Best, >> >> Andy >> >> >> >> On Tue, Feb 4, 2014 at 11:58 AM, Cheng-Kang Hsieh (Andy) < >> changun@cs.ucla.edu> wrote: >> >> Hi all, >> >> First of all, Thank Nathan and all the contributors for pulling out such a >> great framework! I am learning a lot, even just reading the discussion >> threads. >> >> I am building a topology that contains one spout along with a chain of >> bolts. (e.g. S -> A -> B, where S is the spout, A, B are bolts.) >> >> When S emits a tuple, the next bolt A will buffer the tuple in a DFS, and >> compute some aggregated values when it has received a sufficient amount of >> data and then emit the aggregation results to the next bolt B. >> >> Here comes my question, is there a recommended way to store the >> intermediate results emitted by a bolt, so that when machine crashes, the >> results can be replayed to the downstreaming bolts (i.e. bolt B)? >> >> One possible solution could be that: Don't keep any intermediate results, >> but resort to the storm's ack framework, so that the raw data will be >> replay from spout S when crash happened. >> >> However, this approach might not be appropriate in my case, as it might >> take pretty long time (like a couple of hours) before bolt A has received >> all the required data and emit the aggregated results, so that it will be >> very expensive for ack framework to keep tracking that many tuples for >> that >> long. >> >> An alternative solution could be: *making bolt A also a spout* and keep >> the >> emitted data in a DFS queue. When a result has been acked, the bolt A >> removes it from the queue. >> >> I am wondering if it is reasonable to make a task both bolt and spout at >> the same time? or if there is any better approach to do so. >> >> Thank you! >> >> -- >> Cheng-Kang Hsieh >> UCLA Computer Science PhD Student >> M: (310) 990-4297 >> A: 3770 Keystone Ave. Apt 402, >> Los Angeles, CA 90034 >> >> >> >> >> >> -- >> Cheng-Kang Hsieh >> UCLA Computer Science PhD Student >> M: (310) 990-4297 >> A: 3770 Keystone Ave. Apt 402, >> Los Angeles, CA 90034 >> >> >> >> >> >> -- >> Cheng-Kang Hsieh >> UCLA Computer Science PhD Student >> M: (310) 990-4297 >> A: 3770 Keystone Ave. Apt 402, >> Los Angeles, CA 90034 >> >> >> > > > > -- > Cheng-Kang Hsieh > UCLA Computer Science PhD Student > M: (310) 990-4297 > A: 3770 Keystone Ave. Apt 402, > Los Angeles, CA 90034 > -- *Abhishek Bhattacharjee* *Pune Institute of Computer Technology* --047d7b86ccbc0741d104f231799c Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
Hi Cheng,
If you see the= repo that Aniket posted the link of and read its README you'll get wha= t you are asking for in the above mail.
I'll repost the link h= ere h= ttps://github.com/pict2014/storm-redis . This does what you are asking = for.
It uses kafka for replaying and redis for caching the i= ntermediate state in batches. If you have a good understanding of storm the= n you can read the
code and understand how it works. It uses tran= sactional topologies.

Thanks,


On Wed, Feb 12, 2014 at 4:17 AM, Cheng-Kang Hsieh (Andy) <= changun@cs.ucla.edu> wrote:
Hi Adrian

Thank you so much for the input! If I understand how Spout works correctl= y, wouldn't the tuple be regarded failed if it has not been fully acked= before the timeout? (which, by default, is 30 secs) From my understanding = (can be totally wrong though), a storm-ish way to response to a failed tupl= e is to call the fail method in the root Spout, which, in turn,= re-emits the failed tuple to the topology. 

It will be nice, if there is a fail method in th= e intermediate bolt that will be called when the down-streaming bolts faile= d; then this bolt can just re-emit the intermediate results to the downstre= aming bolt without restarting the process all the way up from the root spou= t.

A use case of that will be that, says I have 3 componen= ts chained together as follows: Spout -> Bolt1 -> Bolt2. What Bolt1 d= oes is to aggregate the data within every fixed-size time window in a day a= nd compute some measurements based on that.(e.g. the user's activities = of each hour in a day). With the current design of storm, when the Bolt 2 f= ails, the Spout has to manage to resend all the data in the corresponding t= ime window for the Bolt 1 to recompute the results. It will be nice if Bolt= 1 can cache the results and resend it when the Bolt 2 fails.

Does it make any sense?
Any input is apprecia= ted!

Best,
Andy


On Tue, Feb 11, 2014 at 5:03 PM, Adrian Mocanu <amocanu@verticalsc= ope.com> wrote:

You can have acks from bo= lt to bolt.

 

Spout:

 //ties in tuple to this UID

_collector.emit(new Values(queue.dequeue(), *uniqueID*)

 

Then Bolt1 will ack the tuple only after it emits it= to Bolt2 so that the ack can be tied to the tuple

Bolt1:

 //emit first then ack

_collector.emit(tuple, new Values("stuff")) //**= anchoring** - read below to see what it means

_collector.ack(tuple)

 

At this point tuple from Spout has been acked in Bol= t1, but at the same time the newly emitted tuple "stuff" to Bolt2= is "anchored" to the tuple from Spout. What this means is that it still needs to be acked later on otherwise on timeout it will be resent= by spout.

Bolt2:

_collector.ack(tuple)

Bolt2 needs to ack the tuple received from Bolt1 whi= ch will send in the last ack that Spout was waiting for. If at this point B= olt2 emits tuple, then there must be a Bolt3 which will get it and ack it. If the tuple is not acked at the last point, Spout= will time it out and resend it.

Each time anchoring is done on an emit statement from bolt to bolt, a new node in a "tree" structu= re is built... well more like a list in my case since I never send the same= tuple to 2 or more tuples, I have a 1 to 1 relationship.

All nodes in the tree need to be acked, and only the= n the tuple is marked as fully arrived. If the tuple is not acked and it is= sent with a UID and anchored later on then it will be kept in memory forever (until acked).

Hope this helps.

 

From: Tom Brown [mailto:tombrown52@gmail.com]
Sent: February-11-14 4:57 PM


To: user@storm.incubator.apache.org
Subject: Re: How to efficiently store the intermediate result of a b= olt, and so it can be replayed after the crashes?
=

 

We use 2 storm topologies, with kafka in between: &n= bsp;Kafka --> TopologyA --> Kafka --> TopologyB --> Final outpu= t

 

This allows the two halves of computation to be scal= ed and maintained independently.

 

--Tom

 <= /p>

On Tue, Feb 11, 2014 at 2:36 PM, Cheng-Kang Hsieh (A= ndy) <changun@c= s.ucla.edu> wrote:

Hi Aniket & Andrian,

 

Thank you guys so much for the kind reply! Although = the replies don't directly solve my problem, it has been very rewarding= following the code of redis-storm and Trident. 

 

I guess storing the intermediate data in an external= db (like Cassandra, as suggested by Andrian) would work, but what if the B= olt that is supposed to receive the intermediate data fails? In this case, = the emitter is also a Bolt, and does not have the nice ACK mechanism to rely on, so the emitting Bolt might nev= er know when it should resend the data to the receiving Bolt.=

 

In other framework like Samza, or Spark Streaming, a= ll the emitted data, no matter, by a Spout or Bolt is treated as the same w= ay and so benefits from the same fault tolerance mechanism (they are not as= easy to use as Storm though). For example, in Samza, all the data output of a component are push to a Kafka = queue with the receiving components as the listeners (see here). 

 

Conceptually maybe a more general solution for Storm= is to make a Bolt also a Spout which can receive ACKs from the receiving B= olts; however it seems to violate the assumption of Storm?

 

Again I appreciate any advice or suggestion. Thank y= ou!

 

Best,

Andy

 <= /p>

On Fri, Feb 7, 2014 at 9:37 AM, Adrian Mocanu <amocanu@vertic= alscope.com> wrote:

Hi Andy,=

I think you can use Tride= nt to persist the results at any point in your stream processing.=

I believe the way you do = that is by using STREAM.persistentAggregate(…)<= /p>

 

Here’s an example f= rom https://github.com/nathanmarz/storm/wiki/Trident-tutorial=

 

TridentTopology topology =3D new TridentTopology(); &= nbsp;     

TridentState wordCounts =3D

     topology.newStream("spout1&q= uot;, spout)

       .each(new Fields(&quo= t;sentence"), new Split(), new Fields("word"))=

       .groupBy(new Fields(&= quot;word"))

       .persistentAggregate(= new MemoryMapState.Factory(), new Count(), new Fields("count"))&n= bsp;            = ;  

       .parallelismHint= (6);

 

In this case the counts (= re[place counts with whatever operations you are doing) are stored in a memory map, but you can make another class that saves this intermediate re= sult to a db… at least that’s my understanding… I am cu= rrently also learning these things.

I’m currently worki= ng on a similar problem and I’m attempting to store into Cassandra. F= eel free to watch my conversation threads (with Svend and Taylor Goetz)

 

-A

 

From: Aniket Alhat [mailto:aniket.alhat@gmail.com]
Sent: February-06-14 11:57 PM
To: user@storm.incubator.apache.org
Subject: Re: How to efficiently store the intermediate result of a b= olt, and so it can be replayed after the crashes?

 

I hope this helps

ht= tps://github.com/pict2014/storm-redis

On Feb 7, 2014 12:07 AM, "Cheng-Kang Hsieh (And= y)" <chang= un@cs.ucla.edu> wrote:

Sorry, I realized that question was badly written. S= imply put, my question is that is there a recommended way to store the tupl= es emitted by a BOLT so that the tuples can be replayed after crash without repeating the process all the way up from the source s= pout? any advice would be appreciated. Thank you!

 

Best,

Andy

 <= /p>

On Tue, Feb 4, 2014 at 11:58 AM, Cheng-Kang Hsieh (A= ndy) <changun@c= s.ucla.edu> wrote:

Hi all,

First of all, Thank Nathan and all the contributors for pulling out such a<= br> great framework! I am learning a lot, even just reading the discussion
threads.

I am building a topology that contains one spout along with a chain of
bolts. (e.g. S -> A  -> B, where S is the spout, A, B are bolts.= )

When S emits a tuple, the next bolt A  will buffer the tuple in a DFS,= and
compute some aggregated values when it has received a sufficient amount of<= br> data and then emit the aggregation results to the next bolt B.

Here comes my question, is there a recommended way to store the
intermediate results emitted by a bolt, so that when machine crashes, the results can be replayed to the downstreaming bolts (i.e. bolt B)?

One possible solution could be that: Don't keep any intermediate result= s,
but resort to the storm's ack framework, so that the raw data will be replay from spout S when crash happened.

However, this approach might not be appropriate in my case, as it might
take pretty long time (like a couple of hours) before bolt A has received all the required data and emit the aggregated results, so that it will be very expensive for ack framework to keep tracking that many tuples for that=
long.

An alternative solution could be: *making bolt A also a spout* and keep the=
emitted data in a DFS queue. When a result has been acked, the bolt A
removes it from the queue.

I am wondering if it is reasonable to make a task both bolt and spout at the same time? or if there is any better approach to do so.

Thank you!

--
Cheng-Kang Hsieh
UCLA Computer Science PhD Student
M: (310) 990-4297
A: 3770 Keystone Ave. Apt 402,
     Los Angeles, CA 90034



 

--
Cheng-Kang Hsieh
UCLA Computer Science PhD Student
M:
(310) 990-4297
A: 3770 Keystone Ave. Apt 402,
     Los Angeles, CA 90034



 

--
Cheng-Kang Hsieh
UCLA Computer Science PhD Student
M:
(310) 990-4297
A: 3770 Keystone Ave. Apt 402,
     Los Angeles, CA 90034

 




--
Cheng-Kang H= sieh
UCLA Computer Science PhD Student
M: (310) 990-4297
A: 3770 K= eystone Ave. Apt 402,
     Los Angeles, CA 90034



--
Abhishek Bhattacharjee
Pune Institute = of Computer Technology
--047d7b86ccbc0741d104f231799c--