Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 325ED188FB for ; Fri, 14 Aug 2015 07:17:55 +0000 (UTC) Received: (qmail 67468 invoked by uid 500); 14 Aug 2015 07:17:49 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 67180 invoked by uid 500); 14 Aug 2015 07:17:49 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 65831 invoked by uid 99); 14 Aug 2015 07:17:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Aug 2015 07:17:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 43F8CE7155; Fri, 14 Aug 2015 07:17:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Fri, 14 Aug 2015 07:18:03 -0000 Message-Id: <1df20327b34543ba982e70b7627b9de1@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [17/43] hadoop git commit: HDFS-7009. Active NN and standby NN have different live nodes. Contributed by Ming Ma. HDFS-7009. Active NN and standby NN have different live nodes. Contributed by Ming Ma. (cherry picked from commit 769507bd7a501929d9a2fd56c72c3f50673488a4) (cherry picked from commit 657a6e389b3f6eae43efb11deb6253c3b1255a51) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d5ddc345 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d5ddc345 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d5ddc345 Branch: refs/heads/sjlee/hdfs-merge Commit: d5ddc3450f2f49ea411de590ff3de15b5ec4e17c Parents: 1faa44d Author: cnauroth Authored: Mon Feb 23 15:12:27 2015 -0800 Committer: Sangjin Lee Committed: Wed Aug 12 23:19:33 2015 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/hadoop/ipc/Client.java | 3 +- .../TestDatanodeProtocolRetryPolicy.java | 231 +++++++++++++++++++ 2 files changed, 233 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ddc345/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 96da01c..8a98eb0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -25,6 +25,7 @@ import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -279,7 +280,7 @@ public class Client { /** Check the rpc response header. */ void checkResponse(RpcResponseHeaderProto header) throws IOException { if (header == null) { - throw new IOException("Response is null."); + throw new EOFException("Response is null."); } if (header.hasClientId()) { // check client IDs http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ddc345/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java new file mode 100644 index 0000000..c7ed5b9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.util.ArrayList; + +import com.google.common.base.Supplier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * This tests DatanodeProtocol retry policy + */ +public class TestDatanodeProtocolRetryPolicy { + private static final Log LOG = LogFactory.getLog( + TestDatanodeProtocolRetryPolicy.class); + private static final String DATA_DIR = + MiniDFSCluster.getBaseDirectory() + "data"; + private DataNode dn; + private Configuration conf; + private boolean tearDownDone; + ArrayList locations = new ArrayList(); + private final static String CLUSTER_ID = "testClusterID"; + private final static String POOL_ID = "BP-TEST"; + private final static InetSocketAddress NN_ADDR = new InetSocketAddress( + "localhost", 5020); + private static DatanodeRegistration datanodeRegistration = + DFSTestUtil.getLocalDatanodeRegistration(); + + static { + ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL); + } + + /** + * Starts an instance of DataNode + * @throws IOException + */ + @Before + public void startUp() throws IOException, URISyntaxException { + tearDownDone = false; + conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR); + conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + FileSystem.setDefaultUri(conf, + "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort()); + File dataDir = new File(DATA_DIR); + FileUtil.fullyDelete(dataDir); + dataDir.mkdirs(); + StorageLocation location = StorageLocation.parse(dataDir.getPath()); + locations.add(location); + } + + /** + * Cleans the resources and closes the instance of datanode + * @throws IOException if an error occurred + */ + @After + public void tearDown() throws IOException { + if (!tearDownDone && dn != null) { + try { + dn.shutdown(); + } catch(Exception e) { + LOG.error("Cannot close: ", e); + } finally { + File dir = new File(DATA_DIR); + if (dir.exists()) + Assert.assertTrue( + "Cannot delete data-node dirs", FileUtil.fullyDelete(dir)); + } + tearDownDone = true; + } + } + + private void waitForBlockReport( + final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + Mockito.verify(mockNN).blockReport( + Mockito.eq(datanodeRegistration), + Mockito.eq(POOL_ID), + Mockito.anyObject()); + return true; + } catch (Throwable t) { + LOG.info("waiting on block report: " + t.getMessage()); + return false; + } + } + }, 500, 100000); + } + + /** + * Verify the following scenario. + * 1. The initial DatanodeProtocol.registerDatanode succeeds. + * 2. DN starts heartbeat process. + * 3. In the first heartbeat, NN asks DN to reregister. + * 4. DN calls DatanodeProtocol.registerDatanode. + * 5. DatanodeProtocol.registerDatanode throws EOFException. + * 6. DN retries. + * 7. DatanodeProtocol.registerDatanode succeeds. + */ + @Test(timeout = 60000) + public void testDatanodeRegistrationRetry() throws Exception { + final DatanodeProtocolClientSideTranslatorPB namenode = + mock(DatanodeProtocolClientSideTranslatorPB.class); + + Mockito.doAnswer(new Answer() { + int i = 0; + @Override + public DatanodeRegistration answer(InvocationOnMock invocation) + throws Throwable { + i++; + if ( i > 1 && i < 5) { + LOG.info("mockito exception " + i); + throw new EOFException("TestDatanodeProtocolRetryPolicy"); + } else { + DatanodeRegistration dr = + (DatanodeRegistration) invocation.getArguments()[0]; + datanodeRegistration.setDatanodeUuidForTesting(dr.getDatanodeUuid()); + LOG.info("mockito succeeded " + datanodeRegistration); + return datanodeRegistration; + } + } + }).when(namenode).registerDatanode( + Mockito.any(DatanodeRegistration.class)); + + when(namenode.versionRequest()).thenReturn( + new NamespaceInfo(1, CLUSTER_ID, POOL_ID, 1L)); + + Mockito.doAnswer(new Answer() { + int i = 0; + @Override + public HeartbeatResponse answer(InvocationOnMock invocation) + throws Throwable { + i++; + HeartbeatResponse heartbeatResponse; + if ( i == 1 ) { + LOG.info("mockito heartbeatResponse registration " + i); + heartbeatResponse = new HeartbeatResponse( + new DatanodeCommand[]{RegisterCommand.REGISTER}, + new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), + null); + } else { + LOG.info("mockito heartbeatResponse " + i); + heartbeatResponse = new HeartbeatResponse( + new DatanodeCommand[0], + new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), + null); + } + return heartbeatResponse; + } + }).when(namenode).sendHeartbeat( + Mockito.any(DatanodeRegistration.class), + Mockito.any(StorageReport[].class), + Mockito.anyLong(), + Mockito.anyLong(), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.anyInt()); + + dn = new DataNode(conf, locations, null) { + @Override + DatanodeProtocolClientSideTranslatorPB connectToNN( + InetSocketAddress nnAddr) throws IOException { + Assert.assertEquals(NN_ADDR, nnAddr); + return namenode; + } + }; + + // Trigger a heartbeat so that it acknowledges the NN as active. + dn.getAllBpOs()[0].triggerHeartbeatForTests(); + + waitForBlockReport(namenode); + } +}