From common-commits-return-97779-apmail-hadoop-common-commits-archive=hadoop.apache.org@hadoop.apache.org Fri Jan 3 16:57:29 2020 Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id 17685190BA for ; Fri, 3 Jan 2020 16:57:29 +0000 (UTC) Received: (qmail 77548 invoked by uid 500); 3 Jan 2020 16:57:28 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 77516 invoked by uid 500); 3 Jan 2020 16:57:27 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 77507 invoked by uid 99); 3 Jan 2020 16:57:27 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jan 2020 16:57:27 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 697078D80D; Fri, 3 Jan 2020 16:57:27 +0000 (UTC) Date: Fri, 03 Jan 2020 16:57:27 +0000 To: "common-commits@hadoop.apache.org" Subject: [hadoop] branch trunk updated: HDFS-15068. DataNode could meet deadlock if invoke refreshVolumes when register. Contributed by Aiphago. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157807064687.23682.6413503681107013171@gitbox.apache.org> From: iwasakims@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: hadoop X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: b19d87c2b76e2fa0fc10474fefc616bd9626024d X-Git-Newrev: 037ec8cfb1406ea3a8225a1b6306c2e78440353b X-Git-Rev: 037ec8cfb1406ea3a8225a1b6306c2e78440353b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. iwasakims pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git The following commit(s) were added to refs/heads/trunk by this push: new 037ec8c HDFS-15068. DataNode could meet deadlock if invoke refreshVolumes when register. Contributed by Aiphago. 037ec8c is described below commit 037ec8cfb1406ea3a8225a1b6306c2e78440353b Author: Masatake Iwasaki AuthorDate: Sat Jan 4 01:55:27 2020 +0900 HDFS-15068. DataNode could meet deadlock if invoke refreshVolumes when register. Contributed by Aiphago. Signed-off-by: Masatake Iwasaki --- .../hdfs/server/datanode/BPOfferService.java | 2 +- .../hadoop/hdfs/server/datanode/DataNode.java | 147 +++++++++++---------- .../server/datanode/DataNodeFaultInjector.java | 5 + .../server/datanode/TestDataNodeVolumeFailure.java | 46 +++++++ 4 files changed, 127 insertions(+), 73 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 3233e2c..df793c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -421,7 +421,7 @@ class BPOfferService { reg.getStorageInfo().getClusterID(), "cluster ID"); } bpRegistration = reg; - + DataNodeFaultInjector.get().delayWhenOfferServiceHoldLock(); dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); // Add the initial block token secret keys to the DN's secret manager. if (dn.isBlockTokenEnabled) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 1360303..3b7b13d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -765,90 +765,93 @@ public class DataNode extends ReconfigurableBase * @throws IOException on error. If an IOException is thrown, some new volumes * may have been successfully added and removed. */ - private synchronized void refreshVolumes(String newVolumes) throws IOException { - Configuration conf = getConf(); - conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); - ExecutorService service = null; - int numOldDataDirs = dataDirs.size(); - ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes); - StringBuilder errorMessageBuilder = new StringBuilder(); - List effectiveVolumes = Lists.newArrayList(); - for (StorageLocation sl : changedVolumes.unchangedLocations) { - effectiveVolumes.add(sl.toString()); + private void refreshVolumes(String newVolumes) throws IOException { + // Add volumes for each Namespace + final List nsInfos = Lists.newArrayList(); + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { + nsInfos.add(bpos.getNamespaceInfo()); } - - try { - if (numOldDataDirs + getFSDataset().getNumFailedVolumes() - + changedVolumes.newLocations.size() - - changedVolumes.deactivateLocations.size() <= 0) { - throw new IOException("Attempt to remove all volumes."); + synchronized(this) { + Configuration conf = getConf(); + conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); + ExecutorService service = null; + int numOldDataDirs = dataDirs.size(); + ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes); + StringBuilder errorMessageBuilder = new StringBuilder(); + List effectiveVolumes = Lists.newArrayList(); + for (StorageLocation sl : changedVolumes.unchangedLocations) { + effectiveVolumes.add(sl.toString()); } - if (!changedVolumes.newLocations.isEmpty()) { - LOG.info("Adding new volumes: {}", - Joiner.on(",").join(changedVolumes.newLocations)); - - // Add volumes for each Namespace - final List nsInfos = Lists.newArrayList(); - for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { - nsInfos.add(bpos.getNamespaceInfo()); + + try { + if (numOldDataDirs + getFSDataset().getNumFailedVolumes() + + changedVolumes.newLocations.size() + - changedVolumes.deactivateLocations.size() <= 0) { + throw new IOException("Attempt to remove all volumes."); } - service = Executors - .newFixedThreadPool(changedVolumes.newLocations.size()); - List> exceptions = Lists.newArrayList(); - for (final StorageLocation location : changedVolumes.newLocations) { - exceptions.add(service.submit(new Callable() { - @Override - public IOException call() { - try { - data.addVolume(location, nsInfos); - } catch (IOException e) { - return e; + if (!changedVolumes.newLocations.isEmpty()) { + LOG.info("Adding new volumes: {}", + Joiner.on(",").join(changedVolumes.newLocations)); + + service = Executors + .newFixedThreadPool(changedVolumes.newLocations.size()); + List> exceptions = Lists.newArrayList(); + + for (final StorageLocation location : changedVolumes.newLocations) { + exceptions.add(service.submit(new Callable() { + @Override + public IOException call() { + try { + data.addVolume(location, nsInfos); + } catch (IOException e) { + return e; + } + return null; } - return null; - } - })); - } + })); + } - for (int i = 0; i < changedVolumes.newLocations.size(); i++) { - StorageLocation volume = changedVolumes.newLocations.get(i); - Future ioExceptionFuture = exceptions.get(i); - try { - IOException ioe = ioExceptionFuture.get(); - if (ioe != null) { + for (int i = 0; i < changedVolumes.newLocations.size(); i++) { + StorageLocation volume = changedVolumes.newLocations.get(i); + Future ioExceptionFuture = exceptions.get(i); + try { + IOException ioe = ioExceptionFuture.get(); + if (ioe != null) { + errorMessageBuilder.append( + String.format("FAILED TO ADD: %s: %s%n", + volume, ioe.getMessage())); + LOG.error("Failed to add volume: {}", volume, ioe); + } else { + effectiveVolumes.add(volume.toString()); + LOG.info("Successfully added volume: {}", volume); + } + } catch (Exception e) { errorMessageBuilder.append( - String.format("FAILED TO ADD: %s: %s%n", - volume, ioe.getMessage())); - LOG.error("Failed to add volume: {}", volume, ioe); - } else { - effectiveVolumes.add(volume.toString()); - LOG.info("Successfully added volume: {}", volume); + String.format("FAILED to ADD: %s: %s%n", volume, + e.toString())); + LOG.error("Failed to add volume: {}", volume, e); } - } catch (Exception e) { - errorMessageBuilder.append( - String.format("FAILED to ADD: %s: %s%n", volume, - e.toString())); - LOG.error("Failed to add volume: {}", volume, e); } } - } - try { - removeVolumes(changedVolumes.deactivateLocations); - } catch (IOException e) { - errorMessageBuilder.append(e.getMessage()); - LOG.error("Failed to remove volume", e); - } + try { + removeVolumes(changedVolumes.deactivateLocations); + } catch (IOException e) { + errorMessageBuilder.append(e.getMessage()); + LOG.error("Failed to remove volume", e); + } - if (errorMessageBuilder.length() > 0) { - throw new IOException(errorMessageBuilder.toString()); - } - } finally { - if (service != null) { - service.shutdown(); + if (errorMessageBuilder.length() > 0) { + throw new IOException(errorMessageBuilder.toString()); + } + } finally { + if (service != null) { + service.shutdown(); + } + conf.set(DFS_DATANODE_DATA_DIR_KEY, + Joiner.on(",").join(effectiveVolumes)); + dataDirs = getStorageLocations(conf); } - conf.set(DFS_DATANODE_DATA_DIR_KEY, - Joiner.on(",").join(effectiveVolumes)); - dataDirs = getStorageLocations(conf); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 1dd779e..7e66111 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -95,4 +95,9 @@ public class DataNodeFaultInjector { * process. */ public void stripedBlockReconstruction() throws IOException {} + + /** + * Used as a hook to inject intercept when BPOfferService hold lock. + */ + public void delayWhenOfferServiceHoldLock() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 13e2255..7ad012b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -36,6 +36,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -72,6 +73,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.io.IOUtils; @@ -406,6 +408,50 @@ public class TestDataNodeVolumeFailure { } /** + * Test {@link DataNode#refreshVolumes(String)} not deadLock with + * {@link BPOfferService#registrationSucceeded(BPServiceActor, + * DatanodeRegistration)}. + */ + @Test(timeout=10000) + public void testRefreshDeadLock() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + public void delayWhenOfferServiceHoldLock() { + try { + latch.await(); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + DataNode dn = cluster.getDataNodes().get(0); + File volume = cluster.getInstanceStorageDir(0, 0); + String dataDirs = volume.getPath(); + List allBpOs = dn.getAllBpOs(); + BPOfferService service = allBpOs.get(0); + BPServiceActor actor = service.getBPServiceActors().get(0); + DatanodeRegistration bpRegistration = actor.getBpRegistration(); + + Thread register = new Thread(() -> { + try { + service.registrationSucceeded(actor, bpRegistration); + } catch (IOException e) { + e.printStackTrace(); + } + }); + + register.start(); + String newdir = dataDirs + "tmp"; + // Make sure service have get writelock + latch.countDown(); + String result = dn.reconfigurePropertyImpl( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newdir); + assertNotNull(result); + } + + /** * Test changing the number of volumes does not impact the disk failure * tolerance. */ --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org