incubator-kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ashutosh Singh <ashutoshvsi...@gmail.com>
Subject Re: Replication questions
Date Fri, 04 May 2012 22:32:23 GMT
Is this replication an existing functionality or the new stuff that is
planned to come?

On Tue, May 1, 2012 at 11:47 AM, Felix GV <felix@mate1inc.com> wrote:

> Ah, gotcha, so my usage of the term "in-memory replication" can be
> misleading: Kafka still doesn't retain the data in-app (i.e.: in Kafka's
> allocated memory), but the data is in-memory nonetheless because of the OS'
> file system cache.
>
> Basically, on the individual node's level, this is not different from what
> we already have (without KAFKA-50), but the fact that KAFKA-50 will give us
> replication means that the data will reside in the OS' file system cache of
> many nodes, giving us much more reliable durability guarantees.
>
> Thanks for the nitty gritty details Jay :)
>
> --
> Felix
>
>
>
> On Tue, May 1, 2012 at 1:51 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>
> > Yes, that is correct. Technically we always immediately write to the
> > filesystem, it is just a question of when you fsync the file (that is
> > the slow thing). So though it is in memory it is not in application
> > memory, so it always survives kill -9 but not unplugging the machine.
> > Currently when a broker fails messages that are flushed to disk come
> > back if the broker comes back with an intact filesystem (if the broker
> > fs is destroyed then it is lost). With replication we retain this same
> > flexibility on the flush policy, so you can flush every message to
> > disk immediately if you like, however having the message on multiple
> > machines is in some ways better durability then the fsync gives, as
> > the message will survive destruction of the filesystem, so we think
> > you can legitimately allow consumers to consume messages independent
> > of the flush policy.
> >
> > Also when a broker fails it will lose unflushed messages, however when
> > it comes back to life it will restore these messages from the other
> > replicas before it will serve data to consumers. So the log will be
> > byte-for-byte identical across all servers including both the contents
> > and the ordering of messages.
> >
> > -Jay
> >
> > On Tue, May 1, 2012 at 9:24 AM, Felix GV <felix@mate1inc.com> wrote:
> > > Hmm... interesting!
> > >
> > > So, if I understanding correctly, what you're saying regarding point 2,
> > is
> > > that the messages are going to be kept in memory on several nodes, and
> > > start being served to consumers as soon as this is completed, rather
> than
> > > after the data is flushed to disk? This way, we still benefit from the
> > > throughput gain of flushing data to disk in batches, but we consider
> that
> > > the added durability of having in-memory replication is good enough to
> > > start serving that data to consumers sooner.
> > >
> > > Furthermore, this means that in the unlikely event that several nodes
> > would
> > > fail simultaneously (a correlated failure), the data that is replicated
> > to
> > > the failed nodes but not yet flushed on any of them would be lost.
> > However,
> > > when a single node crashes and is then restarted, only the failed node
> > will
> > > have lost its unflushed data, while the other nodes that had replicated
> > > that data will have had the opportunity to flush it to disk later on.
> > >
> > > Sorry if I'm repeating like a parrot. I just want to make sure I
> > understand
> > > correctly :)
> > >
> > > Please correct me if I'm not interpreting this correctly!
> > >
> > > --
> > > Felix
> > >
> > >
> > >
> > > On Mon, Apr 30, 2012 at 5:59 PM, Jay Kreps <jay.kreps@gmail.com>
> wrote:
> > >
> > >> Yes, it is also worth noting that there are couple of different ways
> > >> to think about latency:
> > >> 1. latency of the request from the producer's point-of-view
> > >> 2. end-to-end latency to the consumer
> > >>
> > >> As Jun mentions (1) may go up a little because the producer was
> > >> sending data without checking for any answer from the server. Although
> > >> this gives a nice buffering effect it leads to a number of corner
> > >> cases that are hard to deal with correctly. It should be the case that
> > >> setting the consumer to async has the same effect from the producer
> > >> point of view without the corner cases of having no RPC response to
> > >> convey errors and other broker misbehavior.
> > >>
> > >> (2) May actually get significantly better, especially for lower volume
> > >> topics. The reason for this is because currently we wait until data is
> > >> flushed to disk before giving it to the consumer, this flush policy is
> > >> controlled by setting a number of messages or timeout at which the
> > >> flush is forced. The reason to configure this is because on
> > >> traditional disks each disk is likely to incur at least one seek. In
> > >> the new model replication can take the place of waiting on a disk
> > >> flush to provide durability (even if the log of the local server loses
> > >> unflushed data as long as all servers don't crash at the same time no
> > >> messages will be lost). Doing 2 parallel replication round-trips
> > >> (perhaps surprisingly) looks like it may be a lot lower-latency than
> > >> doing a local disk flush (< 1ms versus >= 10ms). In our own usage
> > >> desire for this kind of low-latency consumption is not common, but I
> > >> understand that this is a common need for messaging.
> > >>
> > >> -Jay
> > >>
> > >> On Thu, Apr 26, 2012 at 2:03 PM, Felix GV <felix@mate1inc.com> wrote:
> > >> > Thanks Jun :)
> > >> >
> > >> > --
> > >> > Felix
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Apr 26, 2012 at 3:26 PM, Jun Rao <junrao@gmail.com>
wrote:
> > >> >
> > >> >> Some comments inlined below.
> > >> >>
> > >> >> Thanks,
> > >> >>
> > >> >> Jun
> > >> >>
> > >> >> On Thu, Apr 26, 2012 at 10:27 AM, Felix GV <felix@mate1inc.com>
> > wrote:
> > >> >>
> > >> >> > Cool :) Thanks for those insights :) !
> > >> >> >
> > >> >> > I changed the subject of the thread, in order not to derail
the
> > >> original
> > >> >> > thread's subject...! I just want to recap to make sure I
(and
> > others)
> > >> >> > understand all of this correctly :)
> > >> >> >
> > >> >> > So, if I understand correctly, with acks == [0,1] Kafka should
> > >> provide a
> > >> >> > latency that is similar to what we have now, but with the
> > possibility
> > >> of
> > >> >> > losing a small window of unreplicated events in the case
of an
> > >> >> > unrecoverable hardware failure, and with acks > 1 (or
acks == -1)
> > >> there
> > >> >> > will probably be a latency penalty but we will be completely
> > protected
> > >> >> from
> > >> >> > (non-correlated) hardware failures, right?
> > >> >> >
> > >> >> > This is mostly true. The difference is that in 0.7, producer
> > doesn't
> > >> wait
> > >> >> for a TCP response from broker. In 0.8, the producer always waits
> > for a
> > >> >> response from broker. How quickly the broker sends the response
> > depends
> > >> on
> > >> >> acks. If acks is less than ideal, you may get the response faster,
> > but
> > >> have
> > >> >> some risk of losing the data if there is broker failure.
> > >> >>
> > >> >>
> > >> >> > Also, I guess the above assumptions are correct for a batch
size
> > of 1,
> > >> >> and
> > >> >> > that bigger batch sizes could also lead to small windows
of
> > unwritten
> > >> >> data
> > >> >> > in cases of failures, just like now...? Although, now that
I
> think
> > of
> > >> >> it, I
> > >> >> > guess the vulnerability of bigger batch sizes would, again,
only
> > come
> > >> >> into
> > >> >> > play in scenarios of unrecoverable correlated failures, since
> even
> > if
> > >> a
> > >> >> > machine fails with some partially committed batch, there
would be
> > >> other
> > >> >> > machines who received the same data (through replication)
and
> would
> > >> have
> > >> >> > enough time to commit those batches...
> > >> >> >
> > >> >> > I want to add that if the producer itself dies, it could
lose a
> > batch
> > >> of
> > >> >> events.
> > >> >>
> > >> >>
> > >> >> > Finally, I guess that replication (whatever the ack parameter
is)
> > will
> > >> >> > affect the overall throughput capacity of the Kafka cluster,
> since
> > >> every
> > >> >> > node will now be writing its own data as well as the replicated
> > data
> > >> from
> > >> >> > +/- 2 other nodes, right?
> > >> >> >
> > >> >> > --
> > >> >> > Felix
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> > On Wed, Apr 25, 2012 at 6:32 PM, Jay Kreps <jay.kreps@gmail.com>
> > >> wrote:
> > >> >> >
> > >> >> > > Short answer is yes, both async (acks=0 or 1) and sync
> > replication
> > >> >> > > (acks > 1) will be both be supported.
> > >> >> > >
> > >> >> > > -Jay
> > >> >> > >
> > >> >> > > On Wed, Apr 25, 2012 at 11:22 AM, Jun Rao <junrao@gmail.com>
> > wrote:
> > >> >> > > > Felix,
> > >> >> > > >
> > >> >> > > > Initially, we thought we could keep the option
of not sending
> > acks
> > >> >> from
> > >> >> > > the
> > >> >> > > > broker to the producer. However, this seems hard
since in the
> > new
> > >> >> wire
> > >> >> > > > protocol, we need to send at least the error code
to the
> > producer
> > >> >> > (e.g.,
> > >> >> > > a
> > >> >> > > > request is sent to the wrong broker or wrong partition).
> > >> >> > > >
> > >> >> > > > So, what we allow in the current design is the
following. The
> > >> >> producer
> > >> >> > > can
> > >> >> > > > specify the # of acks in the request. By default
(acks = -1),
> > the
> > >> >> > broker
> > >> >> > > > will wait for the message to be written to all
replicas that
> > are
> > >> >> still
> > >> >> > > > synced up with the leader before acking the producer.
> Otherwise
> > >> (acks
> > >> >> > > >=0),
> > >> >> > > > the broker will ack the producer after the message
is written
> > to
> > >> acks
> > >> >> > > > replicas. Currently, acks=0 is treated the same
as acks=1.
> > >> >> > > >
> > >> >> > > > Thanks,
> > >> >> > > >
> > >> >> > > > Jun
> > >> >> > > >
> > >> >> > > > On Wed, Apr 25, 2012 at 10:39 AM, Felix GV <
> felix@mate1inc.com
> > >
> > >> >> wrote:
> > >> >> > > >
> > >> >> > > >> Just curious, but if I remember correctly from
the time I
> read
> > >> >> > KAFKA-50
> > >> >> > > and
> > >> >> > > >> the related JIRA issues, you guys plan to implement
sync AND
> > >> async
> > >> >> > > >> replication, right?
> > >> >> > > >>
> > >> >> > > >> --
> > >> >> > > >> Felix
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> On Tue, Apr 24, 2012 at 4:42 PM, Jay Kreps
<
> > jay.kreps@gmail.com>
> > >> >> > wrote:
> > >> >> > > >>
> > >> >> > > >> > Right now we do sloppy failover. That
is when a broker
> goes
> > >> down
> > >> >> > > >> > traffic is redirected to the remaining
machines, but any
> > >> >> unconsumed
> > >> >> > > >> > messages are stuck on that server until
it comes back, if
> > it is
> > >> >> > > >> > permanently gone the messages are lost.
This is acceptable
> > for
> > >> us
> > >> >> in
> > >> >> > > >> > the near-term since our pipeline is pretty
real-time so
> this
> > >> >> window
> > >> >> > > >> > between production and consumption is
pretty small. The
> > >> complete
> > >> >> > > >> > solution is the intra-cluster replication
in KAFA-50 which
> > is
> > >> >> coming
> > >> >> > > >> > along fairly nicely now that we are working
on it.
> > >> >> > > >> >
> > >> >> > > >> > -Jay
> > >> >> > > >> >
> > >> >> > > >> > On Tue, Apr 24, 2012 at 12:21 PM, Oliver
Krohne
> > >> >> > > >> > <oliver.krohne@googlemail.com> wrote:
> > >> >> > > >> > > Hi,
> > >> >> > > >> > >
> > >> >> > > >> > > indeed I thought could be used as
failover approach.
> > >> >> > > >> > >
> > >> >> > > >> > > We use raid for local redundancy
but it does not protect
> > us
> > >> in
> > >> >> > case
> > >> >> > > of
> > >> >> > > >> a
> > >> >> > > >> > machine failure, so I am looking for a
way to achieve a
> > >> >> master/slave
> > >> >> > > >> setup
> > >> >> > > >> > until KAFKA-50 has been implemented.
> > >> >> > > >> > >
> > >> >> > > >> > > I think we can solve it for now by
having multiple
> broker
> > so
> > >> >> that
> > >> >> > > the
> > >> >> > > >> > application can continue sending messages
if one broker
> goes
> > >> down.
> > >> >> > My
> > >> >> > > >> main
> > >> >> > > >> > concern is to not introduce a new single
point of failure
> > which
> > >> >> can
> > >> >> > > stop
> > >> >> > > >> > the application. However as some consumer
are not
> developed
> > by
> > >> us
> > >> >> > and
> > >> >> > > it
> > >> >> > > >> is
> > >> >> > > >> > not clear how they store the offset in
zookeeper we need
> to
> > >> find
> > >> >> out
> > >> >> > > how
> > >> >> > > >> we
> > >> >> > > >> > can manage the consumer in case a broker
will never return
> > >> after a
> > >> >> > > >> failure.
> > >> >> > > >> > It will be acceptable to lose a couple
of messages if a
> > broker
> > >> >> dies
> > >> >> > > and
> > >> >> > > >> the
> > >> >> > > >> > consumers have not consumed all messages
at the point of
> > >> failure.
> > >> >> > > >> > >
> > >> >> > > >> > > Thanks,
> > >> >> > > >> > > Oliver
> > >> >> > > >> > >
> > >> >> > > >> > >
> > >> >> > > >> > >
> > >> >> > > >> > >
> > >> >> > > >> > > Am 23.04.2012 um 19:58 schrieb Jay
Kreps:
> > >> >> > > >> > >
> > >> >> > > >> > >> I think the confusion comes from
the fact that we are
> > using
> > >> >> > > mirroring
> > >> >> > > >> to
> > >> >> > > >> > >> handle geographic distribution
not failover. If I
> > understand
> > >> >> > > correctly
> > >> >> > > >> > what
> > >> >> > > >> > >> Oliver is asking for is something
to give fault
> tolerance
> > >> not
> > >> >> > > >> something
> > >> >> > > >> > for
> > >> >> > > >> > >> distribution. I don't think that
is really what the
> > >> mirroring
> > >> >> > does
> > >> >> > > out
> > >> >> > > >> > of
> > >> >> > > >> > >> the box, though technically i
suppose you could just
> > reset
> > >> the
> > >> >> > > offsets
> > >> >> > > >> > and
> > >> >> > > >> > >> point the consumer at the new
cluster and have it start
> > from
> > >> >> > "now".
> > >> >> > > >> > >>
> > >> >> > > >> > >> I think it would be helpful to
document our use case in
> > the
> > >> >> > > mirroring
> > >> >> > > >> > docs
> > >> >> > > >> > >> since this is not the first time
someone has asked
> about
> > >> this.
> > >> >> > > >> > >>
> > >> >> > > >> > >> -Jay
> > >> >> > > >> > >>
> > >> >> > > >> > >> On Mon, Apr 23, 2012 at 10:38
AM, Joel Koshy <
> > >> >> > jjkoshy.w@gmail.com>
> > >> >> > > >> > wrote:
> > >> >> > > >> > >>
> > >> >> > > >> > >>> Hi Oliver,
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> I was reading the mirroring
guide and I wonder if it
> is
> > >> >> required
> > >> >> > > that
> > >> >> > > >> > the
> > >> >> > > >> > >>>> mirror runs it's own
zookeeper?
> > >> >> > > >> > >>>>
> > >> >> > > >> > >>>> We have a zookeeper cluster
running which is used by
> > >> >> different
> > >> >> > > >> > >>>> applications, so can
we use that zookeeper cluster
> for
> > the
> > >> >> > kafka
> > >> >> > > >> > source
> > >> >> > > >> > >>> and
> > >> >> > > >> > >>>> kafka mirror?
> > >> >> > > >> > >>>>
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> You could have a single zookeeper
cluster and use
> > different
> > >> >> > > >> namespaces
> > >> >> > > >> > for
> > >> >> > > >> > >>> the source/target mirror.
However, I don't think it is
> > >> >> > > recommended to
> > >> >> > > >> > use a
> > >> >> > > >> > >>> remote zookeeper (if you
have a cross-DC set up) since
> > that
> > >> >> > would
> > >> >> > > >> > >>> potentially mean very high
ZK latencies on one of your
> > >> >> clusters.
> > >> >> > > >> > >>>
> > >> >> > > >> > >>>
> > >> >> > > >> > >>>> What is the procedure
if the kafka source server
> fails
> > to
> > >> >> > switch
> > >> >> > > the
> > >> >> > > >> > >>>> applications to use the
mirrored instance?
> > >> >> > > >> > >>>>
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> I don't quite follow this
question - can you clarify?
> > The
> > >> >> mirror
> > >> >> > > >> > cluster is
> > >> >> > > >> > >>> pretty much a separate instance.
There is no built-in
> > >> >> automatic
> > >> >> > > >> > fail-over
> > >> >> > > >> > >>> if your source cluster goes
down.
> > >> >> > > >> > >>>
> > >> >> > > >> > >>>
> > >> >> > > >> > >>>> Are there any backup
best practices if we would not
> use
> > >> >> > > mirroring?
> > >> >> > > >> > >>>>
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> You can use RAID arrays for
(local) data redundancy.
> You
> > >> may
> > >> >> > also
> > >> >> > > be
> > >> >> > > >> > >>> interested in the (intra-DC)
replication feature
> > (KAFKA-50)
> > >> >> that
> > >> >> > > is
> > >> >> > > >> > >>> currently being developed.
I believe some folks on
> this
> > >> list
> > >> >> > have
> > >> >> > > >> also
> > >> >> > > >> > used
> > >> >> > > >> > >>> plain rsync's as an alternative
to mirroring.
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> Thanks,
> > >> >> > > >> > >>>
> > >> >> > > >> > >>> Joel
> > >> >> > > >> > >>>
> > >> >> > > >> > >
> > >> >> > > >> >
> > >> >> > > >>
> > >> >> > >
> > >> >> >
> > >> >>
> > >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message