hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject git commit: HDFS-6036. Forcibly timeout misbehaving DFSClients that try to do no-checksum reads that extend too long. (cmccabe) (cherry picked from commit cad14aa9168112ef1ceae80b94d9aae3ba293578)
Date Mon, 08 Sep 2014 20:03:10 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 d20047edd -> bdcf5e940


HDFS-6036. Forcibly timeout misbehaving DFSClients that try to do no-checksum reads that extend
too long.  (cmccabe)
(cherry picked from commit cad14aa9168112ef1ceae80b94d9aae3ba293578)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bdcf5e94
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bdcf5e94
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bdcf5e94

Branch: refs/heads/branch-2
Commit: bdcf5e940fa9e0ffd99c98031e9459f9554584ad
Parents: d20047e
Author: Colin Patrick Mccabe <cmccabe@cloudera.com>
Authored: Mon Sep 8 12:51:22 2014 -0700
Committer: Colin Patrick Mccabe <cmccabe@cloudera.com>
Committed: Mon Sep 8 13:02:54 2014 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   6 +
 .../server/datanode/ShortCircuitRegistry.java   |  22 ++-
 .../datanode/fsdataset/impl/FsDatasetCache.java | 109 +++++++++--
 .../src/main/resources/hdfs-default.xml         |  21 +++
 .../server/datanode/TestFsDatasetCache.java     |   2 +-
 .../datanode/TestFsDatasetCacheRevocation.java  | 187 +++++++++++++++++++
 7 files changed, 334 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdcf5e94/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 0999929..d5e496d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -193,6 +193,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6943. Improve NN allocateBlock log to include replicas' datanode IPs.
     (Ming Ma via wheat9)
 
+    HDFS-6036. Forcibly timeout misbehaving DFSClients that try to do
+    no-checksum reads that extend too long (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdcf5e94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5299d8e..7f823d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -249,6 +249,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_OOB_TIMEOUT_KEY = "dfs.datanode.oob.timeout-ms";
   public static final String  DFS_DATANODE_OOB_TIMEOUT_DEFAULT = "1500,0,0,0"; // OOB_TYPE1,
OOB_TYPE2, OOB_TYPE3, OOB_TYPE4
 
+  public static final String DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS = "dfs.datanode.cache.revocation.timeout.ms";
+  public static final long DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT = 900000L;
+
+  public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = "dfs.datanode.cache.revocation.polling.ms";
+  public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 500L;
+
   public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
   public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT
= true;
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdcf5e94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
index a252a17..ddde22d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
@@ -26,6 +26,7 @@ import java.io.Closeable;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
@@ -43,6 +44,7 @@ import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.DomainSocketWatcher;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 
@@ -83,11 +85,13 @@ public class ShortCircuitRegistry {
 
   private static class RegisteredShm extends ShortCircuitShm
       implements DomainSocketWatcher.Handler {
+    private final String clientName;
     private final ShortCircuitRegistry registry;
 
-    RegisteredShm(ShmId shmId, FileInputStream stream,
+    RegisteredShm(String clientName, ShmId shmId, FileInputStream stream,
         ShortCircuitRegistry registry) throws IOException {
       super(shmId, stream);
+      this.clientName = clientName;
       this.registry = registry;
     }
 
@@ -100,6 +104,10 @@ public class ShortCircuitRegistry {
       }
       return true;
     }
+
+    String getClientName() {
+      return clientName;
+    }
   }
 
   public synchronized void removeShm(ShortCircuitShm shm) {
@@ -243,6 +251,16 @@ public class ShortCircuitRegistry {
     }
   }
 
+  public synchronized String getClientNames(ExtendedBlockId blockId) {
+    if (!enabled) return "";
+    final HashSet<String> clientNames = new HashSet<String>();
+    final Set<Slot> affectedSlots = slots.get(blockId);
+    for (Slot slot : affectedSlots) {
+      clientNames.add(((RegisteredShm)slot.getShm()).getClientName());
+    }
+    return Joiner.on(",").join(clientNames);
+  }
+
   public static class NewShmInfo implements Closeable {
     public final ShmId shmId;
     public final FileInputStream stream;
@@ -290,7 +308,7 @@ public class ShortCircuitRegistry {
           shmId = ShmId.createRandom();
         } while (segments.containsKey(shmId));
         fis = shmFactory.createDescriptor(clientName, SHM_LENGTH);
-        shm = new RegisteredShm(shmId, fis, this);
+        shm = new RegisteredShm(clientName, shmId, fis, this);
       } finally {
         if (shm == null) {
           IOUtils.closeQuietly(fis);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdcf5e94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 060aed4..4acfc8f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT;
+
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -33,10 +38,12 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.time.DurationFormatUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
@@ -45,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -116,6 +124,12 @@ public class FsDatasetCache {
 
   private final ThreadPoolExecutor uncachingExecutor;
 
+  private final ScheduledThreadPoolExecutor deferredUncachingExecutor;
+
+  private final long revocationMs;
+
+  private final long revocationPollingMs;
+
   /**
    * The approximate amount of cache space in use.
    *
@@ -217,6 +231,24 @@ public class FsDatasetCache {
             new LinkedBlockingQueue<Runnable>(),
             workerFactory);
     this.uncachingExecutor.allowCoreThreadTimeOut(true);
+    this.deferredUncachingExecutor = new ScheduledThreadPoolExecutor(
+            1, workerFactory);
+    this.revocationMs = dataset.datanode.getConf().getLong(
+        DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
+        DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS_DEFAULT);
+    long confRevocationPollingMs = dataset.datanode.getConf().getLong(
+        DFS_DATANODE_CACHE_REVOCATION_POLLING_MS,
+        DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT);
+    long minRevocationPollingMs = revocationMs / 2;
+    if (minRevocationPollingMs < confRevocationPollingMs) {
+      throw new RuntimeException("configured value " +
+              confRevocationPollingMs + "for " +
+              DFS_DATANODE_CACHE_REVOCATION_POLLING_MS +
+              " is too high.  It must not be more than half of the " +
+              "value of " +  DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS +
+              ".  Reconfigure this to " + minRevocationPollingMs);
+    }
+    this.revocationPollingMs = confRevocationPollingMs;
   }
 
   /**
@@ -262,13 +294,11 @@ public class FsDatasetCache {
   synchronized void uncacheBlock(String bpid, long blockId) {
     ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
     Value prevValue = mappableBlockMap.get(key);
+    boolean deferred = false;
 
     if (!dataset.datanode.getShortCircuitRegistry().
             processBlockMunlockRequest(key)) {
-      // TODO: we probably want to forcibly uncache the block (and close the 
-      // shm) after a certain timeout has elapsed.
-      LOG.debug("{} is anchored, and can't be uncached now.", key);
-      return;
+      deferred = true;
     }
     if (prevValue == null) {
       LOG.debug("Block with id {}, pool {} does not need to be uncached, "
@@ -285,12 +315,19 @@ public class FsDatasetCache {
           new Value(prevValue.mappableBlock, State.CACHING_CANCELLED));
       break;
     case CACHED:
-      LOG.debug(
-          "Block with id {}, pool {} has been scheduled for uncaching" + ".",
-          blockId, bpid);
       mappableBlockMap.put(key,
           new Value(prevValue.mappableBlock, State.UNCACHING));
-      uncachingExecutor.execute(new UncachingTask(key));
+      if (deferred) {
+        LOG.debug("{} is anchored, and can't be uncached now.  Scheduling it " +
+            "for uncaching in {} ",
+            key, DurationFormatUtils.formatDurationHMS(revocationPollingMs));
+        deferredUncachingExecutor.schedule(
+            new UncachingTask(key, revocationMs),
+            revocationPollingMs, TimeUnit.MILLISECONDS);
+      } else {
+        LOG.debug("{} has been scheduled for immediate uncaching.", key);
+        uncachingExecutor.execute(new UncachingTask(key, 0));
+      }
       break;
     default:
       LOG.debug("Block with id {}, pool {} does not need to be uncached, "
@@ -403,22 +440,62 @@ public class FsDatasetCache {
 
   private class UncachingTask implements Runnable {
     private final ExtendedBlockId key; 
+    private final long revocationTimeMs;
 
-    UncachingTask(ExtendedBlockId key) {
+    UncachingTask(ExtendedBlockId key, long revocationDelayMs) {
       this.key = key;
+      if (revocationDelayMs == 0) {
+        this.revocationTimeMs = 0;
+      } else {
+        this.revocationTimeMs = revocationDelayMs + Time.monotonicNow();
+      }
+    }
+
+    private boolean shouldDefer() {
+      /* If revocationTimeMs == 0, this is an immediate uncache request.
+       * No clients were anchored at the time we made the request. */
+      if (revocationTimeMs == 0) {
+        return false;
+      }
+      /* Let's check if any clients still have this block anchored. */
+      boolean anchored =
+        !dataset.datanode.getShortCircuitRegistry().
+            processBlockMunlockRequest(key);
+      if (!anchored) {
+        LOG.debug("Uncaching {} now that it is no longer in use " +
+            "by any clients.", key);
+        return false;
+      }
+      long delta = revocationTimeMs - Time.monotonicNow();
+      if (delta < 0) {
+        LOG.warn("Forcibly uncaching {} after {} " +
+            "because client(s) {} refused to stop using it.", key,
+            DurationFormatUtils.formatDurationHMS(revocationTimeMs),
+            dataset.datanode.getShortCircuitRegistry().getClientNames(key));
+        return false;
+      }
+      LOG.info("Replica {} still can't be uncached because some " +
+          "clients continue to use it.  Will wait for {}", key,
+          DurationFormatUtils.formatDurationHMS(delta));
+      return true;
     }
 
     @Override
     public void run() {
       Value value;
-      
+
+      if (shouldDefer()) {
+        deferredUncachingExecutor.schedule(
+            this, revocationPollingMs, TimeUnit.MILLISECONDS);
+        return;
+      }
+
       synchronized (FsDatasetCache.this) {
         value = mappableBlockMap.get(key);
       }
       Preconditions.checkNotNull(value);
       Preconditions.checkArgument(value.state == State.UNCACHING);
-      // TODO: we will eventually need to do revocation here if any clients
-      // are reading via mmap with checksums enabled.  See HDFS-5182.
+
       IOUtils.closeQuietly(value.mappableBlock);
       synchronized (FsDatasetCache.this) {
         mappableBlockMap.remove(key);
@@ -427,7 +504,13 @@ public class FsDatasetCache {
           usedBytesCount.release(value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
       dataset.datanode.getMetrics().incrBlocksUncached(1);
-      LOG.debug("Uncaching of {} completed. usedBytes = {}", key, newUsedBytes);
+      if (revocationTimeMs != 0) {
+        LOG.debug("Uncaching of {} completed. usedBytes = {}",
+            key, newUsedBytes);
+      } else {
+        LOG.debug("Deferred uncaching of {} completed. usedBytes = {}",
+            key, newUsedBytes);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdcf5e94/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index fe41541..cff519a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2108,4 +2108,25 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.datanode.cache.revocation.timeout.ms</name>
+  <value>900000</value>
+  <description>When the DFSClient reads from a block file which the DataNode is
+    caching, the DFSClient can skip verifying checksums.  The DataNode will
+    keep the block file in cache until the client is done.  If the client takes
+    an unusually long time, though, the DataNode may need to evict the block
+    file from the cache anyway.  This value controls how long the DataNode will
+    wait for the client to release a replica that it is reading without
+    checksums.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.cache.revocation.polling.ms</name>
+  <value>500</value>
+  <description>How often the DataNode should poll to see if the clients have
+    stopped using a replica that the DataNode wants to uncache.
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdcf5e94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index d6e70d8..c049d81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -89,7 +89,7 @@ public class TestFsDatasetCache {
   private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
 
   // Most Linux installs allow a default of 64KB locked memory
-  private static final long CACHE_CAPACITY = 64 * 1024;
+  static final long CACHE_CAPACITY = 64 * 1024;
   // mlock always locks the entire page. So we don't need to deal with this
   // rounding, use the OS page size for the block size.
   private static final long PAGE_SIZE =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdcf5e94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
new file mode 100644
index 0000000..af28ed7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFsDatasetCacheRevocation {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestFsDatasetCacheRevocation.class);
+
+  private static CacheManipulator prevCacheManipulator;
+
+  private static TemporarySocketDirectory sockDir;
+
+  private static final int BLOCK_SIZE = 4096;
+
+  @Before
+  public void setUp() throws Exception {
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+    DomainSocket.disableBindPathValidation();
+    sockDir = new TemporarySocketDirectory();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    // Restore the original CacheManipulator
+    NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
+    sockDir.close();
+  }
+
+  private static Configuration getDefaultConf() {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(
+        DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 50);
+    conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 250);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        TestFsDatasetCache.CACHE_CAPACITY);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+      new File(sockDir.getDir(), "sock").getAbsolutePath());
+    return conf;
+  }
+
+  /**
+   * Test that when a client has a replica mmapped, we will not un-mlock that
+   * replica for a reasonable amount of time, even if an uncache request
+   * occurs.
+   */
+  @Test(timeout=120000)
+  public void testPinning() throws Exception {
+    Configuration conf = getDefaultConf();
+    // Set a really long revocation timeout, so that we won't reach it during
+    // this test.
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
+        1800000L);
+    // Poll very often
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
+    MiniDFSCluster cluster = null;
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+
+    // Create and cache a file.
+    final String TEST_FILE = "/test_file";
+    DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
+        BLOCK_SIZE, (short)1, 0xcafe);
+    dfs.addCachePool(new CachePoolInfo("pool"));
+    long cacheDirectiveId =
+      dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
+        setPool("pool").setPath(new Path(TEST_FILE)).
+          setReplication((short) 1).build());
+    FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+    DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
+
+    // Mmap the file.
+    FSDataInputStream in = dfs.open(new Path(TEST_FILE));
+    ByteBuffer buf =
+        in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
+
+    // Attempt to uncache file.  The file should still be cached.
+    dfs.removeCacheDirective(cacheDirectiveId);
+    Thread.sleep(500);
+    DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
+
+    // Un-mmap the file.  The file should be uncached after this.
+    in.releaseBuffer(buf);
+    DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
+
+    // Cleanup
+    in.close();
+    cluster.shutdown();
+  }
+
+  /**
+   * Test that when we have an uncache request, and the client refuses to release
+   * the replica for a long time, we will un-mlock it.
+   */
+  @Test(timeout=120000)
+  public void testRevocation() throws Exception {
+    BlockReaderTestUtil.enableHdfsCachingTracing();
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    Configuration conf = getDefaultConf();
+    // Set a really short revocation timeout.
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, 250L);
+    // Poll very often
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
+    MiniDFSCluster cluster = null;
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+
+    // Create and cache a file.
+    final String TEST_FILE = "/test_file2";
+    DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
+        BLOCK_SIZE, (short)1, 0xcafe);
+    dfs.addCachePool(new CachePoolInfo("pool"));
+    long cacheDirectiveId =
+        dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
+            setPool("pool").setPath(new Path(TEST_FILE)).
+            setReplication((short) 1).build());
+    FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+    DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
+
+    // Mmap the file.
+    FSDataInputStream in = dfs.open(new Path(TEST_FILE));
+    ByteBuffer buf =
+        in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
+
+    // Attempt to uncache file.  The file should get uncached.
+    LOG.info("removing cache directive {}", cacheDirectiveId);
+    dfs.removeCacheDirective(cacheDirectiveId);
+    LOG.info("finished removing cache directive {}", cacheDirectiveId);
+    Thread.sleep(1000);
+    DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
+
+    // Cleanup
+    in.releaseBuffer(buf);
+    in.close();
+    cluster.shutdown();
+  }
+}


Mime
View raw message