incubator-s4-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Karthik Kambatla <kkamb...@cs.purdue.edu>
Subject Re: Thoughts on adding guaranteed message processing
Date Thu, 09 Aug 2012 15:40:51 GMT
On Thu, Aug 9, 2012 at 7:37 AM, Flavio Junqueira <fpj@yahoo-inc.com> wrote:

> The additional features I see needed are logging and a mechanism to notify
> consumption. Consuming means that an event has been processed. One issue
> here is the one I raised before: not confirming that an event haven't been
> consumed doesn't mean it hasn't, the node might have have crashed before
> sending the message or writing to the journal. In principle, processing the
> message and notifying consumption need to be atomic for exactly-once
> semantics.
>
+1. This needs to be on the path to guaranteeing exactly-once semantics.


> About minimizing errors, I'm still not entirely sure what it buys us. Of
> course we don't want to lose lots of events if we can avoid it, but to my
> knowledge we are not doing poorly in that sense. For applications that
> strictly need to process every event, losing 2 instead of 4 does not make
> much difference, so I'm still stuck on what we gain with point 1 of your
> incremental approach.
>
I think, Leo and Matthieu, are concerned about the performance implications
of supporting full-blown exactly-once semantics.

Matthieu's point 1 takes us closest to loss-less delivery (not guaranteed)
without losing performance, and is indeed common to all use-cases. Also, it
is one of the necessary steps towards exactly-once semantics.

>
> -Flavio
>
> On Aug 9, 2012, at 11:22 AM, Matthieu Morel wrote:
>
> > Hi,
> >
> > Is it worth trying to achieve exactly-once or even at-least-once event
> > processing semantics, which are not useful for applications that process
> > streams in a statistical manner? It seems there are quite a few users
> > and applications interested in these semantics, and that can tolerate
> > the exceptional extra latency due to recovery. And indeed combining
> > checkpointing with an inbound logging mechanism such as something based
> > on Hedwig/Bookkeeper or Kafka is a possible approach. But it is not
> > sufficient, since - implementation-wise - we also have to add some
> > tracking of the messages, a mechanism to avoid load shedding by holding
> > upstream processes, and probably some kind of coordination messages or
> > mechanism.
> >
> > As Leo pointed out, this can get quite complex, and involvement is
> > use-case driven and therefore depends on use cases
> > committers/contributors are facing and directly involved with.
> >
> > So maybe an incremental approach would be worth following:
> >
> > 1. minimize errors (this implies providing metrics, and some other
> > things Leo may have in mind)
> > 2. control load shedding by notifying upstream processing and
> > potentially holding upstream processes (optional of course)
> > 3. integrate with replayable inbound logging systems such as Kafka or
> > Hedwig/logstage,
> > 4. implement a mechanism for coordinating recovery
> >
> > Each step brings very valuable benefits and we can adapt based on use
> > cases we are facing, workload, participants, priorities etc...
> >
> > For now, we already scheduled adding insightful metrics in the next
> > version. 2 and 3 look fairly easy to implement.
> >
> >
> > Regards,
> >
> > Matthieu
> >
> >
> >
> >
> >
> >
> > On 8/8/12 11:13 PM, Flavio Junqueira wrote:
> >> Roughly applications can either afford to lose events or not. If an app
> can't afford to lose events, then it does not make much good to reduce the
> amount of lost events.   One way to achieve fault tolerance is to log
> events, using e.g. BookKeeper. BookKeeper is both replicated and fast.
> >>
> >> Just logging, however, is not sufficient. If you can't stall the
> source, then guaranteeing no event loss might be very difficult in the case
> recovery can be arbitrarily long. In such cases, the amount of storage for
> logging required is unbounded.
> >>
> >> Along those lines, Matthieu has developed a system on top of BookKeeper
> (Hedwig to be more precise) called log stage; it might be useful in this
> context. What do you think, Matthieu?
> >>
> >> -Flavio
> >>
> >> On Aug 7, 2012, at 8:37 PM, Leo Neumeyer wrote:
> >>
> >>> Hi all. Some thoughts.
> >>>
> >>> Building a fault tolerant system would require queuing N events at the
> >>> source. The length of the queue would depend on the frequency of
> >>> checkpointing. Post failure, the system would need to restore the
> state of
> >>> the node from checkpointing and re-apply all the events emitted since
> the
> >>> time of checkpointing. To increase reliability, we would also need to
> >>> replicate the nodes. One would also need to account for the peak data
> rate
> >>> to make sure there is enough capacity to re-process all the data and
> comply
> >>> with the real-time constrains.
> >>>
> >>> All of this is possible but I'm not sure this is the best platform for
> >>> applications that expect zero errors. It was designed for processing
> large
> >>> amounts of data in application that can tolerate a small probability of
> >>> error. Improving the platform to reduce the errors is much simpler than
> >>> trying to achieve zero errors.
> >>>
> >>> Perhaps a better approach is to have good error detection so the
> >>> application can handle the recovery at a higher level (not in
> real-time).
> >>>
> >>> Rather than try to solve ALL the problems, I think that it is better to
> >>> focus on problems that involve statistical processing of massive
> amounts of
> >>> noisy and redundant data where a small probability of errors will not
> >>> affect the accuracy of the results. (text processing, signal
> processing,
> >>> sensor data, market data, etc.) The advantage of focusing is that we
> can
> >>> solve one problem well and keep the system as simple as possible.
> >>>
> >>> Regarding the reliability at the communication layer, using TCP should
> work
> >>> fine, I think.
> >>>
> >>> Of course sending email is easy, I wish I had more time to put my code
> >>> where my mouth is :-)
> >>>
> >>> -leo
> >>>
> >>> On Tue, Aug 7, 2012 at 10:15 AM, Karthik Kambatla <
> kkambatl@cs.purdue.edu>wrote:
> >>>
> >>>> Hi Flavio,
> >>>>
> >>>> We are in agreement. I was trying to push the discussion further, and
> get
> >>>> your inputs to decide on a plausible approach in S4.
> >>>>
> >>>> Regarding exactly-once semantics, I understand we need to plug
> multiple
> >>>> holes in a failing environment. To ensure we actually can support
> reliable
> >>>> delivery, should we outline possible failures and how they can be
> >>>> addressed. I might be able to think threw and list the steps (and
> possible
> >>>> failures) later today/tomorrow.
> >>>>
> >>>> On a side note, I remember Kishore and Leo initiating a conversation
> on
> >>>> using ZeroMQ in the comm-layer. ZeroMQ, I have heard, supports
> reliable
> >>>> delivery, and they claim to be faster than TCP.
> >>>>
> >>>> Thanks
> >>>> Karthik
> >>>>
> >>>> On Tue, Aug 7, 2012 at 6:53 AM, Flavio Junqueira <fpj@yahoo-inc.com>
> >>>> wrote:
> >>>>
> >>>>> I didn't mean to suggest a different way, I was trying to understand
> the
> >>>>> definition of exactly-once. As for the use case Benjamin has posted,
> he
> >>>>> says that not even a single tag scan can be lost, but can we
> guarantee
> >>>> that
> >>>>> events are reliably delivered at-least-once with S4?
> >>>>>
> >>>>> -Flavio
> >>>>>
> >>>>> On Aug 7, 2012, at 7:38 AM, Karthik Kambatla wrote:
> >>>>>
> >>>>>> Given that nodes crash, it seems essential to build reliability
> like it
> >>>>> is
> >>>>>> in lower layers - think TCP. One trivial approach could be to
use
> >>>>>> monotonically increasing sequence numbers to identify events
in a
> >>>> stream.
> >>>>>>
> >>>>>>  1. Event ordering: Hold events until all the previous sequence
> >>>> numbers
> >>>>>>  have been received.
> >>>>>>  2. Exactly-once: If a sequence number is smaller than previous
one,
> >>>> it
> >>>>>>  is a duplicate.
> >>>>>>  3. Fault-tolerance: Store the latest sequence number along
with the
> >>>>>>  checkpoint, replay events from there onwards.
> >>>>>>
> >>>>>> This, of course, comes at a performance overhead and should
be
> >>>> optional.
> >>>>>>
> >>>>>> As I said, this is the first approach that comes to mind. It
is
> indeed
> >>>> an
> >>>>>> interesting problem, and I feel we should not need to re-do
stuff
> that
> >>>> is
> >>>>>> done at lower layers.
> >>>>>>
> >>>>>> Please suggest improvements/alternatives as you see fit.
> >>>>>>
> >>>>>> Thanks
> >>>>>> Karthik
> >>>>>>
> >>>>>> On Mon, Aug 6, 2012 at 10:01 PM, Flavio Junqueira <
> fpj@yahoo-inc.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>>
> >>>>>>> On Aug 6, 2012, at 10:11 PM, Karthik Kambatla wrote:
> >>>>>>>
> >>>>>>> Flavio - it is indeed tricky to offer exactly-once semantics.
My
> >>>>>>> understanding is that the underlying comm-layer could filter
out
> >>>>> subsequent
> >>>>>>> duplicate events; however, we need to sacrifice ordering.
> >>>>>>>
> >>>>>>>
> >>>>>>> I was also thinking that if a node crash and we recover
from
> >>>>> checkpoints,
> >>>>>>> we may end up having messages applied twice.
> >>>>>>>
> >>>>>>> -Flavio
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Karthik
> >>>>>>>
> >>>>>>> On Mon, Aug 6, 2012 at 6:43 AM, "Benjamin Süß" <Gothic13@gmx.de>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Matthieu,
> >>>>>>>>
> >>>>>>>> thank you for your reply. I had a specific use case
in mind,
> indeed:
> >>>>>>>>
> >>>>>>>> I am trying to track RFID tags in distributed systems.
This means,
> >>>> that
> >>>>>>>> not even a single tag scan may get lost. And of course,
none are
> to
> >>>> be
> >>>>> sent
> >>>>>>>> twice or even more often as this would heavily confuse
any
> >>>> surveillance
> >>>>>>>> routines I am going to implement.
> >>>>>>>>
> >>>>>>>> Regarding your answers, especially point 3, I do not
think this
> can
> >>>> be
> >>>>>>>> done with S4 at the moment, can it?
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Benjamin
> >>>>>>>>
> >>>>>>>> -------- Original-Nachricht --------
> >>>>>>>>> Datum: Tue, 31 Jul 2012 17:30:27 +0200
> >>>>>>>>> Von: Matthieu Morel <mmorel@apache.org>
> >>>>>>>>> An: s4-user@incubator.apache.org
> >>>>>>>>> Betreff: Re: Thoughts on adding guaranteed message
processing
> >>>>>>>>
> >>>>>>>>> On 7/31/12 2:54 PM, "Benjamin Süß" wrote:
> >>>>>>>>>> Hi there,
> >>>>>>>>>>
> >>>>>>>>>> it is stated in several places that S4 does
not include
> guaranteed
> >>>>>>>>> one-time message processing. So my question is:
are there
> currently
> >>>>> any
> >>>>>>>> plans on
> >>>>>>>>> adding this to S4? Or is it certain this is not
going to happen?
> If
> >>>>>>>> there
> >>>>>>>>> are any plans on this, can I find further information
somewhere?
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> There are typically 3 requirements for guaranteeing
one-time
> message
> >>>>>>>>> processing:
> >>>>>>>>>
> >>>>>>>>> 1. reliable communication channels
> >>>>>>>>>
> >>>>>>>>> 2. replayable input stream: you need an upstream
component that
> is
> >>>>> able
> >>>>>>>>> to store/bufferize the whole stream and replay on
demand.
> >>>>>>>>>
> >>>>>>>>> 3. tracking of messages, using some sort of piggybacking,
> possibly
> >>>>>>>>> requiring manual input from the user.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> In S4 0.5.0, we already address 1. by providing
communications
> >>>> through
> >>>>>>>>> TCP by default. Requirement 2. is quite straightforward
to
> >>>> implement,
> >>>>> by
> >>>>>>>>> adding some machinery to connect to a component
such as Apache
> Kafka
> >>>>> for
> >>>>>>>>> instance. We are considering options for 3.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Do you have a specific use case in mind?
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>>
> >>>>>>>>> Matthieu
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> *Leo Neumeyer*
> >>> Software, data, algorithms.
> >>> leoneumeyer@gmail.com
> >>> *http://www.linkedin.com/in/leoneu*
> >>>
> >>> See who we know in
> >>> common<
> http://www.linkedin.com/e/wwk/27360/?hs=false&tok=3SHIsP1c6dURk1>Want
> >>> a signature like
> >>> this?<http://www.linkedin.com/e/sig/27360/?hs=false&tok=0r9XKRwP2dURk1
> >
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message