hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rake...@apache.org
Subject [10/50] [abbrv] hadoop git commit: HDFS-11029. [SPS]:Provide retry mechanism for the blocks which were failed while moving its storage at DNs. Contributed by Uma Maheswara Rao G
Date Wed, 08 Nov 2017 13:18:36 GMT
HDFS-11029. [SPS]:Provide retry mechanism for the blocks which were failed while moving its
storage at DNs. Contributed by Uma Maheswara Rao G


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/afbadb2f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/afbadb2f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/afbadb2f

Branch: refs/heads/HDFS-10285
Commit: afbadb2f90cdda4c051b058d91131acc9e620b9a
Parents: 8b3b54e
Author: Rakesh Radhakrishnan <rakeshr@apache.org>
Authored: Thu Nov 10 10:09:45 2016 +0530
Committer: Rakesh Radhakrishnan <rakeshr@apache.org>
Committed: Wed Nov 8 14:04:16 2017 +0530

----------------------------------------------------------------------
 .../BlockStorageMovementAttemptedItems.java     | 221 +++++++++++++++++++
 .../server/namenode/StoragePolicySatisfier.java |  24 +-
 .../TestBlockStorageMovementAttemptedItems.java | 101 +++++++++
 .../namenode/TestStoragePolicySatisfier.java    |   8 +-
 4 files changed, 343 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/afbadb2f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
new file mode 100644
index 0000000..580d0d6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A monitor class for checking whether block storage movements finished or not.
+ * If block storage movement results from datanode indicates about the movement
+ * success, then it will just remove the entries from tracking. If it reports
+ * failure, then it will add back to needed block storage movements list. If no
+ * DN reports about movement for longer time, then such items will be retries
+ * automatically after timeout. The default timeout would be 30mins.
+ */
+public class BlockStorageMovementAttemptedItems {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
+  // A map holds the items which are already taken for blocks movements
+  // processing and sent to DNs.
+  private final Map<Long, Long> storageMovementAttemptedItems;
+  private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
+  private volatile boolean spsRunning = true;
+  private Daemon timerThread = null;
+  //
+  // It might take anywhere between 30 to 60 minutes before
+  // a request is timed out.
+  //
+  private long selfRetryTimeout = 30 * 60 * 1000;
+
+  //
+  // It might take anywhere between 5 to 10 minutes before
+  // a request is timed out.
+  //
+  private long checkTimeout = 5 * 60 * 1000; // minimum value
+  private BlockStorageMovementNeeded blockStorageMovementNeeded;
+
+  public BlockStorageMovementAttemptedItems(long timeoutPeriod,
+      long selfRetryTimeout,
+      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
+    if (timeoutPeriod > 0) {
+      this.checkTimeout = Math.min(checkTimeout, timeoutPeriod);
+    }
+
+    this.selfRetryTimeout = selfRetryTimeout;
+    this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
+    storageMovementAttemptedItems = new HashMap<>();
+    storageMovementAttemptedResults = new ArrayList<>();
+  }
+
+  /**
+   * Add item to block storage movement attempted items map which holds the
+   * tracking/blockCollection id versus time stamp.
+   *
+   * @param blockCollectionID
+   *          - tracking id / block collection id
+   */
+  public void add(Long blockCollectionID) {
+    synchronized (storageMovementAttemptedItems) {
+      storageMovementAttemptedItems.put(blockCollectionID, monotonicNow());
+    }
+  }
+
+  /**
+   * Add the trackIDBlocksStorageMovementResults to
+   * storageMovementAttemptedResults.
+   *
+   * @param blksMovementResults
+   */
+  public void addResults(BlocksStorageMovementResult[] blksMovementResults) {
+    if (blksMovementResults.length == 0) {
+      return;
+    }
+    synchronized (storageMovementAttemptedResults) {
+      storageMovementAttemptedResults
+          .addAll(Arrays.asList(blksMovementResults));
+    }
+  }
+
+  /**
+   * Starts the monitor thread.
+   */
+  void start() {
+    timerThread = new Daemon(new BlocksStorageMovementAttemptResultMonitor());
+    timerThread.setName("BlocksStorageMovementAttemptResultMonitor");
+    timerThread.start();
+  }
+
+  /**
+   * Stops the monitor thread.
+   */
+  public void stop() {
+    spsRunning = false;
+  }
+
+  /**
+   * A monitor class for checking block storage movement result and long waiting
+   * items periodically.
+   */
+  private class BlocksStorageMovementAttemptResultMonitor implements Runnable {
+    @Override
+    public void run() {
+      while (spsRunning) {
+        try {
+          blockStorageMovementResultCheck();
+          blocksStorageMovementUnReportedItemsCheck();
+          Thread.sleep(checkTimeout);
+        } catch (InterruptedException ie) {
+          LOG.debug("BlocksStorageMovementAttemptResultMonitor thread "
+              + "is interrupted.", ie);
+        }
+      }
+    }
+
+    private void blocksStorageMovementUnReportedItemsCheck() {
+      synchronized (storageMovementAttemptedItems) {
+        Iterator<Entry<Long, Long>> iter =
+            storageMovementAttemptedItems.entrySet().iterator();
+        long now = monotonicNow();
+        while (iter.hasNext()) {
+          Entry<Long, Long> entry = iter.next();
+          if (now > entry.getValue() + selfRetryTimeout) {
+            Long blockCollectionID = entry.getKey();
+            synchronized (storageMovementAttemptedResults) {
+              boolean exist = isExistInResult(blockCollectionID);
+              if (!exist) {
+                blockStorageMovementNeeded.add(blockCollectionID);
+              } else {
+                LOG.info("Blocks storage movement results for the"
+                    + " tracking id : " + blockCollectionID
+                    + " is reported from one of the co-ordinating datanode."
+                    + " So, the result will be processed soon.");
+              }
+              iter.remove();
+            }
+          }
+        }
+
+      }
+    }
+
+    private boolean isExistInResult(Long blockCollectionID) {
+      Iterator<BlocksStorageMovementResult> iter =
+          storageMovementAttemptedResults.iterator();
+      while (iter.hasNext()) {
+        BlocksStorageMovementResult storageMovementAttemptedResult =
+            iter.next();
+        if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    private void blockStorageMovementResultCheck() {
+      synchronized (storageMovementAttemptedResults) {
+        Iterator<BlocksStorageMovementResult> iter =
+            storageMovementAttemptedResults.iterator();
+        while (iter.hasNext()) {
+          BlocksStorageMovementResult storageMovementAttemptedResult =
+              iter.next();
+          if (storageMovementAttemptedResult
+              .getStatus() == BlocksStorageMovementResult.Status.FAILURE) {
+            blockStorageMovementNeeded
+                .add(storageMovementAttemptedResult.getTrackId());
+            LOG.warn("Blocks storage movement results for the tracking id : "
+                + storageMovementAttemptedResult.getTrackId()
+                + " is reported from co-ordinating datanode, but result"
+                + " status is FAILURE. So, added for retry");
+          } else {
+            synchronized (storageMovementAttemptedItems) {
+              storageMovementAttemptedItems
+                  .remove(storageMovementAttemptedResult.getTrackId());
+            }
+            LOG.info("Blocks storage movement results for the tracking id : "
+                + storageMovementAttemptedResult.getTrackId()
+                + " is reported from co-ordinating datanode. "
+                + "The result status is SUCCESS.");
+          }
+          iter.remove(); // remove from results as processed above
+        }
+      }
+
+    }
+  }
+
+  @VisibleForTesting
+  public int resultsCount() {
+    return storageMovementAttemptedResults.size();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afbadb2f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index fbe686a..6fa9302 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -69,6 +69,7 @@ public class StoragePolicySatisfier implements Runnable {
   private final Namesystem namesystem;
   private final BlockManager blockManager;
   private final BlockStorageMovementNeeded storageMovementNeeded;
+  private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
 
   public StoragePolicySatisfier(final Namesystem namesystem,
       final BlockStorageMovementNeeded storageMovementNeeded,
@@ -76,15 +77,22 @@ public class StoragePolicySatisfier implements Runnable {
     this.namesystem = namesystem;
     this.storageMovementNeeded = storageMovementNeeded;
     this.blockManager = blkManager;
+    // TODO: below selfRetryTimeout and checkTimeout can be configurable later
+    // Now, the default values of selfRetryTimeout and checkTimeout are 30mins
+    // and 5mins respectively
+    this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
+        5 * 60 * 1000, 30 * 60 * 1000, storageMovementNeeded);
   }
 
   /**
-   * Start storage policy satisfier demon thread.
+   * Start storage policy satisfier demon thread. Also start block storage
+   * movements monitor for retry the attempts if needed.
    */
   public void start() {
     storagePolicySatisfierThread = new Daemon(this);
     storagePolicySatisfierThread.setName("StoragePolicySatisfier");
     storagePolicySatisfierThread.start();
+    this.storageMovementsMonitor.start();
   }
 
   /**
@@ -99,6 +107,7 @@ public class StoragePolicySatisfier implements Runnable {
       storagePolicySatisfierThread.join(3000);
     } catch (InterruptedException ie) {
     }
+    this.storageMovementsMonitor.stop();
   }
 
   @Override
@@ -108,6 +117,7 @@ public class StoragePolicySatisfier implements Runnable {
         Long blockCollectionID = storageMovementNeeded.get();
         if (blockCollectionID != null) {
           computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID);
+          this.storageMovementsMonitor.add(blockCollectionID);
         }
         // TODO: We can think to make this as configurable later, how frequently
         // we want to check block movements.
@@ -398,11 +408,6 @@ public class StoragePolicySatisfier implements Runnable {
     }
   }
 
-  // TODO: Temporarily keeping the results for assertion. This has to be
-  // revisited as part of HDFS-11029.
-  @VisibleForTesting
-  List<BlocksStorageMovementResult> results = new ArrayList<>();
-
   /**
    * Receives the movement results of collection of blocks associated to a
    * trackId.
@@ -415,6 +420,11 @@ public class StoragePolicySatisfier implements Runnable {
     if (blksMovementResults.length <= 0) {
       return;
     }
-    results.addAll(Arrays.asList(blksMovementResults));
+    storageMovementsMonitor.addResults(blksMovementResults);
+  }
+
+  @VisibleForTesting
+  BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
+    return storageMovementsMonitor;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afbadb2f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
new file mode 100644
index 0000000..8c70d99
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests that block storage movement attempt failures are reported from DN and
+ * processed them correctly or not.
+ */
+public class TestBlockStorageMovementAttemptedItems {
+
+  private BlockStorageMovementAttemptedItems bsmAttemptedItems = null;
+  private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null;
+
+  @Before
+  public void setup() {
+    unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
+    bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, 500,
+        unsatisfiedStorageMovementFiles);
+    bsmAttemptedItems.start();
+  }
+
+  @After
+  public void teardown() {
+    if (bsmAttemptedItems != null) {
+      bsmAttemptedItems.stop();
+    }
+  }
+
+  private boolean checkItemMovedForRetry(Long item, long retryTimeout)
+      throws InterruptedException {
+    long stopTime = monotonicNow() + (retryTimeout * 2);
+    boolean isItemFound = false;
+    while (monotonicNow() < (stopTime)) {
+      Long ele = null;
+      while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
+        if (item.longValue() == ele.longValue()) {
+          isItemFound = true;
+          break;
+        }
+      }
+      if (!isItemFound) {
+        Thread.sleep(100);
+      } else {
+        break;
+      }
+    }
+    return isItemFound;
+  }
+
+  @Test(timeout = 30000)
+  public void testAddResultWithFailureResult() throws Exception {
+    Long item = new Long(1234);
+    bsmAttemptedItems.add(item);
+    bsmAttemptedItems.addResults(
+        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+            item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
+    assertTrue(checkItemMovedForRetry(item, 200));
+  }
+
+  @Test(timeout = 30000)
+  public void testAddResultWithSucessResult() throws Exception {
+    Long item = new Long(1234);
+    bsmAttemptedItems.add(item);
+    bsmAttemptedItems.addResults(
+        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+            item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
+    assertFalse(checkItemMovedForRetry(item, 200));
+  }
+
+  @Test(timeout = 30000)
+  public void testNoResultAdded() throws Exception {
+    Long item = new Long(1234);
+    bsmAttemptedItems.add(item);
+    // After selfretry timeout, it should be added back for retry
+    assertTrue(checkItemMovedForRetry(item, 600));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/afbadb2f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index cbfdfc6..6f5c717 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -174,8 +174,6 @@ public class TestStoragePolicySatisfier {
       waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000);
       waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000);
 
-      // TODO: Temporarily using the results from StoragePolicySatisfier class.
-      // This has to be revisited as part of HDFS-11029.
       waitForBlocksMovementResult(1, 30000);
     } finally {
       hdfsCluster.shutdown();
@@ -190,8 +188,10 @@ public class TestStoragePolicySatisfier {
       @Override
       public Boolean get() {
         LOG.info("expectedResultsCount={} actualResultsCount={}",
-            expectedResultsCount, sps.results.size());
-        return expectedResultsCount == sps.results.size();
+            expectedResultsCount,
+            sps.getAttemptedItemsMonitor().resultsCount());
+        return expectedResultsCount == sps.getAttemptedItemsMonitor()
+            .resultsCount();
       }
     }, 100, timeout);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message