storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cheng-Kang Hsieh (Andy)" <chan...@cs.ucla.edu>
Subject Re: How to efficiently store the intermediate result of a bolt, and so it can be replayed after the crashes?
Date Wed, 12 Feb 2014 15:51:29 GMT
Hi Adrian,

Yes that is my understanding too. Sometimes I am wondering if it is not a
good idea to use Storm to perform such computation (i.e. aggregate the data
over a certain time window while the aggregation operation is not additive)

Abhishek, thank you for pointing that out! I kinda get the point of
storm-redis, but didn't think about extending the same idea into a larger
topology.

My idea now (similar to what Tom suggest) is to have the Bolt in the middle
output the data to a kafka queue, and make a spout to listen to this queue
and emit the data to the downstreaing bolts, as well as, handling the
re-emission upon failure. Ideally, I want to make a TopologyBuilder to
automatically construct this, so that, on the surface, it would look the
same as a ordinary topology.

Does it make any sense?

Thank you all so much for the kind replies. This community is great!

Best,
Andy



On Wed, Feb 12, 2014 at 10:22 AM, Adrian Mocanu
<amocanu@verticalscope.com>wrote:

>  Hi
>
>
>
> You can fail a tuple from any intermediate bolt, but AFAIK you can't make
> it not resend from the spout. So your precomputed cached result is useless.
> I know in spark you can save your RDD to a db/cache but that wouldn't work
> in storm.
>
>
>
> If I'm wrong someone correct me.
>
>
>
> A
>
>
>
> *From:* Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com]
>
> *Sent:* February-12-14 3:37 AM
>
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: How to efficiently store the intermediate result of a
> bolt, and so it can be replayed after the crashes?
>
>
>
> Hi Cheng,
>
> If you see the repo that Aniket posted the link of and read its README
> you'll get what you are asking for in the above mail.
>
> I'll repost the link here https://github.com/pict2014/storm-redis . This
> does what you are asking for.
>
> It uses *kafka* for replaying and *redis* for caching the intermediate
> state in batches. If you have a good understanding of storm then you can
> read the
>
> code and understand how it works. It uses transactional topologies.
>
> Thanks,
>
>
>
> On Wed, Feb 12, 2014 at 4:17 AM, Cheng-Kang Hsieh (Andy) <
> changun@cs.ucla.edu> wrote:
>
>  Hi Adrian
>
>
>
> Thank you so much for the input! If I understand how Spout works
> correctly, wouldn't the tuple be regarded failed if it has not been fully
> acked before the timeout? (which, by default, is 30 secs) From my
> understanding (can be totally wrong though), a storm-ish way to response to
> a failed tuple is to call the *fail* method in the root Spout, which, in
> turn, re-emits the failed tuple to the topology.
>
>
>
> It will be nice, if there is a *fail *method in the intermediate bolt
> that will be called when the down-streaming bolts failed; then this bolt
> can just re-emit the intermediate results to the downstreaming bolt without
> restarting the process all the way up from the root spout.
>
>
>
> A use case of that will be that, says I have 3 components chained together
> as follows: Spout -> Bolt1 -> Bolt2. What Bolt1 does is to aggregate the
> data within every fixed-size time window in a day and compute some
> measurements based on that.(e.g. the user's activities of each hour in a
> day). With the current design of storm, when the Bolt 2 fails, the Spout
> has to manage to resend all the data in the corresponding time window for
> the Bolt 1 to recompute the results. It will be nice if Bolt1 can cache the
> results and resend it when the Bolt 2 fails.
>
>
>
> Does it make any sense?
>
> Any input is appreciated!
>
>
>
> Best,
>
> Andy
>
>
>
> On Tue, Feb 11, 2014 at 5:03 PM, Adrian Mocanu <amocanu@verticalscope.com>
> wrote:
>
>  You can have acks from bolt to bolt.
>
>
>
> Spout:
>
>  //ties in tuple to this UID
>
> _collector.emit(new Values(queue.dequeue(), *uniqueID*)
>
>
>
> Then Bolt1 will ack the tuple only after it emits it to Bolt2 so that the
> ack can be tied to the tuple
>
> Bolt1:
>
>  //emit first then ack
>
> _collector.emit(tuple, new Values("stuff")) //**anchoring** - read below
> to see what it means
>
> _collector.ack(tuple)
>
>
>
> At this point tuple from Spout has been acked in Bolt1, but at the same
> time the newly emitted tuple "stuff" to Bolt2 is "anchored" to the tuple
> from Spout. What this means is that it still needs to be acked later on
> otherwise on timeout it will be resent by spout.
>
> Bolt2:
>
> _collector.ack(tuple)
>
> Bolt2 needs to ack the tuple received from Bolt1 which will send in the
> last ack that Spout was waiting for. If at this point Bolt2 emits tuple,
> then there must be a Bolt3 which will get it and ack it. If the tuple is
> not acked at the last point, Spout will time it out and resend it.
>
> Each time anchoring is done on an emit statement from bolt to bolt, a new
> node in a "tree" structure is built... well more like a list in my case
> since I never send the same tuple to 2 or more tuples, I have a 1 to 1
> relationship.
>
> All nodes in the tree need to be acked, and only then the tuple is marked
> as fully arrived. If the tuple is not acked and it is sent with a UID and
> anchored later on then it will be kept in memory forever (until acked).
>
> Hope this helps.
>
>
>
> *From:* Tom Brown [mailto:tombrown52@gmail.com]
> *Sent:* February-11-14 4:57 PM
>
>
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: How to efficiently store the intermediate result of a
> bolt, and so it can be replayed after the crashes?
>
>
>
> We use 2 storm topologies, with kafka in between:  Kafka --> TopologyA -->
> Kafka --> TopologyB --> Final output
>
>
>
> This allows the two halves of computation to be scaled and maintained
> independently.
>
>
>
> --Tom
>
>
>
> On Tue, Feb 11, 2014 at 2:36 PM, Cheng-Kang Hsieh (Andy) <
> changun@cs.ucla.edu> wrote:
>
>  Hi Aniket & Andrian,
>
>
>
> Thank you guys so much for the kind reply! Although the replies don't
> directly solve my problem, it has been very rewarding following the code of
> redis-storm and Trident.
>
>
>
> I guess storing the intermediate data in an external db (like Cassandra,
> as suggested by Andrian) would work, but what if the Bolt that is supposed
> to receive the intermediate data fails? In this case, the emitter is also a
> Bolt, and does not have the nice ACK mechanism to rely on, so the emitting
> Bolt might never know when it should resend the data to the receiving Bolt.
>
>
>
> In other framework like Samza, or Spark Streaming, all the emitted data,
> no matter, by a Spout or Bolt is treated as the same way and so benefits
> from the same fault tolerance mechanism (they are not as easy to use as
> Storm though). For example, in Samza, all the data output of a component
> are push to a Kafka queue with the receiving components as the listeners
> (see here<http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html>
> ).
>
>
>
> Conceptually maybe a more general solution for Storm is to make a Bolt
> also a Spout which can receive ACKs from the receiving Bolts; however it
> seems to violate the assumption of Storm?
>
>
>
> Again I appreciate any advice or suggestion. Thank you!
>
>
>
> Best,
>
> Andy
>
>
>
> On Fri, Feb 7, 2014 at 9:37 AM, Adrian Mocanu <amocanu@verticalscope.com>
> wrote:
>
>  Hi Andy,
>
> I think you can use Trident to persist the results at any point in your
> stream processing.
>
> I believe the way you do that is by using STREAM.persistentAggregate(...)
>
>
>
> Here's an example from
> https://github.com/nathanmarz/storm/wiki/Trident-tutorial
>
>
>
> TridentTopology topology = new TridentTopology();
>
> TridentState wordCounts =
>
>      topology.newStream("spout1", spout)
>
>        .each(new Fields("sentence"), new Split(), new Fields("word"))
>
>        .groupBy(new Fields("word"))
>
>        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new
> Fields("count"))
>
>        .parallelismHint(6);
>
>
>
> In this case the counts (re[place counts with whatever operations you are
> doing) are stored in a memory map, but you can make another class that
> saves this intermediate result to a db... at least that's my understanding... I
> am currently also learning these things.
>
> I'm currently working on a similar problem and I'm attempting to store
> into Cassandra. Feel free to watch my conversation threads (with Svend and
> Taylor Goetz)
>
>
>
> -A
>
>
>
> *From:* Aniket Alhat [mailto:aniket.alhat@gmail.com]
> *Sent:* February-06-14 11:57 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: How to efficiently store the intermediate result of a
> bolt, and so it can be replayed after the crashes?
>
>
>
> I hope this helps
>
> https://github.com/pict2014/storm-redis
>
> On Feb 7, 2014 12:07 AM, "Cheng-Kang Hsieh (Andy)" <changun@cs.ucla.edu>
> wrote:
>
>  Sorry, I realized that question was badly written. Simply put, my
> question is that is there a recommended way to store the tuples emitted by
> a BOLT so that the tuples can be replayed after crash without repeating the
> process all the way up from the source spout? any advice would be
> appreciated. Thank you!
>
>
>
> Best,
>
> Andy
>
>
>
> On Tue, Feb 4, 2014 at 11:58 AM, Cheng-Kang Hsieh (Andy) <
> changun@cs.ucla.edu> wrote:
>
>  Hi all,
>
> First of all, Thank Nathan and all the contributors for pulling out such a
> great framework! I am learning a lot, even just reading the discussion
> threads.
>
> I am building a topology that contains one spout along with a chain of
> bolts. (e.g. S -> A  -> B, where S is the spout, A, B are bolts.)
>
> When S emits a tuple, the next bolt A  will buffer the tuple in a DFS, and
> compute some aggregated values when it has received a sufficient amount of
> data and then emit the aggregation results to the next bolt B.
>
> Here comes my question, is there a recommended way to store the
> intermediate results emitted by a bolt, so that when machine crashes, the
> results can be replayed to the downstreaming bolts (i.e. bolt B)?
>
> One possible solution could be that: Don't keep any intermediate results,
> but resort to the storm's ack framework, so that the raw data will be
> replay from spout S when crash happened.
>
> However, this approach might not be appropriate in my case, as it might
> take pretty long time (like a couple of hours) before bolt A has received
> all the required data and emit the aggregated results, so that it will be
> very expensive for ack framework to keep tracking that many tuples for that
> long.
>
> An alternative solution could be: *making bolt A also a spout* and keep the
> emitted data in a DFS queue. When a result has been acked, the bolt A
> removes it from the queue.
>
> I am wondering if it is reasonable to make a task both bolt and spout at
> the same time? or if there is any better approach to do so.
>
> Thank you!
>
> --
> Cheng-Kang Hsieh
> UCLA Computer Science PhD Student
> M: (310) 990-4297
> A: 3770 Keystone Ave. Apt 402,
>      Los Angeles, CA 90034
>
>
>
>
>
> --
> Cheng-Kang Hsieh
> UCLA Computer Science PhD Student
> M: (310) 990-4297
> A: 3770 Keystone Ave. Apt 402,
>      Los Angeles, CA 90034
>
>
>
>
>
> --
> Cheng-Kang Hsieh
> UCLA Computer Science PhD Student
> M: (310) 990-4297
> A: 3770 Keystone Ave. Apt 402,
>      Los Angeles, CA 90034
>
>
>
>
>
>
>
> --
> Cheng-Kang Hsieh
> UCLA Computer Science PhD Student
> M: (310) 990-4297
> A: 3770 Keystone Ave. Apt 402,
>      Los Angeles, CA 90034
>
>
>
>
> --
>
> *Abhishek Bhattacharjee*
>
> *Pune Institute of Computer Technology*
>



-- 
Cheng-Kang Hsieh
UCLA Computer Science PhD Student
M: (310) 990-4297
A: 3770 Keystone Ave. Apt 402,
     Los Angeles, CA 90034

Mime
View raw message