hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hadoop Wiki] Update of "HedWig/TopicManagement" by ErwinTam
Date Tue, 11 Jan 2011 19:04:28 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change notification.

The "HedWig/TopicManagement" page has been changed by ErwinTam.
http://wiki.apache.org/hadoop/HedWig/TopicManagement

--------------------------------------------------

New page:
---+ Topic management in Hedwig

%TOC% 

---++ !ZooKeeper data structure

Metadata about topics, subscribers, and hubs will be stored in !ZooKeeper. For a given Hedwig
region, we will store the following structure:

        <img src="%ATTACHURLPATH%/hedwigzk.png" alt="hedwigzk.png"/>

The rectangles in this diagram are znodes; rectangles with dashed borders are ephemeral znodes.
   * <B>hedwig</B> is the root. If the !ZooKeeper instance is shared between regions,
then there would be a region-specific root.
      * <B>regionname</B> is the region this Hedwig cluster lives in.
         * <B>topics</B> is the root for the topics subtree. All topics that have
been created live under this root.
            * <B>T1, T2 ... </B> are nodes for each topic. The nodes are named
by the topic name. The fact that a topic has a node under the <B>Topics</B> node
means that the topic exists. If we had any other topic metadata (like permissions) that we
wanted to store, we could store them as the content of the node.
               * <B>Ti.hub</B> is the hub that is currently assigned to the topic.
This is an ephemeral node, so that if the hub fails, another hub can take over. The name of
the node is "Hub" and the content is the hostname of the hub.
               * <B>hubscribers</B> is the root of the tree of subscribers to
the topic. 
                  * <B>Sub1, Sub2 ...</B> are the subscribers. Each node is named
with the subscriber ID (which should be descriptive enough for the hub to be able to deliver
messages to the subscriber.) The content of the node includes the subscriber's current consume
mark for the topic.
         * <B>hosts</B> is the root of the tree of hubs in the region.
            * <B>S1:port, S2:port ... </B> are the hubs. Each node is named with
the hostname:port of the hub. The fact that a node exists for the hub means that the hub is
in the cluster, and is eligible to be assigned topics.
               * <B>Alive</B> is an ephemeral node that indicates whether the
hub is currently alive. 

---++ Topic creation 

Topic creation and assignment to a hub is a lazy process. Topics are created on demand (e.g.,
when there is a subscriber) and assigned to a hub on demand (e.g., when there is a new subscriber
or a message published.) When a hub responsible for a topic fails, we reassign the topic on
demand; e.g. when the connected subscribers reconnect.

---+++ Subscription process

The subscribe call takes three parameters: 
   * The subscriber ID (string) 
   * The topic name (string) 
   * A flag (boolean) indicating whether the client has been redirected. 

When a client C subscribes to a topic T, it will contact one of the hubs (say, H1) and send
a <I>subscribe(C,T,False)</I> message. When a client receives a <i>redirect</I>
message from a hub, it will retry its subscription to the hub listed in the message (e.g.
H2). It will do this by sending a <I>subscribe(C,T,true)</I> message to the hub
H2. The flow is similar to the "false" case, except that the hub H2 knows that it should try
to become the owner of the topic, instead of choosing a random hub.


Upon receiving a <i>subscribe</i> message for topic T, the hub H1 will follow
these steps:

   * The hub H1 will check in !ZooKeeper to see if the topic T exists as a child of <b>Topics</B>.
If the topic T does not exist:
      * H1 will create the node <B>Topics.T</B>, and the node <B>Topics.T.Subscribers</B>.
   * The hub H1 will read <b>T.Hub</b>, the current hub assigned to the topic
(say, H2).
      * If a hub exists, and is the same hub the client contacted (e.g. H1==H2), then H1 will
add C to the list of subscribers (under <b>T.Subscribers</B>), and set up its
internal bookkeeping to begin delivering messages to C.
      * If a hub exists, and is a different hub than the one the client contacted (e.g. H1!=H2),
then H1 will return to the client a <I>redirect</I> message, requesting that the
client retry its subscription at H2.
      * If no hub exists, H1 will check the flag of the <i>subscribe</I> call
to see if the client was redirected.
         * If false (the client has not yet been redirected) H1 will choose a random hub H3
(possibly itself) to manage the topic. 
            * If H1 chooses itself (H1==H3), then H1 will try to create an ephemeral node
under <B>T</B> called <B>Hub</B> with its own hostname (e.g. H1) as
the content). This creation should be done using test-and-set, so that if a <B>Hub</B>
node already exists, the creation fails.
               * If the ephemeral node creation succeeds, then H1 will set up its internal
bookkeeping to begin delivering messages to C.
               * Otherwise (ephemeral node creation fails) then H1 will read the hostname
(e.g. H4) of the hub assigned to the topic from the <B>T.Hub</B> node, and return
to the client a <I>redirect</I> message, requesting that the client retry its
subscription at H4.
            * Otherwise (H1!=H3), then H1 will return to the client a <I>redirect</I>
message, requesting that the client retry its subscription at H3.
         * If true (the client has been redirected), then H1 will try to become the owner
of the topic.
            * H1 will try to create an ephemeral node under <B>T</B> called <B>Hub</B>
with its own hostname (e.g. H1) as the content. This creation should be done using test-and-set,
so that if a <B>Hub</B> node already exists, the creation fails.
               * If the ephemeral node creation succeeds, then H1 will set up its internal
bookkeeping to begin delivering messages to C.
               * Otherwise (ephemeral node creation fails) then H1 will read the hostname
(e.g. H3) of the hub assigned to the topic from the <B>T.Hub</B> node, and return
to the client a <I>redirect</I> message, requesting that the client retry its
subscription at H3.


Notes:
   * Because we want subscribers to be directly connected to the hub responsible for the topic,
we will redirect the client to that hub.
   * Because the <B>T.Hub</B> node is ephemeral, it must be created by the hub
that owns the topic, not by any other hub.
   * To decide which hub to assign a topic to, the deciding hub should use the current list
of alive nodes from <B>Hedwig.Hubs</B> in !ZooKeeper.
   * When choosing a random hub to assign a topic to, we can either do it uniformly randomly
or by weighting the random choice based on the hub's current load. To do the latter, each
hub must record in its <B>Hubs.Si.Alive</B> node its current load, and then hubs
doing topic assignment use these values to make decisions.

---+++ Re-subscription process

A client may become disconnected from a hub, for many reasons including:

   * The hub has failed
   * The network experienced a problem
   * The hub has abandoned the topic (see topic redistribution below)

When this happens, the client can just resubscribe to the topic. Using the same subscription
process as above, Hedwig will direct the client to the appropriate hub, either the (old) hub
which still owns the topic, or a (new) hub which has taken over the topic.

---+++ Publish process

When a client C publishes to a topic T, C contacts a hub (say, H1) and tries to publish. The
publish call takes four parameters: 
   * The publisher ID (string)
   * The topic T
   * The message M
   * A flag (boolean) which indicates whether the client has been redirected. 

When the client C sends a <i>publish</I> call to H1 to publish a message on topic
T, H1 follows these steps:

   * The hub H1 will check in !ZooKeeper to see if the topic T exists as a child of <b>Topics</B>.
If the topic T does not exist:
      * H1 will create the node <B>Topics.T</B>, and the node <B>Topics.T.Subscribers</B>.
   * The hub H1 will read <b>T.Hub</b>, the current hub assigned to the topic
(say, H2).
      * If a hub exists, and is the same hub the client contacted (e.g. H1==H2), then the
hub accepts the publish and writes it into the !BookKeeper log.
      * If a hub exists, but is a different hub than the one the client contacted (e.g. H1!=H2),
then the hub returns a <i>redirect</i> message, requesting that the client retry
its publish at H2.
      * If no hub exists, H1 will check the flag of the <i>publish</I> call to
see if the client was redirected.
         * If false (the client has not yet been redirected) H1 will choose a random hub H3
(possibly itself) to manage the topic. 
            * If H1 chooses itself (H1==H3), then H1 will try to create an ephemeral node
under <B>T</B> called <B>Hub</B> with its own hostname (e.g. H1) as
the content). This creation should be done using test-and-set, so that if a <B>Hub</B>
node already exists, the creation fails.
               * If the ephemeral node creation succeeds, then H1 will set up its internal
bookkeeping to begin publishing messages on T, and will accept and publish C's message.
               * Otherwise (ephemeral node creation fails) then H1 will read the hostname
(e.g. H4) of the hub assigned to the topic from the <B>T.Hub</B> node, and return
to the client a <I>redirect</I> message, requesting that the client retry its
publish at H4.
            * Otherwise (H1!=H3), then H1 will return to the client a <I>redirect</I>
message, requesting that the client retry its subscription at H3.
         * If true (the client has been redirected), then H1 will try to become the owner
of the topic.
            * H1 will try to create an ephemeral node under <B>T</B> called <B>Hub</B>
with its own hostname (e.g. H1) as the content. This creation should be done using test-and-set,
so that if a <B>Hub</B> node already exists, the creation fails.
               * If the ephemeral node creation succeeds, then H1 will set up its internal
bookkeeping to begin publishing messages on T, and will accept and publish C's message.
               * Otherwise (ephemeral node creation fails) then H1 will read the hostname
(e.g. H3) of the hub assigned to the topic from the <B>T.Hub</B> node, and return
to the client a <I>redirect</I> message, requesting that the client retry its
publish at H3.

Notes:
 
   * We may want to optimize this procedure so that the hub does not have to contact !ZooKeeper
on every publish. This could be done using leases, or by depending on the disconnect exception...the
proper way to do this is an open question.
   * Instead of redirecting the client, we could forward the published message to the correct
hub. But redirecting seems cleaner, since we would really like the publisher to be directly
connected to the correct hub.

---+++ Topic redistribution

Occasionally, we should shuffle topics between hubs to ensure load balancing. For example,
when a new hub joins, we want topics to be assigned to it. Similarly, if some topics are hotter
than others, the hub should be able to shed load. Since all of the persistent state about
a topic is in !ZooKeeper or !BookKeeper, shuffling a single topic can be easy: the hub just
stops accepting publishes and deletes its ephemeral node. The next time a client tries to
subscribe or publish to the topic, it will get assigned to a random hub.

When should a hub abandon a topic? It should do so at least under the following conditions:

   * If the hub is overloaded, compared to the other hubs in the system (determined by load
statistics stored by the hubs in their !ZooKeeper <B>hedwig.regionname.hosts.Si</B>
node.)
   * Periodically (e.g. when the hub is closing the !BookKeeper log for the topic)

The constant shuffling of topics should help to keep load evenly balanced across hubs, without
human intervention. Moreover, lazily abandoning topics will help the shuffling to occur in
an incremental fashion spread out over time.

---++ Open questions

   * How to handle the lease of the hub's primary status for a topic
   * How to handle access control - who is allowed to create topics, who is allowed to subscribe
to them, who is allowed to publish to them

Mime
View raw message