On Apr 23, 2013, at 07:27 , JiHyoun Park wrote:

Hi Matthieu,

You said,
But if we use an adapter, by default, senders will wait until the downstream app can process events.

Actually I had a problem that Adapter failed when Adapter sent too many events to App but App was too slow to process all the events.
I tested it by adding Thread.sleep() in App intentionally, to see what happened if App was slow (in other words, complex).
Is the Adapter failure in this case because the queue of RemoteStream between Adapter and App has been full and exploded?

That would depend on how you implement the adapter. If the sender is blocking but that you keep providing data to be sent there will be buffering and that may lead to memory issues if you don't throttle. I suppose that is what you are seeing.


1. How can I change the event processing strategy of RemoteStream from 'Blocking' to 'Shedding'?

You need to override the default binding which is this one: https://github.com/apache/incubator-s4/blob/dev/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java#L92


2. Are there any ways to take events out from the queue of RemoteStream intentionally at the downstream app? (I mean, in the situation of too many incoming events and slow processing speed, I'd like to make App take events from the queue quickly and bypass all the following analysis parts just to offload the queue.)

You could probably do that by detecting that queues are full and doing something special in that case (like: reroute events).  With a different implementation for the StreamExecutorFactory: you could modify the blocking executor implementation by specifying a timeout on the semaphore acquisition for instance. The default is to block when events are slow to be consumed, and that applies backpressure by propagating upstream.


By the way, why does S4 suggest having the Adapter-App structure for one application?
Can't we merge them into one and make a so-called 'adapterPE' in App?
What are the problems and concerns of one App for all?

There was a lengthy discussion about that in https://issues.apache.org/jira/browse/S4-22 with different opinions. So right now this is the way but what you suggest is conceptually possible, and probably convenient, it's just not implemented. Currently you can mimic that by injecting from the start() method of the App class

Regards,

Matthieu


Best Regards
Jihyoun



On Tue, Apr 23, 2013 at 12:47 AM, Matthieu Morel <mmorel@apache.org> wrote:

On Apr 22, 2013, at 16:24 , saradindu kar wrote:

Hi Matthieu,

Thanks for your Clarifications.
do you think, my observations are correct regarding 0.3,0.5,0.6. ?

In S4 0.6 we can define how to process events: blocking (no loss, but waits), shedding (drops events when input (or output) rate is faster than processing rate), custom (maybe, depending on the stream or some characteristic of the event itself).

If I understood correct, by default there are event loss, if speed is high but lag time is 0 in 0.6.

But how can I define "blocking (no loss, but waits), shedding (drops events when input (or output) rate is faster than processing rate), custom (maybe, depending on the stream or some characteristic of the event itself)." . Is it like, I need to write my logics inside the PEs based on the load or any APIs need to configure by putting some limits?


That's more of a platform concern, so it should not be in the application code. You need to specify a custom module in which you define the bindings for the event processors. In other words, specify which processor you'd like for sending events for instance. By default we bind the sender executor to a load shedding implementation. https://github.com/apache/incubator-s4/blob/dev/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java#L91 
But you can simply override that with a blocking implementation in a custom module so that no event is lost.

Currently there is no specific documentation for doing that, but you can follow the same idea as the one described in the checkpointing documentation http://incubator.apache.org/s4/doc/0.6.0/fault_tolerance/ . There is also related documentation in the configuration section  http://incubator.apache.org/s4/doc/0.6.0/configuration/ . 


By default, within an S4 app, when downstream PEs cannot process events sufficiently fast, S4 drops events upstream. But if we use an adapter, by default, senders will wait until the downstream app can process events.


Yes, We can use an adapter, then we need to handle Adapter in a multi-threaded way to make it scalable. or are their any way to handle adapter to cope with input speed.

An adapter is also an S4 app, so you can easily distribute the load.



Bz I usually end up with situation, where it leads to deadlock bz of the queue-size or mis-communication between PEs where PE1 sends the input very fast but PE2 can not, PE1 and PE2 are in the the same level & they are doing same task only with different Keys. It is not throwing any error, After some time as nodes are idle, It expires the whole topology. 

Do you think it is an implementation Error or any issues in architecture?

It's unclear to me from your description. I can suggest to make sure you don't have loops in the PE graph. 

Regards,

Matthieu


Thanks,
~/Sara

On Mon, Apr 22, 2013 at 6:32 PM, Matthieu Morel <mmorel@apache.org> wrote:
Great questions! See my answers inline.

On Apr 22, 2013, at 14:38 , saradindu kar wrote:

Hi,

I followed s4 from s4-0.3 version, then 0.5 and now 0.6.

I have some experimental outcomes, which I want to clarify;

In 0.3 (from PE to PE) : no of events lost: YES ----- Mean Lag is:YES
In 0.5 (from PE to PE) : no of events lost: "0"(No event Loss) ------ Mean Lag is: "Still Lag is their but Less compare to 0.3"
In 0.6 (from PE to PE) : no of events lost: "Starts with 10000, then it reduces up to thousands " ------ Mean Lag is:"0"

So my doubt is as S4 evolves from its inception, what is your current primary goals to address.

0.5 was a complete refactoring, with focus on providing a functional system with a new implementation
0.6 aims were to improve performance and usability / configurability.

Can I deploy one system with 0 loss and 0 lag time or It is like based on my use-case needs, I can choose 0.5 or 0.6.

In S4 0.6 we can define how to process events: blocking (no loss, but waits), shedding (drops events when input (or output) rate is faster than processing rate), custom (maybe, depending on the stream or some characteristic of the event itself).
By default, within an S4 app, when downstream PEs cannot process events sufficiently fast, S4 drops events upstream. But if we use an adapter, by default, senders will wait until the downstream app can process events.


For addressing above issue I felt Storm has upper hand over S4 but It has lesser performance, in terms of no of events processed and processing speed but that can also improve using more no of machines.

Is it correct, as Storm uses ZeroMQ, "kind of pulling technique", It uses for handling incoming events. It doesn't incur above problem.
Whereas S4 won't use ZeroMQ, if I understood correct it uses push technique for handling incoming events, So it looses events for maintaining the queue.

That depends on how you configure the processing of the queues. By blocking upstream based on back pressure from downstream, you can avoid losing events. Events won't be sent faster than the downstream system can process them. 

Then it depends on your source of events. If you can pull from that source, then great, pull code can be implemented in the adapter, then passed to the S4 app. If you cannot pull, you can maintain some buffering, but you'll probably have to drop some events at some point, and S4 provides facilities for that.



Can you give me some idea about concepts behind queue implementation Here.

More details here: http://incubator.apache.org/s4/doc/0.6.0/event_dispatch/ 


One more Query about Joining multiple streams, there is a provision for joining streams in 0.3, did you have any provision here(0.6) for joining, splitting, any incoming streams based on its key. Now we can do, writing a common event file for different Event streams. We can use that for our processing in PEs.
If you have any way do this in 0.6, can you redirect to right API for this.

There is no such API / facility yet, so you have to implement the corresponding logic in the code of the PE

Hope this helps,

Matthieu


Thanks,
~/Sk