kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1518892 - in /kafka/site/08: design.html introduction.html
Date Fri, 30 Aug 2013 05:33:32 GMT
Author: jkreps
Date: Fri Aug 30 05:33:32 2013
New Revision: 1518892

URL: http://svn.apache.org/r1518892
Log:
Document replication in a little more detail. Cover unclean leader election.


Modified:
    kafka/site/08/design.html
    kafka/site/08/introduction.html

Modified: kafka/site/08/design.html
URL: http://svn.apache.org/viewvc/kafka/site/08/design.html?rev=1518892&r1=1518891&r2=1518892&view=diff
==============================================================================
--- kafka/site/08/design.html (original)
+++ kafka/site/08/design.html Fri Aug 30 05:33:32 2013
@@ -145,9 +145,9 @@ Now that we understand a little about ho
 
 It's worth noting that this breaks down into two problems: the durability guarantees for
publishing a message and the guarantees when consuming a message.
 <p>
-Many systems claim to provide "exactly once" delivery semantics, but it is important to read
the fine print, most of these claims are misleading (i.e. they don't translate to the case
where consumers or producers can fail or cases where there are multiple consumer processes).
+Many systems claim to provide "exactly once" delivery semantics, but it is important to read
the fine print, most of these claims are misleading (i.e. they don't translate to the case
where consumers or producers can fail, or cases where there are multiple consumer processes,
or cases where data written to disk can be lost).
 <p>
-Kafka's semantics are straight-forward. When publishing a message we have a notion of the
message being "committed" to the log. Once a published message is committed it will not be
lost as long as one broker remains "alive". The definition of alive, which will be described
in more detail later, translates roughly to "not crashed" and able to keep up with the leader.
If a producer attempts to publish a message and experiences a network error it cannot be sure
if this error happened before or after the message was committed. This is similar to the semantics
of inserting into a database table with an autogenerated key.
+Kafka's semantics are straight-forward. When publishing a message we have a notion of the
message being "committed" to the log. Once a published message is committed it will not be
lost as long as one broker that replicates the partition to which this message was written
remains "alive". The definition of alive as well as a description of which types of failures
we attempt to handle will be described in more detail in the next section. For now let's assume
a perfect, lossless broker and try to understand the guarantees to the producer and consumer.
If a producer attempts to publish a message and experiences a network error it cannot be sure
if this error happened before or after the message was committed. This is similar to the semantics
of inserting into a database table with an autogenerated key.
 <p>
 These are not the strongest possible semantics for publishers. Although we cannot be sure
of what happened in the case of a network error, it is possible to allow the producer to generate
a sort of "primary key" that makes retrying the produce request idempotent. This feature is
not trivial for a replicated system because of course it must work even (or especially) in
the case of a server failure. With this feature it would suffice for the producer to retry
until it receives acknowledgement of a successfully committed message at which point we would
guarantee the message had been published exactly once. We hope to add this in a future Kafka
version.
 <p>
@@ -164,40 +164,69 @@ So effectively Kafka guarantees at-least
 
 <h3><a id="replication">4.7 Replication</a></h3>
 <p>
-Kafka replicates the log for each topic's partitions across the number of servers configured
with each topic. This allows automatic failover when a server in the cluster fails so messages
remain available in the presence of failures.
+Kafka replicates the log for each topic's partitions across a configurable number of servers
(you can set this replication factor on a topic-by-topic basis). This allows automatic failover
to these replicas when a server in the cluster fails so messages remain available in the presence
of failures.
 <p>
-Other messaging systems provide some replication-related features but in our (biased) opinion
this appears to be a tacked-on thing not heavily used and with large downsides: slaves are
inactive, throughput is heavily impacted, it requires fiddly manual configuration, etc. Kafka
is meant to be used with replication by default&mdash;in fact we implement un-replicated
topics as replicated topics where the replication factor is one.
+Other messaging systems provide some replication-related features, but, in our (totally biased)
opinion, this appears to be a tacked-on thing, not heavily used, and with large downsides:
slaves are inactive, throughput is heavily impacted, it requires fiddly manual configuration,
etc. Kafka is meant to be used with replication by default&mdash;in fact we implement
un-replicated topics as replicated topics where the replication factor is one.
 <p>
-The unit of replication is the topic partition. For each partition Kafka has a single leader
and zero or more followers. The number of replicas is configurable at the topic level at topic
creation time. All reads and writes go to the leader of the partition; each node is the leader
for a share of partitions and a follower for others. The logs on the followers are identical&mdash;all
have the same offsets and messages in the same order (though, of course, at any given time
the leader may have a few as-yet unreplicated messages at the end of its log).
+The unit of replication is the topic partition. Under non-failure conditions each partition
Kafka has a single leader and zero or more followers. We call the total number of replicas
including the leader the replication factor. All reads and writes go to the leader of the
partition; each node is the leader for a share of partitions and a follower for others. The
logs on the followers are identical to the leader's log&mdash;all have the same offsets
and messages in the same order (though, of course, at any given time the leader may have a
few as-yet unreplicated messages at the end of its log).
 <p>
-Followers consume messages from the leader just as a normal Kafka consumer would and applying
them to their own log. Having the followers pull from the leader has the nice property of
allowing the follower to naturally batch together log entries they are applying to their log.
The leader keeps track of which slaves are "in sync"&mdash;meaning not too far behind
the leader's own log. If a consumer dies or falls behind, the leader will remove it from the
list of in sync replicas.
+Followers consume messages from the leader just as a normal Kafka consumer would and apply
them to their own log. Having the followers pull from the leader has the nice property of
allowing the follower to naturally batch together log entries they are applying to their log.
 <p>
-A message is considered "committed" when all in sync replicas for that partition have applied
it to their log. The leader only gives out committed messages to the consumer. This means
that the consumer need not worry about potentially seeing a message that could be lost if
the leader fails. Producers, on the other hand, have the option of either waiting for the
message to be committed or not, depending on their preference for latency and durability.

+As with most distributed systems automatically handling failures requires having a precise
definition of what it means for a node to be "alive". For Kafka node liveness has two conditions
+<ol>
+	<li>A node must be able to maintain its session with Zookeeper (via Zookeeper's heartbeat
mechanism)
+	<li>If it is a slave it must replicate the writes happening on the leader and not
fall "too far" behind
+</ol>
+We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness
of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a consumer
dies or falls behind, the leader will remove it from the list of in sync replicas. The definition
of how far behind is too far is controlled by the replica.lag.max.messages configuration.
+<p>
+In distributed systems terminology we only attempt to handle a "fail/recover" model of failures
where nodes suddenly cease working and then later recover (perhaps without knowing that they
have died). Kafka does not handle so-called "Byzantine" failures in which nodes produce arbitrary
or malicious responses (perhaps due to bugs or foul play).
+<p>
+A message is considered "committed" when all in sync replicas for that partition have applied
it to their log. Only committed messages are ever given out to the consumer. This means that
the consumer need not worry about potentially seeing a message that could be lost if the leader
fails. Producers, on the other hand, have the option of either waiting for the message to
be committed or not, depending on their preference for latency and durability. This preference
is controlled by the request.required.acks setting the producer uses.
 <p>
-The guarantee that Kafka offers is that a committed message will not be lost as long as a
single in sync replica survives.
+The guarantee that Kafka offers is that a committed message will not be lost as long as a
single in sync replica remains.
+<p>
+Kafka will remain available in the presence of node failures after a short fail-over period,
but may not remain available in the presence of network partitions.
 
 <h4>Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)</h4>
 
 At it's heart a Kafka partition is a replicated log. The replicated log is one of the most
basic primitives in distributed data systems, and there are many approaches to implementation.
A replicated log can be used by other systems as a primitive for implementing other distributed
systems in the <a href="http://en.wikipedia.org/wiki/State_machine_replication">state-machine
style</a>.
 <p>
-A replicated log models the process of coming into consensus on the order of a series of
values (entries 0, 1, 2, ...). There are many ways to implement this, but the simplest and
fastest is with a leader who chooses the ordering of values provided to it. As long as the
leader remains alive and all followers need only copy the values and ordering it chooses.
+A replicated log models the process of coming into consensus on the order of a series of
values (generally numbering the log entries 0, 1, 2, ...). There are many ways to implement
this, but the simplest and fastest is with a leader who chooses the ordering of values provided
to it. As long as the leader remains alive and all followers need only copy the values and
ordering it chooses.
+<p>
+Of course if leaders didn't fail we wouldn't need followers! When the leader does die we
need to choose a new leader from among the followers. But followers themselves may fall behind
or crash so we must ensure we choose an up-to-date follower. The fundamental guarantee a log
replication algorithm must provide is that if we tell the client a message is committed, and
the leader fails, the new leader we elect must also have that message. This yields a tradeoff:
if the leader waits for more followers to acknowledge a message before declaring it committed
then there will be more potentially electable leaders.
 <p>
-Of course if leaders didn't fail we wouldn't need followers! When the leader does die we
need to choose a new leader from among the followers. But followers themselves may fall behind
or crash so we must ensure we choose an up-to-date follower. The fundamental guarantee a log
replication algorithm must provide is that if we tell the client a message is committed, and
the leader fails, the new leader we elect must also have that message. This yields a tradeoff:
if the leader waits for more followers to replicate a message before declaring it committed
then there will be more potentially electable leaders.
+If you choose the number of acknowledgements required and the number of logs that must be
compared to elect a leader such that there is guaranteed to be an overlap then this is called
a Quorum.
 <p>
-A common approach to this tradeoff is to use a majority (quorum) for both the commit decision
and the leader election. This is not what Kafka does, but let's explore it anyway to understand
the tradeoffs. Let's say we want to tolerate <i>f</i> failures. If 2<i>f</i>+1
servers must replicate a message prior to a commit being declared by the leader, and if we
elect a new leader by electing the follower with the most complete log from at least 2<i>f</i>+1
replicas, then, with no more than <i>f</i> failures there must be at least one
server in common between those that committed the message and the servers that were available
to take over as leader and hence the message will be preserved. There are many remaining details
that each algorithm must handle (such as ensuring log consistency during leader failure or
changing the set of servers in the replica set) but we will ignore these for now.
+A common approach to this tradeoff is to use a majority vote for both the commit decision
and the leader election. This is not what Kafka does, but let's explore it anyway to understand
the tradeoffs. Let's say we want to tolerate <i>f</i> failures. If 2<i>f</i>+1
servers must replicate a message prior to a commit being declared by the leader, and if we
elect a new leader by electing the follower with the most complete log from at least 2<i>f</i>+1
replicas, then, with no more than <i>f</i> failures there must be at least one
server in common between those that committed the message and the servers that were available
to take over as leader and hence the message will be preserved.  There are many remaining
details that each algorithm must handle (such as precisely defined what makes a log more complete,
ensuring log consistency during leader failure or changing the set of servers in the replica
set) but we will ignore these for now.
 <p>
-This quorum approach has a very nice property: the latency is dependent on only the fastest
servers. That is, if the replication factor is three, the latency is determined by the faster
of the two slaves.
+This majority vote approach has a very nice property: the latency is dependent on only the
fastest servers. That is, if the replication factor is three, the latency is determined by
the faster slave not the slower one.
 <p>
 There are a rich variety of algorithms in this family including Zookeeper's <a href="http://www.stanford.edu/class/cs347/reading/zab.pdf">Zab</a>,
<a href="https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf">Raft</a>,
and <a href="http://pmg.csail.mit.edu/papers/vr-revisited.pdf">Viewstamped Replication</a>.
The most similar academic publication we are aware of to Kafka's actual implementation is
<a href="http://research.microsoft.com/apps/pubs/default.aspx?id=66814">PacificA</a>
from Microsoft.
 <p>
-The downside of the quorum is that the amount of replication to tolerate failures is a bit
high. To tolerate one failure requires five copies of the data, and to tolerate two failures
requires five copies of the data. In our experience a single failure is not enough, but doing
every write five times, with 5x the disk space requirements and 1/5th the throughput, is just
not practical for large volume data problems. This is likely why quorum algorithms more commonly
appear for shared cluster configuration such as in Zookeeper but are less common for primary
data storage. For example in HDFS the namenode's high-availability feature is built on quorum-based
journal but quorum journalling is not used for the data itself.
+The downside of majority vote is that it doesn't take many failures to leave you with no
electable leaders. To tolerate one failure requires three copies of the data, and to tolerate
two failures requires five copies of the data. In our experience having only enough redundancy
to tolerate a single failure is not enough for a practical system, but doing every write five
times, with 5x the disk space requirements and 1/5th the throughput, is not very practical
for large volume data problems. This is likely why quorum algorithms more commonly appear
for shared cluster configuration such as Zookeeper but are less common for primary data storage.
For example in HDFS the namenode's high-availability feature is built on a <a href="http://blog.cloudera.com/blog/2012/10/quorum-based-journaling-in-cdh4-1">majority-vote-based
journal</a> but this more expensive approach is not used for the data itself.
+<p>
+Kafka takes a slightly different approach to choosing its quorum set. Instead of majority
vote Kafka dynamically maintains a set of in-sync replicas that are caught-up to the leader.
Only members of this set are eligible for election as leader. A write to a Kafka partition
is not considered committed until <i>all</i> in-sync replicas have received the
write. This ISR set is persisted to zookeeper whenever it changes. Because of this, any replica
in the in-sync set is eligible to be elected leader. This is an important factor for Kafka's
usage model where there are many partitions and ensuring leadership balance is important.
With this ISR model and <i>N</i> replicas a Kafka topic can tolerate <i>N</i>-1
failures without losing committed messages.
+<p>
+In practice for most use cases we hope to handle we think this tradeoff is a reasonable one.
In practice to tolerate <i>f</i> failures both the majority vote and ISR approach
will wait for the same number of replicas to acknowledge before committing a message (e.g.
to survive one failure a majority quorum needs three replicas and one acknowledgement and
the ISR approach requires two replicas and one acknowledgement). The ability to commit without
the slowest servers is an advantage of the majority vote approach but we think it is ameliorated
by allowing the client to choose whether they block on the message commit or not, and the
additional throughput and disk space due to the lower required replication factor is worth
it.
+<p>
+Another important design distinction is that Kafka does not require that crashed nodes recover
with all their data intact. It is not uncommon for replication algorithms in this space to
depend on the existence of "stable storage" that cannot be lost in any failure-recovery scenario
without potential consistency violations. There are two primary problems with this assumption.
First, disk errors are the most common problem we observe in real operation of persistent
data systems and they often do not leave data intact. Secondly, even if this were not a problem,
we do not want to require the use of fsync on every write for our consistency guarantees as
this can reduce performance by two to three orders of magnitude. Our protocol for allowing
a replica to rejoin the ISR ensures that before rejoining it must fully re-sync again even
if it lost unflushed data in its crash.
+
+<h4>Unclean leader election: What if they all die?</h4>
+
+Note that Kafka's guarantee with respect to data loss is predicated on at least on replica
remaining in sync. If all the nodes replicating a partition die, this guarantee no longer
holds.
+<p>
+However a practical system needs to do something reasonable when all the brokers die. If
you are unlucky enough to have this happen it is important to consider what will happen. There
are two behaviors that could be implemented:
+<ol>
+	<li>Wait for a broker in the last in-sync set to come back to life and choose this
broker as the leader (hopefully it still has all its data).
+	<li>Choose the first broker to come back to life as the leader.
+</ol>
 <p>
-Kafka takes a different approach from quorum replication. Instead Kafka dynamically maintains
a set of in-sync replicas that are caught-up to the leader. Only members of this set are eligible
for election as leader. A write to a Kafka partition is not considered committed until all
in-sync replicas have received the write. This ISR set is persisted to zookeeper whenever
it changes. Because of this any replica in the in-sync set is eligible to be elected leader.
This is an important factor in Kafka where there are many partitions and ensuring leadership
balance is important. With this ISR model <i>N</i> replicas a Kafka topic can
tolerate <i>N</i>-1 failures without losing committed messages.
+This is a simple tradeoff between availability and consistency. If we wait for a broker in
the ISR then we will remain unavailable as long as this broker is down. If this broker was
destroyed or its data was lost then we are permanently down. If, on the other hand, a non-in-sync
broker comes back to life and we allow it to become leader then it's log becomes the source
of truth even though it is not guaranteed to have every committed message. In our current
release we choose the second strategy and favor choosing a potentially inconsistent broker
when all other machines are dead. In the future we would like to make this configurable to
better support uses where downtime is preferable to inconsistency.
 <p>
-In practice for most use cases we hope to handle we think this tradeoff is a reasonable one.
In practice to tolerate <i>f</i> failures both the quorum and ISR approach will
wait for the same number of replicas to acknowledge before committing a message (e.g. to survive
one failure a quorum needs three replicas and one acknowledgement and the ISR approach requires
two replicas and one acknowledgement). The ability to commit without the slowest servers is
an advantage of the quorum approach but we think it is ameliorated by allowing the client
to choose whether they block on the message commit or not, and the additional throughput and
disk space due to the lower required replication factor is worth it.
+This dilemma is not specific to Kafka, it exists in any quorum-based scheme. For example
in a majority voting scheme if a majority of servers suffer a permanent failure then you must
either choose to lose 100% of your data or violate consistency by taking what remains on an
existing server as your new source of truth.
 
 <h4>Replica Management</h4>
 
-The above discussion on a replicated log covers only a single partition. However a Kafka
cluster will manage hundreds or thousands of these. We attempt to balance partitions within
a cluster in a round-robin fashion to avoid clustering all partitions for high-volume topics
on a small number of nodes. Likewise we try to balance leadership so that each node is the
leader for a proportional share of its partitions.
+The above discussion on replicated logs really covers only a single log, i.e. one topic partition.
However a Kafka cluster will manage hundreds or thousands of these. We attempt to balance
partitions within a cluster in a round-robin fashion to avoid clustering all partitions for
high-volume topics on a small number of nodes. Likewise we try to balance leadership so that
each node is the leader for a proportional share of its partitions.
 <p>
 It is also important to optimize the leadership election process as that is the critical
window of unavailability. A naive implementation of leader election would end up running an
election per partition for all partitions a node hosted when that node failed. Instead we
elect a single "controller" that is responsible for leadership assignment decisions. This
controller serves an analogous role to the role of leaders themselves&mdash;we avoid making
a sequence of leadership decisions by choosing a designated process to make all these decisions
and then handling faults in this process. The result is that we are able to batch together
many of the required leadership change notifications which makes the election process far
cheaper and faster for a large number of partitions.

Modified: kafka/site/08/introduction.html
URL: http://svn.apache.org/viewvc/kafka/site/08/introduction.html?rev=1518892&r1=1518891&r2=1518892&view=diff
==============================================================================
--- kafka/site/08/introduction.html (original)
+++ kafka/site/08/introduction.html Fri Aug 30 05:33:32 2013
@@ -11,7 +11,7 @@ First let's review some basic messaging 
 	<li>Kafka is run as a cluster comprised of one or more servers each of which is called
a <i>broker</i>.
 </ul>
 
-So, at a high level, producers are send messages over the network to the Kafka cluster which
in turn serves them up to consumers like this:
+So, at a high level, producers send messages over the network to the Kafka cluster which
in turn serves them up to consumers like this:
 <div style="text-align: center; width: 100%">
   <img src="../images/producer_consumer.png">
 </div>
@@ -73,9 +73,10 @@ Not that partitioning means Kafka only p
 
 <h4>Guarantees</h4>
 
-Kafka gives the following guarantees
+At a high-level Kafka gives the following guarantees
 <ul>
   <li>Messages sent by a producer to a particular topic partition will be appended
in the order they are sent. That is if a message M1 is sent by the same producer as a message
M2, and M1 is sent first, then M1 will have a lower offset then M2 and appear earlier in the
log.
-  <li>A consumer instance sees messages in the order they are stored in the log
+  <li>A consumer instance sees messages in the order they are stored in the log.
   <li>For a topic with replication factor N, we will tolerate up to N-1 server failures
without losing any messages committed to the log.
-</ul>
\ No newline at end of file
+</ul>
+More details on these guarantees are given in the design section of the documentation.
\ No newline at end of file



Mime
View raw message