hadoop-hdfs-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Todd Lipcon (Commented) (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HDFS-3077) Quorum-based protocol for reading and writing edit logs
Date Wed, 14 Mar 2012 17:02:45 GMT

    [ https://issues.apache.org/jira/browse/HDFS-3077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13229343#comment-13229343

Todd Lipcon commented on HDFS-3077:

Hi Suresh,

Apologies in advance for the length of this comment - I wanted to cover all the details clearly.

If understand your proposal correctly from what we discussed yesterday, it is:
1. Configure N journals. Pick some value W < N which is the number of required journals
for the ANN to stay up.
2. On write, try to write to all N, synchronously (adding timeouts for any RPC-based writes)
3. If a replica fails, drop for future writes it until it recovers. If dropping a journal
results in less than W active journals, abort the NN.
4. On read, open at least (N - W + 1) replicas. Sort them by their highest txid, and pick
the Wth highest in the list. Synchronize the other edit logs with that one (truncating longer
edits and extending shorter)

The logic of this solution is that, since the number of replicas read at read time is greater
than the minimum number of replicas written at write time, we guarantee that the latest edits
are indeed seen by the new active.

This is mostly great -- so long as you have an external fencing strategy which prevents the
old active from attempting to continue to write after the new active is trying to read. In
the current HA story, the fencing strategy is PDU STONITH or fencing hooks into a NAS device,
but the major goal of this issue is to remove the need for such specialized hardware.

One obvious example of where this fails without fencing is the following:
- Active NN successfully writes to 1 of 3 replicas, then the network connection goes flaky.
The write to the second replica is left in a TCP buffer.
- ZK indicates the NN is dead. The standby notices this and fails over. Since the above write
only hit one replica, it is counted as "uncommitted", and thrown away during recovery.
- Network connection resumes. Another replica receives the write and ACKs it. The old active
then thinks the transaction was committed. Only then does it notice that the ZK lease is gone.
Now we have divergent state since a client may think his transaction was committed when in
fact it was not.

So, it follows that we just need to introduce some simple fencing/lease revocation into the
logger daemons, such that before the new standby does its recovery process (step 4 above),
it gets the loggers to promise not to accept edits from the old active. The simplest I could
come up with is the following:

- add an RPC to the loggers called "fence old active and only accept writes from me"
- when a logger receives this, it promises to only accept writes from the new active (each
node can have a UUID at startup)
- thus, in the above scenario, when the second write finally gets through after the network
reconnects, the logger denies it
- before the reader performs recovery (step 4 above), it gets a quorum of loggers to fence
the old active, thus preventing any further commits during/after recovery.

But, we still have one more problem: given some txid N, we might have multiple actives that
have tried to write the same transaction ID. Example scenario:

- Active is writing just fine, and then loses network connectivity just before writing transaction
101. It manages to send txid 101 to its local replica but nowhere else
- Standby gets a quorum of nodes to agree to fence the old active, and then recovers to transaction
id 100 (since live node ever saw #101)
- Standby writes transaction 101 correctly to the two live nodes, then all nodes crash
- Now when we come back up, all of the nodes have the same length edit logs (101 edits), but
depending which replica we read, we might end up using the incorrect (uncommitted) value instead
of the committed one.

You can also engineer such a situation a number of other ways. It's slightly contrived, but
in this area, I want to use a protocol that is _correct_ (not just _usually correct_) in the
face of failures.

So, the above implies we need something to distinguish in-flight transactions for recovery
aside from simply their transaction ID. In stable versions of HDFS we use the "fstime" file
for this case - whenever we do an error recovery, we bump the fstime on the still-alive storage
dirs, and at recovery time, we ignore directories with less than the latest time. Doing the
same here would rely on synchronized clocks, which is a distributed systems faux pas (clock
skew between your NNs shouldn't result in data loss!!).

A nice parallel here is what we do with the HDFS write pipeline recovery -- the generation
stamp of a block is basically like a logical timestamp. Replicas that don't participate in
recovery are left with an older genstamp, which allows us to disregard them in the future
after more data has been appended. So, we need something like a generation stamp for our writers,
such that each writer gets a distinct generation stamp higher than all previous generation
stamps. Assume we have some magic oracle to provide these genstamps. Our protocol is now:

- NN starts up, and gets a genstamp N from the oracle which is higher than all previous ones.
- It gets a quorum of loggers to agree not to accept any actions from NNs with a lower genstamp.
The loggers also respond with the genstamp of the previous writer, if that writer had uncommitted
- The NN performs recovery, but ignoring transactions from "ancient" replicas (i.e not from
the newest unrecovered active).
- NN now can take over and start writing new transactions.

If you're familiar with ZAB, you'll notice that we're now getting vanishingly close to an
implementation thereof -- the "genstamp" above is what ZAB calls an "epoch". The only thing
we haven't included is how this genstamp oracle is implemented:

- In ZAB, it uses a distributed protocol to allow a new leader/active to generate a unique
increasing epoch number.
- In an initial implementation of this system, I am planning to bootstrap off of ZK to generate
these numbers. This will make the system more simple.

The only remaining difference between your proposal and the above is the actual commit protocol.
Currently, the NN tries to commit to all replicas and only responds after all have succeeded,
or after dropping the "errored" replicas from the list. I think a quorum commit is vastly
superior for HA, especially given we'd like to collocate the log replicas on machines doing
other work. When those machines have latency hiccups, or crash, we don't want the active NN
to have to wait for long timeout periods before continuing. For example, using the current
BN code, if you {{kill -STOP}} the BN, the NN will hang indefinitely.

My overall philosophy is the same as yours -- it should be as simple as possible -- but like
Einstein said, no simpler! Whenever I start trying to design a protocol that ends up smelling
just like a consensus protocol, I'd rather go "by the book" since I know Ben Reed and folks
are way smarter than me and thus much more likely to get it right than an ad-hoc design.

> Quorum-based protocol for reading and writing edit logs
> -------------------------------------------------------
>                 Key: HDFS-3077
>                 URL: https://issues.apache.org/jira/browse/HDFS-3077
>             Project: Hadoop HDFS
>          Issue Type: New Feature
>          Components: ha, name-node
>            Reporter: Todd Lipcon
>            Assignee: Todd Lipcon
> Currently, one of the weak points of the HA design is that it relies on shared storage
such as an NFS filer for the shared edit log. One alternative that has been proposed is to
depend on BookKeeper, a ZooKeeper subproject which provides a highly available replicated
edit log on commodity hardware. This JIRA is to implement another alternative, based on a
quorum commit protocol, integrated more tightly in HDFS and with the requirements driven only
by HDFS's needs rather than more generic use cases. More details to follow.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira


View raw message