incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthieu Morel <mmo...@apache.org>
Subject Re: Curious about extra things in 0.6 compared to 0.5
Date Tue, 23 Apr 2013 18:19:17 GMT

On Apr 23, 2013, at 07:27 , JiHyoun Park wrote:

> Hi Matthieu,
> 
> You said,
> But if we use an adapter, by default, senders will wait until the downstream app can
process events.
> 
> Actually I had a problem that Adapter failed when Adapter sent too many events to App
but App was too slow to process all the events.
> I tested it by adding Thread.sleep() in App intentionally, to see what happened if App
was slow (in other words, complex).
> Is the Adapter failure in this case because the queue of RemoteStream between Adapter
and App has been full and exploded?

That would depend on how you implement the adapter. If the sender is blocking but that you
keep providing data to be sent there will be buffering and that may lead to memory issues
if you don't throttle. I suppose that is what you are seeing.

> 
> 1. How can I change the event processing strategy of RemoteStream from 'Blocking' to
'Shedding'?

You need to override the default binding which is this one: https://github.com/apache/incubator-s4/blob/dev/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java#L92


> 2. Are there any ways to take events out from the queue of RemoteStream intentionally
at the downstream app? (I mean, in the situation of too many incoming events and slow processing
speed, I'd like to make App take events from the queue quickly and bypass all the following
analysis parts just to offload the queue.)

You could probably do that by detecting that queues are full and doing something special in
that case (like: reroute events).  With a different implementation for the StreamExecutorFactory:
you could modify the blocking executor implementation by specifying a timeout on the semaphore
acquisition for instance. The default is to block when events are slow to be consumed, and
that applies backpressure by propagating upstream.

> 
> By the way, why does S4 suggest having the Adapter-App structure for one application?
> Can't we merge them into one and make a so-called 'adapterPE' in App?
> What are the problems and concerns of one App for all?

There was a lengthy discussion about that in https://issues.apache.org/jira/browse/S4-22 with
different opinions. So right now this is the way but what you suggest is conceptually possible,
and probably convenient, it's just not implemented. Currently you can mimic that by injecting
from the start() method of the App class

Regards,

Matthieu

> 
> Best Regards
> Jihyoun
> 
> 
> 
> On Tue, Apr 23, 2013 at 12:47 AM, Matthieu Morel <mmorel@apache.org> wrote:
> 
> On Apr 22, 2013, at 16:24 , saradindu kar wrote:
> 
>> Hi Matthieu,
>> 
>> Thanks for your Clarifications.
>> do you think, my observations are correct regarding 0.3,0.5,0.6. ?
>> 
>> In S4 0.6 we can define how to process events: blocking (no loss, but waits), shedding
(drops events when input (or output) rate is faster than processing rate), custom (maybe,
depending on the stream or some characteristic of the event itself).
>> 
>> If I understood correct, by default there are event loss, if speed is high but lag
time is 0 in 0.6.
>> 
>> But how can I define "blocking (no loss, but waits), shedding (drops events when
input (or output) rate is faster than processing rate), custom (maybe, depending on the stream
or some characteristic of the event itself)." . Is it like, I need to write my logics inside
the PEs based on the load or any APIs need to configure by putting some limits?
> 
> That's more of a platform concern, so it should not be in the application code. You need
to specify a custom module in which you define the bindings for the event processors. In other
words, specify which processor you'd like for sending events for instance. By default we bind
the sender executor to a load shedding implementation. https://github.com/apache/incubator-s4/blob/dev/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java#L91

> But you can simply override that with a blocking implementation in a custom module so
that no event is lost.
> 
> Currently there is no specific documentation for doing that, but you can follow the same
idea as the one described in the checkpointing documentation http://incubator.apache.org/s4/doc/0.6.0/fault_tolerance/
. There is also related documentation in the configuration section  http://incubator.apache.org/s4/doc/0.6.0/configuration/
. 
> 
>> 
>> By default, within an S4 app, when downstream PEs cannot process events sufficiently
fast, S4 drops events upstream. But if we use an adapter, by default, senders will wait until
the downstream app can process events.
>> 
>> 
>> Yes, We can use an adapter, then we need to handle Adapter in a multi-threaded way
to make it scalable. or are their any way to handle adapter to cope with input speed.
> 
> An adapter is also an S4 app, so you can easily distribute the load.
> 
>> 
>> 
>> Bz I usually end up with situation, where it leads to deadlock bz of the queue-size
or mis-communication between PEs where PE1 sends the input very fast but PE2 can not, PE1
and PE2 are in the the same level & they are doing same task only with different Keys.
It is not throwing any error, After some time as nodes are idle, It expires the whole topology.

>> 
>> Do you think it is an implementation Error or any issues in architecture?
> 
> It's unclear to me from your description. I can suggest to make sure you don't have loops
in the PE graph. 
> 
> Regards,
> 
> Matthieu
> 
>> 
>> Thanks,
>> ~/Sara
>> 
>> On Mon, Apr 22, 2013 at 6:32 PM, Matthieu Morel <mmorel@apache.org> wrote:
>> Great questions! See my answers inline.
>> 
>> On Apr 22, 2013, at 14:38 , saradindu kar wrote:
>> 
>>> Hi,
>>> 
>>> I followed s4 from s4-0.3 version, then 0.5 and now 0.6.
>>> 
>>> I have some experimental outcomes, which I want to clarify;
>>> 
>>> In 0.3 (from PE to PE) : no of events lost: YES ----- Mean Lag is:YES
>>> In 0.5 (from PE to PE) : no of events lost: "0"(No event Loss) ------ Mean Lag
is: "Still Lag is their but Less compare to 0.3"
>>> In 0.6 (from PE to PE) : no of events lost: "Starts with 10000, then it reduces
up to thousands " ------ Mean Lag is:"0"
>>> 
>>> So my doubt is as S4 evolves from its inception, what is your current primary
goals to address.
>> 
>> 0.5 was a complete refactoring, with focus on providing a functional system with
a new implementation
>> 0.6 aims were to improve performance and usability / configurability.
>> 
>>> Can I deploy one system with 0 loss and 0 lag time or It is like based on my
use-case needs, I can choose 0.5 or 0.6.
>> 
>> In S4 0.6 we can define how to process events: blocking (no loss, but waits), shedding
(drops events when input (or output) rate is faster than processing rate), custom (maybe,
depending on the stream or some characteristic of the event itself).
>> By default, within an S4 app, when downstream PEs cannot process events sufficiently
fast, S4 drops events upstream. But if we use an adapter, by default, senders will wait until
the downstream app can process events.
>> 
>>> 
>>> For addressing above issue I felt Storm has upper hand over S4 but It has lesser
performance, in terms of no of events processed and processing speed but that can also improve
using more no of machines.
>>> 
>>> Is it correct, as Storm uses ZeroMQ, "kind of pulling technique", It uses for
handling incoming events. It doesn't incur above problem.
>>> Whereas S4 won't use ZeroMQ, if I understood correct it uses push technique for
handling incoming events, So it looses events for maintaining the queue.
>> 
>> That depends on how you configure the processing of the queues. By blocking upstream
based on back pressure from downstream, you can avoid losing events. Events won't be sent
faster than the downstream system can process them. 
>> 
>> Then it depends on your source of events. If you can pull from that source, then
great, pull code can be implemented in the adapter, then passed to the S4 app. If you cannot
pull, you can maintain some buffering, but you'll probably have to drop some events at some
point, and S4 provides facilities for that.
>> 
>> 
>>> 
>>> Can you give me some idea about concepts behind queue implementation Here.
>> 
>> More details here: http://incubator.apache.org/s4/doc/0.6.0/event_dispatch/ 
>> 
>>> 
>>> One more Query about Joining multiple streams, there is a provision for joining
streams in 0.3, did you have any provision here(0.6) for joining, splitting, any incoming
streams based on its key. Now we can do, writing a common event file for different Event streams.
We can use that for our processing in PEs. 
>>> If you have any way do this in 0.6, can you redirect to right API for this.
>> 
>> There is no such API / facility yet, so you have to implement the corresponding logic
in the code of the PE
>> 
>> Hope this helps,
>> 
>> Matthieu
>> 
>>> 
>>> Thanks,
>>> ~/Sk
>> 
>> 
> 
> 


Mime
View raw message