hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1573821 [2/2] - in /hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/datatr...
Date Mon, 03 Mar 2014 23:52:00 GMT
Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Mon Mar  3 23:51:58 2014
@@ -266,6 +266,15 @@ public class FsDatasetCache {
     ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
     Value prevValue = mappableBlockMap.get(key);
 
+    if (!dataset.datanode.getShortCircuitRegistry().
+            processBlockMunlockRequest(key)) {
+      // TODO: we probably want to forcibly uncache the block (and close the 
+      // shm) after a certain timeout has elapsed.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(key + " is anchored, and can't be uncached now.");
+      }
+      return;
+    }
     if (prevValue == null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
@@ -380,6 +389,7 @@ public class FsDatasetCache {
           LOG.debug("Successfully cached " + key + ".  We are now caching " +
               newUsedBytes + " bytes in total.");
         }
+        dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key);
         numBlocksCached.addAndGet(1);
         success = true;
       } finally {

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Mon Mar  3 23:51:58 2014
@@ -47,8 +47,6 @@ import com.google.common.base.Preconditi
  */
 @InterfaceAudience.Private
 public final class CachePool {
-  public static final Log LOG = LogFactory.getLog(CachePool.class);
-
   @Nonnull
   private final String poolName;
 

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java Mon Mar  3 23:51:58 2014
@@ -62,6 +62,7 @@ import com.google.common.io.LimitInputSt
 final class FileDistributionCalculator {
   private final static long MAX_SIZE_DEFAULT = 0x2000000000L; // 1/8 TB = 2^37
   private final static int INTERVAL_DEFAULT = 0x200000; // 2 MB = 2^21
+  private final static int MAX_INTERVALS = 0x8000000; // 128 M = 2^27
 
   private final Configuration conf;
   private final long maxSize;
@@ -82,9 +83,11 @@ final class FileDistributionCalculator {
     this.steps = steps == 0 ? INTERVAL_DEFAULT : steps;
     this.out = out;
     long numIntervals = this.maxSize / this.steps;
+    // avoid OutOfMemoryError when allocating an array
+    Preconditions.checkState(numIntervals <= MAX_INTERVALS,
+        "Too many distribution intervals (maxSize/step): " + numIntervals +
+        ", should be less than " + (MAX_INTERVALS+1) + ".");
     this.distribution = new int[1 + (int) (numIntervals)];
-    Preconditions.checkState(numIntervals < Integer.MAX_VALUE,
-        "Too many distribution intervals");
   }
 
   void visit(RandomAccessFile file) throws IOException {

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java Mon Mar  3 23:51:58 2014
@@ -101,9 +101,8 @@ public class OfflineImageViewerPB {
 
     options.addOption("p", "processor", true, "");
     options.addOption("h", "help", false, "");
-    options.addOption("skipBlocks", false, "");
-    options.addOption("printToScreen", false, "");
-    options.addOption("delimiter", true, "");
+    options.addOption("maxSize", true, "");
+    options.addOption("step", true, "");
 
     return options;
   }
@@ -118,10 +117,15 @@ public class OfflineImageViewerPB {
    * @throws IOException
    */
   public static void main(String[] args) throws IOException {
+    int status = run(args);
+    System.exit(status);
+  }
+
+  public static int run(String[] args) throws IOException {
     Options options = buildOptions();
     if (args.length == 0) {
       printUsage();
-      return;
+      return 0;
     }
 
     CommandLineParser parser = new PosixParser();
@@ -132,12 +136,12 @@ public class OfflineImageViewerPB {
     } catch (ParseException e) {
       System.out.println("Error parsing command-line options: ");
       printUsage();
-      return;
+      return -1;
     }
 
     if (cmd.hasOption("h")) { // print help and exit
       printUsage();
-      return;
+      return 0;
     }
 
     String inputFile = cmd.getOptionValue("i");
@@ -160,6 +164,7 @@ public class OfflineImageViewerPB {
       } else {
         new LsrPBImage(conf, out).visit(new RandomAccessFile(inputFile, "r"));
       }
+      return 0;
     } catch (EOFException e) {
       System.err.println("Input file ended unexpectedly. Exiting");
     } catch (IOException e) {
@@ -167,7 +172,7 @@ public class OfflineImageViewerPB {
     } finally {
       IOUtils.cleanup(null, out);
     }
-
+    return -1;
   }
 
   /**

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Mon Mar  3 23:51:58 2014
@@ -128,6 +128,22 @@ message OpBlockChecksumProto { 
   required BaseHeaderProto header = 1;
 }
 
+/**
+ * An ID uniquely identifying a shared memory segment.
+ */
+message ShortCircuitShmIdProto { 
+  required int64 hi = 1;
+  required int64 lo = 2;
+}
+
+/**
+ * An ID uniquely identifying a slot within a shared memory segment.
+ */
+message ShortCircuitShmSlotProto {
+  required ShortCircuitShmIdProto shmId = 1;
+  required int32 slotIdx = 2; 
+}
+
 message OpRequestShortCircuitAccessProto { 
   required BaseHeaderProto header = 1;
 
@@ -137,6 +153,32 @@ message OpRequestShortCircuitAccessProto
    * if the on-disk format changes.
    */
   required uint32 maxVersion = 2;
+
+  /**
+   * The shared memory slot to use, if we are using one.
+   */
+  optional ShortCircuitShmSlotProto slotId = 3;
+}
+
+message ReleaseShortCircuitAccessRequestProto {
+  required ShortCircuitShmSlotProto slotId = 1;
+}
+
+message ReleaseShortCircuitAccessResponseProto {
+  required Status status = 1;
+  optional string error = 2;
+}
+
+message ShortCircuitShmRequestProto { 
+  // The name of the client requesting the shared memory segment.  This is
+  // purely for logging / debugging purposes.
+  required string clientName = 1;
+}
+
+message ShortCircuitShmResponseProto { 
+  required Status status = 1;
+  optional string error = 2;
+  optional ShortCircuitShmIdProto id = 3;
 }
 
 message PacketHeaderProto {

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Mon Mar  3 23:51:58 2014
@@ -1150,6 +1150,27 @@
 </property>
 
 <property>
+  <name>dfs.datanode.shared.file.descriptor.path</name>
+  <value>/dev/shm</value>
+  <description>
+    The path to use when creating file descriptors that will be shared
+    between the DataNode and the DFSClient.  Typically we use /dev/shm, so
+    that the file descriptors will not be written to disk.  Systems that
+    don't have /dev/shm should use /tmp.
+  </description>
+</property>
+
+<property>
+  <name>dfs.short.circuit.shared.memory.watcher.interrupt.check.ms</name>
+  <value>60000</value>
+  <description>
+    The length of time in milliseconds that the short-circuit shared memory
+    watcher will go between checking for java interruptions sent from other
+    threads.  This is provided mainly for unit tests.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.kerberos.internal.spnego.principal</name>
   <value>${dfs.web.authentication.kerberos.principal}</value>
 </property>

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Mon Mar  3 23:51:58 2014
@@ -17,9 +17,15 @@
  */
 package org.apache.hadoop.fs;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeoutException;
@@ -34,6 +40,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -42,18 +49,24 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.BeforeClass;
@@ -71,12 +84,28 @@ public class TestEnhancedByteBufferAcces
   private static final Log LOG =
       LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName());
 
-  static TemporarySocketDirectory sockDir;
+  static private TemporarySocketDirectory sockDir;
+
+  static private CacheManipulator prevCacheManipulator;
 
   @BeforeClass
   public static void init() {
     sockDir = new TemporarySocketDirectory();
     DomainSocket.disableBindPathValidation();
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new CacheManipulator() {
+      @Override
+      public void mlock(String identifier,
+          ByteBuffer mmap, long length) throws IOException {
+        LOG.info("mlocking " + identifier);
+      }
+    });
+  }
+
+  @AfterClass
+  public static void teardown() {
+    // Restore the original CacheManipulator
+    NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
   }
 
   private static byte[] byteBufferToArray(ByteBuffer buf) {
@@ -86,12 +115,14 @@ public class TestEnhancedByteBufferAcces
     return resultArray;
   }
   
+  private static int BLOCK_SIZE = 4096;
+  
   public static HdfsConfiguration initZeroCopyTest() {
     Assume.assumeTrue(NativeIO.isAvailable());
     Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
     HdfsConfiguration conf = new HdfsConfiguration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
     conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
     conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
@@ -99,6 +130,9 @@ public class TestEnhancedByteBufferAcces
           "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
     conf.setBoolean(DFSConfigKeys.
         DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
     return conf;
   }
 
@@ -549,4 +583,119 @@ public class TestEnhancedByteBufferAcces
       new File(TEST_PATH).delete();
     }
   }
+
+  /**
+   * Test that we can zero-copy read cached data even without disabling
+   * checksums.
+   */
+  @Test(timeout=120000)
+  public void testZeroCopyReadOfCachedData() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    BlockReaderTestUtil.enableBlockReaderFactoryTracing();
+    BlockReaderTestUtil.enableHdfsCachingTracing();
+
+    final int TEST_FILE_LENGTH = 16385;
+    final Path TEST_PATH = new Path("/a");
+    final int RANDOM_SEED = 23453;
+    HdfsConfiguration conf = initZeroCopyTest();
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    final String CONTEXT = "testZeroCopyReadOfCachedData";
+    conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, 4096));
+    MiniDFSCluster cluster = null;
+    ByteBuffer result = null;
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    DFSTestUtil.createFile(fs, TEST_PATH,
+        TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+    DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+    byte original[] = DFSTestUtil.
+        calculateFileContentsFromSeed(RANDOM_SEED, TEST_FILE_LENGTH);
+
+    // Prior to caching, the file can't be read via zero-copy
+    FSDataInputStream fsIn = fs.open(TEST_PATH);
+    try {
+      result = fsIn.read(null, TEST_FILE_LENGTH / 2,
+          EnumSet.noneOf(ReadOption.class));
+      Assert.fail("expected UnsupportedOperationException");
+    } catch (UnsupportedOperationException e) {
+      // expected
+    }
+    // Cache the file
+    fs.addCachePool(new CachePoolInfo("pool1"));
+    long directiveId = fs.addCacheDirective(new CacheDirectiveInfo.Builder().
+        setPath(TEST_PATH).
+        setReplication((short)1).
+        setPool("pool1").
+        build());
+    int numBlocks = (int)Math.ceil((double)TEST_FILE_LENGTH / BLOCK_SIZE);
+    DFSTestUtil.verifyExpectedCacheUsage(
+        DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, BLOCK_SIZE),
+        numBlocks, cluster.getDataNodes().get(0).getFSDataset());
+    try {
+      result = fsIn.read(null, TEST_FILE_LENGTH,
+          EnumSet.noneOf(ReadOption.class));
+    } catch (UnsupportedOperationException e) {
+      Assert.fail("expected to be able to read cached file via zero-copy");
+    }
+    // Verify result
+    Assert.assertArrayEquals(Arrays.copyOfRange(original, 0,
+        BLOCK_SIZE), byteBufferToArray(result));
+    // check that the replica is anchored 
+    final ExtendedBlock firstBlock =
+        DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+    final ShortCircuitCache cache = ClientContext.get(
+        CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache();
+    waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
+    // Uncache the replica
+    fs.removeCacheDirective(directiveId);
+    waitForReplicaAnchorStatus(cache, firstBlock, false, true, 1);
+    fsIn.releaseBuffer(result);
+    waitForReplicaAnchorStatus(cache, firstBlock, false, false, 1);
+    DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
+
+    fsIn.close();
+    fs.close();
+    cluster.shutdown();
+  }
+  
+  private void waitForReplicaAnchorStatus(final ShortCircuitCache cache,
+      final ExtendedBlock block, final boolean expectedIsAnchorable,
+        final boolean expectedIsAnchored, final int expectedOutstandingMmaps)
+          throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        final MutableBoolean result = new MutableBoolean(false);
+        cache.accept(new CacheVisitor() {
+          @Override
+          public void visit(int numOutstandingMmaps,
+              Map<ExtendedBlockId, ShortCircuitReplica> replicas,
+              Map<ExtendedBlockId, InvalidToken> failedLoads,
+              Map<Long, ShortCircuitReplica> evictable,
+              Map<Long, ShortCircuitReplica> evictableMmapped) {
+            Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
+            ShortCircuitReplica replica =
+                replicas.get(ExtendedBlockId.fromExtendedBlock(block));
+            Assert.assertNotNull(replica);
+            Slot slot = replica.getSlot();
+            if ((expectedIsAnchorable != slot.isAnchorable()) ||
+                (expectedIsAnchored != slot.isAnchored())) {
+              LOG.info("replica " + replica + " has isAnchorable = " +
+                slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + 
+                ".  Waiting for isAnchorable = " + expectedIsAnchorable + 
+                ", isAnchored = " + expectedIsAnchored);
+              return;
+            }
+            result.setValue(true);
+          }
+        });
+        return result.toBoolean();
+      }
+    }, 10, 60000);
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Mon Mar  3 23:51:58 2014
@@ -31,6 +31,7 @@ import java.util.Random;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.DfsClientShmManager;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.net.Peer;
@@ -38,9 +39,13 @@ import org.apache.hadoop.hdfs.net.TcpPee
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
+import org.apache.hadoop.hdfs.server.namenode.CacheManager;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -206,6 +211,15 @@ public class BlockReaderTestUtil {
     return cluster.getDataNode(ipcport);
   }
   
+  public static void enableHdfsCachingTracing() {
+    LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(CacheManager.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(FsDatasetCache.class.getName()).setLevel(
+        Level.TRACE);
+  }
+
   public static void enableBlockReaderFactoryTracing() {
     LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
         Level.TRACE);
@@ -213,5 +227,18 @@ public class BlockReaderTestUtil {
         Level.TRACE);
     LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
         Level.TRACE);
+    LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel(
+        Level.TRACE);
+  }
+
+  public static void enableShortCircuitShmTracing() {
+    LogManager.getLogger(DfsClientShmManager.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitRegistry.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitShm.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(DataNode.class.getName()).setLevel(
+        Level.TRACE);
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Mon Mar  3 23:51:58 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
+import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 
 import org.apache.commons.io.FileUtils;
@@ -49,15 +50,18 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.VersionInfo;
 
 import java.io.*;
@@ -1091,4 +1095,47 @@ public class DFSTestUtil {
     buf.duplicate().get(arr);
     return arr;
   }
+
+  /**
+   * Blocks until cache usage hits the expected new value.
+   */
+  public static long verifyExpectedCacheUsage(final long expectedCacheUsed,
+      final long expectedBlocks, final FsDatasetSpi<?> fsd) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      private int tries = 0;
+      
+      @Override
+      public Boolean get() {
+        long curCacheUsed = fsd.getCacheUsed();
+        long curBlocks = fsd.getNumBlocksCached();
+        if ((curCacheUsed != expectedCacheUsed) ||
+            (curBlocks != expectedBlocks)) {
+          if (tries++ > 10) {
+            LOG.info("verifyExpectedCacheUsage: have " +
+                curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
+                curBlocks + "/" + expectedBlocks + " blocks cached. " +
+                "memlock limit = " +
+                NativeIO.POSIX.getCacheManipulator().getMemlockLimit() +
+                ".  Waiting...");
+          }
+          return false;
+        }
+        return true;
+      }
+    }, 100, 60000);
+    return expectedCacheUsed;
+  }
+
+  /**
+   * Round a long value up to a multiple of a factor.
+   *
+   * @param val    The value.
+   * @param factor The factor to round up to.  Must be > 1.
+   * @return       The rounded value.
+   */
+  public static long roundUpToMultiple(long val, int factor) {
+    assert (factor > 1);
+    long c = (val + factor - 1) / factor;
+    return c * factor;
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java Mon Mar  3 23:51:58 2014
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,8 +31,11 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.DfsClientShmManager.PerDatanodeVisitorInfo;
+import org.apache.hadoop.hdfs.client.DfsClientShmManager.Visitor;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@@ -47,6 +52,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
 import static org.hamcrest.CoreMatchers.equalTo;
 
@@ -56,10 +62,6 @@ public class TestBlockReaderFactory {
   @Before
   public void init() {
     DomainSocket.disableBindPathValidation();
-  }
-
-  @Before
-  public void before() {
     Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
   }
 
@@ -69,7 +71,7 @@ public class TestBlockReaderFactory {
     BlockReaderFactory.createShortCircuitReplicaInfoCallback = null;
   }
 
-  private static Configuration createShortCircuitConf(String testName,
+  public static Configuration createShortCircuitConf(String testName,
       TemporarySocketDirectory sockDir) {
     Configuration conf = new Configuration();
     conf.set(DFS_CLIENT_CONTEXT, testName);
@@ -99,6 +101,8 @@ public class TestBlockReaderFactory {
     // the client is.  Both support UNIX domain reads.
     Configuration clientConf = createShortCircuitConf(
         "testFallbackFromShortCircuitToUnixDomainTraffic", sockDir);
+    clientConf.set(DFS_CLIENT_CONTEXT,
+        "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext");
     clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
     Configuration serverConf = new Configuration(clientConf);
     serverConf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
@@ -289,4 +293,87 @@ public class TestBlockReaderFactory {
     sockDir.close();
     Assert.assertFalse(testFailed.get());
   }
+
+   /**
+   * Test that a client which supports short-circuit reads using
+   * shared memory can fall back to not using shared memory when
+   * the server doesn't support it.
+   */
+  @Test
+  public void testShortCircuitReadFromServerWithoutShm() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration clientConf = createShortCircuitConf(
+        "testShortCircuitReadFromServerWithoutShm", sockDir);
+    Configuration serverConf = new Configuration(clientConf);
+    serverConf.setInt(
+        DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    clientConf.set(DFS_CLIENT_CONTEXT,
+        "testShortCircuitReadFromServerWithoutShm_clientContext");
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADEC;
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    final ShortCircuitCache cache =
+        fs.dfs.getClientContext().getShortCircuitCache();
+    final DatanodeInfo datanode =
+        new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
+    cache.getDfsClientShmManager().visit(new Visitor() {
+      @Override
+      public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+          throws IOException {
+        Assert.assertEquals(1,  info.size());
+        PerDatanodeVisitorInfo vinfo = info.get(datanode);
+        Assert.assertTrue(vinfo.disabled);
+        Assert.assertEquals(0, vinfo.full.size());
+        Assert.assertEquals(0, vinfo.notFull.size());
+      }
+    });
+    cluster.shutdown();
+  }
+ 
+  /**
+   * Test that a client which does not support short-circuit reads using
+   * shared memory can talk with a server which supports it.
+   */
+  @Test
+  public void testShortCircuitReadFromClientWithoutShm() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration clientConf = createShortCircuitConf(
+        "testShortCircuitReadWithoutShm", sockDir);
+    Configuration serverConf = new Configuration(clientConf);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    clientConf.setInt(
+        DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
+    clientConf.set(DFS_CLIENT_CONTEXT,
+        "testShortCircuitReadFromClientWithoutShm_clientContext");
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADEC;
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    final ShortCircuitCache cache =
+        fs.dfs.getClientContext().getShortCircuitCache();
+    Assert.assertEquals(null, cache.getDfsClientShmManager());
+    cluster.shutdown();
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Mon Mar  3 23:51:58 2014
@@ -23,19 +23,21 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.util.Time;
@@ -132,6 +134,8 @@ public class TestBlockReaderLocal {
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
     
     FileSystem fs = null;
+    ShortCircuitShm shm = null;
+    RandomAccessFile raf = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
@@ -156,7 +160,6 @@ public class TestBlockReaderLocal {
       File dataFile = MiniDFSCluster.getBlockFile(0, block);
       File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
 
-      DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
       ShortCircuitCache shortCircuitCache =
           ClientContext.getFromConf(conf).getShortCircuitCache();
       cluster.shutdown();
@@ -168,15 +171,23 @@ public class TestBlockReaderLocal {
       };
       dataIn = streams[0];
       metaIn = streams[1];
-      ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
-      ShortCircuitReplica replica = new ShortCircuitReplica(
-          key, dataIn, metaIn, shortCircuitCache, Time.now());
+      ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
+          block.getBlockPoolId());
+      raf = new RandomAccessFile(
+          new File(sockDir.getDir().getAbsolutePath(),
+            UUID.randomUUID().toString()), "rw");
+      raf.setLength(8192);
+      FileInputStream shmStream = new FileInputStream(raf.getFD());
+      shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
+      ShortCircuitReplica replica = 
+          new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
+              Time.now(), shm.allocAndRegisterSlot(
+                  ExtendedBlockId.fromExtendedBlock(block)));
       blockReaderLocal = new BlockReaderLocal.Builder(
               new DFSClient.Conf(conf)).
           setFilename(TEST_PATH.getName()).
           setBlock(block).
           setShortCircuitReplica(replica).
-          setDatanodeID(datanodeID).
           setCachingStrategy(new CachingStrategy(false, readahead)).
           setVerifyChecksum(checksum).
           build();
@@ -193,6 +204,8 @@ public class TestBlockReaderLocal {
       if (dataIn != null) dataIn.close();
       if (metaIn != null) metaIn.close();
       if (blockReaderLocal != null) blockReaderLocal.close();
+      if (shm != null) shm.free();
+      if (raf != null) raf.close();
     }
   }
   
@@ -369,13 +382,13 @@ public class TestBlockReaderLocal {
       assertArrayRegionsEqual(original, 6657,
           DFSTestUtil.asArray(buf), 0,
           1);
-      reader.setMlocked(true);
+      reader.forceAnchorable();
       readFully(reader, buf, 0, 5120);
       buf.flip();
       assertArrayRegionsEqual(original, 6658,
           DFSTestUtil.asArray(buf), 0,
           5120);
-      reader.setMlocked(false);
+      reader.forceUnanchorable();
       readFully(reader, buf, 0, 513);
       buf.flip();
       assertArrayRegionsEqual(original, 11778,
@@ -544,10 +557,10 @@ public class TestBlockReaderLocal {
       assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
       readFully(reader, buf, 10, 100);
       assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
-      reader.setMlocked(true);
+      reader.forceAnchorable();
       readFully(reader, buf, 110, 700);
       assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
-      reader.setMlocked(false);
+      reader.forceUnanchorable();
       reader.skip(1); // skip from offset 810 to offset 811
       readFully(reader, buf, 811, 5);
       assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
@@ -599,10 +612,10 @@ public class TestBlockReaderLocal {
       assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
       readFully(reader, buf, 10, 100);
       assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
-      reader.setMlocked(true);
+      reader.forceAnchorable();
       readFully(reader, buf, 110, 700);
       assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
-      reader.setMlocked(false);
+      reader.forceUnanchorable();
       reader.skip(1); // skip from offset 810 to offset 811
       readFully(reader, buf, 811, 5);
       assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java Mon Mar  3 23:51:58 2014
@@ -20,26 +20,50 @@ package org.apache.hadoop.hdfs;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.client.DfsClientShmManager.PerDatanodeVisitorInfo;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator;
+import org.apache.hadoop.hdfs.client.DfsClientShmManager.Visitor;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Time;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
+import static org.hamcrest.CoreMatchers.equalTo;
 
 public class TestShortCircuitCache {
   static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
@@ -104,7 +128,7 @@ public class TestShortCircuitCache {
         return new ShortCircuitReplicaInfo(
             new ShortCircuitReplica(key,
                 pair.getFileInputStreams()[0], pair.getFileInputStreams()[1],
-                cache, Time.monotonicNow()));
+                cache, Time.monotonicNow(), null));
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -114,14 +138,14 @@ public class TestShortCircuitCache {
   @Test(timeout=60000)
   public void testCreateAndDestroy() throws Exception {
     ShortCircuitCache cache =
-        new ShortCircuitCache(10, 1, 10, 1, 1, 10000);
+        new ShortCircuitCache(10, 1, 10, 1, 1, 10000, 0);
     cache.close();
   }
   
   @Test(timeout=60000)
   public void testAddAndRetrieve() throws Exception {
     final ShortCircuitCache cache =
-        new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000);
+        new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000, 0);
     final TestFileDescriptorPair pair = new TestFileDescriptorPair();
     ShortCircuitReplicaInfo replicaInfo1 =
       cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
@@ -170,7 +194,7 @@ public class TestShortCircuitCache {
   @Test(timeout=60000)
   public void testExpiry() throws Exception {
     final ShortCircuitCache cache =
-        new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000);
+        new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000, 0);
     final TestFileDescriptorPair pair = new TestFileDescriptorPair();
     ShortCircuitReplicaInfo replicaInfo1 =
       cache.fetchOrCreate(
@@ -203,7 +227,7 @@ public class TestShortCircuitCache {
   @Test(timeout=60000)
   public void testEviction() throws Exception {
     final ShortCircuitCache cache =
-        new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000);
+        new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000, 0);
     final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] {
       new TestFileDescriptorPair(),
       new TestFileDescriptorPair(),
@@ -269,10 +293,10 @@ public class TestShortCircuitCache {
   }
   
   @Test(timeout=60000)
-  public void testStaleness() throws Exception {
+  public void testTimeBasedStaleness() throws Exception {
     // Set up the cache with a short staleness time.
     final ShortCircuitCache cache =
-        new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10);
+        new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10, 0);
     final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] {
       new TestFileDescriptorPair(),
       new TestFileDescriptorPair(),
@@ -294,7 +318,7 @@ public class TestShortCircuitCache {
                 new ShortCircuitReplica(key,
                     pairs[iVal].getFileInputStreams()[0],
                     pairs[iVal].getFileInputStreams()[1],
-                    cache, Time.monotonicNow() + (iVal * HOUR_IN_MS)));
+                    cache, Time.monotonicNow() + (iVal * HOUR_IN_MS), null));
           } catch (IOException e) {
             throw new RuntimeException(e);
           }
@@ -343,4 +367,149 @@ public class TestShortCircuitCache {
     }
     cache.close();
   }
+
+  private static Configuration createShortCircuitConf(String testName,
+      TemporarySocketDirectory sockDir) {
+    Configuration conf = new Configuration();
+    conf.set(DFS_CLIENT_CONTEXT, testName);
+    conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
+    conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+        testName).getAbsolutePath());
+    conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        false);
+    conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    DomainSocket.disableBindPathValidation();
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+    return conf;
+  }
+  
+  private static DomainPeer getDomainPeerToDn(Configuration conf)
+      throws IOException {
+    DomainSocket sock =
+        DomainSocket.connect(conf.get(DFS_DOMAIN_SOCKET_PATH_KEY));
+    return new DomainPeer(sock);
+  }
+  
+  @Test(timeout=60000)
+  public void testAllocShm() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf("testAllocShm", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.dfs.getClientContext().getShortCircuitCache();
+    cache.getDfsClientShmManager().visit(new Visitor() {
+      @Override
+      public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+          throws IOException {
+        // The ClientShmManager starts off empty
+        Assert.assertEquals(0,  info.size());
+      }
+    });
+    DomainPeer peer = getDomainPeerToDn(conf);
+    MutableBoolean usedPeer = new MutableBoolean(false);
+    ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
+    final DatanodeInfo datanode =
+        new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
+    // Allocating the first shm slot requires using up a peer.
+    Slot slot = cache.allocShmSlot(datanode, peer, usedPeer,
+                    blockId, "testAllocShm_client");
+    Assert.assertNotNull(slot);
+    Assert.assertTrue(usedPeer.booleanValue());
+    cache.getDfsClientShmManager().visit(new Visitor() {
+      @Override
+      public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+          throws IOException {
+        // The ClientShmManager starts off empty
+        Assert.assertEquals(1,  info.size());
+        PerDatanodeVisitorInfo vinfo = info.get(datanode);
+        Assert.assertFalse(vinfo.disabled);
+        Assert.assertEquals(0, vinfo.full.size());
+        Assert.assertEquals(1, vinfo.notFull.size());
+      }
+    });
+    cache.scheduleSlotReleaser(slot);
+    // Wait for the slot to be released, and the shared memory area to be
+    // closed.  Since we didn't register this shared memory segment on the
+    // server, it will also be a test of how well the server deals with
+    // bogus client behavior.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        final MutableBoolean done = new MutableBoolean(false);
+        try {
+          cache.getDfsClientShmManager().visit(new Visitor() {
+            @Override
+            public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+                throws IOException {
+              done.setValue(info.get(datanode).full.isEmpty() &&
+                  info.get(datanode).notFull.isEmpty());
+            }
+          });
+        } catch (IOException e) {
+          LOG.error("error running visitor", e);
+        }
+        return done.booleanValue();
+      }
+    }, 10, 60000);
+    cluster.shutdown();
+  }
+
+  @Test(timeout=60000)
+  public void testShmBasedStaleness() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf("testShmBasedStaleness", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final ShortCircuitCache cache =
+        fs.dfs.getClientContext().getShortCircuitCache();
+    String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 8193;
+    final int SEED = 0xFADED;
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    FSDataInputStream fis = fs.open(new Path(TEST_FILE));
+    int first = fis.read();
+    final ExtendedBlock block =
+        DFSTestUtil.getFirstBlock(fs, new Path(TEST_FILE));
+    Assert.assertTrue(first != -1);
+    cache.accept(new CacheVisitor() {
+      @Override
+      public void visit(int numOutstandingMmaps,
+          Map<ExtendedBlockId, ShortCircuitReplica> replicas,
+          Map<ExtendedBlockId, InvalidToken> failedLoads,
+          Map<Long, ShortCircuitReplica> evictable,
+          Map<Long, ShortCircuitReplica> evictableMmapped) {
+        ShortCircuitReplica replica = replicas.get(
+            ExtendedBlockId.fromExtendedBlock(block));
+        Assert.assertNotNull(replica);
+        Assert.assertTrue(replica.getSlot().isValid());
+      }
+    });
+    // Stop the Namenode.  This will close the socket keeping the client's
+    // shared memory segment alive, and make it stale.
+    cluster.getDataNodes().get(0).shutdown();
+    cache.accept(new CacheVisitor() {
+      @Override
+      public void visit(int numOutstandingMmaps,
+          Map<ExtendedBlockId, ShortCircuitReplica> replicas,
+          Map<ExtendedBlockId, InvalidToken> failedLoads,
+          Map<Long, ShortCircuitReplica> evictable,
+          Map<Long, ShortCircuitReplica> evictableMmapped) {
+        ShortCircuitReplica replica = replicas.get(
+            ExtendedBlockId.fromExtendedBlock(block));
+        Assert.assertNotNull(replica);
+        Assert.assertFalse(replica.getSlot().isValid());
+      }
+    });
+    cluster.shutdown();
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Mon Mar  3 23:51:58 2014
@@ -420,7 +420,7 @@ public class TestShortCircuitLocalRead {
     }
   }
 
-  @Test
+  @Test(timeout=120000)
   public void testHandleTruncatedBlockFile() throws IOException {
     MiniDFSCluster cluster = null;
     HdfsConfiguration conf = new HdfsConfiguration();

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Mon Mar  3 23:51:58 2014
@@ -208,41 +208,11 @@ public class TestFsDatasetCache {
     return sizes;
   }
 
-  /**
-   * Blocks until cache usage hits the expected new value.
-   */
-  private long verifyExpectedCacheUsage(final long expectedCacheUsed,
-      final long expectedBlocks) throws Exception {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      private int tries = 0;
-      
-      @Override
-      public Boolean get() {
-        long curCacheUsed = fsd.getCacheUsed();
-        long curBlocks = fsd.getNumBlocksCached();
-        if ((curCacheUsed != expectedCacheUsed) ||
-            (curBlocks != expectedBlocks)) {
-          if (tries++ > 10) {
-            LOG.info("verifyExpectedCacheUsage: have " +
-                curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
-                curBlocks + "/" + expectedBlocks + " blocks cached. " +
-                "memlock limit = " +
-                NativeIO.POSIX.getCacheManipulator().getMemlockLimit() +
-                ".  Waiting...");
-          }
-          return false;
-        }
-        return true;
-      }
-    }, 100, 60000);
-    return expectedCacheUsed;
-  }
-
   private void testCacheAndUncacheBlock() throws Exception {
     LOG.info("beginning testCacheAndUncacheBlock");
     final int NUM_BLOCKS = 5;
 
-    verifyExpectedCacheUsage(0, 0);
+    DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
     assertEquals(0, fsd.getNumBlocksCached());
 
     // Write a test file
@@ -270,7 +240,8 @@ public class TestFsDatasetCache {
     // Cache each block in succession, checking each time
     for (int i=0; i<NUM_BLOCKS; i++) {
       setHeartbeatResponse(cacheBlock(locs[i]));
-      current = verifyExpectedCacheUsage(current + blockSizes[i], i + 1);
+      current = DFSTestUtil.verifyExpectedCacheUsage(
+          current + blockSizes[i], i + 1, fsd);
       dnMetrics = getMetrics(dn.getMetrics().name());
       long cmds = MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
       assertTrue("Expected more cache requests from the NN ("
@@ -282,8 +253,9 @@ public class TestFsDatasetCache {
     // Uncache each block in succession, again checking each time
     for (int i=0; i<NUM_BLOCKS; i++) {
       setHeartbeatResponse(uncacheBlock(locs[i]));
-      current = verifyExpectedCacheUsage(current - blockSizes[i],
-          NUM_BLOCKS - 1 - i);
+      current = DFSTestUtil.
+          verifyExpectedCacheUsage(current - blockSizes[i],
+              NUM_BLOCKS - 1 - i, fsd);
       dnMetrics = getMetrics(dn.getMetrics().name());
       long cmds = MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
       assertTrue("Expected more uncache requests from the NN",
@@ -350,11 +322,11 @@ public class TestFsDatasetCache {
 
     // Cache the first n-1 files
     long total = 0;
-    verifyExpectedCacheUsage(0, 0);
+    DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
     for (int i=0; i<numFiles-1; i++) {
       setHeartbeatResponse(cacheBlocks(fileLocs[i]));
-      total = verifyExpectedCacheUsage(
-          rounder.round(total + fileSizes[i]), 4 * (i + 1));
+      total = DFSTestUtil.verifyExpectedCacheUsage(
+          rounder.round(total + fileSizes[i]), 4 * (i + 1), fsd);
     }
 
     // nth file should hit a capacity exception
@@ -380,7 +352,7 @@ public class TestFsDatasetCache {
     for (int i=0; i<numFiles-1; i++) {
       setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
       total -= rounder.round(fileSizes[i]);
-      verifyExpectedCacheUsage(total, 4 * (numFiles - 2 - i));
+      DFSTestUtil.verifyExpectedCacheUsage(total, 4 * (numFiles - 2 - i), fsd);
     }
     LOG.info("finishing testFilesExceedMaxLockedMemory");
   }
@@ -390,7 +362,7 @@ public class TestFsDatasetCache {
     LOG.info("beginning testUncachingBlocksBeforeCachingFinishes");
     final int NUM_BLOCKS = 5;
 
-    verifyExpectedCacheUsage(0, 0);
+    DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
 
     // Write a test file
     final Path testFile = new Path("/testCacheBlock");
@@ -426,7 +398,8 @@ public class TestFsDatasetCache {
     // should increase, even though caching doesn't complete on any of them.
     for (int i=0; i<NUM_BLOCKS; i++) {
       setHeartbeatResponse(cacheBlock(locs[i]));
-      current = verifyExpectedCacheUsage(current + blockSizes[i], i + 1);
+      current = DFSTestUtil.verifyExpectedCacheUsage(
+          current + blockSizes[i], i + 1, fsd);
     }
     
     setHeartbeatResponse(new DatanodeCommand[] {
@@ -434,7 +407,7 @@ public class TestFsDatasetCache {
     });
 
     // wait until all caching jobs are finished cancelling.
-    current = verifyExpectedCacheUsage(0, 0);
+    current = DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
     LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
   }
 
@@ -475,10 +448,10 @@ public class TestFsDatasetCache {
         fileName, 0, fileLen);
     // Cache the file and check the sizes match the page size
     setHeartbeatResponse(cacheBlocks(locs));
-    verifyExpectedCacheUsage(PAGE_SIZE * numBlocks, numBlocks);
+    DFSTestUtil.verifyExpectedCacheUsage(PAGE_SIZE * numBlocks, numBlocks, fsd);
     // Uncache and check that it decrements by the page size too
     setHeartbeatResponse(uncacheBlocks(locs));
-    verifyExpectedCacheUsage(0, 0);
+    DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
   }
 
   @Test(timeout=60000)

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Mon Mar  3 23:51:58 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.fs.InvalidReque
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -141,10 +142,7 @@ public class TestCacheDirectives {
     namenode = cluster.getNameNode();
     prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
     NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
-    LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(CacheManager.class.getName()).setLevel(
-        Level.TRACE);
+    BlockReaderTestUtil.enableHdfsCachingTracing();
   }
 
   @After

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java Mon Mar  3 23:51:58 2014
@@ -278,6 +278,14 @@ public class TestOfflineImageViewer {
   }
 
   @Test
+  public void testFileDistributionCalculatorWithOptions() throws IOException {
+    int status = OfflineImageViewerPB.run(new String[] {"-i",
+        originalFsimage.getAbsolutePath(), "-o", "-", "-p", "FileDistribution",
+        "-maxSize", "512", "-step", "8"});
+    assertEquals(0, status);
+  }
+
+  @Test
   public void testPBImageXmlWriter() throws IOException, SAXException,
       ParserConfigurationException {
     StringWriter output = new StringWriter();



Mime
View raw message