Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D0A0E17E53 for ; Wed, 2 Sep 2015 05:58:16 +0000 (UTC) Received: (qmail 47296 invoked by uid 500); 2 Sep 2015 05:58:14 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 46981 invoked by uid 500); 2 Sep 2015 05:58:14 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 46645 invoked by uid 99); 2 Sep 2015 05:58:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Sep 2015 05:58:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 26D1EE099D; Wed, 2 Sep 2015 05:58:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Wed, 02 Sep 2015 05:58:17 -0000 Message-Id: <97d4fed74a874332bb8dc8fa688976ec@git.apache.org> In-Reply-To: <6565787dd4d443c7bb93c4c198c51fd2@git.apache.org> References: <6565787dd4d443c7bb93c4c198c51fd2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/50] [abbrv] hadoop git commit: HDFS-8951. Move the shortcircuit package to hdfs-client. Contributed by Mingliang Liu. http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java deleted file mode 100644 index 4977fd7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -import java.io.BufferedInputStream; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.DataChecksum; - -import com.google.common.annotations.VisibleForTesting; - - - -/** - * BlockMetadataHeader manages metadata for data blocks on Datanodes. - * This is not related to the Block related functionality in Namenode. - * The biggest part of data block metadata is CRC for the block. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class BlockMetadataHeader { - private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class); - - public static final short VERSION = 1; - - /** - * Header includes everything except the checksum(s) themselves. - * Version is two bytes. Following it is the DataChecksum - * that occupies 5 bytes. - */ - private final short version; - private DataChecksum checksum = null; - - private static final HdfsConfiguration conf = new HdfsConfiguration(); - - @VisibleForTesting - public BlockMetadataHeader(short version, DataChecksum checksum) { - this.checksum = checksum; - this.version = version; - } - - /** Get the version */ - public short getVersion() { - return version; - } - - /** Get the checksum */ - public DataChecksum getChecksum() { - return checksum; - } - - /** - * Read the checksum header from the meta file. - * @return the data checksum obtained from the header. - */ - public static DataChecksum readDataChecksum(File metaFile) throws IOException { - DataInputStream in = null; - try { - in = new DataInputStream(new BufferedInputStream( - new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf))); - return readDataChecksum(in, metaFile); - } finally { - IOUtils.closeStream(in); - } - } - - /** - * Read the checksum header from the meta input stream. - * @return the data checksum obtained from the header. - */ - public static DataChecksum readDataChecksum(final DataInputStream metaIn, - final Object name) throws IOException { - // read and handle the common header here. For now just a version - final BlockMetadataHeader header = readHeader(metaIn); - if (header.getVersion() != VERSION) { - LOG.warn("Unexpected meta-file version for " + name - + ": version in file is " + header.getVersion() - + " but expected version is " + VERSION); - } - return header.getChecksum(); - } - - /** - * Read the header without changing the position of the FileChannel. - * - * @param fc The FileChannel to read. - * @return the Metadata Header. - * @throws IOException on error. - */ - public static BlockMetadataHeader preadHeader(FileChannel fc) - throws IOException { - final byte arr[] = new byte[getHeaderSize()]; - ByteBuffer buf = ByteBuffer.wrap(arr); - - while (buf.hasRemaining()) { - if (fc.read(buf, 0) <= 0) { - throw new EOFException("unexpected EOF while reading " + - "metadata file header"); - } - } - short version = (short)((arr[0] << 8) | (arr[1] & 0xff)); - DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2); - return new BlockMetadataHeader(version, dataChecksum); - } - - /** - * This reads all the fields till the beginning of checksum. - * @return Metadata Header - * @throws IOException - */ - public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException { - return readHeader(in.readShort(), in); - } - - /** - * Reads header at the top of metadata file and returns the header. - * - * @return metadata header for the block - * @throws IOException - */ - public static BlockMetadataHeader readHeader(File file) throws IOException { - DataInputStream in = null; - try { - in = new DataInputStream(new BufferedInputStream( - new FileInputStream(file))); - return readHeader(in); - } finally { - IOUtils.closeStream(in); - } - } - - /** - * Read the header at the beginning of the given block meta file. - * The current file position will be altered by this method. - * If an error occurs, the file is not closed. - */ - public static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException { - byte[] buf = new byte[getHeaderSize()]; - raf.seek(0); - raf.readFully(buf, 0, buf.length); - return readHeader(new DataInputStream(new ByteArrayInputStream(buf))); - } - - // Version is already read. - private static BlockMetadataHeader readHeader(short version, DataInputStream in) - throws IOException { - DataChecksum checksum = DataChecksum.newDataChecksum(in); - return new BlockMetadataHeader(version, checksum); - } - - /** - * This writes all the fields till the beginning of checksum. - * @param out DataOutputStream - * @throws IOException - */ - @VisibleForTesting - public static void writeHeader(DataOutputStream out, - BlockMetadataHeader header) - throws IOException { - out.writeShort(header.getVersion()); - header.getChecksum().writeHeader(out); - } - - /** - * Writes all the fields till the beginning of checksum. - * @throws IOException on error - */ - public static void writeHeader(DataOutputStream out, DataChecksum checksum) - throws IOException { - writeHeader(out, new BlockMetadataHeader(VERSION, checksum)); - } - - /** - * Returns the size of the header - */ - public static int getHeaderSize() { - return Short.SIZE/Byte.SIZE + DataChecksum.getChecksumHeaderSize(); - } -} - http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 74d39f6..e981ccb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -305,7 +305,7 @@ class FsDatasetImpl implements FsDatasetSpi { volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); - asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); + asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf); deletingBlock = new HashMap>(); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { @@ -847,20 +847,20 @@ class FsDatasetImpl implements FsDatasetSpi { */ static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta, File srcFile, File destRoot, boolean calculateChecksum, - int smallBufferSize) throws IOException { + int smallBufferSize, final Configuration conf) throws IOException { final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); final File dstFile = new File(destDir, srcFile.getName()); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum, - smallBufferSize); + smallBufferSize, conf); } static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta, File dstFile, boolean calculateChecksum, - int smallBufferSize) + int smallBufferSize, final Configuration conf) throws IOException { if (calculateChecksum) { - computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize); + computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize, conf); } else { try { Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true); @@ -924,7 +924,7 @@ class FsDatasetImpl implements FsDatasetSpi { File[] blockFiles = copyBlockFiles(block.getBlockId(), block.getGenerationStamp(), oldMetaFile, oldBlockFile, targetVolume.getTmpDir(block.getBlockPoolId()), - replicaInfo.isOnTransientStorage(), smallBufferSize); + replicaInfo.isOnTransientStorage(), smallBufferSize, conf); ReplicaInfo newReplicaInfo = new ReplicaInPipeline( replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), @@ -953,9 +953,10 @@ class FsDatasetImpl implements FsDatasetSpi { * @throws IOException */ private static void computeChecksum(File srcMeta, File dstMeta, - File blockFile, int smallBufferSize) + File blockFile, int smallBufferSize, final Configuration conf) throws IOException { - final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta); + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta, + DFSUtil.getIoFileBufferSize(conf)); final byte[] data = new byte[1 << 16]; final byte[] crcs = new byte[checksum.getChecksumSize(data.length)]; @@ -2513,7 +2514,7 @@ class FsDatasetImpl implements FsDatasetSpi { final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), - dstMetaFile, dstBlockFile, true, smallBufferSize); + dstMetaFile, dstBlockFile, true, smallBufferSize, conf); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index 884df2e..2a4c191 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -53,6 +54,8 @@ class RamDiskAsyncLazyPersistService { private static final long THREADS_KEEP_ALIVE_SECONDS = 60; private final DataNode datanode; + private final Configuration conf; + private final ThreadGroup threadGroup; private Map executors = new HashMap(); @@ -65,8 +68,9 @@ class RamDiskAsyncLazyPersistService { * The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async * disk operations. */ - RamDiskAsyncLazyPersistService(DataNode datanode) { + RamDiskAsyncLazyPersistService(DataNode datanode, Configuration conf) { this.datanode = datanode; + this.conf = conf; this.threadGroup = new ThreadGroup(getClass().getSimpleName()); } @@ -240,7 +244,7 @@ class RamDiskAsyncLazyPersistService { // No FsDatasetImpl lock for the file copy File targetFiles[] = FsDatasetImpl.copyBlockFiles( blockId, genStamp, metaFile, blockFile, lazyPersistDir, true, - smallBufferSize); + smallBufferSize, conf); // Lock FsDataSetImpl during onCompleteLazyPersist callback dataset.onCompleteLazyPersist(bpId, blockId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java deleted file mode 100644 index 8184fdf..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import org.apache.hadoop.classification.InterfaceAudience; - -import java.io.Closeable; -import java.nio.MappedByteBuffer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * A reference to a memory-mapped region used by an HDFS client. - */ -@InterfaceAudience.Private -public class ClientMmap implements Closeable { - static final Log LOG = LogFactory.getLog(ClientMmap.class); - - /** - * A reference to the block replica which this mmap relates to. - */ - private ShortCircuitReplica replica; - - /** - * The java ByteBuffer object. - */ - private final MappedByteBuffer map; - - /** - * Whether or not this ClientMmap anchors the replica into memory while - * it exists. Closing an anchored ClientMmap unanchors the replica. - */ - private final boolean anchored; - - ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map, - boolean anchored) { - this.replica = replica; - this.map = map; - this.anchored = anchored; - } - - /** - * Close the ClientMmap object. - */ - @Override - public void close() { - if (replica != null) { - if (anchored) { - replica.removeNoChecksumAnchor(); - } - replica.unref(); - } - replica = null; - } - - public MappedByteBuffer getMappedByteBuffer() { - return map; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java deleted file mode 100644 index 992d8b4..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.util.PerformanceAdvisory; - -import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; - -public class DomainSocketFactory { - private static final Log LOG = LogFactory.getLog(DomainSocketFactory.class); - - public enum PathState { - UNUSABLE(false, false), - SHORT_CIRCUIT_DISABLED(true, false), - VALID(true, true); - - PathState(boolean usableForDataTransfer, boolean usableForShortCircuit) { - this.usableForDataTransfer = usableForDataTransfer; - this.usableForShortCircuit = usableForShortCircuit; - } - - public boolean getUsableForDataTransfer() { - return usableForDataTransfer; - } - - public boolean getUsableForShortCircuit() { - return usableForShortCircuit; - } - - private final boolean usableForDataTransfer; - private final boolean usableForShortCircuit; - } - - public static class PathInfo { - private final static PathInfo NOT_CONFIGURED = - new PathInfo("", PathState.UNUSABLE); - - final private String path; - final private PathState state; - - PathInfo(String path, PathState state) { - this.path = path; - this.state = state; - } - - public String getPath() { - return path; - } - - public PathState getPathState() { - return state; - } - - @Override - public String toString() { - return new StringBuilder().append("PathInfo{path=").append(path). - append(", state=").append(state).append("}").toString(); - } - } - - /** - * Information about domain socket paths. - */ - final Cache pathMap = - CacheBuilder.newBuilder() - .expireAfterWrite(10, TimeUnit.MINUTES) - .build(); - - public DomainSocketFactory(ShortCircuitConf conf) { - final String feature; - if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) { - feature = "The short-circuit local reads feature"; - } else if (conf.isDomainSocketDataTraffic()) { - feature = "UNIX domain socket data traffic"; - } else { - feature = null; - } - - if (feature == null) { - PerformanceAdvisory.LOG.debug( - "Both short-circuit local reads and UNIX domain socket are disabled."); - } else { - if (conf.getDomainSocketPath().isEmpty()) { - throw new HadoopIllegalArgumentException(feature + " is enabled but " - + HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set."); - } else if (DomainSocket.getLoadingFailureReason() != null) { - LOG.warn(feature + " cannot be used because " - + DomainSocket.getLoadingFailureReason()); - } else { - LOG.debug(feature + " is enabled."); - } - } - } - - /** - * Get information about a domain socket path. - * - * @param addr The inet address to use. - * @param conf The client configuration. - * - * @return Information about the socket path. - */ - public PathInfo getPathInfo(InetSocketAddress addr, ShortCircuitConf conf) { - // If there is no domain socket path configured, we can't use domain - // sockets. - if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED; - // If we can't do anything with the domain socket, don't create it. - if (!conf.isDomainSocketDataTraffic() && - (!conf.isShortCircuitLocalReads() || conf.isUseLegacyBlockReaderLocal())) { - return PathInfo.NOT_CONFIGURED; - } - // If the DomainSocket code is not loaded, we can't create - // DomainSocket objects. - if (DomainSocket.getLoadingFailureReason() != null) { - return PathInfo.NOT_CONFIGURED; - } - // UNIX domain sockets can only be used to talk to local peers - if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED; - String escapedPath = DomainSocket.getEffectivePath( - conf.getDomainSocketPath(), addr.getPort()); - PathState status = pathMap.getIfPresent(escapedPath); - if (status == null) { - return new PathInfo(escapedPath, PathState.VALID); - } else { - return new PathInfo(escapedPath, status); - } - } - - public DomainSocket createSocket(PathInfo info, int socketTimeout) { - Preconditions.checkArgument(info.getPathState() != PathState.UNUSABLE); - boolean success = false; - DomainSocket sock = null; - try { - sock = DomainSocket.connect(info.getPath()); - sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, socketTimeout); - success = true; - } catch (IOException e) { - LOG.warn("error creating DomainSocket", e); - // fall through - } finally { - if (!success) { - if (sock != null) { - IOUtils.closeQuietly(sock); - } - pathMap.put(info.getPath(), PathState.UNUSABLE); - sock = null; - } - } - return sock; - } - - public void disableShortCircuitForPath(String path) { - pathMap.put(path, PathState.SHORT_CIRCUIT_DISABLED); - } - - public void disableDomainSocketPath(String path) { - pathMap.put(path, PathState.UNUSABLE); - } - - @VisibleForTesting - public void clearPathMap() { - pathMap.invalidateAll(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java deleted file mode 100644 index 15b8dea..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ /dev/null @@ -1,1068 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import java.io.BufferedOutputStream; -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.MappedByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.commons.lang.mutable.MutableBoolean; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; -import org.apache.hadoop.hdfs.net.DomainPeer; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.RetriableException; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.DomainSocketWatcher; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.Waitable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * The ShortCircuitCache tracks things which the client needs to access - * HDFS block files via short-circuit. - * - * These things include: memory-mapped regions, file descriptors, and shared - * memory areas for communicating with the DataNode. - */ -@InterfaceAudience.Private -public class ShortCircuitCache implements Closeable { - public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class); - - /** - * Expiry thread which makes sure that the file descriptors get closed - * after a while. - */ - private class CacheCleaner implements Runnable, Closeable { - private ScheduledFuture future; - - /** - * Run the CacheCleaner thread. - * - * Whenever a thread requests a ShortCircuitReplica object, we will make - * sure it gets one. That ShortCircuitReplica object can then be re-used - * when another thread requests a ShortCircuitReplica object for the same - * block. So in that sense, there is no maximum size to the cache. - * - * However, when a ShortCircuitReplica object is unreferenced by the - * thread(s) that are using it, it becomes evictable. There are two - * separate eviction lists-- one for mmaped objects, and another for - * non-mmaped objects. We do this in order to avoid having the regular - * files kick the mmaped files out of the cache too quickly. Reusing - * an already-existing mmap gives a huge performance boost, since the - * page table entries don't have to be re-populated. Both the mmap - * and non-mmap evictable lists have maximum sizes and maximum lifespans. - */ - @Override - public void run() { - ShortCircuitCache.this.lock.lock(); - try { - if (ShortCircuitCache.this.closed) return; - long curMs = Time.monotonicNow(); - - if (LOG.isDebugEnabled()) { - LOG.debug(this + ": cache cleaner running at " + curMs); - } - - int numDemoted = demoteOldEvictableMmaped(curMs); - int numPurged = 0; - Long evictionTimeNs = Long.valueOf(0); - while (true) { - Entry entry = - evictable.ceilingEntry(evictionTimeNs); - if (entry == null) break; - evictionTimeNs = entry.getKey(); - long evictionTimeMs = - TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS); - if (evictionTimeMs + maxNonMmappedEvictableLifespanMs >= curMs) break; - ShortCircuitReplica replica = entry.getValue(); - if (LOG.isTraceEnabled()) { - LOG.trace("CacheCleaner: purging " + replica + ": " + - StringUtils.getStackTrace(Thread.currentThread())); - } - purge(replica); - numPurged++; - } - - if (LOG.isDebugEnabled()) { - LOG.debug(this + ": finishing cache cleaner run started at " + - curMs + ". Demoted " + numDemoted + " mmapped replicas; " + - "purged " + numPurged + " replicas."); - } - } finally { - ShortCircuitCache.this.lock.unlock(); - } - } - - @Override - public void close() throws IOException { - if (future != null) { - future.cancel(false); - } - } - - public void setFuture(ScheduledFuture future) { - this.future = future; - } - - /** - * Get the rate at which this cleaner thread should be scheduled. - * - * We do this by taking the minimum expiration time and dividing by 4. - * - * @return the rate in milliseconds at which this thread should be - * scheduled. - */ - public long getRateInMs() { - long minLifespanMs = - Math.min(maxNonMmappedEvictableLifespanMs, - maxEvictableMmapedLifespanMs); - long sampleTimeMs = minLifespanMs / 4; - return (sampleTimeMs < 1) ? 1 : sampleTimeMs; - } - } - - /** - * A task which asks the DataNode to release a short-circuit shared memory - * slot. If successful, this will tell the DataNode to stop monitoring - * changes to the mlock status of the replica associated with the slot. - * It will also allow us (the client) to re-use this slot for another - * replica. If we can't communicate with the DataNode for some reason, - * we tear down the shared memory segment to avoid being in an inconsistent - * state. - */ - private class SlotReleaser implements Runnable { - /** - * The slot that we need to release. - */ - private final Slot slot; - - SlotReleaser(Slot slot) { - this.slot = slot; - } - - @Override - public void run() { - if (LOG.isTraceEnabled()) { - LOG.trace(ShortCircuitCache.this + ": about to release " + slot); - } - final DfsClientShm shm = (DfsClientShm)slot.getShm(); - final DomainSocket shmSock = shm.getPeer().getDomainSocket(); - DomainSocket sock = null; - DataOutputStream out = null; - final String path = shmSock.getPath(); - boolean success = false; - try { - sock = DomainSocket.connect(path); - out = new DataOutputStream( - new BufferedOutputStream(sock.getOutputStream())); - new Sender(out).releaseShortCircuitFds(slot.getSlotId()); - DataInputStream in = new DataInputStream(sock.getInputStream()); - ReleaseShortCircuitAccessResponseProto resp = - ReleaseShortCircuitAccessResponseProto.parseFrom( - PBHelperClient.vintPrefixed(in)); - if (resp.getStatus() != Status.SUCCESS) { - String error = resp.hasError() ? resp.getError() : "(unknown)"; - throw new IOException(resp.getStatus().toString() + ": " + error); - } - if (LOG.isTraceEnabled()) { - LOG.trace(ShortCircuitCache.this + ": released " + slot); - } - success = true; - } catch (IOException e) { - LOG.error(ShortCircuitCache.this + ": failed to release " + - "short-circuit shared memory slot " + slot + " by sending " + - "ReleaseShortCircuitAccessRequestProto to " + path + - ". Closing shared memory segment.", e); - } finally { - if (success) { - shmManager.freeSlot(slot); - } else { - shm.getEndpointShmManager().shutdown(shm); - } - IOUtils.cleanup(LOG, sock, out); - } - } - } - - public interface ShortCircuitReplicaCreator { - /** - * Attempt to create a ShortCircuitReplica object. - * - * This callback will be made without holding any locks. - * - * @return a non-null ShortCircuitReplicaInfo object. - */ - ShortCircuitReplicaInfo createShortCircuitReplicaInfo(); - } - - /** - * Lock protecting the cache. - */ - private final ReentrantLock lock = new ReentrantLock(); - - /** - * The executor service that runs the cacheCleaner. - */ - private final ScheduledThreadPoolExecutor cleanerExecutor - = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder(). - setDaemon(true).setNameFormat("ShortCircuitCache_Cleaner"). - build()); - - /** - * The executor service that runs the cacheCleaner. - */ - private final ScheduledThreadPoolExecutor releaserExecutor - = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder(). - setDaemon(true).setNameFormat("ShortCircuitCache_SlotReleaser"). - build()); - - /** - * A map containing all ShortCircuitReplicaInfo objects, organized by Key. - * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken - * exception. - */ - private final HashMap> - replicaInfoMap = new HashMap>(); - - /** - * The CacheCleaner. We don't create this and schedule it until it becomes - * necessary. - */ - private CacheCleaner cacheCleaner; - - /** - * Tree of evictable elements. - * - * Maps (unique) insertion time in nanoseconds to the element. - */ - private final TreeMap evictable = - new TreeMap(); - - /** - * Maximum total size of the cache, including both mmapped and - * no$-mmapped elements. - */ - private final int maxTotalSize; - - /** - * Non-mmaped elements older than this will be closed. - */ - private long maxNonMmappedEvictableLifespanMs; - - /** - * Tree of mmaped evictable elements. - * - * Maps (unique) insertion time in nanoseconds to the element. - */ - private final TreeMap evictableMmapped = - new TreeMap(); - - /** - * Maximum number of mmaped evictable elements. - */ - private int maxEvictableMmapedSize; - - /** - * Mmaped elements older than this will be closed. - */ - private final long maxEvictableMmapedLifespanMs; - - /** - * The minimum number of milliseconds we'll wait after an unsuccessful - * mmap attempt before trying again. - */ - private final long mmapRetryTimeoutMs; - - /** - * How long we will keep replicas in the cache before declaring them - * to be stale. - */ - private final long staleThresholdMs; - - /** - * True if the ShortCircuitCache is closed. - */ - private boolean closed = false; - - /** - * Number of existing mmaps associated with this cache. - */ - private int outstandingMmapCount = 0; - - /** - * Manages short-circuit shared memory segments for the client. - */ - private final DfsClientShmManager shmManager; - - public static ShortCircuitCache fromConf(ShortCircuitConf conf) { - return new ShortCircuitCache( - conf.getShortCircuitStreamsCacheSize(), - conf.getShortCircuitStreamsCacheExpiryMs(), - conf.getShortCircuitMmapCacheSize(), - conf.getShortCircuitMmapCacheExpiryMs(), - conf.getShortCircuitMmapCacheRetryTimeout(), - conf.getShortCircuitCacheStaleThresholdMs(), - conf.getShortCircuitSharedMemoryWatcherInterruptCheckMs()); - } - - public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs, - int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs, - long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) { - Preconditions.checkArgument(maxTotalSize >= 0); - this.maxTotalSize = maxTotalSize; - Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0); - this.maxNonMmappedEvictableLifespanMs = maxNonMmappedEvictableLifespanMs; - Preconditions.checkArgument(maxEvictableMmapedSize >= 0); - this.maxEvictableMmapedSize = maxEvictableMmapedSize; - Preconditions.checkArgument(maxEvictableMmapedLifespanMs >= 0); - this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs; - this.mmapRetryTimeoutMs = mmapRetryTimeoutMs; - this.staleThresholdMs = staleThresholdMs; - DfsClientShmManager shmManager = null; - if ((shmInterruptCheckMs > 0) && - (DomainSocketWatcher.getLoadingFailureReason() == null)) { - try { - shmManager = new DfsClientShmManager(shmInterruptCheckMs); - } catch (IOException e) { - LOG.error("failed to create ShortCircuitShmManager", e); - } - } - this.shmManager = shmManager; - } - - public long getStaleThresholdMs() { - return staleThresholdMs; - } - - /** - * Increment the reference count of a replica, and remove it from any free - * list it may be in. - * - * You must hold the cache lock while calling this function. - * - * @param replica The replica we're removing. - */ - private void ref(ShortCircuitReplica replica) { - lock.lock(); - try { - Preconditions.checkArgument(replica.refCount > 0, - "can't ref %s because its refCount reached %d", replica, - replica.refCount); - Long evictableTimeNs = replica.getEvictableTimeNs(); - replica.refCount++; - if (evictableTimeNs != null) { - String removedFrom = removeEvictable(replica); - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": " + removedFrom + - " no longer contains " + replica + ". refCount " + - (replica.refCount - 1) + " -> " + replica.refCount + - StringUtils.getStackTrace(Thread.currentThread())); - - } - } else if (LOG.isTraceEnabled()) { - LOG.trace(this + ": replica refCount " + - (replica.refCount - 1) + " -> " + replica.refCount + - StringUtils.getStackTrace(Thread.currentThread())); - } - } finally { - lock.unlock(); - } - } - - /** - * Unreference a replica. - * - * You must hold the cache lock while calling this function. - * - * @param replica The replica being unreferenced. - */ - void unref(ShortCircuitReplica replica) { - lock.lock(); - try { - // If the replica is stale or unusable, but we haven't purged it yet, - // let's do that. It would be a shame to evict a non-stale replica so - // that we could put a stale or unusable one into the cache. - if (!replica.purged) { - String purgeReason = null; - if (!replica.getDataStream().getChannel().isOpen()) { - purgeReason = "purging replica because its data channel is closed."; - } else if (!replica.getMetaStream().getChannel().isOpen()) { - purgeReason = "purging replica because its meta channel is closed."; - } else if (replica.isStale()) { - purgeReason = "purging replica because it is stale."; - } - if (purgeReason != null) { - if (LOG.isDebugEnabled()) { - LOG.debug(this + ": " + purgeReason); - } - purge(replica); - } - } - String addedString = ""; - boolean shouldTrimEvictionMaps = false; - int newRefCount = --replica.refCount; - if (newRefCount == 0) { - // Close replica, since there are no remaining references to it. - Preconditions.checkArgument(replica.purged, - "Replica %s reached a refCount of 0 without being purged", replica); - replica.close(); - } else if (newRefCount == 1) { - Preconditions.checkState(null == replica.getEvictableTimeNs(), - "Replica %s had a refCount higher than 1, " + - "but was still evictable (evictableTimeNs = %d)", - replica, replica.getEvictableTimeNs()); - if (!replica.purged) { - // Add the replica to the end of an eviction list. - // Eviction lists are sorted by time. - if (replica.hasMmap()) { - insertEvictable(System.nanoTime(), replica, evictableMmapped); - addedString = "added to evictableMmapped, "; - } else { - insertEvictable(System.nanoTime(), replica, evictable); - addedString = "added to evictable, "; - } - shouldTrimEvictionMaps = true; - } - } else { - Preconditions.checkArgument(replica.refCount >= 0, - "replica's refCount went negative (refCount = %d" + - " for %s)", replica.refCount, replica); - } - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": unref replica " + replica + - ": " + addedString + " refCount " + - (newRefCount + 1) + " -> " + newRefCount + - StringUtils.getStackTrace(Thread.currentThread())); - } - if (shouldTrimEvictionMaps) { - trimEvictionMaps(); - } - } finally { - lock.unlock(); - } - } - - /** - * Demote old evictable mmaps into the regular eviction map. - * - * You must hold the cache lock while calling this function. - * - * @param now Current time in monotonic milliseconds. - * @return Number of replicas demoted. - */ - private int demoteOldEvictableMmaped(long now) { - int numDemoted = 0; - boolean needMoreSpace = false; - Long evictionTimeNs = Long.valueOf(0); - - while (true) { - Entry entry = - evictableMmapped.ceilingEntry(evictionTimeNs); - if (entry == null) break; - evictionTimeNs = entry.getKey(); - long evictionTimeMs = - TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS); - if (evictionTimeMs + maxEvictableMmapedLifespanMs >= now) { - if (evictableMmapped.size() < maxEvictableMmapedSize) { - break; - } - needMoreSpace = true; - } - ShortCircuitReplica replica = entry.getValue(); - if (LOG.isTraceEnabled()) { - String rationale = needMoreSpace ? "because we need more space" : - "because it's too old"; - LOG.trace("demoteOldEvictable: demoting " + replica + ": " + - rationale + ": " + - StringUtils.getStackTrace(Thread.currentThread())); - } - removeEvictable(replica, evictableMmapped); - munmap(replica); - insertEvictable(evictionTimeNs, replica, evictable); - numDemoted++; - } - return numDemoted; - } - - /** - * Trim the eviction lists. - */ - private void trimEvictionMaps() { - long now = Time.monotonicNow(); - demoteOldEvictableMmaped(now); - - while (true) { - long evictableSize = evictable.size(); - long evictableMmappedSize = evictableMmapped.size(); - if (evictableSize + evictableMmappedSize <= maxTotalSize) { - return; - } - ShortCircuitReplica replica; - if (evictableSize == 0) { - replica = evictableMmapped.firstEntry().getValue(); - } else { - replica = evictable.firstEntry().getValue(); - } - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": trimEvictionMaps is purging " + replica + - StringUtils.getStackTrace(Thread.currentThread())); - } - purge(replica); - } - } - - /** - * Munmap a replica, updating outstandingMmapCount. - * - * @param replica The replica to munmap. - */ - private void munmap(ShortCircuitReplica replica) { - replica.munmap(); - outstandingMmapCount--; - } - - /** - * Remove a replica from an evictable map. - * - * @param replica The replica to remove. - * @return The map it was removed from. - */ - private String removeEvictable(ShortCircuitReplica replica) { - if (replica.hasMmap()) { - removeEvictable(replica, evictableMmapped); - return "evictableMmapped"; - } else { - removeEvictable(replica, evictable); - return "evictable"; - } - } - - /** - * Remove a replica from an evictable map. - * - * @param replica The replica to remove. - * @param map The map to remove it from. - */ - private void removeEvictable(ShortCircuitReplica replica, - TreeMap map) { - Long evictableTimeNs = replica.getEvictableTimeNs(); - Preconditions.checkNotNull(evictableTimeNs); - ShortCircuitReplica removed = map.remove(evictableTimeNs); - Preconditions.checkState(removed == replica, - "failed to make %s unevictable", replica); - replica.setEvictableTimeNs(null); - } - - /** - * Insert a replica into an evictable map. - * - * If an element already exists with this eviction time, we add a nanosecond - * to it until we find an unused key. - * - * @param evictionTimeNs The eviction time in absolute nanoseconds. - * @param replica The replica to insert. - * @param map The map to insert it into. - */ - private void insertEvictable(Long evictionTimeNs, - ShortCircuitReplica replica, TreeMap map) { - while (map.containsKey(evictionTimeNs)) { - evictionTimeNs++; - } - Preconditions.checkState(null == replica.getEvictableTimeNs()); - replica.setEvictableTimeNs(evictionTimeNs); - map.put(evictionTimeNs, replica); - } - - /** - * Purge a replica from the cache. - * - * This doesn't necessarily close the replica, since there may be - * outstanding references to it. However, it does mean the cache won't - * hand it out to anyone after this. - * - * You must hold the cache lock while calling this function. - * - * @param replica The replica being removed. - */ - private void purge(ShortCircuitReplica replica) { - boolean removedFromInfoMap = false; - String evictionMapName = null; - Preconditions.checkArgument(!replica.purged); - replica.purged = true; - Waitable val = replicaInfoMap.get(replica.key); - if (val != null) { - ShortCircuitReplicaInfo info = val.getVal(); - if ((info != null) && (info.getReplica() == replica)) { - replicaInfoMap.remove(replica.key); - removedFromInfoMap = true; - } - } - Long evictableTimeNs = replica.getEvictableTimeNs(); - if (evictableTimeNs != null) { - evictionMapName = removeEvictable(replica); - } - if (LOG.isTraceEnabled()) { - StringBuilder builder = new StringBuilder(); - builder.append(this).append(": ").append(": purged "). - append(replica).append(" from the cache."); - if (removedFromInfoMap) { - builder.append(" Removed from the replicaInfoMap."); - } - if (evictionMapName != null) { - builder.append(" Removed from ").append(evictionMapName); - } - LOG.trace(builder.toString()); - } - unref(replica); - } - - /** - * Fetch or create a replica. - * - * You must hold the cache lock while calling this function. - * - * @param key Key to use for lookup. - * @param creator Replica creator callback. Will be called without - * the cache lock being held. - * - * @return Null if no replica could be found or created. - * The replica, otherwise. - */ - public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key, - ShortCircuitReplicaCreator creator) { - Waitable newWaitable = null; - lock.lock(); - try { - ShortCircuitReplicaInfo info = null; - do { - if (closed) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": can't fetchOrCreate " + key + - " because the cache is closed."); - } - return null; - } - Waitable waitable = replicaInfoMap.get(key); - if (waitable != null) { - try { - info = fetch(key, waitable); - } catch (RetriableException e) { - if (LOG.isDebugEnabled()) { - LOG.debug(this + ": retrying " + e.getMessage()); - } - continue; - } - } - } while (false); - if (info != null) return info; - // We need to load the replica ourselves. - newWaitable = new Waitable(lock.newCondition()); - replicaInfoMap.put(key, newWaitable); - } finally { - lock.unlock(); - } - return create(key, creator, newWaitable); - } - - /** - * Fetch an existing ReplicaInfo object. - * - * @param key The key that we're using. - * @param waitable The waitable object to wait on. - * @return The existing ReplicaInfo object, or null if there is - * none. - * - * @throws RetriableException If the caller needs to retry. - */ - private ShortCircuitReplicaInfo fetch(ExtendedBlockId key, - Waitable waitable) throws RetriableException { - // Another thread is already in the process of loading this - // ShortCircuitReplica. So we simply wait for it to complete. - ShortCircuitReplicaInfo info; - try { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": found waitable for " + key); - } - info = waitable.await(); - } catch (InterruptedException e) { - LOG.info(this + ": interrupted while waiting for " + key); - Thread.currentThread().interrupt(); - throw new RetriableException("interrupted"); - } - if (info.getInvalidTokenException() != null) { - LOG.info(this + ": could not get " + key + " due to InvalidToken " + - "exception.", info.getInvalidTokenException()); - return info; - } - ShortCircuitReplica replica = info.getReplica(); - if (replica == null) { - LOG.warn(this + ": failed to get " + key); - return info; - } - if (replica.purged) { - // Ignore replicas that have already been purged from the cache. - throw new RetriableException("Ignoring purged replica " + - replica + ". Retrying."); - } - // Check if the replica is stale before using it. - // If it is, purge it and retry. - if (replica.isStale()) { - LOG.info(this + ": got stale replica " + replica + ". Removing " + - "this replica from the replicaInfoMap and retrying."); - // Remove the cache's reference to the replica. This may or may not - // trigger a close. - purge(replica); - throw new RetriableException("ignoring stale replica " + replica); - } - ref(replica); - return info; - } - - private ShortCircuitReplicaInfo create(ExtendedBlockId key, - ShortCircuitReplicaCreator creator, - Waitable newWaitable) { - // Handle loading a new replica. - ShortCircuitReplicaInfo info = null; - try { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": loading " + key); - } - info = creator.createShortCircuitReplicaInfo(); - } catch (RuntimeException e) { - LOG.warn(this + ": failed to load " + key, e); - } - if (info == null) info = new ShortCircuitReplicaInfo(); - lock.lock(); - try { - if (info.getReplica() != null) { - // On success, make sure the cache cleaner thread is running. - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": successfully loaded " + info.getReplica()); - } - startCacheCleanerThreadIfNeeded(); - // Note: new ShortCircuitReplicas start with a refCount of 2, - // indicating that both this cache and whoever requested the - // creation of the replica hold a reference. So we don't need - // to increment the reference count here. - } else { - // On failure, remove the waitable from the replicaInfoMap. - Waitable waitableInMap = replicaInfoMap.get(key); - if (waitableInMap == newWaitable) replicaInfoMap.remove(key); - if (info.getInvalidTokenException() != null) { - LOG.info(this + ": could not load " + key + " due to InvalidToken " + - "exception.", info.getInvalidTokenException()); - } else { - LOG.warn(this + ": failed to load " + key); - } - } - newWaitable.provide(info); - } finally { - lock.unlock(); - } - return info; - } - - private void startCacheCleanerThreadIfNeeded() { - if (cacheCleaner == null) { - cacheCleaner = new CacheCleaner(); - long rateMs = cacheCleaner.getRateInMs(); - ScheduledFuture future = - cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs, - TimeUnit.MILLISECONDS); - cacheCleaner.setFuture(future); - if (LOG.isDebugEnabled()) { - LOG.debug(this + ": starting cache cleaner thread which will run " + - "every " + rateMs + " ms"); - } - } - } - - ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica, - boolean anchored) { - Condition newCond; - lock.lock(); - try { - while (replica.mmapData != null) { - if (replica.mmapData instanceof MappedByteBuffer) { - ref(replica); - MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData; - return new ClientMmap(replica, mmap, anchored); - } else if (replica.mmapData instanceof Long) { - long lastAttemptTimeMs = (Long)replica.mmapData; - long delta = Time.monotonicNow() - lastAttemptTimeMs; - if (delta < mmapRetryTimeoutMs) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": can't create client mmap for " + - replica + " because we failed to " + - "create one just " + delta + "ms ago."); - } - return null; - } - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": retrying client mmap for " + replica + - ", " + delta + " ms after the previous failure."); - } - } else if (replica.mmapData instanceof Condition) { - Condition cond = (Condition)replica.mmapData; - cond.awaitUninterruptibly(); - } else { - Preconditions.checkState(false, "invalid mmapData type %s", - replica.mmapData.getClass().getName()); - } - } - newCond = lock.newCondition(); - replica.mmapData = newCond; - } finally { - lock.unlock(); - } - MappedByteBuffer map = replica.loadMmapInternal(); - lock.lock(); - try { - if (map == null) { - replica.mmapData = Long.valueOf(Time.monotonicNow()); - newCond.signalAll(); - return null; - } else { - outstandingMmapCount++; - replica.mmapData = map; - ref(replica); - newCond.signalAll(); - return new ClientMmap(replica, map, anchored); - } - } finally { - lock.unlock(); - } - } - - /** - * Close the cache and free all associated resources. - */ - @Override - public void close() { - try { - lock.lock(); - if (closed) return; - closed = true; - LOG.info(this + ": closing"); - maxNonMmappedEvictableLifespanMs = 0; - maxEvictableMmapedSize = 0; - // Close and join cacheCleaner thread. - IOUtils.cleanup(LOG, cacheCleaner); - // Purge all replicas. - while (true) { - Entry entry = evictable.firstEntry(); - if (entry == null) break; - purge(entry.getValue()); - } - while (true) { - Entry entry = evictableMmapped.firstEntry(); - if (entry == null) break; - purge(entry.getValue()); - } - } finally { - lock.unlock(); - } - - releaserExecutor.shutdown(); - cleanerExecutor.shutdown(); - // wait for existing tasks to terminate - try { - if (!releaserExecutor.awaitTermination(30, TimeUnit.SECONDS)) { - LOG.error("Forcing SlotReleaserThreadPool to shutdown!"); - releaserExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - releaserExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - LOG.error("Interrupted while waiting for SlotReleaserThreadPool " - + "to terminate", e); - } - - // wait for existing tasks to terminate - try { - if (!cleanerExecutor.awaitTermination(30, TimeUnit.SECONDS)) { - LOG.error("Forcing CleanerThreadPool to shutdown!"); - cleanerExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - cleanerExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - LOG.error("Interrupted while waiting for CleanerThreadPool " - + "to terminate", e); - } - IOUtils.cleanup(LOG, shmManager); - } - - @VisibleForTesting // ONLY for testing - public interface CacheVisitor { - void visit(int numOutstandingMmaps, - Map replicas, - Map failedLoads, - Map evictable, - Map evictableMmapped); - } - - @VisibleForTesting // ONLY for testing - public void accept(CacheVisitor visitor) { - lock.lock(); - try { - Map replicas = - new HashMap(); - Map failedLoads = - new HashMap(); - for (Entry> entry : - replicaInfoMap.entrySet()) { - Waitable waitable = entry.getValue(); - if (waitable.hasVal()) { - if (waitable.getVal().getReplica() != null) { - replicas.put(entry.getKey(), waitable.getVal().getReplica()); - } else { - // The exception may be null here, indicating a failed load that - // isn't the result of an invalid block token. - failedLoads.put(entry.getKey(), - waitable.getVal().getInvalidTokenException()); - } - } - } - if (LOG.isDebugEnabled()) { - StringBuilder builder = new StringBuilder(); - builder.append("visiting ").append(visitor.getClass().getName()). - append("with outstandingMmapCount=").append(outstandingMmapCount). - append(", replicas="); - String prefix = ""; - for (Entry entry : replicas.entrySet()) { - builder.append(prefix).append(entry.getValue()); - prefix = ","; - } - prefix = ""; - builder.append(", failedLoads="); - for (Entry entry : failedLoads.entrySet()) { - builder.append(prefix).append(entry.getValue()); - prefix = ","; - } - prefix = ""; - builder.append(", evictable="); - for (Entry entry : evictable.entrySet()) { - builder.append(prefix).append(entry.getKey()). - append(":").append(entry.getValue()); - prefix = ","; - } - prefix = ""; - builder.append(", evictableMmapped="); - for (Entry entry : evictableMmapped.entrySet()) { - builder.append(prefix).append(entry.getKey()). - append(":").append(entry.getValue()); - prefix = ","; - } - LOG.debug(builder.toString()); - } - visitor.visit(outstandingMmapCount, replicas, failedLoads, - evictable, evictableMmapped); - } finally { - lock.unlock(); - } - } - - @Override - public String toString() { - return "ShortCircuitCache(0x" + - Integer.toHexString(System.identityHashCode(this)) + ")"; - } - - /** - * Allocate a new shared memory slot. - * - * @param datanode The datanode to allocate a shm slot with. - * @param peer A peer connected to the datanode. - * @param usedPeer Will be set to true if we use up the provided peer. - * @param blockId The block id and block pool id of the block we're - * allocating this slot for. - * @param clientName The name of the DFSClient allocating the shared - * memory. - * @return Null if short-circuit shared memory is disabled; - * a short-circuit memory slot otherwise. - * @throws IOException An exception if there was an error talking to - * the datanode. - */ - public Slot allocShmSlot(DatanodeInfo datanode, - DomainPeer peer, MutableBoolean usedPeer, - ExtendedBlockId blockId, String clientName) throws IOException { - if (shmManager != null) { - return shmManager.allocSlot(datanode, peer, usedPeer, - blockId, clientName); - } else { - return null; - } - } - - /** - * Free a slot immediately. - * - * ONLY use this if the DataNode is not yet aware of the slot. - * - * @param slot The slot to free. - */ - public void freeSlot(Slot slot) { - Preconditions.checkState(shmManager != null); - slot.makeInvalid(); - shmManager.freeSlot(slot); - } - - /** - * Schedule a shared memory slot to be released. - * - * @param slot The slot to release. - */ - public void scheduleSlotReleaser(Slot slot) { - Preconditions.checkState(shmManager != null); - releaserExecutor.execute(new SlotReleaser(slot)); - } - - @VisibleForTesting - public DfsClientShmManager getDfsClientShmManager() { - return shmManager; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java deleted file mode 100644 index 1390cf3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.FileChannel.MapMode; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.util.Time; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -/** - * A ShortCircuitReplica object contains file descriptors for a block that - * we are reading via short-circuit local reads. - * - * The file descriptors can be shared between multiple threads because - * all the operations we perform are stateless-- i.e., we use pread - * instead of read, to avoid using the shared position state. - */ -@InterfaceAudience.Private -public class ShortCircuitReplica { - public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class); - - /** - * Identifies this ShortCircuitReplica object. - */ - final ExtendedBlockId key; - - /** - * The block data input stream. - */ - private final FileInputStream dataStream; - - /** - * The block metadata input stream. - * - * TODO: make this nullable if the file has no checksums on disk. - */ - private final FileInputStream metaStream; - - /** - * Block metadata header. - */ - private final BlockMetadataHeader metaHeader; - - /** - * The cache we belong to. - */ - private final ShortCircuitCache cache; - - /** - * Monotonic time at which the replica was created. - */ - private final long creationTimeMs; - - /** - * If non-null, the shared memory slot associated with this replica. - */ - private final Slot slot; - - /** - * Current mmap state. - * - * Protected by the cache lock. - */ - Object mmapData; - - /** - * True if this replica has been purged from the cache; false otherwise. - * - * Protected by the cache lock. - */ - boolean purged = false; - - /** - * Number of external references to this replica. Replicas are referenced - * by the cache, BlockReaderLocal instances, and by ClientMmap instances. - * The number starts at 2 because when we create a replica, it is referenced - * by both the cache and the requester. - * - * Protected by the cache lock. - */ - int refCount = 2; - - /** - * The monotonic time in nanoseconds at which the replica became evictable, or - * null if it is not evictable. - * - * Protected by the cache lock. - */ - private Long evictableTimeNs = null; - - public ShortCircuitReplica(ExtendedBlockId key, - FileInputStream dataStream, FileInputStream metaStream, - ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException { - this.key = key; - this.dataStream = dataStream; - this.metaStream = metaStream; - this.metaHeader = - BlockMetadataHeader.preadHeader(metaStream.getChannel()); - if (metaHeader.getVersion() != 1) { - throw new IOException("invalid metadata header version " + - metaHeader.getVersion() + ". Can only handle version 1."); - } - this.cache = cache; - this.creationTimeMs = creationTimeMs; - this.slot = slot; - } - - /** - * Decrement the reference count. - */ - public void unref() { - cache.unref(this); - } - - /** - * Check if the replica is stale. - * - * Must be called with the cache lock held. - */ - boolean isStale() { - if (slot != null) { - // Check staleness by looking at the shared memory area we use to - // communicate with the DataNode. - boolean stale = !slot.isValid(); - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": checked shared memory segment. isStale=" + stale); - } - return stale; - } else { - // Fall back to old, time-based staleness method. - long deltaMs = Time.monotonicNow() - creationTimeMs; - long staleThresholdMs = cache.getStaleThresholdMs(); - if (deltaMs > staleThresholdMs) { - if (LOG.isTraceEnabled()) { - LOG.trace(this + " is stale because it's " + deltaMs + - " ms old, and staleThresholdMs = " + staleThresholdMs); - } - return true; - } else { - if (LOG.isTraceEnabled()) { - LOG.trace(this + " is not stale because it's only " + deltaMs + - " ms old, and staleThresholdMs = " + staleThresholdMs); - } - return false; - } - } - } - - /** - * Try to add a no-checksum anchor to our shared memory slot. - * - * It is only possible to add this anchor when the block is mlocked on the Datanode. - * The DataNode will not munlock the block until the number of no-checksum anchors - * for the block reaches zero. - * - * This method does not require any synchronization. - * - * @return True if we successfully added a no-checksum anchor. - */ - public boolean addNoChecksumAnchor() { - if (slot == null) { - return false; - } - boolean result = slot.addAnchor(); - if (LOG.isTraceEnabled()) { - if (result) { - LOG.trace(this + ": added no-checksum anchor to slot " + slot); - } else { - LOG.trace(this + ": could not add no-checksum anchor to slot " + slot); - } - } - return result; - } - - /** - * Remove a no-checksum anchor for our shared memory slot. - * - * This method does not require any synchronization. - */ - public void removeNoChecksumAnchor() { - if (slot != null) { - slot.removeAnchor(); - } - } - - /** - * Check if the replica has an associated mmap that has been fully loaded. - * - * Must be called with the cache lock held. - */ - @VisibleForTesting - public boolean hasMmap() { - return ((mmapData != null) && (mmapData instanceof MappedByteBuffer)); - } - - /** - * Free the mmap associated with this replica. - * - * Must be called with the cache lock held. - */ - void munmap() { - MappedByteBuffer mmap = (MappedByteBuffer)mmapData; - NativeIO.POSIX.munmap(mmap); - mmapData = null; - } - - /** - * Close the replica. - * - * Must be called after there are no more references to the replica in the - * cache or elsewhere. - */ - void close() { - String suffix = ""; - - Preconditions.checkState(refCount == 0, - "tried to close replica with refCount %d: %s", refCount, this); - refCount = -1; - Preconditions.checkState(purged, - "tried to close unpurged replica %s", this); - if (hasMmap()) { - munmap(); - if (LOG.isTraceEnabled()) { - suffix += " munmapped."; - } - } - IOUtils.cleanup(LOG, dataStream, metaStream); - if (slot != null) { - cache.scheduleSlotReleaser(slot); - if (LOG.isTraceEnabled()) { - suffix += " scheduling " + slot + " for later release."; - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("closed " + this + suffix); - } - } - - public FileInputStream getDataStream() { - return dataStream; - } - - public FileInputStream getMetaStream() { - return metaStream; - } - - public BlockMetadataHeader getMetaHeader() { - return metaHeader; - } - - public ExtendedBlockId getKey() { - return key; - } - - public ClientMmap getOrCreateClientMmap(boolean anchor) { - return cache.getOrCreateClientMmap(this, anchor); - } - - MappedByteBuffer loadMmapInternal() { - try { - FileChannel channel = dataStream.getChannel(); - MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, - Math.min(Integer.MAX_VALUE, channel.size())); - if (LOG.isTraceEnabled()) { - LOG.trace(this + ": created mmap of size " + channel.size()); - } - return mmap; - } catch (IOException e) { - LOG.warn(this + ": mmap error", e); - return null; - } catch (RuntimeException e) { - LOG.warn(this + ": mmap error", e); - return null; - } - } - - /** - * Get the evictable time in nanoseconds. - * - * Note: you must hold the cache lock to call this function. - * - * @return the evictable time in nanoseconds. - */ - public Long getEvictableTimeNs() { - return evictableTimeNs; - } - - /** - * Set the evictable time in nanoseconds. - * - * Note: you must hold the cache lock to call this function. - * - * @param evictableTimeNs The evictable time in nanoseconds, or null - * to set no evictable time. - */ - void setEvictableTimeNs(Long evictableTimeNs) { - this.evictableTimeNs = evictableTimeNs; - } - - @VisibleForTesting - public Slot getSlot() { - return slot; - } - - /** - * Convert the replica to a string for debugging purposes. - * Note that we can't take the lock here. - */ - @Override - public String toString() { - return new StringBuilder().append("ShortCircuitReplica{"). - append("key=").append(key). - append(", metaHeader.version=").append(metaHeader.getVersion()). - append(", metaHeader.checksum=").append(metaHeader.getChecksum()). - append(", ident=").append("0x"). - append(Integer.toHexString(System.identityHashCode(this))). - append(", creationTimeMs=").append(creationTimeMs). - append("}").toString(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java deleted file mode 100644 index ef0019f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.shortcircuit; - -import org.apache.hadoop.security.token.SecretManager.InvalidToken; - -public final class ShortCircuitReplicaInfo { - private final ShortCircuitReplica replica; - private final InvalidToken exc; - - public ShortCircuitReplicaInfo() { - this.replica = null; - this.exc = null; - } - - public ShortCircuitReplicaInfo(ShortCircuitReplica replica) { - this.replica = replica; - this.exc = null; - } - - public ShortCircuitReplicaInfo(InvalidToken exc) { - this.replica = null; - this.exc = exc; - } - - public ShortCircuitReplica getReplica() { - return replica; - } - - public InvalidToken getInvalidTokenException() { - return exc; - } - - public String toString() { - StringBuilder builder = new StringBuilder(); - String prefix = ""; - builder.append("ShortCircuitReplicaInfo{"); - if (replica != null) { - builder.append(prefix).append(replica); - prefix = ", "; - } - if (exc != null) { - builder.append(prefix).append(exc); - prefix = ", "; - } - builder.append("}"); - return builder.toString(); - } -} \ No newline at end of file