storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Mocanu <amoc...@verticalscope.com>
Subject RE: How to efficiently store the intermediate result of a bolt, and so it can be replayed after the crashes?
Date Tue, 11 Feb 2014 22:03:52 GMT
You can have acks from bolt to bolt.

Spout:
 //ties in tuple to this UID
_collector.emit(new Values(queue.dequeue(), *uniqueID*)

Then Bolt1 will ack the tuple only after it emits it to Bolt2 so that the ack can be tied
to the tuple
Bolt1:
 //emit first then ack
_collector.emit(tuple, new Values("stuff")) //**anchoring** - read below to see what it means
_collector.ack(tuple)

At this point tuple from Spout has been acked in Bolt1, but at the same time the newly emitted
tuple "stuff" to Bolt2 is "anchored" to the tuple from Spout. What this means is that it still
needs to be acked later on otherwise on timeout it will be resent by spout.
Bolt2:
_collector.ack(tuple)
Bolt2 needs to ack the tuple received from Bolt1 which will send in the last ack that Spout
was waiting for. If at this point Bolt2 emits tuple, then there must be a Bolt3 which will
get it and ack it. If the tuple is not acked at the last point, Spout will time it out and
resend it.
Each time anchoring is done on an emit statement from bolt to bolt, a new node in a "tree"
structure is built... well more like a list in my case since I never send the same tuple to
2 or more tuples, I have a 1 to 1 relationship.
All nodes in the tree need to be acked, and only then the tuple is marked as fully arrived.
If the tuple is not acked and it is sent with a UID and anchored later on then it will be
kept in memory forever (until acked).
Hope this helps.

From: Tom Brown [mailto:tombrown52@gmail.com]
Sent: February-11-14 4:57 PM
To: user@storm.incubator.apache.org
Subject: Re: How to efficiently store the intermediate result of a bolt, and so it can be
replayed after the crashes?

We use 2 storm topologies, with kafka in between:  Kafka --> TopologyA --> Kafka -->
TopologyB --> Final output

This allows the two halves of computation to be scaled and maintained independently.

--Tom

On Tue, Feb 11, 2014 at 2:36 PM, Cheng-Kang Hsieh (Andy) <changun@cs.ucla.edu<mailto:changun@cs.ucla.edu>>
wrote:
Hi Aniket & Andrian,

Thank you guys so much for the kind reply! Although the replies don't directly solve my problem,
it has been very rewarding following the code of redis-storm and Trident.

I guess storing the intermediate data in an external db (like Cassandra, as suggested by Andrian)
would work, but what if the Bolt that is supposed to receive the intermediate data fails?
In this case, the emitter is also a Bolt, and does not have the nice ACK mechanism to rely
on, so the emitting Bolt might never know when it should resend the data to the receiving
Bolt.

In other framework like Samza, or Spark Streaming, all the emitted data, no matter, by a Spout
or Bolt is treated as the same way and so benefits from the same fault tolerance mechanism
(they are not as easy to use as Storm though). For example, in Samza, all the data output
of a component are push to a Kafka queue with the receiving components as the listeners (see
here<http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html>).

Conceptually maybe a more general solution for Storm is to make a Bolt also a Spout which
can receive ACKs from the receiving Bolts; however it seems to violate the assumption of Storm?

Again I appreciate any advice or suggestion. Thank you!

Best,
Andy

On Fri, Feb 7, 2014 at 9:37 AM, Adrian Mocanu <amocanu@verticalscope.com<mailto:amocanu@verticalscope.com>>
wrote:
Hi Andy,
I think you can use Trident to persist the results at any point in your stream processing.
I believe the way you do that is by using STREAM.persistentAggregate(...)

Here's an example from https://github.com/nathanmarz/storm/wiki/Trident-tutorial

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
       .parallelismHint(6);

In this case the counts (re[place counts with whatever operations you are doing) are stored
in a memory map, but you can make another class that saves this intermediate result to a db...
at least that's my understanding... I am currently also learning these things.
I'm currently working on a similar problem and I'm attempting to store into Cassandra. Feel
free to watch my conversation threads (with Svend and Taylor Goetz)

-A

From: Aniket Alhat [mailto:aniket.alhat@gmail.com<mailto:aniket.alhat@gmail.com>]
Sent: February-06-14 11:57 PM
To: user@storm.incubator.apache.org<mailto:user@storm.incubator.apache.org>
Subject: Re: How to efficiently store the intermediate result of a bolt, and so it can be
replayed after the crashes?


I hope this helps

https://github.com/pict2014/storm-redis
On Feb 7, 2014 12:07 AM, "Cheng-Kang Hsieh (Andy)" <changun@cs.ucla.edu<mailto:changun@cs.ucla.edu>>
wrote:
Sorry, I realized that question was badly written. Simply put, my question is that is there
a recommended way to store the tuples emitted by a BOLT so that the tuples can be replayed
after crash without repeating the process all the way up from the source spout? any advice
would be appreciated. Thank you!

Best,
Andy

On Tue, Feb 4, 2014 at 11:58 AM, Cheng-Kang Hsieh (Andy) <changun@cs.ucla.edu<mailto:changun@cs.ucla.edu>>
wrote:
Hi all,

First of all, Thank Nathan and all the contributors for pulling out such a
great framework! I am learning a lot, even just reading the discussion
threads.

I am building a topology that contains one spout along with a chain of
bolts. (e.g. S -> A  -> B, where S is the spout, A, B are bolts.)

When S emits a tuple, the next bolt A  will buffer the tuple in a DFS, and
compute some aggregated values when it has received a sufficient amount of
data and then emit the aggregation results to the next bolt B.

Here comes my question, is there a recommended way to store the
intermediate results emitted by a bolt, so that when machine crashes, the
results can be replayed to the downstreaming bolts (i.e. bolt B)?

One possible solution could be that: Don't keep any intermediate results,
but resort to the storm's ack framework, so that the raw data will be
replay from spout S when crash happened.

However, this approach might not be appropriate in my case, as it might
take pretty long time (like a couple of hours) before bolt A has received
all the required data and emit the aggregated results, so that it will be
very expensive for ack framework to keep tracking that many tuples for that
long.

An alternative solution could be: *making bolt A also a spout* and keep the
emitted data in a DFS queue. When a result has been acked, the bolt A
removes it from the queue.

I am wondering if it is reasonable to make a task both bolt and spout at
the same time? or if there is any better approach to do so.

Thank you!

--
Cheng-Kang Hsieh
UCLA Computer Science PhD Student
M: (310) 990-4297<tel:%28310%29%20990-4297>
A: 3770 Keystone Ave. Apt 402,
     Los Angeles, CA 90034



--
Cheng-Kang Hsieh
UCLA Computer Science PhD Student
M: (310) 990-4297<tel:%28310%29%20990-4297>
A: 3770 Keystone Ave. Apt 402,
     Los Angeles, CA 90034



--
Cheng-Kang Hsieh
UCLA Computer Science PhD Student
M: (310) 990-4297<tel:%28310%29%20990-4297>
A: 3770 Keystone Ave. Apt 402,
     Los Angeles, CA 90034


Mime
View raw message