incubator-s4-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Karthik Kambatla <kkamb...@cs.purdue.edu>
Subject Re: Thoughts on adding guaranteed message processing
Date Fri, 10 Aug 2012 06:59:52 GMT
>From the conversation so far, S4's priorities are in the following order.
Please correct me where wrong.

   1. High throughput supporting at-most once semantics
   2. Optional (on/off) guaranteed delivery (at least once semantics) at
   the expense of some performance/throughput
   3. Optional (on/off) exactly-once semantics at the expense of further
   performance/throughput loss

As each case benefits some application, it should be okay to contribute to
any/all of the goals. We should be good as long as a lower priority item
doesn't adversely affect a higher priority item.

Thanks
Karthik

On Thu, Aug 9, 2012 at 11:29 PM, kishore g <g.kishore@gmail.com> wrote:

> Nice discussion and all points are valid. Exactly once is pretty hard to
> get right and not sure if it will be worth the efforts. We already have
> atmost once logic by doing tcp/udp. This is the easiest semantics to
> program against. Atleast once is also quite useful and as Matthieu
> mentioned kafka,bookeeper/hedwig/log stage or any pub/sub system should
> help us achieve this. But it is quite hard to program against this
> semantics and most likely will require converting the streaming model into
> batch based streaming because of the need to checkpoint at some intervals.
>
>
> thanks,
> Kishore G
>
>
> On Thu, Aug 9, 2012 at 8:43 AM, Karthik Kambatla <kkambatl@cs.purdue.edu
> >wrote:
>
> > Should we move this conversation to the JIRA?
> >
> > On Thu, Aug 9, 2012 at 8:40 AM, Karthik Kambatla <kkambatl@cs.purdue.edu
> > >wrote:
> >
> > > On Thu, Aug 9, 2012 at 7:37 AM, Flavio Junqueira <fpj@yahoo-inc.com
> > >wrote:
> > >
> > >> The additional features I see needed are logging and a mechanism to
> > >> notify consumption. Consuming means that an event has been processed.
> > One
> > >> issue here is the one I raised before: not confirming that an event
> > haven't
> > >> been consumed doesn't mean it hasn't, the node might have have crashed
> > >> before sending the message or writing to the journal. In principle,
> > >> processing the message and notifying consumption need to be atomic for
> > >> exactly-once semantics.
> > >>
> > > +1. This needs to be on the path to guaranteeing exactly-once
> semantics.
> > >
> > >
> > >> About minimizing errors, I'm still not entirely sure what it buys us.
> Of
> > >> course we don't want to lose lots of events if we can avoid it, but to
> > my
> > >> knowledge we are not doing poorly in that sense. For applications that
> > >> strictly need to process every event, losing 2 instead of 4 does not
> > make
> > >> much difference, so I'm still stuck on what we gain with point 1 of
> your
> > >> incremental approach.
> > >>
> > > I think, Leo and Matthieu, are concerned about the performance
> > > implications of supporting full-blown exactly-once semantics.
> > >
> > > Matthieu's point 1 takes us closest to loss-less delivery (not
> > guaranteed)
> > > without losing performance, and is indeed common to all use-cases.
> Also,
> > it
> > > is one of the necessary steps towards exactly-once semantics.
> > >
> > >>
> > >> -Flavio
> > >>
> > >> On Aug 9, 2012, at 11:22 AM, Matthieu Morel wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > Is it worth trying to achieve exactly-once or even at-least-once
> event
> > >> > processing semantics, which are not useful for applications that
> > process
> > >> > streams in a statistical manner? It seems there are quite a few
> users
> > >> > and applications interested in these semantics, and that can
> tolerate
> > >> > the exceptional extra latency due to recovery. And indeed combining
> > >> > checkpointing with an inbound logging mechanism such as something
> > based
> > >> > on Hedwig/Bookkeeper or Kafka is a possible approach. But it is not
> > >> > sufficient, since - implementation-wise - we also have to add some
> > >> > tracking of the messages, a mechanism to avoid load shedding by
> > holding
> > >> > upstream processes, and probably some kind of coordination messages
> or
> > >> > mechanism.
> > >> >
> > >> > As Leo pointed out, this can get quite complex, and involvement is
> > >> > use-case driven and therefore depends on use cases
> > >> > committers/contributors are facing and directly involved with.
> > >> >
> > >> > So maybe an incremental approach would be worth following:
> > >> >
> > >> > 1. minimize errors (this implies providing metrics, and some other
> > >> > things Leo may have in mind)
> > >> > 2. control load shedding by notifying upstream processing and
> > >> > potentially holding upstream processes (optional of course)
> > >> > 3. integrate with replayable inbound logging systems such as Kafka
> or
> > >> > Hedwig/logstage,
> > >> > 4. implement a mechanism for coordinating recovery
> > >> >
> > >> > Each step brings very valuable benefits and we can adapt based on
> use
> > >> > cases we are facing, workload, participants, priorities etc...
> > >> >
> > >> > For now, we already scheduled adding insightful metrics in the next
> > >> > version. 2 and 3 look fairly easy to implement.
> > >> >
> > >> >
> > >> > Regards,
> > >> >
> > >> > Matthieu
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On 8/8/12 11:13 PM, Flavio Junqueira wrote:
> > >> >> Roughly applications can either afford to lose events or not.
If an
> > >> app can't afford to lose events, then it does not make much good to
> > reduce
> > >> the amount of lost events.   One way to achieve fault tolerance is to
> > log
> > >> events, using e.g. BookKeeper. BookKeeper is both replicated and fast.
> > >> >>
> > >> >> Just logging, however, is not sufficient. If you can't stall the
> > >> source, then guaranteeing no event loss might be very difficult in the
> > case
> > >> recovery can be arbitrarily long. In such cases, the amount of storage
> > for
> > >> logging required is unbounded.
> > >> >>
> > >> >> Along those lines, Matthieu has developed a system on top of
> > >> BookKeeper (Hedwig to be more precise) called log stage; it might be
> > useful
> > >> in this context. What do you think, Matthieu?
> > >> >>
> > >> >> -Flavio
> > >> >>
> > >> >> On Aug 7, 2012, at 8:37 PM, Leo Neumeyer wrote:
> > >> >>
> > >> >>> Hi all. Some thoughts.
> > >> >>>
> > >> >>> Building a fault tolerant system would require queuing N events
at
> > the
> > >> >>> source. The length of the queue would depend on the frequency
of
> > >> >>> checkpointing. Post failure, the system would need to restore
the
> > >> state of
> > >> >>> the node from checkpointing and re-apply all the events emitted
> > since
> > >> the
> > >> >>> time of checkpointing. To increase reliability, we would also
need
> > to
> > >> >>> replicate the nodes. One would also need to account for the
peak
> > data
> > >> rate
> > >> >>> to make sure there is enough capacity to re-process all the
data
> and
> > >> comply
> > >> >>> with the real-time constrains.
> > >> >>>
> > >> >>> All of this is possible but I'm not sure this is the best
platform
> > for
> > >> >>> applications that expect zero errors. It was designed for
> processing
> > >> large
> > >> >>> amounts of data in application that can tolerate a small
> probability
> > >> of
> > >> >>> error. Improving the platform to reduce the errors is much
simpler
> > >> than
> > >> >>> trying to achieve zero errors.
> > >> >>>
> > >> >>> Perhaps a better approach is to have good error detection
so the
> > >> >>> application can handle the recovery at a higher level (not
in
> > >> real-time).
> > >> >>>
> > >> >>> Rather than try to solve ALL the problems, I think that it
is
> better
> > >> to
> > >> >>> focus on problems that involve statistical processing of massive
> > >> amounts of
> > >> >>> noisy and redundant data where a small probability of errors
will
> > not
> > >> >>> affect the accuracy of the results. (text processing, signal
> > >> processing,
> > >> >>> sensor data, market data, etc.) The advantage of focusing
is that
> we
> > >> can
> > >> >>> solve one problem well and keep the system as simple as possible.
> > >> >>>
> > >> >>> Regarding the reliability at the communication layer, using
TCP
> > >> should work
> > >> >>> fine, I think.
> > >> >>>
> > >> >>> Of course sending email is easy, I wish I had more time to
put my
> > code
> > >> >>> where my mouth is :-)
> > >> >>>
> > >> >>> -leo
> > >> >>>
> > >> >>> On Tue, Aug 7, 2012 at 10:15 AM, Karthik Kambatla <
> > >> kkambatl@cs.purdue.edu>wrote:
> > >> >>>
> > >> >>>> Hi Flavio,
> > >> >>>>
> > >> >>>> We are in agreement. I was trying to push the discussion
further,
> > >> and get
> > >> >>>> your inputs to decide on a plausible approach in S4.
> > >> >>>>
> > >> >>>> Regarding exactly-once semantics, I understand we need
to plug
> > >> multiple
> > >> >>>> holes in a failing environment. To ensure we actually
can support
> > >> reliable
> > >> >>>> delivery, should we outline possible failures and how
they can be
> > >> >>>> addressed. I might be able to think threw and list the
steps (and
> > >> possible
> > >> >>>> failures) later today/tomorrow.
> > >> >>>>
> > >> >>>> On a side note, I remember Kishore and Leo initiating
a
> > conversation
> > >> on
> > >> >>>> using ZeroMQ in the comm-layer. ZeroMQ, I have heard,
supports
> > >> reliable
> > >> >>>> delivery, and they claim to be faster than TCP.
> > >> >>>>
> > >> >>>> Thanks
> > >> >>>> Karthik
> > >> >>>>
> > >> >>>> On Tue, Aug 7, 2012 at 6:53 AM, Flavio Junqueira <
> > fpj@yahoo-inc.com>
> > >> >>>> wrote:
> > >> >>>>
> > >> >>>>> I didn't mean to suggest a different way, I was trying
to
> > >> understand the
> > >> >>>>> definition of exactly-once. As for the use case Benjamin
has
> > >> posted, he
> > >> >>>>> says that not even a single tag scan can be lost,
but can we
> > >> guarantee
> > >> >>>> that
> > >> >>>>> events are reliably delivered at-least-once with S4?
> > >> >>>>>
> > >> >>>>> -Flavio
> > >> >>>>>
> > >> >>>>> On Aug 7, 2012, at 7:38 AM, Karthik Kambatla wrote:
> > >> >>>>>
> > >> >>>>>> Given that nodes crash, it seems essential to
build reliability
> > >> like it
> > >> >>>>> is
> > >> >>>>>> in lower layers - think TCP. One trivial approach
could be to
> use
> > >> >>>>>> monotonically increasing sequence numbers to identify
events
> in a
> > >> >>>> stream.
> > >> >>>>>>
> > >> >>>>>>  1. Event ordering: Hold events until all the
previous sequence
> > >> >>>> numbers
> > >> >>>>>>  have been received.
> > >> >>>>>>  2. Exactly-once: If a sequence number is smaller
than previous
> > >> one,
> > >> >>>> it
> > >> >>>>>>  is a duplicate.
> > >> >>>>>>  3. Fault-tolerance: Store the latest sequence
number along
> with
> > >> the
> > >> >>>>>>  checkpoint, replay events from there onwards.
> > >> >>>>>>
> > >> >>>>>> This, of course, comes at a performance overhead
and should be
> > >> >>>> optional.
> > >> >>>>>>
> > >> >>>>>> As I said, this is the first approach that comes
to mind. It is
> > >> indeed
> > >> >>>> an
> > >> >>>>>> interesting problem, and I feel we should not
need to re-do
> stuff
> > >> that
> > >> >>>> is
> > >> >>>>>> done at lower layers.
> > >> >>>>>>
> > >> >>>>>> Please suggest improvements/alternatives as you
see fit.
> > >> >>>>>>
> > >> >>>>>> Thanks
> > >> >>>>>> Karthik
> > >> >>>>>>
> > >> >>>>>> On Mon, Aug 6, 2012 at 10:01 PM, Flavio Junqueira
<
> > >> fpj@yahoo-inc.com>
> > >> >>>>> wrote:
> > >> >>>>>>
> > >> >>>>>>>
> > >> >>>>>>> On Aug 6, 2012, at 10:11 PM, Karthik Kambatla
wrote:
> > >> >>>>>>>
> > >> >>>>>>> Flavio - it is indeed tricky to offer exactly-once
semantics.
> My
> > >> >>>>>>> understanding is that the underlying comm-layer
could filter
> out
> > >> >>>>> subsequent
> > >> >>>>>>> duplicate events; however, we need to sacrifice
ordering.
> > >> >>>>>>>
> > >> >>>>>>>
> > >> >>>>>>> I was also thinking that if a node crash and
we recover from
> > >> >>>>> checkpoints,
> > >> >>>>>>> we may end up having messages applied twice.
> > >> >>>>>>>
> > >> >>>>>>> -Flavio
> > >> >>>>>>>
> > >> >>>>>>> Thanks
> > >> >>>>>>> Karthik
> > >> >>>>>>>
> > >> >>>>>>> On Mon, Aug 6, 2012 at 6:43 AM, "Benjamin
Süß" <
> Gothic13@gmx.de
> > >
> > >> >>>> wrote:
> > >> >>>>>>>
> > >> >>>>>>>> Hi Matthieu,
> > >> >>>>>>>>
> > >> >>>>>>>> thank you for your reply. I had a specific
use case in mind,
> > >> indeed:
> > >> >>>>>>>>
> > >> >>>>>>>> I am trying to track RFID tags in distributed
systems. This
> > >> means,
> > >> >>>> that
> > >> >>>>>>>> not even a single tag scan may get lost.
And of course, none
> > are
> > >> to
> > >> >>>> be
> > >> >>>>> sent
> > >> >>>>>>>> twice or even more often as this would
heavily confuse any
> > >> >>>> surveillance
> > >> >>>>>>>> routines I am going to implement.
> > >> >>>>>>>>
> > >> >>>>>>>> Regarding your answers, especially point
3, I do not think
> this
> > >> can
> > >> >>>> be
> > >> >>>>>>>> done with S4 at the moment, can it?
> > >> >>>>>>>>
> > >> >>>>>>>> Regards,
> > >> >>>>>>>> Benjamin
> > >> >>>>>>>>
> > >> >>>>>>>> -------- Original-Nachricht --------
> > >> >>>>>>>>> Datum: Tue, 31 Jul 2012 17:30:27 +0200
> > >> >>>>>>>>> Von: Matthieu Morel <mmorel@apache.org>
> > >> >>>>>>>>> An: s4-user@incubator.apache.org
> > >> >>>>>>>>> Betreff: Re: Thoughts on adding guaranteed
message
> processing
> > >> >>>>>>>>
> > >> >>>>>>>>> On 7/31/12 2:54 PM, "Benjamin Süß"
wrote:
> > >> >>>>>>>>>> Hi there,
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> it is stated in several places
that S4 does not include
> > >> guaranteed
> > >> >>>>>>>>> one-time message processing. So my
question is: are there
> > >> currently
> > >> >>>>> any
> > >> >>>>>>>> plans on
> > >> >>>>>>>>> adding this to S4? Or is it certain
this is not going to
> > >> happen? If
> > >> >>>>>>>> there
> > >> >>>>>>>>> are any plans on this, can I find
further information
> > somewhere?
> > >> >>>>>>>>>>
> > >> >>>>>>>>>
> > >> >>>>>>>>> There are typically 3 requirements
for guaranteeing one-time
> > >> message
> > >> >>>>>>>>> processing:
> > >> >>>>>>>>>
> > >> >>>>>>>>> 1. reliable communication channels
> > >> >>>>>>>>>
> > >> >>>>>>>>> 2. replayable input stream: you need
an upstream component
> > that
> > >> is
> > >> >>>>> able
> > >> >>>>>>>>> to store/bufferize the whole stream
and replay on demand.
> > >> >>>>>>>>>
> > >> >>>>>>>>> 3. tracking of messages, using some
sort of piggybacking,
> > >> possibly
> > >> >>>>>>>>> requiring manual input from the user.
> > >> >>>>>>>>>
> > >> >>>>>>>>>
> > >> >>>>>>>>> In S4 0.5.0, we already address 1.
by providing
> communications
> > >> >>>> through
> > >> >>>>>>>>> TCP by default. Requirement 2. is
quite straightforward to
> > >> >>>> implement,
> > >> >>>>> by
> > >> >>>>>>>>> adding some machinery to connect to
a component such as
> Apache
> > >> Kafka
> > >> >>>>> for
> > >> >>>>>>>>> instance. We are considering options
for 3.
> > >> >>>>>>>>>
> > >> >>>>>>>>>
> > >> >>>>>>>>> Do you have a specific use case in
mind?
> > >> >>>>>>>>>
> > >> >>>>>>>>> Regards,
> > >> >>>>>>>>>
> > >> >>>>>>>>> Matthieu
> > >> >>>>>>>>>
> > >> >>>>>>>>
> > >> >>>>>>>
> > >> >>>>>>>
> > >> >>>>>>>
> > >> >>>>>
> > >> >>>>>
> > >> >>>>
> > >> >>>
> > >> >>>
> > >> >>>
> > >> >>> --
> > >> >>> *Leo Neumeyer*
> > >> >>> Software, data, algorithms.
> > >> >>> leoneumeyer@gmail.com
> > >> >>> *http://www.linkedin.com/in/leoneu*
> > >> >>>
> > >> >>> See who we know in
> > >> >>> common<
> > >> http://www.linkedin.com/e/wwk/27360/?hs=false&tok=3SHIsP1c6dURk1>Want
> > >> >>> a signature like
> > >> >>> this?<
> > >> http://www.linkedin.com/e/sig/27360/?hs=false&tok=0r9XKRwP2dURk1>
> > >> >>
> > >> >
> > >>
> > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message