hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [1/2] hadoop git commit: HDFS-11119. Support for parallel checking of StorageLocations on DataNode startup.
Date Fri, 06 Jan 2017 04:58:59 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 d1aa844dc -> fd3b1ca26


HDFS-11119. Support for parallel checking of StorageLocations on DataNode startup.

Change-Id: Iddedbeefd056af165be3557ccad199fe2878b591


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/76f1ab52
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/76f1ab52
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/76f1ab52

Branch: refs/heads/branch-2
Commit: 76f1ab524c6335d4bb9366f86f32492e4ef53a80
Parents: d1aa844
Author: Arpit Agarwal <arp@apache.org>
Authored: Fri Nov 11 15:02:52 2016 -0800
Committer: Arpit Agarwal <arp@apache.org>
Committed: Thu Jan 5 20:49:30 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  11 +
 .../hdfs/server/datanode/StorageLocation.java   |  36 ++-
 .../checker/StorageLocationChecker.java         | 207 ++++++++++++++++++
 .../datanode/checker/VolumeCheckResult.java     |  43 ++++
 .../src/main/resources/hdfs-default.xml         |  24 ++
 .../checker/TestStorageLocationChecker.java     | 217 +++++++++++++++++++
 6 files changed, 537 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/76f1ab52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index c6057a5..de15ef3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -500,6 +500,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
   public static final String  DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
   public static final int     DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
+
+  public static final String DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY =
+      "dfs.datanode.disk.check.min.gap";
+  public static final long DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT =
+      900000; // 15 minutes.
+
+  public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY =
+      "dfs.datanode.disk.check.timeout";
+  public static final long DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT =
+      600000; // 10 minutes.
+
   public static final String
       DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
       "dfs.datanode.directoryscan.throttle.limit.ms.per.sec";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76f1ab52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
index 42e443b..7a91a82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
@@ -26,8 +26,18 @@ import java.net.URI;
 import java.util.regex.Matcher;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -37,7 +47,8 @@ import org.apache.hadoop.util.StringUtils;
  *
  */
 @InterfaceAudience.Private
-public class StorageLocation {
+public class StorageLocation
+    implements Checkable<StorageLocation.CheckContext, VolumeCheckResult> {
   final StorageType storageType;
   final File file;
 
@@ -116,4 +127,27 @@ public class StorageLocation {
   public int hashCode() {
     return toString().hashCode();
   }
+
+  @Override  // Checkable
+  public VolumeCheckResult check(CheckContext context) throws IOException {
+    DiskChecker.checkDir(
+        context.localFileSystem,
+        new Path(file.toURI()),
+        context.expectedPermission);
+    return VolumeCheckResult.HEALTHY;
+  }
+
+  /**
+   * Class to hold the parameters for running a {@link #check}.
+   */
+  public static final class CheckContext {
+    private final LocalFileSystem localFileSystem;
+    private final FsPermission expectedPermission;
+
+    public CheckContext(LocalFileSystem localFileSystem,
+                        FsPermission expectedPermission) {
+      this.localFileSystem = localFileSystem;
+      this.expectedPermission = expectedPermission;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76f1ab52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
new file mode 100644
index 0000000..4209737
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation.CheckContext;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A utility class that encapsulates checking storage locations during DataNode
+ * startup.
+ *
+ * Some of this code was extracted from the DataNode class.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class StorageLocationChecker {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      StorageLocationChecker.class);
+  private final AsyncChecker<CheckContext, VolumeCheckResult> delegateChecker;
+  private final Timer timer;
+
+  /**
+   * Max allowed time for a disk check in milliseconds. If the check
+   * doesn't complete within this time we declare the disk as dead.
+   */
+  private final long maxAllowedTimeForCheckMs;
+
+
+  /**
+   * Expected filesystem permissions on the storage directory.
+   */
+  private final FsPermission expectedPermission;
+
+  /**
+   * Maximum number of volume failures that can be tolerated without
+   * declaring a fatal error.
+   */
+  private final int maxVolumeFailuresTolerated;
+
+  public StorageLocationChecker(Configuration conf, Timer timer) {
+    maxAllowedTimeForCheckMs = conf.getTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    expectedPermission = new FsPermission(
+        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
+            DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
+
+    maxVolumeFailuresTolerated = conf.getInt(
+        DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
+        DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
+
+    this.timer = timer;
+
+    delegateChecker = new ThrottledAsyncChecker<>(
+        timer,
+        conf.getTimeDuration(
+            DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+            DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
+            TimeUnit.MILLISECONDS),
+        Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder()
+                .setNameFormat("StorageLocationChecker thread %d")
+                .setDaemon(true)
+                .build()));
+  }
+
+  /**
+   * Initiate a check of the supplied storage volumes and return
+   * a list of failed volumes.
+   *
+   * @param conf HDFS configuration.
+   * @param dataDirs list of volumes to check.
+   * @return returns a list of failed volumes. Returns the empty list if
+   *         there are no failed volumes.
+   *
+   * @throws InterruptedException if the check was interrupted.
+   * @throws IOException if the number of failed volumes exceeds the
+   *                     maximum allowed or if there are no good
+   *                     volumes.
+   */
+  public List<StorageLocation> check(
+      final Configuration conf,
+      final Collection<StorageLocation> dataDirs)
+      throws InterruptedException, IOException {
+
+    final ArrayList<StorageLocation> goodLocations = new ArrayList<>();
+    final Set<StorageLocation> failedLocations = new HashSet<>();
+    final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures =
+        Maps.newHashMap();
+    final LocalFileSystem localFS = FileSystem.getLocal(conf);
+    final CheckContext context = new CheckContext(localFS, expectedPermission);
+
+    // Start parallel disk check operations on all StorageLocations.
+    for (StorageLocation location : dataDirs) {
+      futures.put(location,
+          delegateChecker.schedule(location, context));
+    }
+
+    final long checkStartTimeMs = timer.monotonicNow();
+
+    // Retrieve the results of the disk checks.
+    for (Map.Entry<StorageLocation,
+             ListenableFuture<VolumeCheckResult>> entry : futures.entrySet()) {
+
+      // Determine how much time we can allow for this check to complete.
+      // The cumulative wait time cannot exceed maxAllowedTimeForCheck.
+      final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs);
+      final long timeLeftMs = Math.max(0,
+          maxAllowedTimeForCheckMs - waitSoFarMs);
+      final StorageLocation location = entry.getKey();
+
+      try {
+        final VolumeCheckResult result =
+            entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS);
+        switch (result) {
+        case HEALTHY:
+          goodLocations.add(entry.getKey());
+          break;
+        case DEGRADED:
+          LOG.warn("StorageLocation {} appears to be degraded.", location);
+          break;
+        case FAILED:
+          LOG.warn("StorageLocation {} detected as failed.", location);
+          failedLocations.add(location);
+          break;
+        default:
+          LOG.error("Unexpected health check result {} for StorageLocation {}",
+              result, location);
+          goodLocations.add(entry.getKey());
+        }
+      } catch (ExecutionException|TimeoutException e) {
+        LOG.warn("Exception checking StorageLocation " + location,
+            e.getCause());
+        failedLocations.add(location);
+      }
+    }
+
+    if (failedLocations.size() > maxVolumeFailuresTolerated) {
+      throw new IOException(
+          "Too many failed volumes: " + failedLocations.size() +
+          ". The configuration allows for a maximum of " +
+          maxVolumeFailuresTolerated + " failed volumes.");
+    }
+
+    if (goodLocations.size() == 0) {
+      throw new IOException("All directories in "
+          + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
+          + failedLocations);
+    }
+
+    return goodLocations;
+  }
+
+  public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
+    try {
+      delegateChecker.shutdownAndWait(gracePeriod, timeUnit);
+    } catch (InterruptedException e) {
+      LOG.warn("StorageLocationChecker interrupted during shutdown.");
+      Thread.currentThread().interrupt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76f1ab52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/VolumeCheckResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/VolumeCheckResult.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/VolumeCheckResult.java
new file mode 100644
index 0000000..65b14da
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/VolumeCheckResult.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Defines the outcomes of running a disk check operation against a
+ * volume.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public enum VolumeCheckResult {
+  HEALTHY(1),
+  DEGRADED(2),
+  FAILED(3);
+
+  private final int value;
+
+  VolumeCheckResult(int value) {
+    this.value = value;
+  }
+
+  int getValue() {
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76f1ab52/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 84d7ca5..30f9e2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4170,4 +4170,28 @@
     The size buffer to be used when creating or opening httpfs filesystem IO stream.
     </description>
   </property>
+
+  <property>
+    <name>dfs.datanode.disk.check.min.gap</name>
+    <value>15m</value>
+    <description>
+      The minimum gap between two successive checks of the same DataNode
+      volume. This setting supports multiple time unit suffixes as described
+      in dfs.heartbeat.interval. If no suffix is specified then milliseconds
+      is assumed.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.datanode.disk.check.timeout</name>
+    <value>10m</value>
+    <description>
+      Maximum allowed time for a disk check to complete. If the check does
+      not complete within this time interval then the disk is declared as
+      failed. This setting supports multiple time unit suffixes as described
+      in dfs.heartbeat.interval. If no suffix is specified then milliseconds
+      is assumed.
+    </description>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76f1ab52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestStorageLocationChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestStorageLocationChecker.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestStorageLocationChecker.java
new file mode 100644
index 0000000..bf885be
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestStorageLocationChecker.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY;
+import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.*;
+
+/**
+ * Unit tests for the {@link StorageLocationChecker} class.
+ */
+public class TestStorageLocationChecker {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestStorageLocationChecker.class);
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  /**
+   * Verify that all healthy locations are correctly handled and that the
+   * check routine is invoked as expected.
+   * @throws Exception
+   */
+  @Test(timeout=30000)
+  public void testAllLocationsHealthy() throws Exception {
+    final List<StorageLocation> locations =
+        makeMockLocations(HEALTHY, HEALTHY, HEALTHY);
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
+    StorageLocationChecker checker =
+        new StorageLocationChecker(conf, new FakeTimer());
+    List<StorageLocation> filteredLocations = checker.check(conf, locations);
+
+    // All locations should be healthy.
+    assertThat(filteredLocations.size(), is(3));
+
+    // Ensure that the check method was invoked for each location.
+    for (StorageLocation location : locations) {
+      verify(location).check(any(StorageLocation.CheckContext.class));
+    }
+  }
+
+  /**
+   * Test handling when the number of failed locations is below the
+   * max volume failure threshold.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=30000)
+  public void testFailedLocationsBelowThreshold() throws Exception {
+    final List<StorageLocation> locations =
+        makeMockLocations(HEALTHY, HEALTHY, FAILED); // 2 healthy, 1 failed.
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    StorageLocationChecker checker =
+        new StorageLocationChecker(conf, new FakeTimer());
+    List<StorageLocation> filteredLocations = checker.check(conf, locations);
+    assertThat(filteredLocations.size(), is(2));
+  }
+
+  /**
+   * Test handling when the number of failed locations is above the
+   * max volume failure threshold.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=30000)
+  public void testFailedLocationsAboveThreshold() throws Exception {
+    final List<StorageLocation> locations =
+        makeMockLocations(HEALTHY, FAILED, FAILED); // 1 healthy, 2 failed.
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Too many failed volumes");
+    StorageLocationChecker checker =
+        new StorageLocationChecker(conf, new FakeTimer());
+    checker.check(conf, locations);
+  }
+
+  /**
+   * Test handling all storage locations are failed.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=30000)
+  public void testAllFailedLocations() throws Exception {
+    final List<StorageLocation> locations =
+        makeMockLocations(FAILED, FAILED, FAILED);
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 3);
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage("All directories in " + DFS_DATANODE_DATA_DIR_KEY +
+        " are invalid");
+    StorageLocationChecker checker =
+        new StorageLocationChecker(conf, new FakeTimer());
+    checker.check(conf, locations);
+  }
+
+  /**
+   * Verify that a {@link StorageLocation#check} timeout is correctly detected
+   * as a failure.
+   *
+   * This is hard to test without a {@link Thread#sleep} call.
+   *
+   * @throws Exception
+   */
+  @Test (timeout=300000)
+  public void testTimeoutInCheck() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setTimeDuration(DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+        1, TimeUnit.SECONDS);
+    conf.setInt(DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    final FakeTimer timer = new FakeTimer();
+
+    // Generate a list of storage locations the first of which sleeps
+    // for 2 seconds in its check() routine.
+    final List<StorageLocation> locations = makeSlowLocations(2000, 1);
+    StorageLocationChecker checker =
+        new StorageLocationChecker(conf, timer);
+
+    try {
+      // Check the two locations and ensure that only one of them
+      // was filtered out.
+      List<StorageLocation> filteredList = checker.check(conf, locations);
+      assertThat(filteredList.size(), is(1));
+    } finally {
+      checker.shutdownAndWait(10, TimeUnit.SECONDS);
+    }
+  }
+
+  /**
+   * Return a list of storage locations - one per argument - which return
+   * health check results corresponding to the supplied arguments.
+   */
+  private List<StorageLocation> makeMockLocations(VolumeCheckResult... args)
+      throws IOException {
+    final List<StorageLocation> locations = new ArrayList<>(args.length);
+    final AtomicInteger index = new AtomicInteger(0);
+
+    for (VolumeCheckResult result : args) {
+      final StorageLocation location = mock(StorageLocation.class);
+      when(location.toString()).thenReturn("/" + index.incrementAndGet());
+      when(location.check(any(StorageLocation.CheckContext.class)))
+          .thenReturn(result);
+      locations.add(location);
+    }
+
+    return locations;
+  }
+
+  /**
+   * Return a list of storage locations - one per argument - whose check()
+   * method takes at least the specified number of milliseconds to complete.
+   */
+  private List<StorageLocation> makeSlowLocations(long... args)
+      throws IOException {
+    final List<StorageLocation> locations = new ArrayList<>(args.length);
+    final AtomicInteger index = new AtomicInteger(0);
+
+    for (final long checkDelayMs: args) {
+      final StorageLocation location = mock(StorageLocation.class);
+      when(location.toString()).thenReturn("/" + index.incrementAndGet());
+      when(location.check(any(StorageLocation.CheckContext.class)))
+          .thenAnswer(new Answer<VolumeCheckResult>() {
+            @Override
+            public VolumeCheckResult answer(InvocationOnMock invocation)
+                throws Throwable {
+              Thread.sleep(checkDelayMs);
+              return VolumeCheckResult.HEALTHY;
+            }
+          });
+
+      locations.add(location);
+    }
+    return locations;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message