kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1520956 - /kafka/site/08/design.html
Date Mon, 09 Sep 2013 03:20:03 GMT
Author: junrao
Date: Mon Sep  9 03:20:03 2013
New Revision: 1520956

URL: http://svn.apache.org/r1520956
Log:
minor changes to 0.8 replication design doc

Modified:
    kafka/site/08/design.html

Modified: kafka/site/08/design.html
URL: http://svn.apache.org/viewvc/kafka/site/08/design.html?rev=1520956&r1=1520955&r2=1520956&view=diff
==============================================================================
--- kafka/site/08/design.html (original)
+++ kafka/site/08/design.html Mon Sep  9 03:20:03 2013
@@ -37,7 +37,7 @@ This style of pagecache-centric design i
 
 <h4>Constant Time Suffices</h4>
 <p>
-The persistent data structure used in messaging systems are often a per-consumer queue with
an associated BTree or other general-purpose random access data structureto maintain metadata
about messages. BTrees are the most versatile data structure available, and make it possible
to support a wide variety of transactional and non-transactional semantics in the messaging
system. They do come with a fairly high cost, though: Btree operations are O(log N). Normally
O(log N) is considered essentially equivalent to constant time, but this is not true for disk
operations. Disk seeks come at 10 ms a pop, and each disk can do only one seek at a time so
parallelism is limited. Hence even a handful of disk seeks leads to very high overhead. Since
storage systems mix very fast cached operations with very slow physical disk operations, the
observed performance of tree structures is often superlinear as data increases with fixed
cache--i.e. doubling your data makes things much worse then twice as 
 slow.
+The persistent data structure used in messaging systems are often a per-consumer queue with
an associated BTree or other general-purpose random access data structures to maintain metadata
about messages. BTrees are the most versatile data structure available, and make it possible
to support a wide variety of transactional and non-transactional semantics in the messaging
system. They do come with a fairly high cost, though: Btree operations are O(log N). Normally
O(log N) is considered essentially equivalent to constant time, but this is not true for disk
operations. Disk seeks come at 10 ms a pop, and each disk can do only one seek at a time so
parallelism is limited. Hence even a handful of disk seeks leads to very high overhead. Since
storage systems mix very fast cached operations with very slow physical disk operations, the
observed performance of tree structures is often superlinear as data increases with fixed
cache--i.e. doubling your data makes things much worse then twice a
 s slow.
 <p>
 Intuitively a persistent queue could be built on simple reads and appends to files as is
commonly the case with logging solutions. This structure has the advantage that all operations
are O(1) and reads do not block writes or each other. This has obvious performance advantages
since the performance is completely decoupled from the data size&mdash;one server can
now take full advantage of a number of cheap, low-rotational speed 1+TB SATA drives. Though
they have poor seek performance, these drives have acceptable performance for large reads
and writes and come at 1/3 the price and 3x the capacity.
 <p>
@@ -160,7 +160,7 @@ Now let's describe the semantics from th
   <li>So what about exactly once semantics (i.e. the thing you actually want)? The
limitation here is not actually a feature of the messaging system but rather the need to co-ordinate
the consumers position with what is actually stored as output. The classic way of achieving
this would be to introduce a two-phase commit between the storage for the consumer position
and the storage of the consumers output. But this can be handled more simply and generally
by simply letting the consumer store its offset in the same place as its output. This is better
because many of the output systems a consumer might want to write to will not support a two-phase
commit. As example of this our Hadoop ETL that populates data in HDFS stores its offsets in
HDFS with the data it reads so that it is guaranteed that either data and offsets are both
updated or neither is. We follow similar patterns for many other data systems which require
these stronger semantics and for which the messages do not have a pri
 mary key to allow for deduplication.
 </ol>
 <p>
-So effectively Kafka guarantees at-least-once delivery by default and allows the user to
implement at most once delivery by disabling retries on the producer and commiting its offset
prior to processing a batch of messages. Exactly-once delivery requires co-operation with
the destination storage system but Kafka gives the offset which makes implementing this straight-forward.
+So effectively Kafka guarantees at-least-once delivery by default and allows the user to
implement at most once delivery by disabling retries on the producer and committing its offset
prior to processing a batch of messages. Exactly-once delivery requires co-operation with
the destination storage system but Kafka gives the offset which makes implementing this straight-forward.
 
 <h3><a id="replication">4.7 Replication</a></h3>
 <p>
@@ -168,7 +168,7 @@ Kafka replicates the log for each topic'
 <p>
 Other messaging systems provide some replication-related features, but, in our (totally biased)
opinion, this appears to be a tacked-on thing, not heavily used, and with large downsides:
slaves are inactive, throughput is heavily impacted, it requires fiddly manual configuration,
etc. Kafka is meant to be used with replication by default&mdash;in fact we implement
un-replicated topics as replicated topics where the replication factor is one.
 <p>
-The unit of replication is the topic partition. Under non-failure conditions each partition
Kafka has a single leader and zero or more followers. We call the total number of replicas
including the leader the replication factor. All reads and writes go to the leader of the
partition; each node is the leader for a share of partitions and a follower for others. The
logs on the followers are identical to the leader's log&mdash;all have the same offsets
and messages in the same order (though, of course, at any given time the leader may have a
few as-yet unreplicated messages at the end of its log).
+The unit of replication is the topic partition. Under non-failure conditions, each partition
Kafka has a single leader and zero or more followers. We call the total number of replicas
including the leader the replication factor. All reads and writes go to the leader of the
partition. Typically, there are many more partitions than brokers and the leaders are evenly
distributed among brokers. The logs on the followers are identical to the leader's log&mdash;all
have the same offsets and messages in the same order (though, of course, at any given time
the leader may have a few as-yet unreplicated messages at the end of its log).
 <p>
 Followers consume messages from the leader just as a normal Kafka consumer would and apply
them to their own log. Having the followers pull from the leader has the nice property of
allowing the follower to naturally batch together log entries they are applying to their log.
 <p>
@@ -177,11 +177,11 @@ As with most distributed systems automat
 	<li>A node must be able to maintain its session with Zookeeper (via Zookeeper's heartbeat
mechanism)
 	<li>If it is a slave it must replicate the writes happening on the leader and not
fall "too far" behind
 </ol>
-We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness
of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a consumer
dies or falls behind, the leader will remove it from the list of in sync replicas. The definition
of how far behind is too far is controlled by the replica.lag.max.messages configuration.
+We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness
of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a follower
dies or falls behind, the leader will remove it from the list of in sync replicas. The definition
of how far behind is too far is controlled by the replica.lag.max.messages configuration.
 <p>
 In distributed systems terminology we only attempt to handle a "fail/recover" model of failures
where nodes suddenly cease working and then later recover (perhaps without knowing that they
have died). Kafka does not handle so-called "Byzantine" failures in which nodes produce arbitrary
or malicious responses (perhaps due to bugs or foul play).
 <p>
-A message is considered "committed" when all in sync replicas for that partition have applied
it to their log. Only committed messages are ever given out to the consumer. This means that
the consumer need not worry about potentially seeing a message that could be lost if the leader
fails. Producers, on the other hand, have the option of either waiting for the message to
be committed or not, depending on their preference for latency and durability. This preference
is controlled by the request.required.acks setting the producer uses.
+A message is considered "committed" when all in sync replicas for that partition have applied
it to their log. Only committed messages are ever given out to the consumer. This means that
the consumer need not worry about potentially seeing a message that could be lost if the leader
fails. Producers, on the other hand, have the option of either waiting for the message to
be committed or not, depending on their preference for latency and durability. This preference
is controlled by the request.required.acks setting that the producer uses.
 <p>
 The guarantee that Kafka offers is that a committed message will not be lost as long as a
single in sync replica remains.
 <p>
@@ -191,42 +191,42 @@ Kafka will remain available in the prese
 
 At it's heart a Kafka partition is a replicated log. The replicated log is one of the most
basic primitives in distributed data systems, and there are many approaches to implementation.
A replicated log can be used by other systems as a primitive for implementing other distributed
systems in the <a href="http://en.wikipedia.org/wiki/State_machine_replication">state-machine
style</a>.
 <p>
-A replicated log models the process of coming into consensus on the order of a series of
values (generally numbering the log entries 0, 1, 2, ...). There are many ways to implement
this, but the simplest and fastest is with a leader who chooses the ordering of values provided
to it. As long as the leader remains alive and all followers need only copy the values and
ordering it chooses.
+A replicated log models the process of coming into consensus on the order of a series of
values (generally numbering the log entries 0, 1, 2, ...). There are many ways to implement
this, but the simplest and fastest is with a leader who chooses the ordering of values provided
to it. As long as the leader remains alive and all followers need only copy the values and
ordering the leader chooses.
 <p>
 Of course if leaders didn't fail we wouldn't need followers! When the leader does die we
need to choose a new leader from among the followers. But followers themselves may fall behind
or crash so we must ensure we choose an up-to-date follower. The fundamental guarantee a log
replication algorithm must provide is that if we tell the client a message is committed, and
the leader fails, the new leader we elect must also have that message. This yields a tradeoff:
if the leader waits for more followers to acknowledge a message before declaring it committed
then there will be more potentially electable leaders.
 <p>
-If you choose the number of acknowledgements required and the number of logs that must be
compared to elect a leader such that there is guaranteed to be an overlap then this is called
a Quorum.
+If you choose the number of acknowledgements required and the number of logs that must be
compared to elect a leader such that there is guaranteed to be an overlap, then this is called
a Quorum.
 <p>
-A common approach to this tradeoff is to use a majority vote for both the commit decision
and the leader election. This is not what Kafka does, but let's explore it anyway to understand
the tradeoffs. Let's say we want to tolerate <i>f</i> failures. If 2<i>f</i>+1
servers must replicate a message prior to a commit being declared by the leader, and if we
elect a new leader by electing the follower with the most complete log from at least 2<i>f</i>+1
replicas, then, with no more than <i>f</i> failures there must be at least one
server in common between those that committed the message and the servers that were available
to take over as leader and hence the message will be preserved.  There are many remaining
details that each algorithm must handle (such as precisely defined what makes a log more complete,
ensuring log consistency during leader failure or changing the set of servers in the replica
set) but we will ignore these for now.
+A common approach to this tradeoff is to use a majority vote for both the commit decision
and the leader election. This is not what Kafka does, but let's explore it anyway to understand
the tradeoffs. Let's say we have 2<i>f</i>+1 replicas. If <i>f</i>+1
replicas must receive a message prior to a commit being declared by the leader, and if we
elect a new leader by electing the follower with the most complete log from at least <i>f</i>+1
replicas, then, with no more than <i>f</i> failures, the leader is guaranteed
to have all committed messages. This is because among any <i>f</i>+1 replicas,
there must be at least one replica that contains all committed messages. That replica's log
will be the most complete and therefore will be selected as the new leader. There are many
remaining details that each algorithm must handle (such as precisely defined what makes a
log more complete, ensuring log consistency during leader failure or changing the set of servers
in the replica set) but we wi
 ll ignore these for now.
 <p>
 This majority vote approach has a very nice property: the latency is dependent on only the
fastest servers. That is, if the replication factor is three, the latency is determined by
the faster slave not the slower one.
 <p>
 There are a rich variety of algorithms in this family including Zookeeper's <a href="http://www.stanford.edu/class/cs347/reading/zab.pdf">Zab</a>,
<a href="https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf">Raft</a>,
and <a href="http://pmg.csail.mit.edu/papers/vr-revisited.pdf">Viewstamped Replication</a>.
The most similar academic publication we are aware of to Kafka's actual implementation is
<a href="http://research.microsoft.com/apps/pubs/default.aspx?id=66814">PacificA</a>
from Microsoft.
 <p>
-The downside of majority vote is that it doesn't take many failures to leave you with no
electable leaders. To tolerate one failure requires three copies of the data, and to tolerate
two failures requires five copies of the data. In our experience having only enough redundancy
to tolerate a single failure is not enough for a practical system, but doing every write five
times, with 5x the disk space requirements and 1/5th the throughput, is not very practical
for large volume data problems. This is likely why quorum algorithms more commonly appear
for shared cluster configuration such as Zookeeper but are less common for primary data storage.
For example in HDFS the namenode's high-availability feature is built on a <a href="http://blog.cloudera.com/blog/2012/10/quorum-based-journaling-in-cdh4-1">majority-vote-based
journal</a> but this more expensive approach is not used for the data itself.
+The downside of majority vote is that it doesn't take many failures to leave you with no
electable leaders. To tolerate one failure requires three copies of the data, and to tolerate
two failures requires five copies of the data. In our experience having only enough redundancy
to tolerate a single failure is not enough for a practical system, but doing every write five
times, with 5x the disk space requirements and 1/5th the throughput, is not very practical
for large volume data problems. This is likely why quorum algorithms more commonly appear
for shared cluster configuration such as Zookeeper but are less common for primary data storage.
For example in HDFS the namenode's high-availability feature is built on a <a href="http://blog.cloudera.com/blog/2012/10/quorum-based-journaling-in-cdh4-1">majority-vote-based
journal</a>, but this more expensive approach is not used for the data itself.
 <p>
-Kafka takes a slightly different approach to choosing its quorum set. Instead of majority
vote Kafka dynamically maintains a set of in-sync replicas that are caught-up to the leader.
Only members of this set are eligible for election as leader. A write to a Kafka partition
is not considered committed until <i>all</i> in-sync replicas have received the
write. This ISR set is persisted to zookeeper whenever it changes. Because of this, any replica
in the in-sync set is eligible to be elected leader. This is an important factor for Kafka's
usage model where there are many partitions and ensuring leadership balance is important.
With this ISR model and <i>N</i> replicas a Kafka topic can tolerate <i>N</i>-1
failures without losing committed messages.
+Kafka takes a slightly different approach to choosing its quorum set. Instead of majority
vote, Kafka dynamically maintains a set of in-sync replicas (ISR) that are caught-up to the
leader. Only members of this set are eligible for election as leader. A write to a Kafka partition
is not considered committed until <i>all</i> in-sync replicas have received the
write. This ISR set is persisted to zookeeper whenever it changes. Because of this, any replica
in the ISR is eligible to be elected leader. This is an important factor for Kafka's usage
model where there are many partitions and ensuring leadership balance is important. With this
ISR model and <i>f+1</i> replicas, a Kafka topic can tolerate <i>f</i>
failures without losing committed messages.
 <p>
-In practice for most use cases we hope to handle we think this tradeoff is a reasonable one.
In practice to tolerate <i>f</i> failures both the majority vote and ISR approach
will wait for the same number of replicas to acknowledge before committing a message (e.g.
to survive one failure a majority quorum needs three replicas and one acknowledgement and
the ISR approach requires two replicas and one acknowledgement). The ability to commit without
the slowest servers is an advantage of the majority vote approach but we think it is ameliorated
by allowing the client to choose whether they block on the message commit or not, and the
additional throughput and disk space due to the lower required replication factor is worth
it.
+For most use cases we hope to handle, we think this tradeoff is a reasonable one. In practice,
to tolerate <i>f</i> failures, both the majority vote and the ISR approach will
wait for the same number of replicas to acknowledge before committing a message (e.g. to survive
one failure a majority quorum needs three replicas and one acknowledgement and the ISR approach
requires two replicas and one acknowledgement). The ability to commit without the slowest
servers is an advantage of the majority vote approach. However, we think it is ameliorated
by allowing the client to choose whether they block on the message commit or not, and the
additional throughput and disk space due to the lower required replication factor is worth
it.
 <p>
-Another important design distinction is that Kafka does not require that crashed nodes recover
with all their data intact. It is not uncommon for replication algorithms in this space to
depend on the existence of "stable storage" that cannot be lost in any failure-recovery scenario
without potential consistency violations. There are two primary problems with this assumption.
First, disk errors are the most common problem we observe in real operation of persistent
data systems and they often do not leave data intact. Secondly, even if this were not a problem,
we do not want to require the use of fsync on every write for our consistency guarantees as
this can reduce performance by two to three orders of magnitude. Our protocol for allowing
a replica to rejoin the ISR ensures that before rejoining it must fully re-sync again even
if it lost unflushed data in its crash.
+Another important design distinction is that Kafka does not require that crashed nodes recover
with all their data intact. It is not uncommon for replication algorithms in this space to
depend on the existence of "stable storage" that cannot be lost in any failure-recovery scenario
without potential consistency violations. There are two primary problems with this assumption.
First, disk errors are the most common problem we observe in real operation of persistent
data systems and they often do not leave data intact. Secondly, even if this were not a problem,
we do not want to require the use of fsync on every write for our consistency guarantees as
this can reduce performance by two to three orders of magnitude. Our protocol for allowing
a replica to rejoin the ISR ensures that before rejoining, it must fully re-sync again even
if it lost unflushed data in its crash.
 
 <h4>Unclean leader election: What if they all die?</h4>
 
 Note that Kafka's guarantee with respect to data loss is predicated on at least on replica
remaining in sync. If all the nodes replicating a partition die, this guarantee no longer
holds.
 <p>
-However a practical system needs to do something reasonable when all the brokers die. If
you are unlucky enough to have this happen it is important to consider what will happen. There
are two behaviors that could be implemented:
+However a practical system needs to do something reasonable when all the replicas die. If
you are unlucky enough to have this occur, it is important to consider what will happen. There
are two behaviors that could be implemented:
 <ol>
-	<li>Wait for a broker in the last in-sync set to come back to life and choose this
broker as the leader (hopefully it still has all its data).
-	<li>Choose the first broker to come back to life as the leader.
+	<li>Wait for a replica in the ISR to come back to life and choose this replica as
the leader (hopefully it still has all its data).
+	<li>Choose the first replica (not necessarily in the ISR) that comes back to life
as the leader.
 </ol>
 <p>
-This is a simple tradeoff between availability and consistency. If we wait for a broker in
the ISR then we will remain unavailable as long as this broker is down. If this broker was
destroyed or its data was lost then we are permanently down. If, on the other hand, a non-in-sync
broker comes back to life and we allow it to become leader then it's log becomes the source
of truth even though it is not guaranteed to have every committed message. In our current
release we choose the second strategy and favor choosing a potentially inconsistent broker
when all other machines are dead. In the future we would like to make this configurable to
better support uses where downtime is preferable to inconsistency.
+This is a simple tradeoff between availability and consistency. If we wait for replicas in
the ISR, then we will remain unavailable as long as those replicas are down. If such replicas
were destroyed or their data was lost, then we are permanently down. If, on the other hand,
a non-in-sync replica comes back to life and we allow it to become leader, then its log becomes
the source of truth even though it is not guaranteed to have every committed message. In our
current release we choose the second strategy and favor choosing a potentially inconsistent
replica when all replicas in the ISR are dead. In the future, we would like to make this configurable
to better support use cases where downtime is preferable to inconsistency.
 <p>
-This dilemma is not specific to Kafka, it exists in any quorum-based scheme. For example
in a majority voting scheme if a majority of servers suffer a permanent failure then you must
either choose to lose 100% of your data or violate consistency by taking what remains on an
existing server as your new source of truth.
+This dilemma is not specific to Kafka. It exists in any quorum-based scheme. For example
in a majority voting scheme, if a majority of servers suffer a permanent failure, then you
must either choose to lose 100% of your data or violate consistency by taking what remains
on an existing server as your new source of truth.
 
 <h4>Replica Management</h4>
 
-The above discussion on replicated logs really covers only a single log, i.e. one topic partition.
However a Kafka cluster will manage hundreds or thousands of these. We attempt to balance
partitions within a cluster in a round-robin fashion to avoid clustering all partitions for
high-volume topics on a small number of nodes. Likewise we try to balance leadership so that
each node is the leader for a proportional share of its partitions.
+The above discussion on replicated logs really covers only a single log, i.e. one topic partition.
However a Kafka cluster will manage hundreds or thousands of these partitions. We attempt
to balance partitions within a cluster in a round-robin fashion to avoid clustering all partitions
for high-volume topics on a small number of nodes. Likewise we try to balance leadership so
that each node is the leader for a proportional share of its partitions.
 <p>
-It is also important to optimize the leadership election process as that is the critical
window of unavailability. A naive implementation of leader election would end up running an
election per partition for all partitions a node hosted when that node failed. Instead we
elect a single "controller" that is responsible for leadership assignment decisions. This
controller serves an analogous role to the role of leaders themselves&mdash;we avoid making
a sequence of leadership decisions by choosing a designated process to make all these decisions
and then handling faults in this process. The result is that we are able to batch together
many of the required leadership change notifications which makes the election process far
cheaper and faster for a large number of partitions.
+It is also important to optimize the leadership election process as that is the critical
window of unavailability. A naive implementation of leader election would end up running an
election per partition for all partitions a node hosted when that node failed. Instead, we
elect one of the brokers as the "controller". This controller detects failures at the broker
level and is responsible for changing the leader of all affected partitions in a failed broker.
The result is that we are able to batch together many of the required leadership change notifications
which makes the election process far cheaper and faster for a large number of partitions.
If the controller fails, one of the surviving brokers will become the new controller.



Mime
View raw message