hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [1/2] hadoop git commit: HDFS-8951. Move the shortcircuit package to hdfs-client. Contributed by Mingliang Liu.
Date Wed, 26 Aug 2015 21:22:43 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk a4d9acc51 -> c992bcf9c


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
deleted file mode 100644
index 4977fd7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.DataChecksum;
-
-import com.google.common.annotations.VisibleForTesting;
-
-
-
-/**
- * BlockMetadataHeader manages metadata for data blocks on Datanodes.
- * This is not related to the Block related functionality in Namenode.
- * The biggest part of data block metadata is CRC for the block.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class BlockMetadataHeader {
-  private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class);
-
-  public static final short VERSION = 1;
-  
-  /**
-   * Header includes everything except the checksum(s) themselves.
-   * Version is two bytes. Following it is the DataChecksum
-   * that occupies 5 bytes. 
-   */
-  private final short version;
-  private DataChecksum checksum = null;
-
-  private static final HdfsConfiguration conf = new HdfsConfiguration();
-    
-  @VisibleForTesting
-  public BlockMetadataHeader(short version, DataChecksum checksum) {
-    this.checksum = checksum;
-    this.version = version;
-  }
-  
-  /** Get the version */
-  public short getVersion() {
-    return version;
-  }
-
-  /** Get the checksum */
-  public DataChecksum getChecksum() {
-    return checksum;
-  }
-
-  /**
-   * Read the checksum header from the meta file.
-   * @return the data checksum obtained from the header.
-   */
-  public static DataChecksum readDataChecksum(File metaFile) throws IOException {
-    DataInputStream in = null;
-    try {
-      in = new DataInputStream(new BufferedInputStream(
-        new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf)));
-      return readDataChecksum(in, metaFile);
-    } finally {
-      IOUtils.closeStream(in);
-    }
-  }
-
-  /**
-   * Read the checksum header from the meta input stream.
-   * @return the data checksum obtained from the header.
-   */
-  public static DataChecksum readDataChecksum(final DataInputStream metaIn,
-      final Object name) throws IOException {
-    // read and handle the common header here. For now just a version
-    final BlockMetadataHeader header = readHeader(metaIn);
-    if (header.getVersion() != VERSION) {
-      LOG.warn("Unexpected meta-file version for " + name
-          + ": version in file is " + header.getVersion()
-          + " but expected version is " + VERSION);
-    }
-    return header.getChecksum();
-  }
-
-  /**
-   * Read the header without changing the position of the FileChannel.
-   *
-   * @param fc The FileChannel to read.
-   * @return the Metadata Header.
-   * @throws IOException on error.
-   */
-  public static BlockMetadataHeader preadHeader(FileChannel fc)
-      throws IOException {
-    final byte arr[] = new byte[getHeaderSize()];
-    ByteBuffer buf = ByteBuffer.wrap(arr);
-
-    while (buf.hasRemaining()) {
-      if (fc.read(buf, 0) <= 0) {
-        throw new EOFException("unexpected EOF while reading " +
-            "metadata file header");
-      }
-    }
-    short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
-    DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2);
-    return new BlockMetadataHeader(version, dataChecksum);
-  }
-
-  /**
-   * This reads all the fields till the beginning of checksum.
-   * @return Metadata Header
-   * @throws IOException
-   */
-  public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
-    return readHeader(in.readShort(), in);
-  }
-  
-  /**
-   * Reads header at the top of metadata file and returns the header.
-   * 
-   * @return metadata header for the block
-   * @throws IOException
-   */
-  public static BlockMetadataHeader readHeader(File file) throws IOException {
-    DataInputStream in = null;
-    try {
-      in = new DataInputStream(new BufferedInputStream(
-                               new FileInputStream(file)));
-      return readHeader(in);
-    } finally {
-      IOUtils.closeStream(in);
-    }
-  }
-  
-  /**
-   * Read the header at the beginning of the given block meta file.
-   * The current file position will be altered by this method.
-   * If an error occurs, the file is <em>not</em> closed.
-   */
-  public static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException {
-    byte[] buf = new byte[getHeaderSize()];
-    raf.seek(0);
-    raf.readFully(buf, 0, buf.length);
-    return readHeader(new DataInputStream(new ByteArrayInputStream(buf)));
-  }
-  
-  // Version is already read.
-  private static BlockMetadataHeader readHeader(short version, DataInputStream in) 
-                                   throws IOException {
-    DataChecksum checksum = DataChecksum.newDataChecksum(in);
-    return new BlockMetadataHeader(version, checksum);
-  }
-  
-  /**
-   * This writes all the fields till the beginning of checksum.
-   * @param out DataOutputStream
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public static void writeHeader(DataOutputStream out, 
-                                  BlockMetadataHeader header) 
-                                  throws IOException {
-    out.writeShort(header.getVersion());
-    header.getChecksum().writeHeader(out);
-  }
-  
-  /**
-   * Writes all the fields till the beginning of checksum.
-   * @throws IOException on error
-   */
-  public static void writeHeader(DataOutputStream out, DataChecksum checksum)
-                         throws IOException {
-    writeHeader(out, new BlockMetadataHeader(VERSION, checksum));
-  }
-
-  /**
-   * Returns the size of the header
-   */
-  public static int getHeaderSize() {
-    return Short.SIZE/Byte.SIZE + DataChecksum.getChecksumHeaderSize();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 74d39f6..e981ccb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -305,7 +305,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
         blockChooserImpl);
     asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
-    asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
+    asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf);
     deletingBlock = new HashMap<String, Set<Long>>();
 
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
@@ -847,20 +847,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
       File srcFile, File destRoot, boolean calculateChecksum,
-      int smallBufferSize) throws IOException {
+      int smallBufferSize, final Configuration conf) throws IOException {
     final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
     final File dstFile = new File(destDir, srcFile.getName());
     final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
     return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum,
-        smallBufferSize);
+        smallBufferSize, conf);
   }
 
   static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta,
                                File dstFile, boolean calculateChecksum,
-                               int smallBufferSize)
+                               int smallBufferSize, final Configuration conf)
       throws IOException {
     if (calculateChecksum) {
-      computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize);
+      computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize, conf);
     } else {
       try {
         Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
@@ -924,7 +924,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       File[] blockFiles = copyBlockFiles(block.getBlockId(),
           block.getGenerationStamp(), oldMetaFile, oldBlockFile,
           targetVolume.getTmpDir(block.getBlockPoolId()),
-          replicaInfo.isOnTransientStorage(), smallBufferSize);
+          replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
 
       ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
           replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
@@ -953,9 +953,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @throws IOException
    */
   private static void computeChecksum(File srcMeta, File dstMeta,
-      File blockFile, int smallBufferSize)
+      File blockFile, int smallBufferSize, final Configuration conf)
       throws IOException {
-    final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
+    final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
+        DFSUtil.getIoFileBufferSize(conf));
     final byte[] data = new byte[1 << 16];
     final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
 
@@ -2513,7 +2514,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
       return copyBlockFiles(replicaInfo.getMetaFile(),
           replicaInfo.getBlockFile(),
-          dstMetaFile, dstBlockFile, true, smallBufferSize);
+          dstMetaFile, dstBlockFile, true, smallBufferSize, conf);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index 884df2e..2a4c191 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -53,6 +54,8 @@ class RamDiskAsyncLazyPersistService {
   private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
 
   private final DataNode datanode;
+  private final Configuration conf;
+
   private final ThreadGroup threadGroup;
   private Map<File, ThreadPoolExecutor> executors
       = new HashMap<File, ThreadPoolExecutor>();
@@ -65,8 +68,9 @@ class RamDiskAsyncLazyPersistService {
    * The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async
    * disk operations.
    */
-  RamDiskAsyncLazyPersistService(DataNode datanode) {
+  RamDiskAsyncLazyPersistService(DataNode datanode, Configuration conf) {
     this.datanode = datanode;
+    this.conf = conf;
     this.threadGroup = new ThreadGroup(getClass().getSimpleName());
   }
 
@@ -240,7 +244,7 @@ class RamDiskAsyncLazyPersistService {
         // No FsDatasetImpl lock for the file copy
         File targetFiles[] = FsDatasetImpl.copyBlockFiles(
             blockId, genStamp, metaFile, blockFile, lazyPersistDir, true,
-            smallBufferSize);
+            smallBufferSize, conf);
 
         // Lock FsDataSetImpl during onCompleteLazyPersist callback
         dataset.onCompleteLazyPersist(bpId, blockId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java
deleted file mode 100644
index 8184fdf..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.io.Closeable;
-import java.nio.MappedByteBuffer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * A reference to a memory-mapped region used by an HDFS client.
- */
-@InterfaceAudience.Private
-public class ClientMmap implements Closeable {
-  static final Log LOG = LogFactory.getLog(ClientMmap.class);
-  
-  /**
-   * A reference to the block replica which this mmap relates to.
-   */
-  private ShortCircuitReplica replica;
-  
-  /**
-   * The java ByteBuffer object.
-   */
-  private final MappedByteBuffer map;
-
-  /**
-   * Whether or not this ClientMmap anchors the replica into memory while
-   * it exists.  Closing an anchored ClientMmap unanchors the replica.
-   */
-  private final boolean anchored;
-
-  ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map,
-      boolean anchored) {
-    this.replica = replica;
-    this.map = map;
-    this.anchored = anchored;
-  }
-
-  /**
-   * Close the ClientMmap object.
-   */
-  @Override
-  public void close() {
-    if (replica != null) {
-      if (anchored) {
-        replica.removeNoChecksumAnchor();
-      }
-      replica.unref();
-    }
-    replica = null;
-  }
-
-  public MappedByteBuffer getMappedByteBuffer() {
-    return map;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
deleted file mode 100644
index 992d8b4..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.util.PerformanceAdvisory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
-public class DomainSocketFactory {
-  private static final Log LOG = LogFactory.getLog(DomainSocketFactory.class);
-
-  public enum PathState {
-    UNUSABLE(false, false),
-    SHORT_CIRCUIT_DISABLED(true, false),
-    VALID(true, true);
-
-    PathState(boolean usableForDataTransfer, boolean usableForShortCircuit) {
-      this.usableForDataTransfer = usableForDataTransfer;
-      this.usableForShortCircuit = usableForShortCircuit;
-    }
-
-    public boolean getUsableForDataTransfer() {
-      return usableForDataTransfer;
-    }
-
-    public boolean getUsableForShortCircuit() {
-      return usableForShortCircuit;
-    }
-
-    private final boolean usableForDataTransfer;
-    private final boolean usableForShortCircuit;
-  }
-
-  public static class PathInfo {
-    private final static PathInfo NOT_CONFIGURED =
-          new PathInfo("", PathState.UNUSABLE);
-
-    final private String path;
-    final private PathState state;
-
-    PathInfo(String path, PathState state) {
-      this.path = path;
-      this.state = state;
-    }
-
-    public String getPath() {
-      return path;
-    }
-
-    public PathState getPathState() {
-      return state;
-    }
-    
-    @Override
-    public String toString() {
-      return new StringBuilder().append("PathInfo{path=").append(path).
-          append(", state=").append(state).append("}").toString();
-    }
-  }
-
-  /**
-   * Information about domain socket paths.
-   */
-  final Cache<String, PathState> pathMap =
-      CacheBuilder.newBuilder()
-      .expireAfterWrite(10, TimeUnit.MINUTES)
-      .build();
-
-  public DomainSocketFactory(ShortCircuitConf conf) {
-    final String feature;
-    if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) {
-      feature = "The short-circuit local reads feature";
-    } else if (conf.isDomainSocketDataTraffic()) {
-      feature = "UNIX domain socket data traffic";
-    } else {
-      feature = null;
-    }
-
-    if (feature == null) {
-      PerformanceAdvisory.LOG.debug(
-          "Both short-circuit local reads and UNIX domain socket are disabled.");
-    } else {
-      if (conf.getDomainSocketPath().isEmpty()) {
-        throw new HadoopIllegalArgumentException(feature + " is enabled but "
-            + HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set.");
-      } else if (DomainSocket.getLoadingFailureReason() != null) {
-        LOG.warn(feature + " cannot be used because "
-            + DomainSocket.getLoadingFailureReason());
-      } else {
-        LOG.debug(feature + " is enabled.");
-      }
-    }
-  }
-
-  /**
-   * Get information about a domain socket path.
-   *
-   * @param addr         The inet address to use.
-   * @param conf         The client configuration.
-   *
-   * @return             Information about the socket path.
-   */
-  public PathInfo getPathInfo(InetSocketAddress addr, ShortCircuitConf conf) {
-    // If there is no domain socket path configured, we can't use domain
-    // sockets.
-    if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED;
-    // If we can't do anything with the domain socket, don't create it.
-    if (!conf.isDomainSocketDataTraffic() &&
-        (!conf.isShortCircuitLocalReads() || conf.isUseLegacyBlockReaderLocal())) {
-      return PathInfo.NOT_CONFIGURED;
-    }
-    // If the DomainSocket code is not loaded, we can't create
-    // DomainSocket objects.
-    if (DomainSocket.getLoadingFailureReason() != null) {
-      return PathInfo.NOT_CONFIGURED;
-    }
-    // UNIX domain sockets can only be used to talk to local peers
-    if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
-    String escapedPath = DomainSocket.getEffectivePath(
-        conf.getDomainSocketPath(), addr.getPort());
-    PathState status = pathMap.getIfPresent(escapedPath);
-    if (status == null) {
-      return new PathInfo(escapedPath, PathState.VALID);
-    } else {
-      return new PathInfo(escapedPath, status);
-    }
-  }
-
-  public DomainSocket createSocket(PathInfo info, int socketTimeout) {
-    Preconditions.checkArgument(info.getPathState() != PathState.UNUSABLE);
-    boolean success = false;
-    DomainSocket sock = null;
-    try {
-      sock = DomainSocket.connect(info.getPath());
-      sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, socketTimeout);
-      success = true;
-    } catch (IOException e) {
-      LOG.warn("error creating DomainSocket", e);
-      // fall through
-    } finally {
-      if (!success) {
-        if (sock != null) {
-          IOUtils.closeQuietly(sock);
-        }
-        pathMap.put(info.getPath(), PathState.UNUSABLE);
-        sock = null;
-      }
-    }
-    return sock;
-  }
-
-  public void disableShortCircuitForPath(String path) {
-    pathMap.put(path, PathState.SHORT_CIRCUIT_DISABLED);
-  }
-
-  public void disableDomainSocketPath(String path) {
-    pathMap.put(path, PathState.UNUSABLE);
-  }
-
-  @VisibleForTesting
-  public void clearPathMap() {
-    pathMap.invalidateAll();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
deleted file mode 100644
index 15b8dea..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
+++ /dev/null
@@ -1,1068 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.MappedByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.RetriableException;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.net.unix.DomainSocketWatcher;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.Waitable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * The ShortCircuitCache tracks things which the client needs to access
- * HDFS block files via short-circuit.
- *
- * These things include: memory-mapped regions, file descriptors, and shared
- * memory areas for communicating with the DataNode.
- */
-@InterfaceAudience.Private
-public class ShortCircuitCache implements Closeable {
-  public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class);
-
-  /**
-   * Expiry thread which makes sure that the file descriptors get closed
-   * after a while.
-   */
-  private class CacheCleaner implements Runnable, Closeable {
-    private ScheduledFuture<?> future;
-
-    /**
-     * Run the CacheCleaner thread.
-     *
-     * Whenever a thread requests a ShortCircuitReplica object, we will make
-     * sure it gets one.  That ShortCircuitReplica object can then be re-used
-     * when another thread requests a ShortCircuitReplica object for the same
-     * block.  So in that sense, there is no maximum size to the cache.
-     *
-     * However, when a ShortCircuitReplica object is unreferenced by the
-     * thread(s) that are using it, it becomes evictable.  There are two
-     * separate eviction lists-- one for mmaped objects, and another for
-     * non-mmaped objects.  We do this in order to avoid having the regular
-     * files kick the mmaped files out of the cache too quickly.  Reusing
-     * an already-existing mmap gives a huge performance boost, since the
-     * page table entries don't have to be re-populated.  Both the mmap
-     * and non-mmap evictable lists have maximum sizes and maximum lifespans.
-     */
-    @Override
-    public void run() {
-      ShortCircuitCache.this.lock.lock();
-      try {
-        if (ShortCircuitCache.this.closed) return;
-        long curMs = Time.monotonicNow();
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(this + ": cache cleaner running at " + curMs);
-        }
-
-        int numDemoted = demoteOldEvictableMmaped(curMs);
-        int numPurged = 0;
-        Long evictionTimeNs = Long.valueOf(0);
-        while (true) {
-          Entry<Long, ShortCircuitReplica> entry = 
-              evictable.ceilingEntry(evictionTimeNs);
-          if (entry == null) break;
-          evictionTimeNs = entry.getKey();
-          long evictionTimeMs = 
-              TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
-          if (evictionTimeMs + maxNonMmappedEvictableLifespanMs >= curMs) break;
-          ShortCircuitReplica replica = entry.getValue();
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("CacheCleaner: purging " + replica + ": " + 
-                  StringUtils.getStackTrace(Thread.currentThread()));
-          }
-          purge(replica);
-          numPurged++;
-        }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(this + ": finishing cache cleaner run started at " +
-            curMs + ".  Demoted " + numDemoted + " mmapped replicas; " +
-            "purged " + numPurged + " replicas.");
-        }
-      } finally {
-        ShortCircuitCache.this.lock.unlock();
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (future != null) {
-        future.cancel(false);
-      }
-    }
-
-    public void setFuture(ScheduledFuture<?> future) {
-      this.future = future;
-    }
-
-    /**
-     * Get the rate at which this cleaner thread should be scheduled.
-     *
-     * We do this by taking the minimum expiration time and dividing by 4.
-     *
-     * @return the rate in milliseconds at which this thread should be
-     *         scheduled.
-     */
-    public long getRateInMs() {
-      long minLifespanMs =
-          Math.min(maxNonMmappedEvictableLifespanMs,
-              maxEvictableMmapedLifespanMs);
-      long sampleTimeMs = minLifespanMs / 4;
-      return (sampleTimeMs < 1) ? 1 : sampleTimeMs;
-    }
-  }
-
-  /**
-   * A task which asks the DataNode to release a short-circuit shared memory
-   * slot.  If successful, this will tell the DataNode to stop monitoring
-   * changes to the mlock status of the replica associated with the slot.
-   * It will also allow us (the client) to re-use this slot for another
-   * replica.  If we can't communicate with the DataNode for some reason,
-   * we tear down the shared memory segment to avoid being in an inconsistent
-   * state.
-   */
-  private class SlotReleaser implements Runnable {
-    /**
-     * The slot that we need to release.
-     */
-    private final Slot slot;
-
-    SlotReleaser(Slot slot) {
-      this.slot = slot;
-    }
-
-    @Override
-    public void run() {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(ShortCircuitCache.this + ": about to release " + slot);
-      }
-      final DfsClientShm shm = (DfsClientShm)slot.getShm();
-      final DomainSocket shmSock = shm.getPeer().getDomainSocket();
-      DomainSocket sock = null;
-      DataOutputStream out = null;
-      final String path = shmSock.getPath();
-      boolean success = false;
-      try {
-        sock = DomainSocket.connect(path);
-        out = new DataOutputStream(
-            new BufferedOutputStream(sock.getOutputStream()));
-        new Sender(out).releaseShortCircuitFds(slot.getSlotId());
-        DataInputStream in = new DataInputStream(sock.getInputStream());
-        ReleaseShortCircuitAccessResponseProto resp =
-            ReleaseShortCircuitAccessResponseProto.parseFrom(
-                PBHelperClient.vintPrefixed(in));
-        if (resp.getStatus() != Status.SUCCESS) {
-          String error = resp.hasError() ? resp.getError() : "(unknown)";
-          throw new IOException(resp.getStatus().toString() + ": " + error);
-        }
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(ShortCircuitCache.this + ": released " + slot);
-        }
-        success = true;
-      } catch (IOException e) {
-        LOG.error(ShortCircuitCache.this + ": failed to release " +
-            "short-circuit shared memory slot " + slot + " by sending " +
-            "ReleaseShortCircuitAccessRequestProto to " + path +
-            ".  Closing shared memory segment.", e);
-      } finally {
-        if (success) {
-          shmManager.freeSlot(slot);
-        } else {
-          shm.getEndpointShmManager().shutdown(shm);
-        }
-        IOUtils.cleanup(LOG, sock, out);
-      }
-    }
-  }
-
-  public interface ShortCircuitReplicaCreator {
-    /**
-     * Attempt to create a ShortCircuitReplica object.
-     *
-     * This callback will be made without holding any locks.
-     *
-     * @return a non-null ShortCircuitReplicaInfo object.
-     */
-    ShortCircuitReplicaInfo createShortCircuitReplicaInfo();
-  }
-
-  /**
-   * Lock protecting the cache.
-   */
-  private final ReentrantLock lock = new ReentrantLock();
-
-  /**
-   * The executor service that runs the cacheCleaner.
-   */
-  private final ScheduledThreadPoolExecutor cleanerExecutor
-  = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
-          setDaemon(true).setNameFormat("ShortCircuitCache_Cleaner").
-          build());
-
-  /**
-   * The executor service that runs the cacheCleaner.
-   */
-  private final ScheduledThreadPoolExecutor releaserExecutor
-      = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
-          setDaemon(true).setNameFormat("ShortCircuitCache_SlotReleaser").
-          build());
-
-  /**
-   * A map containing all ShortCircuitReplicaInfo objects, organized by Key.
-   * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken
-   * exception.
-   */
-  private final HashMap<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> 
-      replicaInfoMap = new HashMap<ExtendedBlockId,
-          Waitable<ShortCircuitReplicaInfo>>();
-
-  /**
-   * The CacheCleaner.  We don't create this and schedule it until it becomes
-   * necessary.
-   */
-  private CacheCleaner cacheCleaner;
-
-  /**
-   * Tree of evictable elements.
-   *
-   * Maps (unique) insertion time in nanoseconds to the element.
-   */
-  private final TreeMap<Long, ShortCircuitReplica> evictable =
-      new TreeMap<Long, ShortCircuitReplica>();
-
-  /**
-   * Maximum total size of the cache, including both mmapped and
-   * no$-mmapped elements.
-   */
-  private final int maxTotalSize;
-
-  /**
-   * Non-mmaped elements older than this will be closed.
-   */
-  private long maxNonMmappedEvictableLifespanMs;
-
-  /**
-   * Tree of mmaped evictable elements.
-   *
-   * Maps (unique) insertion time in nanoseconds to the element.
-   */
-  private final TreeMap<Long, ShortCircuitReplica> evictableMmapped =
-      new TreeMap<Long, ShortCircuitReplica>();
-
-  /**
-   * Maximum number of mmaped evictable elements.
-   */
-  private int maxEvictableMmapedSize;
-
-  /**
-   * Mmaped elements older than this will be closed.
-   */
-  private final long maxEvictableMmapedLifespanMs;
-
-  /**
-   * The minimum number of milliseconds we'll wait after an unsuccessful
-   * mmap attempt before trying again.
-   */
-  private final long mmapRetryTimeoutMs;
-
-  /**
-   * How long we will keep replicas in the cache before declaring them
-   * to be stale.
-   */
-  private final long staleThresholdMs;
-
-  /**
-   * True if the ShortCircuitCache is closed.
-   */
-  private boolean closed = false;
-
-  /**
-   * Number of existing mmaps associated with this cache.
-   */
-  private int outstandingMmapCount = 0;
-
-  /**
-   * Manages short-circuit shared memory segments for the client.
-   */
-  private final DfsClientShmManager shmManager;
-
-  public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
-    return new ShortCircuitCache(
-        conf.getShortCircuitStreamsCacheSize(),
-        conf.getShortCircuitStreamsCacheExpiryMs(),
-        conf.getShortCircuitMmapCacheSize(),
-        conf.getShortCircuitMmapCacheExpiryMs(),
-        conf.getShortCircuitMmapCacheRetryTimeout(),
-        conf.getShortCircuitCacheStaleThresholdMs(),
-        conf.getShortCircuitSharedMemoryWatcherInterruptCheckMs());
-  }
-
-  public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs,
-      int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs,
-      long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) {
-    Preconditions.checkArgument(maxTotalSize >= 0);
-    this.maxTotalSize = maxTotalSize;
-    Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0);
-    this.maxNonMmappedEvictableLifespanMs = maxNonMmappedEvictableLifespanMs;
-    Preconditions.checkArgument(maxEvictableMmapedSize >= 0);
-    this.maxEvictableMmapedSize = maxEvictableMmapedSize;
-    Preconditions.checkArgument(maxEvictableMmapedLifespanMs >= 0);
-    this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs;
-    this.mmapRetryTimeoutMs = mmapRetryTimeoutMs;
-    this.staleThresholdMs = staleThresholdMs;
-    DfsClientShmManager shmManager = null;
-    if ((shmInterruptCheckMs > 0) &&
-        (DomainSocketWatcher.getLoadingFailureReason() == null)) {
-      try {
-        shmManager = new DfsClientShmManager(shmInterruptCheckMs);
-      } catch (IOException e) {
-        LOG.error("failed to create ShortCircuitShmManager", e);
-      }
-    }
-    this.shmManager = shmManager;
-  }
-
-  public long getStaleThresholdMs() {
-    return staleThresholdMs;
-  }
-
-  /**
-   * Increment the reference count of a replica, and remove it from any free
-   * list it may be in.
-   *
-   * You must hold the cache lock while calling this function.
-   *
-   * @param replica      The replica we're removing.
-   */
-  private void ref(ShortCircuitReplica replica) {
-    lock.lock();
-    try {
-      Preconditions.checkArgument(replica.refCount > 0,
-          "can't ref %s because its refCount reached %d", replica,
-          replica.refCount);
-      Long evictableTimeNs = replica.getEvictableTimeNs();
-      replica.refCount++;
-      if (evictableTimeNs != null) {
-        String removedFrom = removeEvictable(replica);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": " + removedFrom +
-              " no longer contains " + replica + ".  refCount " +
-              (replica.refCount - 1) + " -> " + replica.refCount +
-              StringUtils.getStackTrace(Thread.currentThread()));
-
-        }
-      } else if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": replica  refCount " +
-            (replica.refCount - 1) + " -> " + replica.refCount +
-            StringUtils.getStackTrace(Thread.currentThread()));
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Unreference a replica.
-   *
-   * You must hold the cache lock while calling this function.
-   *
-   * @param replica   The replica being unreferenced.
-   */
-  void unref(ShortCircuitReplica replica) {
-    lock.lock();
-    try {
-      // If the replica is stale or unusable, but we haven't purged it yet,
-      // let's do that.  It would be a shame to evict a non-stale replica so
-      // that we could put a stale or unusable one into the cache.
-      if (!replica.purged) {
-        String purgeReason = null;
-        if (!replica.getDataStream().getChannel().isOpen()) {
-          purgeReason = "purging replica because its data channel is closed.";
-        } else if (!replica.getMetaStream().getChannel().isOpen()) {
-          purgeReason = "purging replica because its meta channel is closed.";
-        } else if (replica.isStale()) {
-          purgeReason = "purging replica because it is stale.";
-        }
-        if (purgeReason != null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + ": " + purgeReason);
-          }
-          purge(replica);
-        }
-      }
-      String addedString = "";
-      boolean shouldTrimEvictionMaps = false;
-      int newRefCount = --replica.refCount;
-      if (newRefCount == 0) {
-        // Close replica, since there are no remaining references to it.
-        Preconditions.checkArgument(replica.purged,
-          "Replica %s reached a refCount of 0 without being purged", replica);
-        replica.close();
-      } else if (newRefCount == 1) {
-        Preconditions.checkState(null == replica.getEvictableTimeNs(),
-            "Replica %s had a refCount higher than 1, " +
-              "but was still evictable (evictableTimeNs = %d)",
-              replica, replica.getEvictableTimeNs());
-        if (!replica.purged) {
-          // Add the replica to the end of an eviction list.
-          // Eviction lists are sorted by time.
-          if (replica.hasMmap()) {
-            insertEvictable(System.nanoTime(), replica, evictableMmapped);
-            addedString = "added to evictableMmapped, ";
-          } else {
-            insertEvictable(System.nanoTime(), replica, evictable);
-            addedString = "added to evictable, ";
-          }
-          shouldTrimEvictionMaps = true;
-        }
-      } else {
-        Preconditions.checkArgument(replica.refCount >= 0,
-            "replica's refCount went negative (refCount = %d" +
-            " for %s)", replica.refCount, replica);
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": unref replica " + replica +
-            ": " + addedString + " refCount " +
-            (newRefCount + 1) + " -> " + newRefCount +
-            StringUtils.getStackTrace(Thread.currentThread()));
-      }
-      if (shouldTrimEvictionMaps) {
-        trimEvictionMaps();
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Demote old evictable mmaps into the regular eviction map.
-   *
-   * You must hold the cache lock while calling this function.
-   *
-   * @param now   Current time in monotonic milliseconds.
-   * @return      Number of replicas demoted.
-   */
-  private int demoteOldEvictableMmaped(long now) {
-    int numDemoted = 0;
-    boolean needMoreSpace = false;
-    Long evictionTimeNs = Long.valueOf(0);
-
-    while (true) {
-      Entry<Long, ShortCircuitReplica> entry = 
-          evictableMmapped.ceilingEntry(evictionTimeNs);
-      if (entry == null) break;
-      evictionTimeNs = entry.getKey();
-      long evictionTimeMs = 
-          TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
-      if (evictionTimeMs + maxEvictableMmapedLifespanMs >= now) {
-        if (evictableMmapped.size() < maxEvictableMmapedSize) {
-          break;
-        }
-        needMoreSpace = true;
-      }
-      ShortCircuitReplica replica = entry.getValue();
-      if (LOG.isTraceEnabled()) {
-        String rationale = needMoreSpace ? "because we need more space" : 
-            "because it's too old";
-        LOG.trace("demoteOldEvictable: demoting " + replica + ": " +
-            rationale + ": " +
-            StringUtils.getStackTrace(Thread.currentThread()));
-      }
-      removeEvictable(replica, evictableMmapped);
-      munmap(replica);
-      insertEvictable(evictionTimeNs, replica, evictable);
-      numDemoted++;
-    }
-    return numDemoted;
-  }
-
-  /**
-   * Trim the eviction lists.
-   */
-  private void trimEvictionMaps() {
-    long now = Time.monotonicNow();
-    demoteOldEvictableMmaped(now);
-
-    while (true) {
-      long evictableSize = evictable.size();
-      long evictableMmappedSize = evictableMmapped.size();
-      if (evictableSize + evictableMmappedSize <= maxTotalSize) {
-        return;
-      }
-      ShortCircuitReplica replica;
-      if (evictableSize == 0) {
-       replica = evictableMmapped.firstEntry().getValue();
-      } else {
-       replica = evictable.firstEntry().getValue();
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": trimEvictionMaps is purging " + replica +
-          StringUtils.getStackTrace(Thread.currentThread()));
-      }
-      purge(replica);
-    }
-  }
-
-  /**
-   * Munmap a replica, updating outstandingMmapCount.
-   *
-   * @param replica  The replica to munmap.
-   */
-  private void munmap(ShortCircuitReplica replica) {
-    replica.munmap();
-    outstandingMmapCount--;
-  }
-
-  /**
-   * Remove a replica from an evictable map.
-   *
-   * @param replica   The replica to remove.
-   * @return          The map it was removed from.
-   */
-  private String removeEvictable(ShortCircuitReplica replica) {
-    if (replica.hasMmap()) {
-      removeEvictable(replica, evictableMmapped);
-      return "evictableMmapped";
-    } else {
-      removeEvictable(replica, evictable);
-      return "evictable";
-    }
-  }
-
-  /**
-   * Remove a replica from an evictable map.
-   *
-   * @param replica   The replica to remove.
-   * @param map       The map to remove it from.
-   */
-  private void removeEvictable(ShortCircuitReplica replica,
-      TreeMap<Long, ShortCircuitReplica> map) {
-    Long evictableTimeNs = replica.getEvictableTimeNs();
-    Preconditions.checkNotNull(evictableTimeNs);
-    ShortCircuitReplica removed = map.remove(evictableTimeNs);
-    Preconditions.checkState(removed == replica,
-        "failed to make %s unevictable", replica);
-    replica.setEvictableTimeNs(null);
-  }
-
-  /**
-   * Insert a replica into an evictable map.
-   *
-   * If an element already exists with this eviction time, we add a nanosecond
-   * to it until we find an unused key.
-   *
-   * @param evictionTimeNs   The eviction time in absolute nanoseconds.
-   * @param replica          The replica to insert.
-   * @param map              The map to insert it into.
-   */
-  private void insertEvictable(Long evictionTimeNs,
-      ShortCircuitReplica replica, TreeMap<Long, ShortCircuitReplica> map) {
-    while (map.containsKey(evictionTimeNs)) {
-      evictionTimeNs++;
-    }
-    Preconditions.checkState(null == replica.getEvictableTimeNs());
-    replica.setEvictableTimeNs(evictionTimeNs);
-    map.put(evictionTimeNs, replica);
-  }
-
-  /**
-   * Purge a replica from the cache.
-   *
-   * This doesn't necessarily close the replica, since there may be
-   * outstanding references to it.  However, it does mean the cache won't
-   * hand it out to anyone after this.
-   *
-   * You must hold the cache lock while calling this function.
-   *
-   * @param replica   The replica being removed.
-   */
-  private void purge(ShortCircuitReplica replica) {
-    boolean removedFromInfoMap = false;
-    String evictionMapName = null;
-    Preconditions.checkArgument(!replica.purged);
-    replica.purged = true;
-    Waitable<ShortCircuitReplicaInfo> val = replicaInfoMap.get(replica.key);
-    if (val != null) {
-      ShortCircuitReplicaInfo info = val.getVal();
-      if ((info != null) && (info.getReplica() == replica)) {
-        replicaInfoMap.remove(replica.key);
-        removedFromInfoMap = true;
-      }
-    }
-    Long evictableTimeNs = replica.getEvictableTimeNs();
-    if (evictableTimeNs != null) {
-      evictionMapName = removeEvictable(replica);
-    }
-    if (LOG.isTraceEnabled()) {
-      StringBuilder builder = new StringBuilder();
-      builder.append(this).append(": ").append(": purged ").
-          append(replica).append(" from the cache.");
-      if (removedFromInfoMap) {
-        builder.append("  Removed from the replicaInfoMap.");
-      }
-      if (evictionMapName != null) {
-        builder.append("  Removed from ").append(evictionMapName);
-      }
-      LOG.trace(builder.toString());
-    }
-    unref(replica);
-  }
-
-  /**
-   * Fetch or create a replica.
-   *
-   * You must hold the cache lock while calling this function.
-   *
-   * @param key          Key to use for lookup.
-   * @param creator      Replica creator callback.  Will be called without
-   *                     the cache lock being held.
-   *
-   * @return             Null if no replica could be found or created.
-   *                     The replica, otherwise.
-   */
-  public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
-      ShortCircuitReplicaCreator creator) {
-    Waitable<ShortCircuitReplicaInfo> newWaitable = null;
-    lock.lock();
-    try {
-      ShortCircuitReplicaInfo info = null;
-      do {
-        if (closed) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": can't fetchOrCreate " + key +
-                " because the cache is closed.");
-          }
-          return null;
-        }
-        Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key);
-        if (waitable != null) {
-          try {
-            info = fetch(key, waitable);
-          } catch (RetriableException e) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(this + ": retrying " + e.getMessage());
-            }
-            continue;
-          }
-        }
-      } while (false);
-      if (info != null) return info;
-      // We need to load the replica ourselves.
-      newWaitable = new Waitable<ShortCircuitReplicaInfo>(lock.newCondition());
-      replicaInfoMap.put(key, newWaitable);
-    } finally {
-      lock.unlock();
-    }
-    return create(key, creator, newWaitable);
-  }
-
-  /**
-   * Fetch an existing ReplicaInfo object.
-   *
-   * @param key       The key that we're using.
-   * @param waitable  The waitable object to wait on.
-   * @return          The existing ReplicaInfo object, or null if there is
-   *                  none.
-   *
-   * @throws RetriableException   If the caller needs to retry.
-   */
-  private ShortCircuitReplicaInfo fetch(ExtendedBlockId key,
-      Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException {
-    // Another thread is already in the process of loading this
-    // ShortCircuitReplica.  So we simply wait for it to complete.
-    ShortCircuitReplicaInfo info;
-    try {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": found waitable for " + key);
-      }
-      info = waitable.await();
-    } catch (InterruptedException e) {
-      LOG.info(this + ": interrupted while waiting for " + key);
-      Thread.currentThread().interrupt();
-      throw new RetriableException("interrupted");
-    }
-    if (info.getInvalidTokenException() != null) {
-      LOG.info(this + ": could not get " + key + " due to InvalidToken " +
-            "exception.", info.getInvalidTokenException());
-      return info;
-    }
-    ShortCircuitReplica replica = info.getReplica();
-    if (replica == null) {
-      LOG.warn(this + ": failed to get " + key);
-      return info;
-    }
-    if (replica.purged) {
-      // Ignore replicas that have already been purged from the cache.
-      throw new RetriableException("Ignoring purged replica " +
-          replica + ".  Retrying.");
-    }
-    // Check if the replica is stale before using it.
-    // If it is, purge it and retry.
-    if (replica.isStale()) {
-      LOG.info(this + ": got stale replica " + replica + ".  Removing " +
-          "this replica from the replicaInfoMap and retrying.");
-      // Remove the cache's reference to the replica.  This may or may not
-      // trigger a close.
-      purge(replica);
-      throw new RetriableException("ignoring stale replica " + replica);
-    }
-    ref(replica);
-    return info;
-  }
-
-  private ShortCircuitReplicaInfo create(ExtendedBlockId key,
-      ShortCircuitReplicaCreator creator,
-      Waitable<ShortCircuitReplicaInfo> newWaitable) {
-    // Handle loading a new replica.
-    ShortCircuitReplicaInfo info = null;
-    try {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": loading " + key);
-      }
-      info = creator.createShortCircuitReplicaInfo();
-    } catch (RuntimeException e) {
-      LOG.warn(this + ": failed to load " + key, e);
-    }
-    if (info == null) info = new ShortCircuitReplicaInfo();
-    lock.lock();
-    try {
-      if (info.getReplica() != null) {
-        // On success, make sure the cache cleaner thread is running.
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + ": successfully loaded " + info.getReplica());
-        }
-        startCacheCleanerThreadIfNeeded();
-        // Note: new ShortCircuitReplicas start with a refCount of 2,
-        // indicating that both this cache and whoever requested the 
-        // creation of the replica hold a reference.  So we don't need
-        // to increment the reference count here.
-      } else {
-        // On failure, remove the waitable from the replicaInfoMap.
-        Waitable<ShortCircuitReplicaInfo> waitableInMap = replicaInfoMap.get(key);
-        if (waitableInMap == newWaitable) replicaInfoMap.remove(key);
-        if (info.getInvalidTokenException() != null) {
-          LOG.info(this + ": could not load " + key + " due to InvalidToken " +
-              "exception.", info.getInvalidTokenException());
-        } else {
-          LOG.warn(this + ": failed to load " + key);
-        }
-      }
-      newWaitable.provide(info);
-    } finally {
-      lock.unlock();
-    }
-    return info;
-  }
-
-  private void startCacheCleanerThreadIfNeeded() {
-    if (cacheCleaner == null) {
-      cacheCleaner = new CacheCleaner();
-      long rateMs = cacheCleaner.getRateInMs();
-      ScheduledFuture<?> future =
-          cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
-              TimeUnit.MILLISECONDS);
-      cacheCleaner.setFuture(future);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(this + ": starting cache cleaner thread which will run " +
-          "every " + rateMs + " ms");
-      }
-    }
-  }
-
-  ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica,
-      boolean anchored) {
-    Condition newCond;
-    lock.lock();
-    try {
-      while (replica.mmapData != null) {
-        if (replica.mmapData instanceof MappedByteBuffer) {
-          ref(replica);
-          MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData;
-          return new ClientMmap(replica, mmap, anchored);
-        } else if (replica.mmapData instanceof Long) {
-          long lastAttemptTimeMs = (Long)replica.mmapData;
-          long delta = Time.monotonicNow() - lastAttemptTimeMs;
-          if (delta < mmapRetryTimeoutMs) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace(this + ": can't create client mmap for " +
-                  replica + " because we failed to " +
-                  "create one just " + delta + "ms ago.");
-            }
-            return null;
-          }
-          if (LOG.isTraceEnabled()) {
-            LOG.trace(this + ": retrying client mmap for " + replica +
-                ", " + delta + " ms after the previous failure.");
-          }
-        } else if (replica.mmapData instanceof Condition) {
-          Condition cond = (Condition)replica.mmapData;
-          cond.awaitUninterruptibly();
-        } else {
-          Preconditions.checkState(false, "invalid mmapData type %s",
-              replica.mmapData.getClass().getName());
-        }
-      }
-      newCond = lock.newCondition();
-      replica.mmapData = newCond;
-    } finally {
-      lock.unlock();
-    }
-    MappedByteBuffer map = replica.loadMmapInternal();
-    lock.lock();
-    try {
-      if (map == null) {
-        replica.mmapData = Long.valueOf(Time.monotonicNow());
-        newCond.signalAll();
-        return null;
-      } else {
-        outstandingMmapCount++;
-        replica.mmapData = map;
-        ref(replica);
-        newCond.signalAll();
-        return new ClientMmap(replica, map, anchored);
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Close the cache and free all associated resources.
-   */
-  @Override
-  public void close() {
-    try {
-      lock.lock();
-      if (closed) return;
-      closed = true;
-      LOG.info(this + ": closing");
-      maxNonMmappedEvictableLifespanMs = 0;
-      maxEvictableMmapedSize = 0;
-      // Close and join cacheCleaner thread.
-      IOUtils.cleanup(LOG, cacheCleaner);
-      // Purge all replicas.
-      while (true) {
-        Entry<Long, ShortCircuitReplica> entry = evictable.firstEntry();
-        if (entry == null) break;
-        purge(entry.getValue());
-      }
-      while (true) {
-        Entry<Long, ShortCircuitReplica> entry = evictableMmapped.firstEntry();
-        if (entry == null) break;
-        purge(entry.getValue());
-      }
-    } finally {
-      lock.unlock();
-    }
-
-    releaserExecutor.shutdown();
-    cleanerExecutor.shutdown();
-    // wait for existing tasks to terminate
-    try {
-      if (!releaserExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
-        LOG.error("Forcing SlotReleaserThreadPool to shutdown!");
-        releaserExecutor.shutdownNow();
-      }
-    } catch (InterruptedException e) {
-      releaserExecutor.shutdownNow();
-      Thread.currentThread().interrupt();
-      LOG.error("Interrupted while waiting for SlotReleaserThreadPool "
-          + "to terminate", e);
-    }
-
-    // wait for existing tasks to terminate
-    try {
-      if (!cleanerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
-        LOG.error("Forcing CleanerThreadPool to shutdown!");
-        cleanerExecutor.shutdownNow();
-      }
-    } catch (InterruptedException e) {
-      cleanerExecutor.shutdownNow();
-      Thread.currentThread().interrupt();
-      LOG.error("Interrupted while waiting for CleanerThreadPool "
-          + "to terminate", e);
-    }
-    IOUtils.cleanup(LOG, shmManager);
-  }
-
-  @VisibleForTesting // ONLY for testing
-  public interface CacheVisitor {
-    void visit(int numOutstandingMmaps,
-        Map<ExtendedBlockId, ShortCircuitReplica> replicas,
-        Map<ExtendedBlockId, InvalidToken> failedLoads,
-        Map<Long, ShortCircuitReplica> evictable,
-        Map<Long, ShortCircuitReplica> evictableMmapped);
-  }
-
-  @VisibleForTesting // ONLY for testing
-  public void accept(CacheVisitor visitor) {
-    lock.lock();
-    try {
-      Map<ExtendedBlockId, ShortCircuitReplica> replicas =
-          new HashMap<ExtendedBlockId, ShortCircuitReplica>();
-      Map<ExtendedBlockId, InvalidToken> failedLoads =
-          new HashMap<ExtendedBlockId, InvalidToken>();
-      for (Entry<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> entry :
-            replicaInfoMap.entrySet()) {
-        Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue();
-        if (waitable.hasVal()) {
-          if (waitable.getVal().getReplica() != null) {
-            replicas.put(entry.getKey(), waitable.getVal().getReplica());
-          } else {
-            // The exception may be null here, indicating a failed load that
-            // isn't the result of an invalid block token.
-            failedLoads.put(entry.getKey(),
-                waitable.getVal().getInvalidTokenException());
-          }
-        }
-      }
-      if (LOG.isDebugEnabled()) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("visiting ").append(visitor.getClass().getName()).
-            append("with outstandingMmapCount=").append(outstandingMmapCount).
-            append(", replicas=");
-        String prefix = "";
-        for (Entry<ExtendedBlockId, ShortCircuitReplica> entry : replicas.entrySet()) {
-          builder.append(prefix).append(entry.getValue());
-          prefix = ",";
-        }
-        prefix = "";
-        builder.append(", failedLoads=");
-        for (Entry<ExtendedBlockId, InvalidToken> entry : failedLoads.entrySet()) {
-          builder.append(prefix).append(entry.getValue());
-          prefix = ",";
-        }
-        prefix = "";
-        builder.append(", evictable=");
-        for (Entry<Long, ShortCircuitReplica> entry : evictable.entrySet()) {
-          builder.append(prefix).append(entry.getKey()).
-              append(":").append(entry.getValue());
-          prefix = ",";
-        }
-        prefix = "";
-        builder.append(", evictableMmapped=");
-        for (Entry<Long, ShortCircuitReplica> entry : evictableMmapped.entrySet()) {
-          builder.append(prefix).append(entry.getKey()).
-              append(":").append(entry.getValue());
-          prefix = ",";
-        }
-        LOG.debug(builder.toString());
-      }
-      visitor.visit(outstandingMmapCount, replicas, failedLoads,
-            evictable, evictableMmapped);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "ShortCircuitCache(0x" +
-        Integer.toHexString(System.identityHashCode(this)) + ")";
-  }
-
-  /**
-   * Allocate a new shared memory slot.
-   *
-   * @param datanode       The datanode to allocate a shm slot with.
-   * @param peer           A peer connected to the datanode.
-   * @param usedPeer       Will be set to true if we use up the provided peer.
-   * @param blockId        The block id and block pool id of the block we're 
-   *                         allocating this slot for.
-   * @param clientName     The name of the DFSClient allocating the shared
-   *                         memory.
-   * @return               Null if short-circuit shared memory is disabled;
-   *                         a short-circuit memory slot otherwise.
-   * @throws IOException   An exception if there was an error talking to 
-   *                         the datanode.
-   */
-  public Slot allocShmSlot(DatanodeInfo datanode,
-        DomainPeer peer, MutableBoolean usedPeer,
-        ExtendedBlockId blockId, String clientName) throws IOException {
-    if (shmManager != null) {
-      return shmManager.allocSlot(datanode, peer, usedPeer,
-          blockId, clientName);
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Free a slot immediately.
-   *
-   * ONLY use this if the DataNode is not yet aware of the slot.
-   * 
-   * @param slot           The slot to free.
-   */
-  public void freeSlot(Slot slot) {
-    Preconditions.checkState(shmManager != null);
-    slot.makeInvalid();
-    shmManager.freeSlot(slot);
-  }
-  
-  /**
-   * Schedule a shared memory slot to be released.
-   *
-   * @param slot           The slot to release.
-   */
-  public void scheduleSlotReleaser(Slot slot) {
-    Preconditions.checkState(shmManager != null);
-    releaserExecutor.execute(new SlotReleaser(slot));
-  }
-
-  @VisibleForTesting
-  public DfsClientShmManager getDfsClientShmManager() {
-    return shmManager;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
deleted file mode 100644
index 1390cf3..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-/**
- * A ShortCircuitReplica object contains file descriptors for a block that
- * we are reading via short-circuit local reads.
- *
- * The file descriptors can be shared between multiple threads because
- * all the operations we perform are stateless-- i.e., we use pread
- * instead of read, to avoid using the shared position state.
- */
-@InterfaceAudience.Private
-public class ShortCircuitReplica {
-  public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class);
-
-  /**
-   * Identifies this ShortCircuitReplica object.
-   */
-  final ExtendedBlockId key;
-
-  /**
-   * The block data input stream.
-   */
-  private final FileInputStream dataStream;
-
-  /**
-   * The block metadata input stream.
-   *
-   * TODO: make this nullable if the file has no checksums on disk.
-   */
-  private final FileInputStream metaStream;
-
-  /**
-   * Block metadata header.
-   */
-  private final BlockMetadataHeader metaHeader;
-
-  /**
-   * The cache we belong to.
-   */
-  private final ShortCircuitCache cache;
-
-  /**
-   * Monotonic time at which the replica was created.
-   */
-  private final long creationTimeMs;
-
-  /**
-   * If non-null, the shared memory slot associated with this replica.
-   */
-  private final Slot slot;
-  
-  /**
-   * Current mmap state.
-   *
-   * Protected by the cache lock.
-   */
-  Object mmapData;
-
-  /**
-   * True if this replica has been purged from the cache; false otherwise.
-   *
-   * Protected by the cache lock.
-   */
-  boolean purged = false;
-
-  /**
-   * Number of external references to this replica.  Replicas are referenced
-   * by the cache, BlockReaderLocal instances, and by ClientMmap instances.
-   * The number starts at 2 because when we create a replica, it is referenced
-   * by both the cache and the requester.
-   *
-   * Protected by the cache lock.
-   */
-  int refCount = 2;
-
-  /**
-   * The monotonic time in nanoseconds at which the replica became evictable, or
-   * null if it is not evictable.
-   *
-   * Protected by the cache lock.
-   */
-  private Long evictableTimeNs = null;
-
-  public ShortCircuitReplica(ExtendedBlockId key,
-      FileInputStream dataStream, FileInputStream metaStream,
-      ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
-    this.key = key;
-    this.dataStream = dataStream;
-    this.metaStream = metaStream;
-    this.metaHeader =
-          BlockMetadataHeader.preadHeader(metaStream.getChannel());
-    if (metaHeader.getVersion() != 1) {
-      throw new IOException("invalid metadata header version " +
-          metaHeader.getVersion() + ".  Can only handle version 1.");
-    }
-    this.cache = cache;
-    this.creationTimeMs = creationTimeMs;
-    this.slot = slot;
-  }
-
-  /**
-   * Decrement the reference count.
-   */
-  public void unref() {
-    cache.unref(this);
-  }
-
-  /**
-   * Check if the replica is stale.
-   *
-   * Must be called with the cache lock held.
-   */
-  boolean isStale() {
-    if (slot != null) {
-      // Check staleness by looking at the shared memory area we use to
-      // communicate with the DataNode.
-      boolean stale = !slot.isValid();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": checked shared memory segment.  isStale=" + stale);
-      }
-      return stale;
-    } else {
-      // Fall back to old, time-based staleness method.
-      long deltaMs = Time.monotonicNow() - creationTimeMs;
-      long staleThresholdMs = cache.getStaleThresholdMs();
-      if (deltaMs > staleThresholdMs) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + " is stale because it's " + deltaMs +
-              " ms old, and staleThresholdMs = " + staleThresholdMs);
-        }
-        return true;
-      } else {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(this + " is not stale because it's only " + deltaMs +
-              " ms old, and staleThresholdMs = " + staleThresholdMs);
-        }
-        return false;
-      }
-    }
-  }
-  
-  /**
-   * Try to add a no-checksum anchor to our shared memory slot.
-   *
-   * It is only possible to add this anchor when the block is mlocked on the Datanode.
-   * The DataNode will not munlock the block until the number of no-checksum anchors
-   * for the block reaches zero.
-   * 
-   * This method does not require any synchronization.
-   *
-   * @return     True if we successfully added a no-checksum anchor.
-   */
-  public boolean addNoChecksumAnchor() {
-    if (slot == null) {
-      return false;
-    }
-    boolean result = slot.addAnchor();
-    if (LOG.isTraceEnabled()) {
-      if (result) {
-        LOG.trace(this + ": added no-checksum anchor to slot " + slot);
-      } else {
-        LOG.trace(this + ": could not add no-checksum anchor to slot " + slot);
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Remove a no-checksum anchor for our shared memory slot.
-   *
-   * This method does not require any synchronization.
-   */
-  public void removeNoChecksumAnchor() {
-    if (slot != null) {
-      slot.removeAnchor();
-    }
-  }
-
-  /**
-   * Check if the replica has an associated mmap that has been fully loaded.
-   *
-   * Must be called with the cache lock held.
-   */
-  @VisibleForTesting
-  public boolean hasMmap() {
-    return ((mmapData != null) && (mmapData instanceof MappedByteBuffer));
-  }
-
-  /**
-   * Free the mmap associated with this replica.
-   *
-   * Must be called with the cache lock held.
-   */
-  void munmap() {
-    MappedByteBuffer mmap = (MappedByteBuffer)mmapData;
-    NativeIO.POSIX.munmap(mmap);
-    mmapData = null;
-  }
-
-  /**
-   * Close the replica.
-   *
-   * Must be called after there are no more references to the replica in the
-   * cache or elsewhere.
-   */
-  void close() {
-    String suffix = "";
-    
-    Preconditions.checkState(refCount == 0,
-        "tried to close replica with refCount %d: %s", refCount, this);
-    refCount = -1;
-    Preconditions.checkState(purged,
-        "tried to close unpurged replica %s", this);
-    if (hasMmap()) {
-      munmap();
-      if (LOG.isTraceEnabled()) {
-        suffix += "  munmapped.";
-      }
-    }
-    IOUtils.cleanup(LOG, dataStream, metaStream);
-    if (slot != null) {
-      cache.scheduleSlotReleaser(slot);
-      if (LOG.isTraceEnabled()) {
-        suffix += "  scheduling " + slot + " for later release.";
-      }
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("closed " + this + suffix);
-    }
-  }
-
-  public FileInputStream getDataStream() {
-    return dataStream;
-  }
-
-  public FileInputStream getMetaStream() {
-    return metaStream;
-  }
-
-  public BlockMetadataHeader getMetaHeader() {
-    return metaHeader;
-  }
-
-  public ExtendedBlockId getKey() {
-    return key;
-  }
-
-  public ClientMmap getOrCreateClientMmap(boolean anchor) {
-    return cache.getOrCreateClientMmap(this, anchor);
-  }
-
-  MappedByteBuffer loadMmapInternal() {
-    try {
-      FileChannel channel = dataStream.getChannel();
-      MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, 
-          Math.min(Integer.MAX_VALUE, channel.size()));
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": created mmap of size " + channel.size());
-      }
-      return mmap;
-    } catch (IOException e) {
-      LOG.warn(this + ": mmap error", e);
-      return null;
-    } catch (RuntimeException e) {
-      LOG.warn(this + ": mmap error", e);
-      return null;
-    }
-  }
-
-  /**
-   * Get the evictable time in nanoseconds.
-   *
-   * Note: you must hold the cache lock to call this function.
-   *
-   * @return the evictable time in nanoseconds.
-   */
-  public Long getEvictableTimeNs() {
-    return evictableTimeNs;
-  }
-
-  /**
-   * Set the evictable time in nanoseconds.
-   *
-   * Note: you must hold the cache lock to call this function.
-   *
-   * @param evictableTimeNs   The evictable time in nanoseconds, or null
-   *                          to set no evictable time.
-   */
-  void setEvictableTimeNs(Long evictableTimeNs) {
-    this.evictableTimeNs = evictableTimeNs;
-  }
-
-  @VisibleForTesting
-  public Slot getSlot() {
-    return slot;
-  }
-
-  /**
-   * Convert the replica to a string for debugging purposes.
-   * Note that we can't take the lock here.
-   */
-  @Override
-  public String toString() {
-    return new StringBuilder().append("ShortCircuitReplica{").
-        append("key=").append(key).
-        append(", metaHeader.version=").append(metaHeader.getVersion()).
-        append(", metaHeader.checksum=").append(metaHeader.getChecksum()).
-        append(", ident=").append("0x").
-          append(Integer.toHexString(System.identityHashCode(this))).
-        append(", creationTimeMs=").append(creationTimeMs).
-        append("}").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java
deleted file mode 100644
index ef0019f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.shortcircuit;
-
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-
-public final class ShortCircuitReplicaInfo {
-  private final ShortCircuitReplica replica;
-  private final InvalidToken exc; 
-
-  public ShortCircuitReplicaInfo() {
-    this.replica = null;
-    this.exc = null;
-  }
-
-  public ShortCircuitReplicaInfo(ShortCircuitReplica replica) {
-    this.replica = replica;
-    this.exc = null;
-  }
-
-  public ShortCircuitReplicaInfo(InvalidToken exc) {
-    this.replica = null;
-    this.exc = exc;
-  }
-
-  public ShortCircuitReplica getReplica() {
-    return replica;
-  }
-
-  public InvalidToken getInvalidTokenException() {
-    return exc; 
-  }
-  
-  public String toString() {
-    StringBuilder builder = new StringBuilder();
-    String prefix = "";
-    builder.append("ShortCircuitReplicaInfo{");
-    if (replica != null) {
-      builder.append(prefix).append(replica);
-      prefix = ", ";
-    }
-    if (exc != null) {
-      builder.append(prefix).append(exc);
-      prefix = ", ";
-    }
-    builder.append("}");
-    return builder.toString();
-  }
-}
\ No newline at end of file


Mime
View raw message