incubator-s4-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leo Neumeyer <leoneume...@gmail.com>
Subject Re: Thoughts on adding guaranteed message processing
Date Fri, 10 Aug 2012 20:19:47 GMT
These are great projects if someone wants to champion them. However, with limited bandwidth,
I would focus first on usability, dev tools, dashboard, DSL support, etc. That is, making
it very easy to write, share, and deploy apps. 

-leo


On Aug 9, 2012, at 11:59 PM, Karthik Kambatla <kkambatl@cs.purdue.edu> wrote:

> From the conversation so far, S4's priorities are in the following order.
> Please correct me where wrong.
> 
>   1. High throughput supporting at-most once semantics
>   2. Optional (on/off) guaranteed delivery (at least once semantics) at
>   the expense of some performance/throughput
>   3. Optional (on/off) exactly-once semantics at the expense of further
>   performance/throughput loss
> 
> As each case benefits some application, it should be okay to contribute to
> any/all of the goals. We should be good as long as a lower priority item
> doesn't adversely affect a higher priority item.
> 
> Thanks
> Karthik
> 
> On Thu, Aug 9, 2012 at 11:29 PM, kishore g <g.kishore@gmail.com> wrote:
> 
>> Nice discussion and all points are valid. Exactly once is pretty hard to
>> get right and not sure if it will be worth the efforts. We already have
>> atmost once logic by doing tcp/udp. This is the easiest semantics to
>> program against. Atleast once is also quite useful and as Matthieu
>> mentioned kafka,bookeeper/hedwig/log stage or any pub/sub system should
>> help us achieve this. But it is quite hard to program against this
>> semantics and most likely will require converting the streaming model into
>> batch based streaming because of the need to checkpoint at some intervals.
>> 
>> 
>> thanks,
>> Kishore G
>> 
>> 
>> On Thu, Aug 9, 2012 at 8:43 AM, Karthik Kambatla <kkambatl@cs.purdue.edu
>>> wrote:
>> 
>>> Should we move this conversation to the JIRA?
>>> 
>>> On Thu, Aug 9, 2012 at 8:40 AM, Karthik Kambatla <kkambatl@cs.purdue.edu
>>>> wrote:
>>> 
>>>> On Thu, Aug 9, 2012 at 7:37 AM, Flavio Junqueira <fpj@yahoo-inc.com
>>>> wrote:
>>>> 
>>>>> The additional features I see needed are logging and a mechanism to
>>>>> notify consumption. Consuming means that an event has been processed.
>>> One
>>>>> issue here is the one I raised before: not confirming that an event
>>> haven't
>>>>> been consumed doesn't mean it hasn't, the node might have have crashed
>>>>> before sending the message or writing to the journal. In principle,
>>>>> processing the message and notifying consumption need to be atomic for
>>>>> exactly-once semantics.
>>>>> 
>>>> +1. This needs to be on the path to guaranteeing exactly-once
>> semantics.
>>>> 
>>>> 
>>>>> About minimizing errors, I'm still not entirely sure what it buys us.
>> Of
>>>>> course we don't want to lose lots of events if we can avoid it, but to
>>> my
>>>>> knowledge we are not doing poorly in that sense. For applications that
>>>>> strictly need to process every event, losing 2 instead of 4 does not
>>> make
>>>>> much difference, so I'm still stuck on what we gain with point 1 of
>> your
>>>>> incremental approach.
>>>>> 
>>>> I think, Leo and Matthieu, are concerned about the performance
>>>> implications of supporting full-blown exactly-once semantics.
>>>> 
>>>> Matthieu's point 1 takes us closest to loss-less delivery (not
>>> guaranteed)
>>>> without losing performance, and is indeed common to all use-cases.
>> Also,
>>> it
>>>> is one of the necessary steps towards exactly-once semantics.
>>>> 
>>>>> 
>>>>> -Flavio
>>>>> 
>>>>> On Aug 9, 2012, at 11:22 AM, Matthieu Morel wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> Is it worth trying to achieve exactly-once or even at-least-once
>> event
>>>>>> processing semantics, which are not useful for applications that
>>> process
>>>>>> streams in a statistical manner? It seems there are quite a few
>> users
>>>>>> and applications interested in these semantics, and that can
>> tolerate
>>>>>> the exceptional extra latency due to recovery. And indeed combining
>>>>>> checkpointing with an inbound logging mechanism such as something
>>> based
>>>>>> on Hedwig/Bookkeeper or Kafka is a possible approach. But it is not
>>>>>> sufficient, since - implementation-wise - we also have to add some
>>>>>> tracking of the messages, a mechanism to avoid load shedding by
>>> holding
>>>>>> upstream processes, and probably some kind of coordination messages
>> or
>>>>>> mechanism.
>>>>>> 
>>>>>> As Leo pointed out, this can get quite complex, and involvement is
>>>>>> use-case driven and therefore depends on use cases
>>>>>> committers/contributors are facing and directly involved with.
>>>>>> 
>>>>>> So maybe an incremental approach would be worth following:
>>>>>> 
>>>>>> 1. minimize errors (this implies providing metrics, and some other
>>>>>> things Leo may have in mind)
>>>>>> 2. control load shedding by notifying upstream processing and
>>>>>> potentially holding upstream processes (optional of course)
>>>>>> 3. integrate with replayable inbound logging systems such as Kafka
>> or
>>>>>> Hedwig/logstage,
>>>>>> 4. implement a mechanism for coordinating recovery
>>>>>> 
>>>>>> Each step brings very valuable benefits and we can adapt based on
>> use
>>>>>> cases we are facing, workload, participants, priorities etc...
>>>>>> 
>>>>>> For now, we already scheduled adding insightful metrics in the next
>>>>>> version. 2 and 3 look fairly easy to implement.
>>>>>> 
>>>>>> 
>>>>>> Regards,
>>>>>> 
>>>>>> Matthieu
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On 8/8/12 11:13 PM, Flavio Junqueira wrote:
>>>>>>> Roughly applications can either afford to lose events or not.
If an
>>>>> app can't afford to lose events, then it does not make much good to
>>> reduce
>>>>> the amount of lost events.   One way to achieve fault tolerance is to
>>> log
>>>>> events, using e.g. BookKeeper. BookKeeper is both replicated and fast.
>>>>>>> 
>>>>>>> Just logging, however, is not sufficient. If you can't stall
the
>>>>> source, then guaranteeing no event loss might be very difficult in the
>>> case
>>>>> recovery can be arbitrarily long. In such cases, the amount of storage
>>> for
>>>>> logging required is unbounded.
>>>>>>> 
>>>>>>> Along those lines, Matthieu has developed a system on top of
>>>>> BookKeeper (Hedwig to be more precise) called log stage; it might be
>>> useful
>>>>> in this context. What do you think, Matthieu?
>>>>>>> 
>>>>>>> -Flavio
>>>>>>> 
>>>>>>> On Aug 7, 2012, at 8:37 PM, Leo Neumeyer wrote:
>>>>>>> 
>>>>>>>> Hi all. Some thoughts.
>>>>>>>> 
>>>>>>>> Building a fault tolerant system would require queuing N
events at
>>> the
>>>>>>>> source. The length of the queue would depend on the frequency
of
>>>>>>>> checkpointing. Post failure, the system would need to restore
the
>>>>> state of
>>>>>>>> the node from checkpointing and re-apply all the events emitted
>>> since
>>>>> the
>>>>>>>> time of checkpointing. To increase reliability, we would
also need
>>> to
>>>>>>>> replicate the nodes. One would also need to account for the
peak
>>> data
>>>>> rate
>>>>>>>> to make sure there is enough capacity to re-process all the
data
>> and
>>>>> comply
>>>>>>>> with the real-time constrains.
>>>>>>>> 
>>>>>>>> All of this is possible but I'm not sure this is the best
platform
>>> for
>>>>>>>> applications that expect zero errors. It was designed for
>> processing
>>>>> large
>>>>>>>> amounts of data in application that can tolerate a small
>> probability
>>>>> of
>>>>>>>> error. Improving the platform to reduce the errors is much
simpler
>>>>> than
>>>>>>>> trying to achieve zero errors.
>>>>>>>> 
>>>>>>>> Perhaps a better approach is to have good error detection
so the
>>>>>>>> application can handle the recovery at a higher level (not
in
>>>>> real-time).
>>>>>>>> 
>>>>>>>> Rather than try to solve ALL the problems, I think that it
is
>> better
>>>>> to
>>>>>>>> focus on problems that involve statistical processing of
massive
>>>>> amounts of
>>>>>>>> noisy and redundant data where a small probability of errors
will
>>> not
>>>>>>>> affect the accuracy of the results. (text processing, signal
>>>>> processing,
>>>>>>>> sensor data, market data, etc.) The advantage of focusing
is that
>> we
>>>>> can
>>>>>>>> solve one problem well and keep the system as simple as possible.
>>>>>>>> 
>>>>>>>> Regarding the reliability at the communication layer, using
TCP
>>>>> should work
>>>>>>>> fine, I think.
>>>>>>>> 
>>>>>>>> Of course sending email is easy, I wish I had more time to
put my
>>> code
>>>>>>>> where my mouth is :-)
>>>>>>>> 
>>>>>>>> -leo
>>>>>>>> 
>>>>>>>> On Tue, Aug 7, 2012 at 10:15 AM, Karthik Kambatla <
>>>>> kkambatl@cs.purdue.edu>wrote:
>>>>>>>> 
>>>>>>>>> Hi Flavio,
>>>>>>>>> 
>>>>>>>>> We are in agreement. I was trying to push the discussion
further,
>>>>> and get
>>>>>>>>> your inputs to decide on a plausible approach in S4.
>>>>>>>>> 
>>>>>>>>> Regarding exactly-once semantics, I understand we need
to plug
>>>>> multiple
>>>>>>>>> holes in a failing environment. To ensure we actually
can support
>>>>> reliable
>>>>>>>>> delivery, should we outline possible failures and how
they can be
>>>>>>>>> addressed. I might be able to think threw and list the
steps (and
>>>>> possible
>>>>>>>>> failures) later today/tomorrow.
>>>>>>>>> 
>>>>>>>>> On a side note, I remember Kishore and Leo initiating
a
>>> conversation
>>>>> on
>>>>>>>>> using ZeroMQ in the comm-layer. ZeroMQ, I have heard,
supports
>>>>> reliable
>>>>>>>>> delivery, and they claim to be faster than TCP.
>>>>>>>>> 
>>>>>>>>> Thanks
>>>>>>>>> Karthik
>>>>>>>>> 
>>>>>>>>> On Tue, Aug 7, 2012 at 6:53 AM, Flavio Junqueira <
>>> fpj@yahoo-inc.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> I didn't mean to suggest a different way, I was trying
to
>>>>> understand the
>>>>>>>>>> definition of exactly-once. As for the use case Benjamin
has
>>>>> posted, he
>>>>>>>>>> says that not even a single tag scan can be lost,
but can we
>>>>> guarantee
>>>>>>>>> that
>>>>>>>>>> events are reliably delivered at-least-once with
S4?
>>>>>>>>>> 
>>>>>>>>>> -Flavio
>>>>>>>>>> 
>>>>>>>>>> On Aug 7, 2012, at 7:38 AM, Karthik Kambatla wrote:
>>>>>>>>>> 
>>>>>>>>>>> Given that nodes crash, it seems essential to
build reliability
>>>>> like it
>>>>>>>>>> is
>>>>>>>>>>> in lower layers - think TCP. One trivial approach
could be to
>> use
>>>>>>>>>>> monotonically increasing sequence numbers to
identify events
>> in a
>>>>>>>>> stream.
>>>>>>>>>>> 
>>>>>>>>>>> 1. Event ordering: Hold events until all the
previous sequence
>>>>>>>>> numbers
>>>>>>>>>>> have been received.
>>>>>>>>>>> 2. Exactly-once: If a sequence number is smaller
than previous
>>>>> one,
>>>>>>>>> it
>>>>>>>>>>> is a duplicate.
>>>>>>>>>>> 3. Fault-tolerance: Store the latest sequence
number along
>> with
>>>>> the
>>>>>>>>>>> checkpoint, replay events from there onwards.
>>>>>>>>>>> 
>>>>>>>>>>> This, of course, comes at a performance overhead
and should be
>>>>>>>>> optional.
>>>>>>>>>>> 
>>>>>>>>>>> As I said, this is the first approach that comes
to mind. It is
>>>>> indeed
>>>>>>>>> an
>>>>>>>>>>> interesting problem, and I feel we should not
need to re-do
>> stuff
>>>>> that
>>>>>>>>> is
>>>>>>>>>>> done at lower layers.
>>>>>>>>>>> 
>>>>>>>>>>> Please suggest improvements/alternatives as you
see fit.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks
>>>>>>>>>>> Karthik
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Aug 6, 2012 at 10:01 PM, Flavio Junqueira
<
>>>>> fpj@yahoo-inc.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Aug 6, 2012, at 10:11 PM, Karthik Kambatla
wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Flavio - it is indeed tricky to offer exactly-once
semantics.
>> My
>>>>>>>>>>>> understanding is that the underlying comm-layer
could filter
>> out
>>>>>>>>>> subsequent
>>>>>>>>>>>> duplicate events; however, we need to sacrifice
ordering.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> I was also thinking that if a node crash
and we recover from
>>>>>>>>>> checkpoints,
>>>>>>>>>>>> we may end up having messages applied twice.
>>>>>>>>>>>> 
>>>>>>>>>>>> -Flavio
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Karthik
>>>>>>>>>>>> 
>>>>>>>>>>>> On Mon, Aug 6, 2012 at 6:43 AM, "Benjamin
Süß" <
>> Gothic13@gmx.de
>>>> 
>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Matthieu,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> thank you for your reply. I had a specific
use case in mind,
>>>>> indeed:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I am trying to track RFID tags in distributed
systems. This
>>>>> means,
>>>>>>>>> that
>>>>>>>>>>>>> not even a single tag scan may get lost.
And of course, none
>>> are
>>>>> to
>>>>>>>>> be
>>>>>>>>>> sent
>>>>>>>>>>>>> twice or even more often as this would
heavily confuse any
>>>>>>>>> surveillance
>>>>>>>>>>>>> routines I am going to implement.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regarding your answers, especially point
3, I do not think
>> this
>>>>> can
>>>>>>>>> be
>>>>>>>>>>>>> done with S4 at the moment, can it?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Benjamin
>>>>>>>>>>>>> 
>>>>>>>>>>>>> -------- Original-Nachricht --------
>>>>>>>>>>>>>> Datum: Tue, 31 Jul 2012 17:30:27
+0200
>>>>>>>>>>>>>> Von: Matthieu Morel <mmorel@apache.org>
>>>>>>>>>>>>>> An: s4-user@incubator.apache.org
>>>>>>>>>>>>>> Betreff: Re: Thoughts on adding guaranteed
message
>> processing
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 7/31/12 2:54 PM, "Benjamin Süß"
wrote:
>>>>>>>>>>>>>>> Hi there,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> it is stated in several places
that S4 does not include
>>>>> guaranteed
>>>>>>>>>>>>>> one-time message processing. So my
question is: are there
>>>>> currently
>>>>>>>>>> any
>>>>>>>>>>>>> plans on
>>>>>>>>>>>>>> adding this to S4? Or is it certain
this is not going to
>>>>> happen? If
>>>>>>>>>>>>> there
>>>>>>>>>>>>>> are any plans on this, can I find
further information
>>> somewhere?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> There are typically 3 requirements
for guaranteeing one-time
>>>>> message
>>>>>>>>>>>>>> processing:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1. reliable communication channels
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 2. replayable input stream: you need
an upstream component
>>> that
>>>>> is
>>>>>>>>>> able
>>>>>>>>>>>>>> to store/bufferize the whole stream
and replay on demand.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 3. tracking of messages, using some
sort of piggybacking,
>>>>> possibly
>>>>>>>>>>>>>> requiring manual input from the user.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> In S4 0.5.0, we already address 1.
by providing
>> communications
>>>>>>>>> through
>>>>>>>>>>>>>> TCP by default. Requirement 2. is
quite straightforward to
>>>>>>>>> implement,
>>>>>>>>>> by
>>>>>>>>>>>>>> adding some machinery to connect
to a component such as
>> Apache
>>>>> Kafka
>>>>>>>>>> for
>>>>>>>>>>>>>> instance. We are considering options
for 3.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Do you have a specific use case in
mind?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Matthieu
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> *Leo Neumeyer*
>>>>>>>> Software, data, algorithms.
>>>>>>>> leoneumeyer@gmail.com
>>>>>>>> *http://www.linkedin.com/in/leoneu*
>>>>>>>> 
>>>>>>>> See who we know in
>>>>>>>> common<
>>>>> http://www.linkedin.com/e/wwk/27360/?hs=false&tok=3SHIsP1c6dURk1>Want
>>>>>>>> a signature like
>>>>>>>> this?<
>>>>> http://www.linkedin.com/e/sig/27360/?hs=false&tok=0r9XKRwP2dURk1>
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Mime
View raw message