Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 447A2200B64 for ; Tue, 2 Aug 2016 17:21:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 42E93160AAF; Tue, 2 Aug 2016 15:21:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3F92B160A76 for ; Tue, 2 Aug 2016 17:21:01 +0200 (CEST) Received: (qmail 80159 invoked by uid 500); 2 Aug 2016 15:20:58 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 79836 invoked by uid 99); 2 Aug 2016 15:20:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Aug 2016 15:20:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 45936ED225; Tue, 2 Aug 2016 15:20:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Tue, 02 Aug 2016 15:21:03 -0000 Message-Id: <17f4fa0cdb154dc2898f011314215e23@git.apache.org> In-Reply-To: <0b8347fdc73c480b8d5c2ea4ac794f36@git.apache.org> References: <0b8347fdc73c480b8d5c2ea4ac794f36@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/40] hadoop git commit: HDFS-10519. Add a configuration option to enable in-progress edit log tailing. Contributed by Jiayi Zhou. archived-at: Tue, 02 Aug 2016 15:21:03 -0000 HDFS-10519. Add a configuration option to enable in-progress edit log tailing. Contributed by Jiayi Zhou. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/098ec2b1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/098ec2b1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/098ec2b1 Branch: refs/heads/yarn-native-services Commit: 098ec2b11ff3f677eb823f75b147a1ac8dbf959e Parents: b43de80 Author: Andrew Wang Authored: Wed Jul 27 17:55:41 2016 -0700 Committer: Andrew Wang Committed: Wed Jul 27 17:55:41 2016 -0700 ---------------------------------------------------------------------- .../bkjournal/BookKeeperJournalManager.java | 10 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 6 +- .../qjournal/client/QuorumJournalManager.java | 28 +- .../qjournal/client/QuorumOutputStream.java | 13 +- .../hadoop/hdfs/qjournal/server/Journal.java | 20 +- .../server/namenode/BackupJournalManager.java | 2 +- .../hadoop/hdfs/server/namenode/FSEditLog.java | 28 +- .../server/namenode/FileJournalManager.java | 9 +- .../hadoop/hdfs/server/namenode/JournalSet.java | 11 +- .../hdfs/server/namenode/LogsPurgeable.java | 6 +- .../namenode/NNStorageRetentionManager.java | 2 +- .../hdfs/server/namenode/SecondaryNameNode.java | 2 +- .../hdfs/server/namenode/ha/EditLogTailer.java | 12 +- .../server/protocol/RemoteEditLogManifest.java | 19 +- .../hadoop-hdfs/src/main/proto/HdfsServer.proto | 1 + .../src/main/resources/hdfs-default.xml | 10 + .../hadoop/hdfs/protocolPB/TestPBHelper.java | 2 +- .../hdfs/qjournal/server/TestJournal.java | 4 +- .../hdfs/server/namenode/TestEditLog.java | 18 +- .../server/namenode/TestFileJournalManager.java | 2 +- .../server/namenode/TestGenericJournalConf.java | 2 +- .../namenode/TestNNStorageRetentionManager.java | 4 +- .../namenode/ha/TestFailureToReadEdits.java | 7 +- .../namenode/ha/TestStandbyInProgressTail.java | 339 +++++++++++++++++++ 25 files changed, 499 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 21fa4f2..8e4d032 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -533,12 +533,18 @@ public class BookKeeperJournalManager implements JournalManager { } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException("Error finalising ledger", ie); - } + } + } + + public void selectInputStreams( + Collection streams, + long fromTxnId, boolean inProgressOk) throws IOException { + selectInputStreams(streams, fromTxnId, inProgressOk, false); } @Override public void selectInputStreams(Collection streams, - long fromTxId, boolean inProgressOk) + long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) throws IOException { List currentLedgerList = getLedgerList(fromTxId, inProgressOk); http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index cfcfd55..231dea7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -728,6 +728,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m public static final String DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY = "dfs.ha.tail-edits.namenode-retries"; public static final int DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT = 3; + public static final String DFS_HA_TAILEDITS_INPROGRESS_KEY = + "dfs.ha.tail-edits.in-progress"; + public static final boolean DFS_HA_TAILEDITS_INPROGRESS_DEFAULT = false; public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout"; public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 52ac5d8..78371f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -289,7 +289,8 @@ public class PBHelper { public static RemoteEditLogManifestProto convert( RemoteEditLogManifest manifest) { RemoteEditLogManifestProto.Builder builder = RemoteEditLogManifestProto - .newBuilder(); + .newBuilder() + .setCommittedTxnId(manifest.getCommittedTxnId()); for (RemoteEditLog log : manifest.getLogs()) { builder.addLogs(convert(log)); } @@ -303,7 +304,8 @@ public class PBHelper { for (RemoteEditLogProto l : manifest.getLogsList()) { logs.add(convert(l)); } - return new RemoteEditLogManifest(logs); + return new RemoteEditLogManifest(logs, + manifest.getCommittedTxnId()); } public static CheckpointCommandProto convert(CheckpointCommand cmd) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index 49baa8c..c32b667 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -402,8 +402,11 @@ public class QuorumJournalManager implements JournalManager { layoutVersion); loggers.waitForWriteQuorum(q, startSegmentTimeoutMs, "startLogSegment(" + txId + ")"); - return new QuorumOutputStream(loggers, txId, - outputBufferCapacity, writeTxnsTimeoutMs); + boolean updateCommittedTxId = conf.getBoolean( + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT); + return new QuorumOutputStream(loggers, txId, outputBufferCapacity, + writeTxnsTimeoutMs, updateCommittedTxId); } @Override @@ -462,9 +465,15 @@ public class QuorumJournalManager implements JournalManager { loggers.close(); } - @Override public void selectInputStreams(Collection streams, long fromTxnId, boolean inProgressOk) throws IOException { + selectInputStreams(streams, fromTxnId, inProgressOk, false); + } + + @Override + public void selectInputStreams(Collection streams, + long fromTxnId, boolean inProgressOk, + boolean onlyDurableTxns) throws IOException { QuorumCall q = loggers.getEditLogManifest(fromTxnId, inProgressOk); @@ -481,13 +490,22 @@ public class QuorumJournalManager implements JournalManager { for (Map.Entry e : resps.entrySet()) { AsyncLogger logger = e.getKey(); RemoteEditLogManifest manifest = e.getValue(); - + long committedTxnId = manifest.getCommittedTxnId(); + for (RemoteEditLog remoteLog : manifest.getLogs()) { URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId()); + long endTxId = remoteLog.getEndTxId(); + + // If it's bounded by durable Txns, endTxId could not be larger + // than committedTxnId. This ensures the consistency. + if (onlyDurableTxns && inProgressOk) { + endTxId = Math.min(endTxId, committedTxnId); + } + EditLogInputStream elis = EditLogFileInputStream.fromUrl( connectionFactory, url, remoteLog.getStartTxId(), - remoteLog.getEndTxId(), remoteLog.isInProgress()); + endTxId, remoteLog.isInProgress()); allStreams.add(elis); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java index e094b21..3ffcd3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java @@ -33,15 +33,17 @@ class QuorumOutputStream extends EditLogOutputStream { private EditsDoubleBuffer buf; private final long segmentTxId; private final int writeTimeoutMs; + private final boolean updateCommittedTxId; public QuorumOutputStream(AsyncLoggerSet loggers, long txId, int outputBufferCapacity, - int writeTimeoutMs) throws IOException { + int writeTimeoutMs, boolean updateCommittedTxId) throws IOException { super(); this.buf = new EditsDoubleBuffer(outputBufferCapacity); this.loggers = loggers; this.segmentTxId = txId; this.writeTimeoutMs = writeTimeoutMs; + this.updateCommittedTxId = updateCommittedTxId; } @Override @@ -110,6 +112,15 @@ class QuorumOutputStream extends EditLogOutputStream { // RPCs will thus let the loggers know of the most recent transaction, even // if a logger has fallen behind. loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1); + + // If we don't have this dummy send, committed TxId might be one-batch + // stale on the Journal Nodes + if (updateCommittedTxId) { + QuorumCall fakeCall = loggers.sendEdits( + segmentTxId, firstTxToFlush, + 0, new byte[0]); + loggers.waitForWriteQuorum(fakeCall, writeTimeoutMs, "sendEdits"); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index de052c6..3760641 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -252,8 +252,8 @@ public class Journal implements Closeable { checkFormatted(); return lastWriterEpoch.get(); } - - synchronized long getCommittedTxnIdForTests() throws IOException { + + synchronized long getCommittedTxnId() throws IOException { return committedTxnId.get(); } @@ -357,9 +357,15 @@ public class Journal implements Closeable { checkFormatted(); checkWriteRequest(reqInfo); + // If numTxns is 0, it's actually a fake send which aims at updating + // committedTxId only. So we can return early. + if (numTxns == 0) { + return; + } + checkSync(curSegment != null, "Can't write, no segment open"); - + if (curSegmentTxId != segmentTxId) { // Sanity check: it is possible that the writer will fail IPCs // on both the finalize() and then the start() of the next segment. @@ -673,12 +679,12 @@ public class Journal implements Closeable { } } if (log != null && log.isInProgress()) { - logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId(), - true)); + logs.add(new RemoteEditLog(log.getStartTxId(), + getHighestWrittenTxId(), true)); } } - - return new RemoteEditLogManifest(logs); + + return new RemoteEditLogManifest(logs, getCommittedTxnId()); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java index ec8049e..e1ddfb9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java @@ -80,7 +80,7 @@ class BackupJournalManager implements JournalManager { @Override public void selectInputStreams(Collection streams, - long fromTxnId, boolean inProgressOk) { + long fromTxnId, boolean inProgressOk, boolean onlyDurableTxns) { // This JournalManager is never used for input. Therefore it cannot // return any transactions } http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 57229da..ef9eb68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -317,7 +317,7 @@ public class FSEditLog implements LogsPurgeable { // Safety check: we should never start a segment if there are // newer txids readable. List streams = new ArrayList(); - journalSet.selectInputStreams(streams, segmentTxId, true); + journalSet.selectInputStreams(streams, segmentTxId, true, false); if (!streams.isEmpty()) { String error = String.format("Cannot start writing at txid %s " + "when there is a stream available for read: %s", @@ -1575,15 +1575,23 @@ public class FSEditLog implements LogsPurgeable { @Override public void selectInputStreams(Collection streams, - long fromTxId, boolean inProgressOk) throws IOException { - journalSet.selectInputStreams(streams, fromTxId, inProgressOk); + long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) + throws IOException { + journalSet.selectInputStreams(streams, fromTxId, + inProgressOk, onlyDurableTxns); } public Collection selectInputStreams( long fromTxId, long toAtLeastTxId) throws IOException { - return selectInputStreams(fromTxId, toAtLeastTxId, null, true); + return selectInputStreams(fromTxId, toAtLeastTxId, null, true, false); + } + + public Collection selectInputStreams( + long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery, + boolean inProgressOK) throws IOException { + return selectInputStreams(fromTxId, toAtLeastTxId, + recovery, inProgressOK, false); } - /** * Select a list of input streams. * @@ -1591,16 +1599,18 @@ public class FSEditLog implements LogsPurgeable { * @param toAtLeastTxId the selected streams must contain this transaction * @param recovery recovery context * @param inProgressOk set to true if in-progress streams are OK + * @param onlyDurableTxns set to true if streams are bounded + * by the durable TxId */ - public Collection selectInputStreams( - long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery, - boolean inProgressOk) throws IOException { + public Collection selectInputStreams(long fromTxId, + long toAtLeastTxId, MetaRecoveryContext recovery, boolean inProgressOk, + boolean onlyDurableTxns) throws IOException { List streams = new ArrayList(); synchronized(journalSetLock) { Preconditions.checkState(journalSet.isOpen(), "Cannot call " + "selectInputStreams() on closed FSEditLog"); - selectInputStreams(streams, fromTxId, inProgressOk); + selectInputStreams(streams, fromTxId, inProgressOk, onlyDurableTxns); } try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index ff6376e..5d62825 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -333,10 +333,17 @@ public class FileJournalManager implements JournalManager { return ret; } + synchronized public void selectInputStreams( + Collection streams, + long fromTxnId, boolean inProgressOk) throws IOException { + selectInputStreams(streams, fromTxnId, inProgressOk, false); + } + @Override synchronized public void selectInputStreams( Collection streams, long fromTxId, - boolean inProgressOk) throws IOException { + boolean inProgressOk, boolean onlyDurableTxns) + throws IOException { List elfs = matchEditLogs(sd.getCurrentDir()); if (LOG.isDebugEnabled()) { LOG.debug(this + ": selecting input streams starting at " + fromTxId + http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java index 667b2e0..fde54a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java @@ -261,10 +261,13 @@ public class JournalSet implements JournalManager { * may not be sorted-- this is up to the caller. * @param fromTxId The transaction ID to start looking for streams at * @param inProgressOk Should we consider unfinalized streams? + * @param onlyDurableTxns Set to true if streams are bounded by the durable + * TxId. A durable TxId is the committed txid in QJM + * or the largest txid written into file in FJM */ @Override public void selectInputStreams(Collection streams, - long fromTxId, boolean inProgressOk) { + long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) { final PriorityQueue allStreams = new PriorityQueue(64, EDIT_LOG_INPUT_STREAM_COMPARATOR); @@ -274,7 +277,8 @@ public class JournalSet implements JournalManager { continue; } try { - jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk); + jas.getManager().selectInputStreams(allStreams, fromTxId, + inProgressOk, onlyDurableTxns); } catch (IOException ioe) { LOG.warn("Unable to determine input streams from " + jas.getManager() + ". Skipping.", ioe); @@ -681,7 +685,8 @@ public class JournalSet implements JournalManager { // And then start looking from after that point curStartTxId = bestLog.getEndTxId() + 1; } - RemoteEditLogManifest ret = new RemoteEditLogManifest(logs); + RemoteEditLogManifest ret = new RemoteEditLogManifest(logs, + curStartTxId - 1); if (LOG.isDebugEnabled()) { LOG.debug("Generated manifest for logs since " + fromTxId + ":" http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java index 71e3a35..c6d421c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java @@ -42,10 +42,14 @@ interface LogsPurgeable { * * @param fromTxId the first transaction id we want to read * @param inProgressOk whether or not in-progress streams should be returned + * @param onlyDurableTxns whether or not streams should be bounded by durable + * TxId. A durable TxId is the committed txid in QJM + * or the largest txid written into file in FJM * @throws IOException if the underlying storage has an error or is otherwise * inaccessible */ void selectInputStreams(Collection streams, - long fromTxId, boolean inProgressOk) throws IOException; + long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) + throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java index 327f14c..98b7e9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java @@ -134,7 +134,7 @@ public class NNStorageRetentionManager { long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain); ArrayList editLogs = new ArrayList(); - purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false); + purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false); Collections.sort(editLogs, new Comparator() { @Override public int compare(EditLogInputStream a, EditLogInputStream b) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java index acb2c8a..ec73468 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java @@ -904,7 +904,7 @@ public class SecondaryNameNode implements Runnable, @Override public void selectInputStreams(Collection streams, - long fromTxId, boolean inProgressOk) { + long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) { Iterator iter = storage.dirIterator(); while (iter.hasNext()) { StorageDirectory dir = iter.next(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index f7b3c00..1447375 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -117,6 +117,11 @@ public class EditLogTailer { */ private int maxRetries; + /** + * Whether the tailer should tail the in-progress edit log segments. + */ + private final boolean inProgressOk; + public EditLogTailer(FSNamesystem namesystem, Configuration conf) { this.tailerThread = new EditLogTailerThread(); this.conf = conf; @@ -164,6 +169,10 @@ public class EditLogTailer { maxRetries = DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT; } + inProgressOk = conf.getBoolean( + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT); + nnCount = nns.size(); // setup the iterator to endlessly loop the nns this.nnLookup = Iterators.cycle(nns); @@ -236,7 +245,8 @@ public class EditLogTailer { } Collection streams; try { - streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false); + streams = editLog.selectInputStreams(lastTxnId + 1, 0, + null, inProgressOk, true); } catch (IOException ioe) { // This is acceptable. If we try to tail edits in the middle of an edits // log roll, i.e. the last one has been finalized but the new inprogress http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java index 0b3a031..686f7c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java @@ -29,12 +29,15 @@ import com.google.common.base.Preconditions; public class RemoteEditLogManifest { private List logs; - + + private long committedTxnId = -1; + public RemoteEditLogManifest() { } - - public RemoteEditLogManifest(List logs) { + + public RemoteEditLogManifest(List logs, long committedTxnId) { this.logs = logs; + this.committedTxnId = committedTxnId; checkState(); } @@ -46,7 +49,7 @@ public class RemoteEditLogManifest { */ private void checkState() { Preconditions.checkNotNull(logs); - + RemoteEditLog prev = null; for (RemoteEditLog log : logs) { if (prev != null) { @@ -56,7 +59,6 @@ public class RemoteEditLogManifest { + this); } } - prev = log; } } @@ -65,10 +67,13 @@ public class RemoteEditLogManifest { return Collections.unmodifiableList(logs); } + public long getCommittedTxnId() { + return committedTxnId; + } - @Override public String toString() { - return "[" + Joiner.on(", ").join(logs) + "]"; + return "[" + Joiner.on(", ").join(logs) + "]" + " CommittedTxId: " + + committedTxnId; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto index 5d6f3fc..e87dc95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto @@ -88,6 +88,7 @@ message RemoteEditLogProto { */ message RemoteEditLogManifestProto { repeated RemoteEditLogProto logs = 1; + required uint64 committedTxnId = 2; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 54592ab..b82fa31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2790,6 +2790,16 @@ + dfs.ha.tail-edits.in-progress + false + + Whether enable standby namenode to tail in-progress edit logs. + Clients might want to turn it on when they want Standby NN to have + more up-to-date data. + + + + dfs.datanode.ec.reconstruction.stripedread.timeout.millis 5000 Datanode striped read timeout in milliseconds. http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index a4f3302..45cd4ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -329,7 +329,7 @@ public class TestPBHelper { List logs = new ArrayList(); logs.add(new RemoteEditLog(1, 10)); logs.add(new RemoteEditLog(11, 20)); - RemoteEditLogManifest m = new RemoteEditLogManifest(logs); + RemoteEditLogManifest m = new RemoteEditLogManifest(logs, 20); RemoteEditLogManifestProto mProto = PBHelper.convert(m); RemoteEditLogManifest m1 = PBHelper.convert(mProto); http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java index e181fd6..5cdc1a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java @@ -158,12 +158,12 @@ public class TestJournal { // Send txids 1-3, with a request indicating only 0 committed journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3, QJMTestUtil.createTxnData(1, 3)); - assertEquals(0, journal.getCommittedTxnIdForTests()); + assertEquals(0, journal.getCommittedTxnId()); // Send 4-6, with request indicating that through 3 is committed. journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3, QJMTestUtil.createTxnData(4, 6)); - assertEquals(3, journal.getCommittedTxnIdForTests()); + assertEquals(3, journal.getCommittedTxnId()); } @Test (timeout = 10000) http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 1eb377a..dcd7cc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -1034,9 +1034,9 @@ public class TestEditLog { "[1,100]|[101,200]|[201,]"); log = getFSEditLog(storage); log.initJournalsForWrite(); - assertEquals("[[1,100], [101,200]]", + assertEquals("[[1,100], [101,200]] CommittedTxId: 200", log.getEditLogManifest(1).toString()); - assertEquals("[[101,200]]", + assertEquals("[[101,200]] CommittedTxId: 200", log.getEditLogManifest(101).toString()); // Another simple case, different directories have different @@ -1046,8 +1046,8 @@ public class TestEditLog { "[1,100]|[201,300]|[301,400]"); // nothing starting at 101 log = getFSEditLog(storage); log.initJournalsForWrite(); - assertEquals("[[1,100], [101,200], [201,300], [301,400]]", - log.getEditLogManifest(1).toString()); + assertEquals("[[1,100], [101,200], [201,300], [301,400]]" + + " CommittedTxId: 400", log.getEditLogManifest(1).toString()); // Case where one directory has an earlier finalized log, followed // by a gap. The returned manifest should start after the gap. @@ -1056,7 +1056,7 @@ public class TestEditLog { "[301,400]|[401,500]"); log = getFSEditLog(storage); log.initJournalsForWrite(); - assertEquals("[[301,400], [401,500]]", + assertEquals("[[301,400], [401,500]] CommittedTxId: 500", log.getEditLogManifest(1).toString()); // Case where different directories have different length logs @@ -1066,9 +1066,9 @@ public class TestEditLog { "[1,50]|[101,200]"); // short log at 1 log = getFSEditLog(storage); log.initJournalsForWrite(); - assertEquals("[[1,100], [101,200]]", + assertEquals("[[1,100], [101,200]] CommittedTxId: 200", log.getEditLogManifest(1).toString()); - assertEquals("[[101,200]]", + assertEquals("[[101,200]] CommittedTxId: 200", log.getEditLogManifest(101).toString()); // Case where the first storage has an inprogress while @@ -1079,9 +1079,9 @@ public class TestEditLog { "[1,100]|[101,200]"); log = getFSEditLog(storage); log.initJournalsForWrite(); - assertEquals("[[1,100], [101,200]]", + assertEquals("[[1,100], [101,200]] CommittedTxId: 200", log.getEditLogManifest(1).toString()); - assertEquals("[[101,200]]", + assertEquals("[[101,200]] CommittedTxId: 200", log.getEditLogManifest(101).toString()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java index f5172c3..4e4c64b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java @@ -398,7 +398,7 @@ public class TestFileJournalManager { FileJournalManager.matchEditLogs(badDir); } - private static EditLogInputStream getJournalInputStream(JournalManager jm, + private static EditLogInputStream getJournalInputStream(FileJournalManager jm, long txId, boolean inProgressOk) throws IOException { final PriorityQueue allStreams = new PriorityQueue(64, http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java index 4517b0b..020ecb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java @@ -173,7 +173,7 @@ public class TestGenericJournalConf { @Override public void selectInputStreams(Collection streams, - long fromTxnId, boolean inProgressOk) { + long fromTxnId, boolean inProgressOk, boolean onlyDurableTxns) { } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java index 346d949..b94e5a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java @@ -368,11 +368,11 @@ public class TestNNStorageRetentionManager { public Void answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); journalSet.selectInputStreams((Collection)args[0], - (Long)args[1], (Boolean)args[2]); + (Long)args[1], (Boolean)args[2], (Boolean)args[3]); return null; } }).when(mockLog).selectInputStreams(Mockito.anyCollection(), - Mockito.anyLong(), Mockito.anyBoolean()); + Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean()); return mockLog; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java index 5221ef9..079038c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java @@ -187,7 +187,7 @@ public class TestFailureToReadEdits { // This op should get applied just fine. assertTrue(fs.mkdirs(new Path(TEST_DIR2))); - + // This is the op the mocking will cause to fail to be read. assertTrue(fs.mkdirs(new Path(TEST_DIR3))); @@ -209,7 +209,7 @@ public class TestFailureToReadEdits { // Null because it hasn't been created yet. assertNull(NameNodeAdapter.getFileInfo(nn1, TEST_DIR3, false)); - + // Now let the standby read ALL the edits. answer.setThrowExceptionOnRead(false); HATestUtil.waitForStandbyToCatchUp(nn0, nn1); @@ -329,7 +329,8 @@ public class TestFailureToReadEdits { FSEditLog spyEditLog = NameNodeAdapter.spyOnEditLog(nn1); LimitedEditLogAnswer answer = new LimitedEditLogAnswer(); doAnswer(answer).when(spyEditLog).selectInputStreams( - anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean()); + anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean(), + anyBoolean()); return answer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/098ec2b1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java new file mode 100644 index 0000000..9201cda --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +/** + * Test cases for in progress tailing edit logs by + * the standby node. + */ +public class TestStandbyInProgressTail { + private static final Log LOG = + LogFactory.getLog(TestStandbyInProgressTail.class); + private Configuration conf; + private MiniQJMHACluster qjmhaCluster; + private MiniDFSCluster cluster; + private NameNode nn0; + private NameNode nn1; + + @Before + public void startUp() throws IOException { + conf = new Configuration(); + // Set period of tail edits to a large value (20 mins) for test purposes + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + HAUtil.setAllowStandbyReads(conf, true); + qjmhaCluster = new MiniQJMHACluster.Builder(conf).build(); + cluster = qjmhaCluster.getDfsCluster(); + + // Get NameNode from cluster to future manual control + nn0 = cluster.getNameNode(0); + nn1 = cluster.getNameNode(1); + } + + @After + public void tearDown() throws IOException { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + + @Test + public void testDefault() throws Exception { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + conf = new Configuration(); + // Set period of tail edits to a large value (20 mins) for test purposes + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, false); + HAUtil.setAllowStandbyReads(conf, true); + qjmhaCluster = new MiniQJMHACluster.Builder(conf).build(); + cluster = qjmhaCluster.getDfsCluster(); + + try { + // During HA startup, both nodes should be in + // standby and we shouldn't have any edits files + // in any edits directory! + List allDirs = Lists.newArrayList(); + allDirs.addAll(cluster.getNameDirs(0)); + allDirs.addAll(cluster.getNameDirs(1)); + assertNoEditFiles(allDirs); + + // Set the first NN to active, make sure it creates edits + // in its own dirs and the shared dir. The standby + // should still have no edits! + cluster.transitionToActive(0); + + assertEditFiles(cluster.getNameDirs(0), + NNStorage.getInProgressEditsFileName(1)); + assertNoEditFiles(cluster.getNameDirs(1)); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test", + FsPermission.createImmutable((short) 0755), true); + + cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits(); + + // StandbyNameNode should not finish tailing in-progress logs + assertNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), + "/test", true)); + + // Restarting the standby should not finalize any edits files + // in the shared directory when it starts up! + cluster.restartNameNode(1); + + assertEditFiles(cluster.getNameDirs(0), + NNStorage.getInProgressEditsFileName(1)); + assertNoEditFiles(cluster.getNameDirs(1)); + + // Additionally it should not have applied any in-progress logs + // at start-up -- otherwise, it would have read half-way into + // the current log segment, and on the next roll, it would have to + // either replay starting in the middle of the segment (not allowed) + // or double-replay the edits (incorrect). + assertNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), + "/test", true)); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test2", + FsPermission.createImmutable((short) 0755), true); + + // If we restart NN0, it'll come back as standby, and we can + // transition NN1 to active and make sure it reads edits correctly. + cluster.restartNameNode(0); + cluster.transitionToActive(1); + + // NN1 should have both the edits that came before its restart, + // and the edits that came after its restart. + assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), + "/test", true)); + assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), + "/test2", true)); + } finally { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + } + + @Test + public void testSetup() throws Exception { + // During HA startup, both nodes should be in + // standby and we shouldn't have any edits files + // in any edits directory! + List allDirs = Lists.newArrayList(); + allDirs.addAll(cluster.getNameDirs(0)); + allDirs.addAll(cluster.getNameDirs(1)); + assertNoEditFiles(allDirs); + + // Set the first NN to active, make sure it creates edits + // in its own dirs and the shared dir. The standby + // should still have no edits! + cluster.transitionToActive(0); + + assertEditFiles(cluster.getNameDirs(0), + NNStorage.getInProgressEditsFileName(1)); + assertNoEditFiles(cluster.getNameDirs(1)); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test", + FsPermission.createImmutable((short) 0755), true); + + nn1.getNamesystem().getEditLogTailer().doTailEdits(); + + // After waiting for 5 seconds, StandbyNameNode should finish tailing + // in-progress logs + assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), + "/test", true)); + + // Restarting the standby should not finalize any edits files + // in the shared directory when it starts up! + cluster.restartNameNode(1); + + assertEditFiles(cluster.getNameDirs(0), + NNStorage.getInProgressEditsFileName(1)); + assertNoEditFiles(cluster.getNameDirs(1)); + + // Because we're using in-progress tailer, this should not be null + assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), + "/test", true)); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test2", + FsPermission.createImmutable((short) 0755), true); + + // If we restart NN0, it'll come back as standby, and we can + // transition NN1 to active and make sure it reads edits correctly. + cluster.restartNameNode(0); + cluster.transitionToActive(1); + + // NN1 should have both the edits that came before its restart, + // and the edits that came after its restart. + assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), + "/test", true)); + assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), + "/test2", true)); + } + + @Test + public void testHalfStartInProgressTail() throws Exception { + // Set the first NN to active, make sure it creates edits + // in its own dirs and the shared dir. The standby + // should still have no edits! + cluster.transitionToActive(0); + + assertEditFiles(cluster.getNameDirs(0), + NNStorage.getInProgressEditsFileName(1)); + assertNoEditFiles(cluster.getNameDirs(1)); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test", + FsPermission.createImmutable((short) 0755), true); + nn1.getNamesystem().getEditLogTailer().doTailEdits(); + + // StandbyNameNode should tail the in-progress edit + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true)); + + // Create a new edit and finalized it + cluster.getNameNode(0).getRpcServer().mkdirs("/test2", + FsPermission.createImmutable((short) 0755), true); + nn0.getRpcServer().rollEditLog(); + + // StandbyNameNode shouldn't tail the edit since we do not call the method + assertNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true)); + + // Create a new in-progress edit and let SBNN do the tail + cluster.getNameNode(0).getRpcServer().mkdirs("/test3", + FsPermission.createImmutable((short) 0755), true); + nn1.getNamesystem().getEditLogTailer().doTailEdits(); + + // StandbyNameNode should tail the finalized edit and the new in-progress + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true)); + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true)); + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true)); + } + + @Test + public void testInitStartInProgressTail() throws Exception { + // Set the first NN to active, make sure it creates edits + // in its own dirs and the shared dir. The standby + // should still have no edits! + cluster.transitionToActive(0); + + assertEditFiles(cluster.getNameDirs(0), + NNStorage.getInProgressEditsFileName(1)); + assertNoEditFiles(cluster.getNameDirs(1)); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test", + FsPermission.createImmutable((short) 0755), true); + cluster.getNameNode(0).getRpcServer().mkdirs("/test2", + FsPermission.createImmutable((short) 0755), true); + nn0.getRpcServer().rollEditLog(); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test3", + FsPermission.createImmutable((short) 0755), true); + + assertNull(NameNodeAdapter.getFileInfo(nn1, "/test", true)); + assertNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true)); + assertNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true)); + + nn1.getNamesystem().getEditLogTailer().doTailEdits(); + + // StandbyNameNode shoudl tail the finalized edit and the new in-progress + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true)); + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true)); + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true)); + } + + @Test + public void testNewStartInProgressTail() throws Exception { + cluster.transitionToActive(0); + + assertEditFiles(cluster.getNameDirs(0), + NNStorage.getInProgressEditsFileName(1)); + assertNoEditFiles(cluster.getNameDirs(1)); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test", + FsPermission.createImmutable((short) 0755), true); + cluster.getNameNode(0).getRpcServer().mkdirs("/test2", + FsPermission.createImmutable((short) 0755), true); + nn1.getNamesystem().getEditLogTailer().doTailEdits(); + nn0.getRpcServer().rollEditLog(); + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true)); + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true)); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test3", + FsPermission.createImmutable((short) 0755), true); + nn1.getNamesystem().getEditLogTailer().doTailEdits(); + + // StandbyNameNode shoudl tail the finalized edit and the new in-progress + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true)); + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true)); + assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true)); + } + + /** + * Check that no edits files are present in the given storage dirs. + */ + private static void assertNoEditFiles(Iterable dirs) throws IOException { + assertEditFiles(dirs); + } + + /** + * Check that the given list of edits files are present in the given storage + * dirs. + */ + private static void assertEditFiles(Iterable dirs, String... files) + throws IOException { + for (URI u : dirs) { + File editDirRoot = new File(u.getPath()); + File editDir = new File(editDirRoot, "current"); + GenericTestUtils.assertExists(editDir); + if (files.length == 0) { + LOG.info("Checking no edit files exist in " + editDir); + } else { + LOG.info("Checking for following edit files in " + editDir + + ": " + Joiner.on(",").join(files)); + } + + GenericTestUtils.assertGlobEquals(editDir, "edits_.*", files); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org