hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hadoop Wiki] Update of "ZooKeeper/ZooKeeperRecipes" by PatrickHunt
Date Thu, 17 Jul 2008 16:30:24 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change notification.

The following page has been changed by PatrickHunt:
http://wiki.apache.org/hadoop/ZooKeeper/ZooKeeperRecipes

New page:
= ZooKeeper Recipes =

Although ZooKeeper uses asynchronous notifications it can be used to build synchronous consistency
primitives such as queues and locks. This is possible because ZooKeeper imposes a total order
on updates and has mechanisms for exposing this ordering. To implement these primitives efficiently
we cannot use polling, timers, or anything that would generate the "herd effect" since these
cause large bursts of traffic and limit scalability.

Name service and configuration are two of the primary applications of ZooKeeper. These two
functions are provided directly by the ZooKeeper API. Another function directly provided by
ZooKeeper is group membership. The group is represented by a node. Members of the group create
ephemeral nodes under the group node. Nodes of the members that fail abnormally will be removed
automatically when ZooKeeper detects the failure.

In the following section we describe conventions that can be used with ZooKeeper it implement
higher order functions. All of them are conventions implemented at the client and do not require
special support from ZooKeeper. We imagine that eventually these conventions will be captured
in client side libraries to ease use and encourage standard application of conventions. These
examples are to stimulate thought. There are many more functions that can be imagined, for
example revocable read write priority locks.

Some of these constructs,locks particularly, are here for illustrative purposes. In general
applications usually find more directly applicable constructs, such as event handles or queues,
to use in their applications.

== Event Handle ==

Distributed systems use Barriers to block processing of a set of nodes until a condition is
met at which time all the nodes are allowed to proceed. Barriers are implemented in ZooKeeper
by designating a barrier node. The barrier is in place if the barrier node exists. Distributed
machines call exists() on the barrier node with watch set to true. If exists() returns false,
the barrier is gone and the machines proceed with proceed. Otherwise, if exists() returns
true, the machines wait for a watch event from ZooKeeper for the barrier node. When the watch
event is triggered, the machines will reissue the exists() again waiting until the barrier
node is removed.

== Queue ==

Distributed queues are a rather common data structure. We first designate a ZooKeeper node
to hold the queue, the queue node. A distributed clients puts something into the queue by
calling create() with a pathname ending in "queue-" and the sequence and ephemeral flags set
to true. Because the sequence flag is set the new pathnames will have the form _path-to-queue-node_/queue-XXXX,
where XXXX is a monotonic increasing number. A client that want to remove from the queue does
a getChildren with watch set to true on the queue node and starts processing nodes with the
lowest number. The client does not need to issue another getChildren until he exhausts the
list obtained from the first getChildren. If there are are no children in the queue node,
the reader waits for a watch notification to check to queue again.

== Priority Queue ==

To implement a priority queue we need to make two simple changes to the queuing protocol described
above. First, to add to a queue the pathname ends with "queue-YY" where YY is the priority
of the element with lower numbers representing higher priority (just like UNIX). Second, when
removing from the queue a client uses an up-to-date children list meaning that the client
will invalidate previously obtained children lists if a watch notification triggers for the
queue node.

== Locks ==

Fully distributed locks that are globally synchronous, meaning at any snapshot in type no
two clients think they hold the same lock can be implemented using ZooKeeeper. As with priorities
we define a lock node.

Clients wishing to obtain lock do the following:

 1. call create() with a pathname of "_locknode_/lock-" and the sequence and ephemeral flags
set.
 1. do getChildren on the lock node WITHOUT setting the watch flag (this is important to avoid
the herd effect).
 1. if the pathname created in step 1 has the lowest sequence number suffix, the client has
the lock and the client exits the protocol.
 1. the client does an exists() with the watch flag set on the path in the lock directory
with the next lowest sequence number.
 1. if exists() returns false goto step 2. Otherwise, wait for a notification for the pathname
from the previous step before going to step 2.

Clients wishing to release a lock simply delete the node they created in step 1.

There are a couple of observations to make:

 1. The removal of a node will only cause one client to wake up since each node is watched
by exactly one client, so we do not have the herd effect.
 1. There is no polling or timeouts.
 1. Because of the way we have implemented locking it is easy to see the amount of lock contention,
break locks, debug locking problems, etc.

Since ZooKeeper is often compared to Chubby it is interesting to see how we can implement
Chubby in Zoo Keeper: //(Note: this is written using only the published information on Chubby.
There are subtleties that are not taken into account, so until Google tests and says otherwise,
this is more a rough comparison rather than a drop in replacement.)//

 * Open(), Close(), Poison() - These are NOOPs in ZooKeeper since we don't use file handles.
 * GetContentsAndStat() = getData().
 * GetStat() = exists().
 * ReadDir() = getChildren().
 * SetContents() = setData().
 * Delete() = delete().
 * Acquire() - This is the obtain lock protocol outlined above.
 * TryAcquire() - This is the obtain lock protocol that replaces step 4 with "failed to obtain
lock, delete the node created in step 1 and exit".
 * Release() - This is the release lock protocol.
 * GetSequencer() - This is the pathname and version (0) of the lock file created in step
1 of the obtain lock protocol.
 * SetSequencer() - The client calls exists() on the pathname of the sequencer with watch
set and invalidates the "sequencer" if the version has changed or if it receives a notification
and the version changes.
 * CheckSequencer - exists() on the pathname of the sequencer and ensure the node exists and
the version has not changed.

Although you could implement a Chubby client using ZooKeeper, more powerful APIs could also
be implemented. Read on!

=== Shared Locks ===

To implement shared locks we change the lock protocol slightly:

To obtain a read lock:
 1. call create() with a pathname of "_locknode_/read-" and the sequence and ephemeral flags
set.
 1. do getChildren on the lock node WITHOUT setting the watch flag (this is important to avoid
the herd effect).
 1. if there are no children with pathname that start with "write-" and have a lower sequence
number than the node created in step 1, the client has the lock and the client exits the protocol.
 1. the client does an exists() with the watch flag set on node with the pathname that starts
with "write-" in the lock directory and has the next lowest sequence number.
 1. if exists() returns false, goto step 2. Otherwise, wait for a notification for the pathname
from the previous step before going to step 2.

To obtain a write lock:
 1. call create() with a pathname of "_locknode_/write-" and the sequence and ephemeral flags
set.
 1. do getChildren on the lock node WITHOUT setting the watch flag (this is important to avoid
the herd effect).
 1. if there are no children with a lower sequence number than the node created in step 1,
the client has the lock and the client exits the protocol.
 1. the client does an exists() with the watch flag set on node with the pathname that has
the next lowest sequence number.
 1. if exists() returns false, goto step 2. Otherwise, wait for a notification for the pathname
from the previous step before going to step 2.

Note that while it appears that we have a "herd effect" when there are a bunch of clients
waiting for a read lock and get notified when the "write-" node with the lower sequence number
gets deleted. In fact that is valid behavior, all those read clients should be released since
they have the lock. The "herd effect" refers to releasing a "herd" when in fact only a single
or a small number of machines can proceed.

=== Revocable Shared Locks ===

We can make shared locks revocable by modifying slightly the shared lock protocol. In step
1. of both obtain lock protocols. After the create() the client does a getData() with watch
set. If a client gets a notification for the node created in step 1, it does a getData() with
watch set and looks for the string "unlock", which signals the client that it must release
the lock.

Locks are revoked by doing a setData() on the node of the lock to revoke with the string "unlock"
as the data.

Note that this protocol requires the lock holder to consent to releasing the lock. Such consent
is important if the lock holder needs to do some processing before releasing the lock. Revocable
Shared Locks with Freaking Laser Beams can be implemented by simply deleting the node of the
locks to be revoked if they done get removed after a revoker-determined period of time.

== Two-Phase commit ==

We can implement two-phase commit by having a coordinator creating a transaction node, say
"/app/Tx", and one child node per participating site, say "/app/Tx/s_i". The content of each
child node is initially undefined. Once each site involved in the transaction receives the
transaction from the coordinator, the site reads each child node and sets a watch. Each site
then processes the query and votes commit or abort by writing to its respective node. Once
the write completes, the other sites are notified, and as soon as all sites have all votes,
they can decide either "abort" or "commit". Note that a node can decide "abort" earlier if
some site votes for "abort".

An interesting aspect of this implementation is that the only role of the coordinator is to
decide upon the group of sites, to create the ZooKeeper nodes, and to propagate the transaction
to the corresponding sites. In fact, even propagating the transaction can be done through
ZooKeeper by writing it in the transaction node.

There are two important drawbacks of the approach described above. One is the message complexity,
which is O(n²). The second is the impossibility of detecting failures of sites through ephemeral
nodes. To detect the failure of a site using ephemeral nodes, it is necessary that the site
creates the node.

To solve the first problem, we can have only the coordinator being notified of changes to
the transaction nodes, and then notifying the sites once it reaches a decision. Note that
this approach, although more scalable, is slower as it requires all communication to go through
the coordinator. For the second problem, we can have the coordinator propagating the transaction
to the sites, and having each site creating its own ephemeral node.

Mime
View raw message