Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D278F18265 for ; Mon, 29 Jun 2015 23:30:27 +0000 (UTC) Received: (qmail 86233 invoked by uid 500); 29 Jun 2015 23:30:18 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 85816 invoked by uid 500); 29 Jun 2015 23:30:18 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 84592 invoked by uid 99); 29 Jun 2015 23:30:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jun 2015 23:30:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 89059E360C; Mon, 29 Jun 2015 23:30:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjshen@apache.org To: common-commits@hadoop.apache.org Date: Mon, 29 Jun 2015 23:30:36 -0000 Message-Id: <9ef0cd97ca94484f97f99c616cb77ef4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/50] [abbrv] hadoop git commit: HDFS-6440. Support more than 2 NameNodes. Contributed by Jesse Yates. HDFS-6440. Support more than 2 NameNodes. Contributed by Jesse Yates. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d5ea7d3e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d5ea7d3e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d5ea7d3e Branch: refs/heads/YARN-2928 Commit: d5ea7d3e1c664ec16b7909678b4081deb87dc1c0 Parents: 6c71cfe Author: Aaron T. Myers Authored: Tue Jun 23 17:26:11 2015 -0700 Committer: Zhijie Shen Committed: Mon Jun 29 10:28:23 2015 -0700 ---------------------------------------------------------------------- .../apache/hadoop/ha/ZKFailoverController.java | 61 ++- .../org/apache/hadoop/ha/MiniZKFCCluster.java | 93 +++- .../hadoop/ha/TestZKFailoverController.java | 32 ++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../bkjournal/TestBookKeeperHACheckpoints.java | 7 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../java/org/apache/hadoop/hdfs/HAUtil.java | 36 +- .../token/block/BlockTokenSecretManager.java | 40 +- .../server/blockmanagement/BlockManager.java | 21 +- .../hdfs/server/namenode/CheckpointConf.java | 14 +- .../hdfs/server/namenode/ImageServlet.java | 88 +++- .../server/namenode/NameNodeHttpServer.java | 7 +- .../hdfs/server/namenode/TransferFsImage.java | 47 +- .../server/namenode/ha/BootstrapStandby.java | 94 ++-- .../hdfs/server/namenode/ha/EditLogTailer.java | 162 +++++-- .../server/namenode/ha/RemoteNameNodeInfo.java | 133 ++++++ .../server/namenode/ha/StandbyCheckpointer.java | 182 +++++--- .../hdfs/tools/DFSZKFailoverController.java | 13 + .../src/main/resources/hdfs-default.xml | 20 + .../org/apache/hadoop/hdfs/MiniDFSCluster.java | 464 ++++++++++++------- .../apache/hadoop/hdfs/MiniDFSNNTopology.java | 18 +- .../hadoop/hdfs/TestDFSUpgradeFromImage.java | 6 +- .../apache/hadoop/hdfs/TestRollingUpgrade.java | 109 +++-- .../hadoop/hdfs/qjournal/MiniQJMHACluster.java | 94 ++-- .../security/token/block/TestBlockToken.java | 10 +- .../hdfs/server/namenode/TestBackupNode.java | 2 +- .../hdfs/server/namenode/TestCheckpoint.java | 3 +- .../server/namenode/ha/HAStressTestHarness.java | 46 +- .../hdfs/server/namenode/ha/HATestUtil.java | 60 ++- .../namenode/ha/TestBootstrapStandby.java | 176 ++++--- .../ha/TestBootstrapStandbyWithQJM.java | 47 +- .../ha/TestDNFencingWithReplication.java | 1 + .../server/namenode/ha/TestEditLogTailer.java | 12 +- .../ha/TestFailoverWithBlockTokensEnabled.java | 55 ++- .../server/namenode/ha/TestHAConfiguration.java | 49 +- .../namenode/ha/TestPipelinesFailover.java | 110 +++-- .../namenode/ha/TestRemoteNameNodeInfo.java | 61 +++ .../namenode/ha/TestSeveralNameNodes.java | 179 +++++++ .../namenode/ha/TestStandbyCheckpoints.java | 106 +++-- .../src/test/resources/hadoop-0.23-reserved.tgz | Bin 4558 -> 5590 bytes .../src/test/resources/hadoop-1-reserved.tgz | Bin 2572 -> 3348 bytes .../src/test/resources/hadoop-2-reserved.tgz | Bin 2838 -> 3465 bytes .../src/test/resources/hadoop-22-dfs-dir.tgz | Bin 318180 -> 413239 bytes .../src/test/resources/hadoop1-bbw.tgz | Bin 40234 -> 43294 bytes .../src/test/resources/log4j.properties | 2 +- 45 files changed, 1926 insertions(+), 740 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java index b1f5920..30ec8f2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Executors; @@ -141,6 +142,7 @@ public abstract class ZKFailoverController { throws AccessControlException, IOException; protected abstract InetSocketAddress getRpcAddressToBindTo(); protected abstract PolicyProvider getPolicyProvider(); + protected abstract List getAllOtherNodes(); /** * Return the name of a znode inside the configured parent znode in which @@ -616,9 +618,11 @@ public abstract class ZKFailoverController { * Coordinate a graceful failover. This proceeds in several phases: * 1) Pre-flight checks: ensure that the local node is healthy, and * thus a candidate for failover. - * 2) Determine the current active node. If it is the local node, no + * 2a) Determine the current active node. If it is the local node, no * need to failover - return success. - * 3) Ask that node to yield from the election for a number of seconds. + * 2b) Get the other nodes + * 3a) Ask the other nodes to yield from election for a number of seconds + * 3b) Ask the active node to yield from the election for a number of seconds. * 4) Allow the normal election path to run in other threads. Wait until * we either become unhealthy or we see an election attempt recorded by * the normal code path. @@ -648,12 +652,27 @@ public abstract class ZKFailoverController { "No need to failover. Returning success."); return; } - - // Phase 3: ask the old active to yield from the election. - LOG.info("Asking " + oldActive + " to cede its active state for " + - timeout + "ms"); - ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout); - oldZkfc.cedeActive(timeout); + + // Phase 2b: get the other nodes + List otherNodes = getAllOtherNodes(); + List otherZkfcs = new ArrayList(otherNodes.size()); + + // Phase 3: ask the other nodes to yield from the election. + HAServiceTarget activeNode = null; + for (HAServiceTarget remote : otherNodes) { + // same location, same node - may not always be == equality + if (remote.getAddress().equals(oldActive.getAddress())) { + activeNode = remote; + continue; + } + otherZkfcs.add(cedeRemoteActive(remote, timeout)); + } + + assert + activeNode != null : "Active node does not match any known remote node"; + + // Phase 3b: ask the old active to yield + otherZkfcs.add(cedeRemoteActive(activeNode, timeout)); // Phase 4: wait for the normal election to make the local node // active. @@ -676,8 +695,10 @@ public abstract class ZKFailoverController { // Phase 5. At this point, we made some attempt to become active. So we // can tell the old active to rejoin if it wants. This allows a quick // fail-back if we immediately crash. - oldZkfc.cedeActive(-1); - + for (ZKFCProtocol zkfc : otherZkfcs) { + zkfc.cedeActive(-1); + } + if (attempt.succeeded) { LOG.info("Successfully became active. " + attempt.status); } else { @@ -688,6 +709,23 @@ public abstract class ZKFailoverController { } /** + * Ask the remote zkfc to cede its active status and wait for the specified + * timeout before attempting to claim leader status. + * @param remote node to ask + * @param timeout amount of time to cede + * @return the {@link ZKFCProtocol} used to talk to the ndoe + * @throws IOException + */ + private ZKFCProtocol cedeRemoteActive(HAServiceTarget remote, int timeout) + throws IOException { + LOG.info("Asking " + remote + " to cede its active state for " + + timeout + "ms"); + ZKFCProtocol oldZkfc = remote.getZKFCProxy(conf, timeout); + oldZkfc.cedeActive(timeout); + return oldZkfc; + } + + /** * Ensure that the local node is in a healthy state, and thus * eligible for graceful failover. * @throws ServiceFailedException if the node is unhealthy @@ -777,7 +815,8 @@ public abstract class ZKFailoverController { break; default: - throw new IllegalArgumentException("Unhandled state:" + lastHealthState); + throw new IllegalArgumentException("Unhandled state:" + + lastHealthState); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java index 5aee611..b496bf9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,7 +51,7 @@ public class MiniZKFCCluster { private final TestContext ctx; private final ZooKeeperServer zks; - private DummyHAService svcs[]; + private List svcs; private DummyZKFCThread thrs[]; private Configuration conf; @@ -63,38 +65,67 @@ public class MiniZKFCCluster { conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50); conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50); conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50); - svcs = new DummyHAService[2]; - svcs[0] = new DummyHAService(HAServiceState.INITIALIZING, - new InetSocketAddress("svc1", 1234)); - svcs[0].setSharedResource(sharedResource); - svcs[1] = new DummyHAService(HAServiceState.INITIALIZING, - new InetSocketAddress("svc2", 1234)); - svcs[1].setSharedResource(sharedResource); - + svcs = new ArrayList(2); + // remove any existing instances we are keeping track of + DummyHAService.instances.clear(); + + for (int i = 0; i < 2; i++) { + addSvcs(svcs, i); + } + this.ctx = new TestContext(); this.zks = zks; } - + + private void addSvcs(List svcs, int i) { + svcs.add(new DummyHAService(HAServiceState.INITIALIZING, new InetSocketAddress("svc" + (i + 1), + 1234))); + svcs.get(i).setSharedResource(sharedResource); + } + /** * Set up two services and their failover controllers. svc1 is started * first, so that it enters ACTIVE state, and then svc2 is started, * which enters STANDBY */ public void start() throws Exception { + start(2); + } + + /** + * Set up the specified number of services and their failover controllers. svc1 is + * started first, so that it enters ACTIVE state, and then svc2...svcN is started, which enters + * STANDBY. + *

+ * Adds any extra svc needed beyond the first two before starting the rest of the cluster. + * @param count number of zkfcs to start + */ + public void start(int count) throws Exception { + // setup the expected number of zkfcs, if we need to add more. This seemed the least invasive + // way to add the services - otherwise its a large test rewrite or changing a lot of assumptions + if (count > 2) { + for (int i = 2; i < count; i++) { + addSvcs(svcs, i); + } + } + // Format the base dir, should succeed - thrs = new DummyZKFCThread[2]; - thrs[0] = new DummyZKFCThread(ctx, svcs[0]); + thrs = new DummyZKFCThread[count]; + thrs[0] = new DummyZKFCThread(ctx, svcs.get(0)); assertEquals(0, thrs[0].zkfc.run(new String[]{"-formatZK"})); ctx.addThread(thrs[0]); thrs[0].start(); LOG.info("Waiting for svc0 to enter active state"); waitForHAState(0, HAServiceState.ACTIVE); - - LOG.info("Adding svc1"); - thrs[1] = new DummyZKFCThread(ctx, svcs[1]); - thrs[1].start(); - waitForHAState(1, HAServiceState.STANDBY); + + // add the remaining zkfc + for (int i = 1; i < count; i++) { + LOG.info("Adding svc" + i); + thrs[i] = new DummyZKFCThread(ctx, svcs.get(i)); + thrs[i].start(); + waitForHAState(i, HAServiceState.STANDBY); + } } /** @@ -122,7 +153,7 @@ public class MiniZKFCCluster { } public DummyHAService getService(int i) { - return svcs[i]; + return svcs.get(i); } public ActiveStandbyElector getElector(int i) { @@ -134,23 +165,23 @@ public class MiniZKFCCluster { } public void setHealthy(int idx, boolean healthy) { - svcs[idx].isHealthy = healthy; + svcs.get(idx).isHealthy = healthy; } public void setFailToBecomeActive(int idx, boolean doFail) { - svcs[idx].failToBecomeActive = doFail; + svcs.get(idx).failToBecomeActive = doFail; } public void setFailToBecomeStandby(int idx, boolean doFail) { - svcs[idx].failToBecomeStandby = doFail; + svcs.get(idx).failToBecomeStandby = doFail; } public void setFailToFence(int idx, boolean doFail) { - svcs[idx].failToFence = doFail; + svcs.get(idx).failToFence = doFail; } public void setUnreachable(int idx, boolean unreachable) { - svcs[idx].actUnreachable = unreachable; + svcs.get(idx).actUnreachable = unreachable; } /** @@ -204,7 +235,7 @@ public class MiniZKFCCluster { byte[] data = zks.getZKDatabase().getData( DummyZKFC.LOCK_ZNODE, stat, null); - assertArrayEquals(Ints.toByteArray(svcs[idx].index), data); + assertArrayEquals(Ints.toByteArray(svcs.get(idx).index), data); long session = stat.getEphemeralOwner(); LOG.info("Expiring svc " + idx + "'s zookeeper session " + session); zks.closeSession(session); @@ -218,7 +249,7 @@ public class MiniZKFCCluster { */ public void waitForActiveLockHolder(Integer idx) throws Exception { - DummyHAService svc = idx == null ? null : svcs[idx]; + DummyHAService svc = idx == null ? null : svcs.get(idx); ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks, DummyZKFC.SCOPED_PARENT_ZNODE, (idx == null) ? null : Ints.toByteArray(svc.index)); @@ -320,5 +351,17 @@ public class MiniZKFCCluster { protected PolicyProvider getPolicyProvider() { return null; } + + @Override + protected List getAllOtherNodes() { + List services = new ArrayList( + DummyHAService.instances.size()); + for (DummyHAService service : DummyHAService.instances) { + if (service != this.localTarget) { + services.add(service); + } + } + return services; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java index d8271c5..b8d9ce4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java @@ -605,6 +605,38 @@ public class TestZKFailoverController extends ClientBaseWithFixes { } } + @Test(timeout = 25000) + public void testGracefulFailoverMultipleZKfcs() throws Exception { + try { + cluster.start(3); + + cluster.waitForActiveLockHolder(0); + + // failover to first + cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover(); + cluster.waitForActiveLockHolder(1); + + // failover to second + cluster.getService(2).getZKFCProxy(conf, 5000).gracefulFailover(); + cluster.waitForActiveLockHolder(2); + + // failover back to original + cluster.getService(0).getZKFCProxy(conf, 5000).gracefulFailover(); + cluster.waitForActiveLockHolder(0); + + Thread.sleep(10000); // allow to quiesce + + assertEquals(0, cluster.getService(0).fenceCount); + assertEquals(0, cluster.getService(1).fenceCount); + assertEquals(0, cluster.getService(2).fenceCount); + assertEquals(2, cluster.getService(0).activeTransitionCount); + assertEquals(1, cluster.getService(1).activeTransitionCount); + assertEquals(1, cluster.getService(2).activeTransitionCount); + } finally { + cluster.stop(); + } + } + private int runFC(DummyHAService target, String ... args) throws Exception { DummyZKFC zkfc = new DummyZKFC(conf, target); return zkfc.run(args); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 981ca55..09a6891 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -38,6 +38,8 @@ Trunk (Unreleased) HDFS-3125. Add JournalService to enable Journal Daemon. (suresh) + HDFS-6440. Support more than 2 NameNodes. (Jesse Yates via atm) + IMPROVEMENTS HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java index b74cd7f..ed53512 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java @@ -32,6 +32,10 @@ import org.junit.BeforeClass; * using a bookkeeper journal manager as the shared directory */ public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints { + //overwrite the nn count + static{ + TestStandbyCheckpoints.NUM_NNS = 2; + } private static BKJMUtil bkutil = null; static int numBookies = 3; static int journalCount = 0; @@ -57,8 +61,7 @@ public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints { .build(); cluster.waitActive(); - nn0 = cluster.getNameNode(0); - nn1 = cluster.getNameNode(1); + setNNs(); fs = HATestUtil.configureFailoverFs(cluster, conf); cluster.transitionToActive(0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 30540a9..ebd668f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -132,6 +132,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090"; public static final String DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY = "dfs.namenode.secondary.https-address"; public static final String DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:50091"; + public static final String DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY = "dfs.namenode.checkpoint.check.quiet-multiplier"; + public static final double DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT = 1.5; public static final String DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY = "dfs.namenode.checkpoint.check.period"; public static final long DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT = 60; public static final String DFS_NAMENODE_CHECKPOINT_PERIOD_KEY = "dfs.namenode.checkpoint.period"; @@ -544,6 +546,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_HA_LOGROLL_PERIOD_DEFAULT = 2 * 60; // 2m public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period"; public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m + public static final String DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY = "dfs.ha.tail-edits.namenode-retries"; + public static final int DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT = 3; public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout"; public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index c967c69..686a0b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -143,7 +143,7 @@ public class HAUtil { * @param conf the configuration of this node * @return the NN ID of the other node in this nameservice */ - public static String getNameNodeIdOfOtherNode(Configuration conf, String nsId) { + public static List getNameNodeIdOfOtherNodes(Configuration conf, String nsId) { Preconditions.checkArgument(nsId != null, "Could not determine namespace id. Please ensure that this " + "machine is one of the machines listed as a NN RPC address, " + @@ -157,20 +157,20 @@ public class HAUtil { DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, nsId), nsId); - Preconditions.checkArgument(nnIds.size() == 2, - "Expected exactly 2 NameNodes in namespace '%s'. " + - "Instead, got only %s (NN ids were '%s'", - nsId, nnIds.size(), Joiner.on("','").join(nnIds)); + Preconditions.checkArgument(nnIds.size() >= 2, + "Expected at least 2 NameNodes in namespace '%s'. " + + "Instead, got only %s (NN ids were '%s')", + nsId, nnIds.size(), Joiner.on("','").join(nnIds)); Preconditions.checkState(myNNId != null && !myNNId.isEmpty(), "Could not determine own NN ID in namespace '%s'. Please " + "ensure that this node is one of the machines listed as an " + "NN RPC address, or configure " + DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, nsId); - ArrayList nnSet = Lists.newArrayList(nnIds); - nnSet.remove(myNNId); - assert nnSet.size() == 1; - return nnSet.get(0); + ArrayList namenodes = Lists.newArrayList(nnIds); + namenodes.remove(myNNId); + assert namenodes.size() >= 1; + return namenodes; } /** @@ -180,16 +180,20 @@ public class HAUtil { * @param myConf the configuration of this node * @return the configuration of the other node in an HA setup */ - public static Configuration getConfForOtherNode( + public static List getConfForOtherNodes( Configuration myConf) { String nsId = DFSUtil.getNamenodeNameServiceId(myConf); - String otherNn = getNameNodeIdOfOtherNode(myConf, nsId); - - // Look up the address of the active NN. - Configuration confForOtherNode = new Configuration(myConf); - NameNode.initializeGenericKeys(confForOtherNode, nsId, otherNn); - return confForOtherNode; + List otherNn = getNameNodeIdOfOtherNodes(myConf, nsId); + + // Look up the address of the other NNs + List confs = new ArrayList(otherNn.size()); + for (String nn : otherNn) { + Configuration confForOtherNode = new Configuration(myConf); + NameNode.initializeGenericKeys(confForOtherNode, nsId, nn); + confs.add(confForOtherNode); + } + return confs; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index b103c1a..53da44c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -52,17 +52,11 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class BlockTokenSecretManager extends SecretManager { - public static final Log LOG = LogFactory - .getLog(BlockTokenSecretManager.class); - - // We use these in an HA setup to ensure that the pair of NNs produce block - // token serial numbers that are in different ranges. - private static final int LOW_MASK = ~(1 << 31); - + public static final Log LOG = LogFactory.getLog(BlockTokenSecretManager.class); + public static final Token DUMMY_TOKEN = new Token(); private final boolean isMaster; - private int nnIndex; /** * keyUpdateInterval is the interval that NN updates its block keys. It should @@ -77,21 +71,22 @@ public class BlockTokenSecretManager extends private final Map allKeys; private String blockPoolId; private final String encryptionAlgorithm; - + + private final int intRange; + private final int nnRangeStart; + private final SecureRandom nonceGenerator = new SecureRandom(); - ; - /** * Constructor for slaves. - * + * * @param keyUpdateInterval how often a new key will be generated * @param tokenLifetime how long an individual token is valid */ public BlockTokenSecretManager(long keyUpdateInterval, long tokenLifetime, String blockPoolId, String encryptionAlgorithm) { this(false, keyUpdateInterval, tokenLifetime, blockPoolId, - encryptionAlgorithm); + encryptionAlgorithm, 0, 1); } /** @@ -99,23 +94,25 @@ public class BlockTokenSecretManager extends * * @param keyUpdateInterval how often a new key will be generated * @param tokenLifetime how long an individual token is valid - * @param nnIndex namenode index + * @param nnIndex namenode index of the namenode for which we are creating the manager * @param blockPoolId block pool ID * @param encryptionAlgorithm encryption algorithm to use + * @param numNNs number of namenodes possible */ public BlockTokenSecretManager(long keyUpdateInterval, - long tokenLifetime, int nnIndex, String blockPoolId, + long tokenLifetime, int nnIndex, int numNNs, String blockPoolId, String encryptionAlgorithm) { - this(true, keyUpdateInterval, tokenLifetime, blockPoolId, - encryptionAlgorithm); - Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1); - this.nnIndex = nnIndex; + this(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm, nnIndex, numNNs); + Preconditions.checkArgument(nnIndex >= 0); + Preconditions.checkArgument(numNNs > 0); setSerialNo(new SecureRandom().nextInt()); generateKeys(); } private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval, - long tokenLifetime, String blockPoolId, String encryptionAlgorithm) { + long tokenLifetime, String blockPoolId, String encryptionAlgorithm, int nnIndex, int numNNs) { + this.intRange = Integer.MAX_VALUE / numNNs; + this.nnRangeStart = intRange * nnIndex; this.isMaster = isMaster; this.keyUpdateInterval = keyUpdateInterval; this.tokenLifetime = tokenLifetime; @@ -127,7 +124,8 @@ public class BlockTokenSecretManager extends @VisibleForTesting public synchronized void setSerialNo(int serialNo) { - this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31); + // we mod the serial number by the range and then add that times the index + this.serialNo = (serialNo % intRange) + (nnRangeStart); } public void setBlockPoolId(String blockPoolId) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 824801f..7d3a678 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.IOException; @@ -42,6 +43,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; @@ -399,14 +401,21 @@ public class BlockManager { boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId); if (isHaEnabled) { - String thisNnId = HAUtil.getNameNodeId(conf, nsId); - String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId); - return new BlockTokenSecretManager(updateMin*60*1000L, - lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1, null, - encryptionAlgorithm); + // figure out which index we are of the nns + Collection nnIds = DFSUtilClient.getNameNodeIds(conf, nsId); + String nnId = HAUtil.getNameNodeId(conf, nsId); + int nnIndex = 0; + for (String id : nnIds) { + if (id.equals(nnId)) { + break; + } + nnIndex++; + } + return new BlockTokenSecretManager(updateMin * 60 * 1000L, + lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, encryptionAlgorithm); } else { return new BlockTokenSecretManager(updateMin*60*1000L, - lifetimeMin*60*1000L, 0, null, encryptionAlgorithm); + lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java index b1636bc..c30730b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java @@ -44,7 +44,13 @@ public class CheckpointConf { /** The output dir for legacy OIV image */ private final String legacyOivImageDir; - + + /** + * multiplier on the checkpoint period to allow other nodes to do the checkpointing, when not the + * 'primary' checkpoint node + */ + private double quietMultiplier; + public CheckpointConf(Configuration conf) { checkpointCheckPeriod = conf.getLong( DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, @@ -57,6 +63,8 @@ public class CheckpointConf { maxRetriesOnMergeError = conf.getInt(DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY, DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT); legacyOivImageDir = conf.get(DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY); + quietMultiplier = conf.getDouble(DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY, + DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT); warnForDeprecatedConfigs(conf); } @@ -91,4 +99,8 @@ public class CheckpointConf { public String getLegacyOivImageDir() { return legacyOivImageDir; } + + public double getQuietPeriod() { + return this.checkpointPeriod * this.quietMultiplier; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java index c565eb5..9dc20b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java @@ -30,6 +30,7 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.security.SecurityUtil; import org.apache.commons.logging.Log; @@ -81,6 +82,9 @@ public class ImageServlet extends HttpServlet { private static final String LATEST_FSIMAGE_VALUE = "latest"; private static final String IMAGE_FILE_TYPE = "imageFile"; + private SortedSet currentlyDownloadingCheckpoints = Collections + . synchronizedSortedSet(new TreeSet()); + @Override public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { @@ -253,10 +257,12 @@ public class ImageServlet extends HttpServlet { } if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) { - Configuration otherNnConf = HAUtil.getConfForOtherNode(conf); - validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf - .get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY), - NameNode.getAddress(otherNnConf).getHostName())); + List otherNnConfs = HAUtil.getConfForOtherNodes(conf); + for (Configuration otherNnConf : otherNnConfs) { + validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf + .get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY), + NameNode.getAddress(otherNnConf).getHostName())); + } } for (String v : validRequestors) { @@ -420,7 +426,6 @@ public class ImageServlet extends HttpServlet { /** * Set the required parameters for uploading image * - * @param httpMethod instance of method to set the parameters * @param storage colon separated storageInfo string * @param txid txid of the image * @param imageFileSize size of the imagefile to be uploaded @@ -459,12 +464,37 @@ public class ImageServlet extends HttpServlet { @Override public Void run() throws Exception { + // if its not the active NN, then we need to notify the caller it was was the wrong + // target (regardless of the fact that we got the image) + HAServiceProtocol.HAServiceState state = NameNodeHttpServer + .getNameNodeStateFromContext(getServletContext()); + if (state != HAServiceProtocol.HAServiceState.ACTIVE) { + // we need a different response type here so the client can differentiate this + // from the failure to upload due to (1) security, or (2) other checkpoints already + // present + response.sendError(HttpServletResponse.SC_EXPECTATION_FAILED, + "Nameode "+request.getLocalAddr()+" is currently not in a state which can " + + "accept uploads of new fsimages. State: "+state); + return null; + } final long txid = parsedParams.getTxId(); + String remoteAddr = request.getRemoteAddr(); + ImageUploadRequest imageRequest = new ImageUploadRequest(txid, remoteAddr); final NameNodeFile nnf = parsedParams.getNameNodeFile(); - if (!nnImage.addToCheckpointing(txid)) { + // if the node is attempting to upload an older transaction, we ignore it + SortedSet larger = currentlyDownloadingCheckpoints.tailSet(imageRequest); + if (larger.size() > 0) { + response.sendError(HttpServletResponse.SC_CONFLICT, + "Another checkpointer is already in the process of uploading a" + + " checkpoint made up to transaction ID " + larger.last()); + return null; + } + + //make sure no one else has started uploading one + if (!currentlyDownloadingCheckpoints.add(imageRequest)) { response.sendError(HttpServletResponse.SC_CONFLICT, "Either current namenode is checkpointing or another" + " checkpointer is already in the process of " @@ -499,6 +529,10 @@ public class ImageServlet extends HttpServlet { // remove some old ones. nnImage.purgeOldStorage(nnf); } finally { + // remove the request once we've processed it, or it threw an error, so we + // aren't using it either + currentlyDownloadingCheckpoints.remove(imageRequest); + stream.close(); } } finally { @@ -555,4 +589,46 @@ public class ImageServlet extends HttpServlet { return nnf; } } + + private static class ImageUploadRequest implements Comparable { + + private final long txId; + private final String address; + + public ImageUploadRequest(long txid, String remoteAddr) { + this.txId = txid; + this.address = remoteAddr; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ImageUploadRequest that = (ImageUploadRequest) o; + + if (txId != that.txId) return false; + if (!address.equals(that.address)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = (int) (txId ^ (txId >>> 32)); + result = 31 * result + address.hashCode(); + return result; + } + + @Override public int compareTo(ImageUploadRequest other) { + return Long.compare(txId, other.txId); + } + + @Override public String toString() { + return "ImageRequest{" + + "txId=" + txId + + ", address='" + address + '\'' + + '}'; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java index 09b6b80..6bd9868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java @@ -27,6 +27,7 @@ import javax.servlet.ServletContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -272,4 +273,8 @@ public class NameNodeHttpServer { ServletContext context) { return (StartupProgress)context.getAttribute(STARTUP_PROGRESS_ATTRIBUTE_KEY); } -} + + public static HAServiceProtocol.HAServiceState getNameNodeStateFromContext(ServletContext context) { + return getNameNodeFromContext(context).getServiceState(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java index 9783cca..afecf99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java @@ -70,7 +70,33 @@ import org.mortbay.jetty.EofException; */ @InterfaceAudience.Private public class TransferFsImage { - + + public enum TransferResult{ + SUCCESS(HttpServletResponse.SC_OK, false), + AUTHENTICATION_FAILURE(HttpServletResponse.SC_FORBIDDEN, true), + NOT_ACTIVE_NAMENODE_FAILURE(HttpServletResponse.SC_EXPECTATION_FAILED, false), + OLD_TRANSACTION_ID_FAILURE(HttpServletResponse.SC_CONFLICT, false), + UNEXPECTED_FAILURE(-1, true); + + private final int response; + private final boolean shouldReThrowException; + + private TransferResult(int response, boolean rethrow) { + this.response = response; + this.shouldReThrowException = rethrow; + } + + public static TransferResult getResultForCode(int code){ + TransferResult ret = UNEXPECTED_FAILURE; + for(TransferResult result:TransferResult.values()){ + if(result.response == code){ + return result; + } + } + return ret; + } + } + public final static String CONTENT_LENGTH = "Content-Length"; public final static String FILE_LENGTH = "File-Length"; public final static String MD5_HEADER = "X-MD5-Digest"; @@ -198,9 +224,9 @@ public class TransferFsImage { * @param txid the transaction ID of the image to be uploaded * @throws IOException if there is an I/O error */ - public static void uploadImageFromStorage(URL fsName, Configuration conf, + public static TransferResult uploadImageFromStorage(URL fsName, Configuration conf, NNStorage storage, NameNodeFile nnf, long txid) throws IOException { - uploadImageFromStorage(fsName, conf, storage, nnf, txid, null); + return uploadImageFromStorage(fsName, conf, storage, nnf, txid, null); } /** @@ -215,7 +241,7 @@ public class TransferFsImage { * @param canceler optional canceler to check for abort of upload * @throws IOException if there is an I/O error or cancellation */ - public static void uploadImageFromStorage(URL fsName, Configuration conf, + public static TransferResult uploadImageFromStorage(URL fsName, Configuration conf, NNStorage storage, NameNodeFile nnf, long txid, Canceler canceler) throws IOException { URL url = new URL(fsName, ImageServlet.PATH_SPEC); @@ -223,21 +249,18 @@ public class TransferFsImage { try { uploadImage(url, conf, storage, nnf, txid, canceler); } catch (HttpPutFailedException e) { - if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) { - // this is OK - this means that a previous attempt to upload - // this checkpoint succeeded even though we thought it failed. - LOG.info("Image upload with txid " + txid + - " conflicted with a previous image upload to the " + - "same NameNode. Continuing...", e); - return; - } else { + // translate the error code to a result, which is a bit more obvious in usage + TransferResult result = TransferResult.getResultForCode(e.getResponseCode()); + if (result.shouldReThrowException) { throw e; } + return result; } double xferSec = Math.max( ((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001); LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName + " in " + xferSec + " seconds"); + return TransferResult.SUCCESS; } /* http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java index 88d9a6a..c22d7f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java @@ -23,8 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; -import java.net.URL; import java.security.PrivilegedAction; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -77,10 +77,8 @@ public class BootstrapStandby implements Tool, Configurable { private static final Log LOG = LogFactory.getLog(BootstrapStandby.class); private String nsId; private String nnId; - private String otherNNId; + private List remoteNNs; - private URL otherHttpAddr; - private InetSocketAddress otherIpcAddr; private Collection dirsToFormat; private List editUrisToFormat; private List sharedEditsUris; @@ -139,8 +137,8 @@ public class BootstrapStandby implements Tool, Configurable { System.err.println("Usage: " + this.getClass().getSimpleName() + " [-force] [-nonInteractive] [-skipSharedEditsCheck]"); } - - private NamenodeProtocol createNNProtocolProxy() + + private NamenodeProtocol createNNProtocolProxy(InetSocketAddress otherIpcAddr) throws IOException { return NameNodeProxies.createNonHAProxy(getConf(), otherIpcAddr, NamenodeProtocol.class, @@ -149,18 +147,36 @@ public class BootstrapStandby implements Tool, Configurable { } private int doRun() throws IOException { - NamenodeProtocol proxy = createNNProtocolProxy(); - NamespaceInfo nsInfo; - boolean isUpgradeFinalized; - try { - nsInfo = proxy.versionRequest(); - isUpgradeFinalized = proxy.isUpgradeFinalized(); - } catch (IOException ioe) { - LOG.fatal("Unable to fetch namespace information from active NN at " + - otherIpcAddr + ": " + ioe.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug("Full exception trace", ioe); + // find the active NN + NamenodeProtocol proxy = null; + NamespaceInfo nsInfo = null; + boolean isUpgradeFinalized = false; + RemoteNameNodeInfo proxyInfo = null; + for (int i = 0; i < remoteNNs.size(); i++) { + proxyInfo = remoteNNs.get(i); + InetSocketAddress otherIpcAddress = proxyInfo.getIpcAddress(); + proxy = createNNProtocolProxy(otherIpcAddress); + try { + // Get the namespace from any active NN. If you just formatted the primary NN and are + // bootstrapping the other NNs from that layout, it will only contact the single NN. + // However, if there cluster is already running and you are adding a NN later (e.g. + // replacing a failed NN), then this will bootstrap from any node in the cluster. + nsInfo = proxy.versionRequest(); + isUpgradeFinalized = proxy.isUpgradeFinalized(); + break; + } catch (IOException ioe) { + LOG.warn("Unable to fetch namespace information from remote NN at " + otherIpcAddress + + ": " + ioe.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug("Full exception trace", ioe); + } } + } + + if (nsInfo == null) { + LOG.fatal( + "Unable to fetch namespace information from any remote NN. Possible NameNodes: " + + remoteNNs); return ERR_CODE_FAILED_CONNECT; } @@ -175,9 +191,9 @@ public class BootstrapStandby implements Tool, Configurable { "=====================================================\n" + "About to bootstrap Standby ID " + nnId + " from:\n" + " Nameservice ID: " + nsId + "\n" + - " Other Namenode ID: " + otherNNId + "\n" + - " Other NN's HTTP address: " + otherHttpAddr + "\n" + - " Other NN's IPC address: " + otherIpcAddr + "\n" + + " Other Namenode ID: " + proxyInfo.getNameNodeID() + "\n" + + " Other NN's HTTP address: " + proxyInfo.getHttpAddress() + "\n" + + " Other NN's IPC address: " + proxyInfo.getIpcAddress() + "\n" + " Namespace ID: " + nsInfo.getNamespaceID() + "\n" + " Block pool ID: " + nsInfo.getBlockPoolID() + "\n" + " Cluster ID: " + nsInfo.getClusterID() + "\n" + @@ -201,7 +217,7 @@ public class BootstrapStandby implements Tool, Configurable { } // download the fsimage from active namenode - int download = downloadImage(storage, proxy); + int download = downloadImage(storage, proxy, proxyInfo); if (download != 0) { return download; } @@ -292,7 +308,7 @@ public class BootstrapStandby implements Tool, Configurable { } } - private int downloadImage(NNStorage storage, NamenodeProtocol proxy) + private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo) throws IOException { // Load the newly formatted image, using all of the directories // (including shared edits) @@ -316,7 +332,7 @@ public class BootstrapStandby implements Tool, Configurable { // Download that checkpoint into our storage directories. MD5Hash hash = TransferFsImage.downloadImageToStorage( - otherHttpAddr, imageTxId, storage, true); + proxyInfo.getHttpAddress(), imageTxId, storage, true); image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId, hash); } catch (IOException ioe) { @@ -385,18 +401,26 @@ public class BootstrapStandby implements Tool, Configurable { throw new HadoopIllegalArgumentException( "Shared edits storage is not enabled for this namenode."); } - - Configuration otherNode = HAUtil.getConfForOtherNode(conf); - otherNNId = HAUtil.getNameNodeId(otherNode, nsId); - otherIpcAddr = NameNode.getServiceAddress(otherNode, true); - Preconditions.checkArgument(otherIpcAddr.getPort() != 0 && - !otherIpcAddr.getAddress().isAnyLocalAddress(), - "Could not determine valid IPC address for other NameNode (%s)" + - ", got: %s", otherNNId, otherIpcAddr); - - final String scheme = DFSUtil.getHttpClientScheme(conf); - otherHttpAddr = DFSUtil.getInfoServerWithDefaultHost( - otherIpcAddr.getHostName(), otherNode, scheme).toURL(); + + + remoteNNs = RemoteNameNodeInfo.getRemoteNameNodes(conf, nsId); + // validate the configured NNs + List remove = new ArrayList(remoteNNs.size()); + for (RemoteNameNodeInfo info : remoteNNs) { + InetSocketAddress address = info.getIpcAddress(); + LOG.info("Found nn: " + info.getNameNodeID() + ", ipc: " + info.getIpcAddress()); + if (address.getPort() == 0 || address.getAddress().isAnyLocalAddress()) { + LOG.error("Could not determine valid IPC address for other NameNode (" + + info.getNameNodeID() + ") , got: " + address); + remove.add(info); + } + } + + // remove any invalid nns + remoteNNs.removeAll(remove); + + // make sure we have at least one left to read + Preconditions.checkArgument(!remoteNNs.isEmpty(), "Could not find any valid namenodes!"); dirsToFormat = FSNamesystem.getNamespaceDirs(conf); editUrisToFormat = FSNamesystem.getNamespaceEditsDirs( http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 38aa358..cfca77c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -23,7 +23,13 @@ import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -67,10 +73,10 @@ public class EditLogTailer { private final Configuration conf; private final FSNamesystem namesystem; + private final Iterator nnLookup; private FSEditLog editLog; - private InetSocketAddress activeAddr; - private NamenodeProtocol cachedActiveProxy = null; + private RemoteNameNodeInfo currentNN; /** * The last transaction ID at which an edit log roll was initiated. @@ -100,7 +106,17 @@ public class EditLogTailer { * available to be read from. */ private final long sleepTimeMs; - + + private final int nnCount; + private NamenodeProtocol cachedActiveProxy = null; + // count of the number of NNs we have attempted in the current lookup loop + private int nnLoopCount = 0; + + /** + * maximum number of retries we should give each of the remote namenodes before giving up + */ + private int maxRetries; + public EditLogTailer(FSNamesystem namesystem, Configuration conf) { this.tailerThread = new EditLogTailerThread(); this.conf = conf; @@ -111,12 +127,24 @@ public class EditLogTailer { logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000; + List nns = Collections.emptyList(); if (logRollPeriodMs >= 0) { - this.activeAddr = getActiveNodeAddress(); - Preconditions.checkArgument(activeAddr.getPort() > 0, - "Active NameNode must have an IPC port configured. " + - "Got address '%s'", activeAddr); - LOG.info("Will roll logs on active node at " + activeAddr + " every " + + try { + nns = RemoteNameNodeInfo.getRemoteNameNodes(conf); + } catch (IOException e) { + throw new IllegalArgumentException("Remote NameNodes not correctly configured!", e); + } + + for (RemoteNameNodeInfo info : nns) { + // overwrite the socket address, if we need to + InetSocketAddress ipc = NameNode.getServiceAddress(info.getConfiguration(), true); + // sanity check the ipc address + Preconditions.checkArgument(ipc.getPort() > 0, + "Active NameNode must have an IPC port configured. " + "Got address '%s'", ipc); + info.setIpcAddress(ipc); + } + + LOG.info("Will roll logs on active node every " + (logRollPeriodMs / 1000) + " seconds."); } else { LOG.info("Not going to trigger log rolls on active node because " + @@ -125,29 +153,24 @@ public class EditLogTailer { sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000; - + + maxRetries = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, + DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT); + if (maxRetries <= 0) { + LOG.error("Specified a non-positive number of retries for the number of retries for the " + + "namenode connection when manipulating the edit log (" + + DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY + "), setting to default: " + + DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT); + maxRetries = DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT; + } + + nnCount = nns.size(); + // setup the iterator to endlessly loop the nns + this.nnLookup = Iterators.cycle(nns); + LOG.debug("logRollPeriodMs=" + logRollPeriodMs + " sleepTime=" + sleepTimeMs); } - - private InetSocketAddress getActiveNodeAddress() { - Configuration activeConf = HAUtil.getConfForOtherNode(conf); - return NameNode.getServiceAddress(activeConf, true); - } - - private NamenodeProtocol getActiveNodeProxy() throws IOException { - if (cachedActiveProxy == null) { - int rpcTimeout = conf.getInt( - DFSConfigKeys.DFS_HA_LOGROLL_RPC_TIMEOUT_KEY, - DFSConfigKeys.DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT); - NamenodeProtocolPB proxy = RPC.waitForProxy(NamenodeProtocolPB.class, - RPC.getProtocolVersion(NamenodeProtocolPB.class), activeAddr, conf, - rpcTimeout, Long.MAX_VALUE); - cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy); - } - assert cachedActiveProxy != null; - return cachedActiveProxy; - } public void start() { tailerThread.start(); @@ -270,9 +293,15 @@ public class EditLogTailer { * Trigger the active node to roll its logs. */ private void triggerActiveLogRoll() { - LOG.info("Triggering log roll on remote NameNode " + activeAddr); + LOG.info("Triggering log roll on remote NameNode"); try { - getActiveNodeProxy().rollEditLog(); + new MultipleNameNodeProxy() { + @Override + protected Void doWork() throws IOException { + cachedActiveProxy.rollEditLog(); + return null; + } + }.call(); lastRollTriggerTxId = lastLoadedTxnId; } catch (IOException ioe) { if (ioe instanceof RemoteException) { @@ -362,5 +391,76 @@ public class EditLogTailer { } } } + /** + * Manage the 'active namenode proxy'. This cannot just be the a single proxy since we could + * failover across a number of NameNodes, rather than just between an active and a standby. + *

+ * We - lazily - get a proxy to one of the configured namenodes and attempt to make the request + * against it. If it doesn't succeed, either because the proxy failed to be created or the request + * failed, we try the next NN in the list. We try this up to the configuration maximum number of + * retries before throwing up our hands. A working proxy is retained across attempts since we + * expect the active NameNode to switch rarely. + *

+ * This mechanism is very bad for cases where we care about being fast; it just + * blindly goes and tries namenodes. + */ + private abstract class MultipleNameNodeProxy implements Callable { + + /** + * Do the actual work to the remote namenode via the {@link #cachedActiveProxy}. + * @return the result of the work, if there is one + * @throws IOException if the actions done to the proxy throw an exception. + */ + protected abstract T doWork() throws IOException; + + public T call() throws IOException { + while ((cachedActiveProxy = getActiveNodeProxy()) != null) { + try { + T ret = doWork(); + // reset the loop count on success + nnLoopCount = 0; + return ret; + } catch (RemoteException e) { + Throwable cause = e.unwrapRemoteException(StandbyException.class); + // if its not a standby exception, then we need to re-throw it, something bad has happened + if (cause == e) { + throw e; + } else { + // it is a standby exception, so we try the other NN + LOG.warn("Failed to reach remote node: " + currentNN + + ", retrying with remaining remote NNs"); + cachedActiveProxy = null; + // this NN isn't responding to requests, try the next one + nnLoopCount++; + } + } + } + throw new IOException("Cannot find any valid remote NN to service request!"); + } -} + private NamenodeProtocol getActiveNodeProxy() throws IOException { + if (cachedActiveProxy == null) { + while (true) { + // if we have reached the max loop count, quit by returning null + if ((nnLoopCount / nnCount) >= maxRetries) { + return null; + } + + currentNN = nnLookup.next(); + try { + NamenodeProtocolPB proxy = RPC.waitForProxy(NamenodeProtocolPB.class, + RPC.getProtocolVersion(NamenodeProtocolPB.class), currentNN.getIpcAddress(), conf); + cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy); + break; + } catch (IOException e) { + LOG.info("Failed to reach " + currentNN, e); + // couldn't even reach this NN, try the next one + nnLoopCount++; + } + } + } + assert cachedActiveProxy != null; + return cachedActiveProxy; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java new file mode 100644 index 0000000..9a51190 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.google.common.base.Objects; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +import com.google.common.base.Preconditions; + +/** + * Information about a single remote NameNode + */ +public class RemoteNameNodeInfo { + + public static List getRemoteNameNodes(Configuration conf) throws IOException { + String nsId = DFSUtil.getNamenodeNameServiceId(conf); + return getRemoteNameNodes(conf, nsId); + } + + public static List getRemoteNameNodes(Configuration conf, String nsId) + throws IOException { + // there is only a single NN configured (and no federation) so we don't have any more NNs + if (nsId == null) { + return Collections.emptyList(); + } + List otherNodes = HAUtil.getConfForOtherNodes(conf); + List nns = new ArrayList(); + + for (Configuration otherNode : otherNodes) { + String otherNNId = HAUtil.getNameNodeId(otherNode, nsId); + // don't do any validation here as in some cases, it can be overwritten later + InetSocketAddress otherIpcAddr = NameNode.getServiceAddress(otherNode, true); + + + final String scheme = DFSUtil.getHttpClientScheme(conf); + URL otherHttpAddr = DFSUtil.getInfoServerWithDefaultHost(otherIpcAddr.getHostName(), + otherNode, scheme).toURL(); + + nns.add(new RemoteNameNodeInfo(otherNode, otherNNId, otherIpcAddr, otherHttpAddr)); + } + return nns; + } + + private final Configuration conf; + private final String nnId; + private InetSocketAddress ipcAddress; + private final URL httpAddress; + + private RemoteNameNodeInfo(Configuration conf, String nnId, InetSocketAddress ipcAddress, + URL httpAddress) { + this.conf = conf; + this.nnId = nnId; + this.ipcAddress = ipcAddress; + this.httpAddress = httpAddress; + } + + public InetSocketAddress getIpcAddress() { + return this.ipcAddress; + } + + public String getNameNodeID() { + return this.nnId; + } + + public URL getHttpAddress() { + return this.httpAddress; + } + + public Configuration getConfiguration() { + return this.conf; + } + + public void setIpcAddress(InetSocketAddress ipc) { + this.ipcAddress = ipc; + } + + @Override + public String toString() { + return "RemoteNameNodeInfo [nnId=" + nnId + ", ipcAddress=" + ipcAddress + + ", httpAddress=" + httpAddress + "]"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RemoteNameNodeInfo that = (RemoteNameNodeInfo) o; + + if (!nnId.equals(that.nnId)) return false; + if (!ipcAddress.equals(that.ipcAddress)) return false; + // convert to the standard strings since URL.equals does address resolution, which is a + // blocking call and a a FindBugs issue. + String httpString = httpAddress.toString(); + String thatHttpString = that.httpAddress.toString(); + return httpString.equals(thatHttpString); + + } + + @Override + public int hashCode() { + int result = nnId.hashCode(); + result = 31 * result + ipcAddress.hashCode(); + // toString rather than hashCode b/c Url.hashCode is a blocking call. + result = 31 * result + httpAddress.toString().hashCode(); + return result; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index 1e40368..f5ecbec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -23,12 +23,10 @@ import java.io.IOException; import java.net.URI; import java.net.URL; import java.security.PrivilegedAction; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +43,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.hdfs.util.Canceler; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -68,12 +67,13 @@ public class StandbyCheckpointer { private long lastCheckpointTime; private final CheckpointerThread thread; private final ThreadFactory uploadThreadFactory; - private URL activeNNAddress; + private List activeNNAddresses; private URL myNNAddress; private final Object cancelLock = new Object(); private Canceler canceler; - + private boolean isPrimaryCheckPointer = true; + // Keep track of how many checkpoints were canceled. // This is for use in tests. private static int canceledCount = 0; @@ -100,14 +100,21 @@ public class StandbyCheckpointer { myNNAddress = getHttpAddress(conf); // Look up the active node's address - Configuration confForActive = HAUtil.getConfForOtherNode(conf); - activeNNAddress = getHttpAddress(confForActive); - + List confForActive = HAUtil.getConfForOtherNodes(conf); + activeNNAddresses = new ArrayList(confForActive.size()); + for (Configuration activeConf : confForActive) { + URL activeNNAddress = getHttpAddress(activeConf); + + // sanity check each possible active NN + Preconditions.checkArgument(checkAddress(activeNNAddress), + "Bad address for active NN: %s", activeNNAddress); + + activeNNAddresses.add(activeNNAddress); + } + // Sanity-check. - Preconditions.checkArgument(checkAddress(activeNNAddress), - "Bad address for active NN: %s", activeNNAddress); - Preconditions.checkArgument(checkAddress(myNNAddress), - "Bad address for standby NN: %s", myNNAddress); + Preconditions.checkArgument(checkAddress(myNNAddress), "Bad address for standby NN: %s", + myNNAddress); } private URL getHttpAddress(Configuration conf) throws IOException { @@ -127,7 +134,7 @@ public class StandbyCheckpointer { public void start() { LOG.info("Starting standby checkpoint thread...\n" + - "Checkpointing active NN at " + activeNNAddress + "\n" + + "Checkpointing active NN to possible NNs: " + activeNNAddresses + "\n" + "Serving checkpoints at " + myNNAddress); thread.start(); } @@ -148,11 +155,10 @@ public class StandbyCheckpointer { thread.interrupt(); } - private void doCheckpoint() throws InterruptedException, IOException { + private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException { assert canceler != null; final long txid; final NameNodeFile imageType; - // Acquire cpLock to make sure no one is modifying the name system. // It does not need the full namesystem write lock, since the only thing // that modifies namesystem on standby node is edit log replaying. @@ -161,9 +167,9 @@ public class StandbyCheckpointer { assert namesystem.getEditLog().isOpenForRead() : "Standby Checkpointer should only attempt a checkpoint when " + "NN is in standby mode, but the edit logs are in an unexpected state"; - + FSImage img = namesystem.getFSImage(); - + long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId(); long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId(); assert thisCheckpointTxId >= prevCheckpointTxId; @@ -185,7 +191,7 @@ public class StandbyCheckpointer { img.saveNamespace(namesystem, imageType, canceler); txid = img.getStorage().getMostRecentCheckpointTxId(); assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" + - thisCheckpointTxId + " but instead saved at txid=" + txid; + thisCheckpointTxId + " but instead saved at txid=" + txid; // Save the legacy OIV image, if the output dir is defined. String outputDir = checkpointConf.getLegacyOivImageDir(); @@ -195,31 +201,85 @@ public class StandbyCheckpointer { } finally { namesystem.cpUnlock(); } - + + //early exit if we shouldn't actually send the checkpoint to the ANN + if(!sendCheckpoint){ + return; + } + // Upload the saved checkpoint back to the active - // Do this in a separate thread to avoid blocking transition to active + // Do this in a separate thread to avoid blocking transition to active, but don't allow more + // than the expected number of tasks to run or queue up // See HDFS-4816 - ExecutorService executor = - Executors.newSingleThreadExecutor(uploadThreadFactory); - Future upload = executor.submit(new Callable() { - @Override - public Void call() throws IOException { - TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, - namesystem.getFSImage().getStorage(), imageType, txid, canceler); - return null; + ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue(activeNNAddresses.size()), + uploadThreadFactory); + // for right now, just match the upload to the nn address by convention. There is no need to + // directly tie them together by adding a pair class. + List> uploads = + new ArrayList>(); + for (final URL activeNNAddress : activeNNAddresses) { + Future upload = + executor.submit(new Callable() { + @Override + public TransferFsImage.TransferResult call() throws IOException { + return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem + .getFSImage().getStorage(), imageType, txid, canceler); + } + }); + uploads.add(upload); + } + InterruptedException ie = null; + IOException ioe= null; + int i = 0; + boolean success = false; + for (; i < uploads.size(); i++) { + Future upload = uploads.get(i); + try { + // TODO should there be some smarts here about retries nodes that are not the active NN? + if (upload.get() == TransferFsImage.TransferResult.SUCCESS) { + success = true; + //avoid getting the rest of the results - we don't care since we had a successful upload + break; + } + + } catch (ExecutionException e) { + ioe = new IOException("Exception during image upload: " + e.getMessage(), + e.getCause()); + break; + } catch (InterruptedException e) { + ie = e; + break; + } + } + + // we are primary if we successfully updated the ANN + this.isPrimaryCheckPointer = success; + + // cleaner than copying code for multiple catch statements and better than catching all + // exceptions, so we just handle the ones we expect. + if (ie != null || ioe != null) { + + // cancel the rest of the tasks, and close the pool + for (; i < uploads.size(); i++) { + Future upload = uploads.get(i); + // The background thread may be blocked waiting in the throttler, so + // interrupt it. + upload.cancel(true); + } + + // shutdown so we interrupt anything running and don't start anything new + executor.shutdownNow(); + // this is a good bit longer than the thread timeout, just to make sure all the threads + // that are not doing any work also stop + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + + // re-throw the exception we got, since one of these two must be non-null + if (ie != null) { + throw ie; + } else if (ioe != null) { + throw ioe; } - }); - executor.shutdown(); - try { - upload.get(); - } catch (InterruptedException e) { - // The background thread may be blocked waiting in the throttler, so - // interrupt it. - upload.cancel(true); - throw e; - } catch (ExecutionException e) { - throw new IOException("Exception during image upload: " + e.getMessage(), - e.getCause()); } } @@ -322,8 +382,10 @@ public class StandbyCheckpointer { final long now = monotonicNow(); final long uncheckpointed = countUncheckpointedTxns(); final long secsSinceLast = (now - lastCheckpointTime) / 1000; - + + // if we need a rollback checkpoint, always attempt to checkpoint boolean needCheckpoint = needRollbackCheckpoint; + if (needCheckpoint) { LOG.info("Triggering a rollback fsimage for rolling upgrade."); } else if (uncheckpointed >= checkpointConf.getTxnCount()) { @@ -338,19 +400,23 @@ public class StandbyCheckpointer { "exceeds the configured interval " + checkpointConf.getPeriod()); needCheckpoint = true; } - - synchronized (cancelLock) { - if (now < preventCheckpointsUntil) { - LOG.info("But skipping this checkpoint since we are about to failover!"); - canceledCount++; - continue; - } - assert canceler == null; - canceler = new Canceler(); - } - + if (needCheckpoint) { - doCheckpoint(); + synchronized (cancelLock) { + if (now < preventCheckpointsUntil) { + LOG.info("But skipping this checkpoint since we are about to failover!"); + canceledCount++; + continue; + } + assert canceler == null; + canceler = new Canceler(); + } + + // on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a + // rollback request, are the checkpointer, are outside the quiet period. + boolean sendRequest = isPrimaryCheckPointer || secsSinceLast >= checkpointConf.getQuietPeriod(); + doCheckpoint(sendRequest); + // reset needRollbackCheckpoint to false only when we finish a ckpt // for rollback image if (needRollbackCheckpoint @@ -379,7 +445,7 @@ public class StandbyCheckpointer { } @VisibleForTesting - URL getActiveNNAddress() { - return activeNNAddress; + List getActiveNNAddresses() { + return activeNNAddresses; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java index f125a27..24e5bef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java @@ -25,6 +25,8 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.URL; +import java.util.ArrayList; +import java.util.List; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; @@ -261,4 +263,15 @@ public class DFSZKFailoverController extends ZKFailoverController { return isThreadDumpCaptured; } + @Override + public List getAllOtherNodes() { + String nsId = DFSUtil.getNamenodeNameServiceId(conf); + List otherNn = HAUtil.getNameNodeIdOfOtherNodes(conf, nsId); + + List targets = new ArrayList(otherNn.size()); + for (String nnId : otherNn) { + targets.add(new NNHAServiceTarget(conf, nsId, nnId)); + } + return targets; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ea7d3e/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index aaa1c2f..76161a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -914,6 +914,18 @@ + dfs.namenode.checkpoint.check.quiet-multiplier + 1.5 + + Used to calculate the amount of time between retries when in the 'quiet' period + for creating checkpoints (active namenode already has an up-to-date image from another + checkpointer), so we wait a multiplier of the dfs.namenode.checkpoint.check.period before + retrying the checkpoint because another node likely is already managing the checkpoints, + allowing us to save bandwidth to transfer checkpoints that don't need to be used. + + + + dfs.namenode.num.checkpoints.retained 2 The number of image checkpoint files (fsimage_*) that will be retained by @@ -1288,6 +1300,14 @@ + dfs.ha.tail-edits.namenode-retries + 3 + + Number of retries to use when contacting the namenode when tailing the log. + + + + dfs.ha.automatic-failover.enabled false