Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 200A118D3D for ; Fri, 28 Aug 2015 21:38:41 +0000 (UTC) Received: (qmail 57254 invoked by uid 500); 28 Aug 2015 21:38:40 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 57034 invoked by uid 500); 28 Aug 2015 21:38:40 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 56840 invoked by uid 99); 28 Aug 2015 21:38:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Aug 2015 21:38:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 18E6EE0F7C; Fri, 28 Aug 2015 21:38:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wheat9@apache.org To: common-commits@hadoop.apache.org Date: Fri, 28 Aug 2015 21:38:42 -0000 Message-Id: <0483a5cda2744dbb82ef143453ab79fe@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/4] hadoop git commit: HDFS-8925. Move BlockReaderLocal to hdfs-client. Contributed by Mingliang Liu. http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java new file mode 100644 index 0000000..55aa741 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.LinkedListMultimap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.util.IOUtilsClient; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A cache of input stream sockets to Data Node. + */ +@InterfaceStability.Unstable +@InterfaceAudience.Private +@VisibleForTesting +public class PeerCache { + private static final Logger LOG = LoggerFactory.getLogger(PeerCache.class); + + private static class Key { + final DatanodeID dnID; + final boolean isDomain; + + Key(DatanodeID dnID, boolean isDomain) { + this.dnID = dnID; + this.isDomain = isDomain; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Key)) { + return false; + } + Key other = (Key)o; + return dnID.equals(other.dnID) && isDomain == other.isDomain; + } + + @Override + public int hashCode() { + return dnID.hashCode() ^ (isDomain ? 1 : 0); + } + } + + private static class Value { + private final Peer peer; + private final long time; + + Value(Peer peer, long time) { + this.peer = peer; + this.time = time; + } + + Peer getPeer() { + return peer; + } + + long getTime() { + return time; + } + } + + private Daemon daemon; + /** A map for per user per datanode. */ + private final LinkedListMultimap multimap = + LinkedListMultimap.create(); + private final int capacity; + private final long expiryPeriod; + + public PeerCache(int c, long e) { + this.capacity = c; + this.expiryPeriod = e; + + if (capacity == 0 ) { + LOG.info("SocketCache disabled."); + } else if (expiryPeriod == 0) { + throw new IllegalStateException("Cannot initialize expiryPeriod to " + + expiryPeriod + " when cache is enabled."); + } + } + + private boolean isDaemonStarted() { + return (daemon == null)? false: true; + } + + private synchronized void startExpiryDaemon() { + // start daemon only if not already started + if (isDaemonStarted() == true) { + return; + } + + daemon = new Daemon(new Runnable() { + @Override + public void run() { + try { + PeerCache.this.run(); + } catch(InterruptedException e) { + //noop + } finally { + PeerCache.this.clear(); + } + } + + @Override + public String toString() { + return String.valueOf(PeerCache.this); + } + }); + daemon.start(); + } + + /** + * Get a cached peer connected to the given DataNode. + * @param dnId The DataNode to get a Peer for. + * @param isDomain Whether to retrieve a DomainPeer or not. + * + * @return An open Peer connected to the DN, or null if none + * was found. + */ + public Peer get(DatanodeID dnId, boolean isDomain) { + + if (capacity <= 0) { // disabled + return null; + } + return getInternal(dnId, isDomain); + } + + private synchronized Peer getInternal(DatanodeID dnId, boolean isDomain) { + List sockStreamList = multimap.get(new Key(dnId, isDomain)); + if (sockStreamList == null) { + return null; + } + + Iterator iter = sockStreamList.iterator(); + while (iter.hasNext()) { + Value candidate = iter.next(); + iter.remove(); + long ageMs = Time.monotonicNow() - candidate.getTime(); + Peer peer = candidate.getPeer(); + if (ageMs >= expiryPeriod) { + try { + peer.close(); + } catch (IOException e) { + LOG.warn("got IOException closing stale peer " + peer + + ", which is " + ageMs + " ms old"); + } + } else if (!peer.isClosed()) { + return peer; + } + } + return null; + } + + /** + * Give an unused socket to the cache. + */ + public void put(DatanodeID dnId, Peer peer) { + Preconditions.checkNotNull(dnId); + Preconditions.checkNotNull(peer); + if (peer.isClosed()) return; + if (capacity <= 0) { + // Cache disabled. + IOUtilsClient.cleanup(LOG, peer); + return; + } + putInternal(dnId, peer); + } + + private synchronized void putInternal(DatanodeID dnId, Peer peer) { + startExpiryDaemon(); + + if (capacity == multimap.size()) { + evictOldest(); + } + multimap.put(new Key(dnId, peer.getDomainSocket() != null), + new Value(peer, Time.monotonicNow())); + } + + public synchronized int size() { + return multimap.size(); + } + + /** + * Evict and close sockets older than expiry period from the cache. + */ + private synchronized void evictExpired(long expiryPeriod) { + while (multimap.size() != 0) { + Iterator> iter = + multimap.entries().iterator(); + Entry entry = iter.next(); + // if oldest socket expired, remove it + if (entry == null || + Time.monotonicNow() - entry.getValue().getTime() < + expiryPeriod) { + break; + } + IOUtilsClient.cleanup(LOG, entry.getValue().getPeer()); + iter.remove(); + } + } + + /** + * Evict the oldest entry in the cache. + */ + private synchronized void evictOldest() { + // We can get the oldest element immediately, because of an interesting + // property of LinkedListMultimap: its iterator traverses entries in the + // order that they were added. + Iterator> iter = + multimap.entries().iterator(); + if (!iter.hasNext()) { + throw new IllegalStateException("Cannot evict from empty cache! " + + "capacity: " + capacity); + } + Entry entry = iter.next(); + IOUtilsClient.cleanup(LOG, entry.getValue().getPeer()); + iter.remove(); + } + + /** + * Periodically check in the cache and expire the entries + * older than expiryPeriod minutes + */ + private void run() throws InterruptedException { + for(long lastExpiryTime = Time.monotonicNow(); + !Thread.interrupted(); + Thread.sleep(expiryPeriod)) { + final long elapsed = Time.monotonicNow() - lastExpiryTime; + if (elapsed >= expiryPeriod) { + evictExpired(expiryPeriod); + lastExpiryTime = Time.monotonicNow(); + } + } + clear(); + throw new InterruptedException("Daemon Interrupted"); + } + + /** + * Empty the cache, and close all sockets. + */ + @VisibleForTesting + synchronized void clear() { + for (Value value : multimap.values()) { + IOUtilsClient.cleanup(LOG, value.getPeer()); + } + multimap.clear(); + } + + @VisibleForTesting + void close() { + clear(); + if (daemon != null) { + daemon.interrupt(); + try { + daemon.join(); + } catch (InterruptedException e) { + throw new RuntimeException("failed to join thread"); + } + } + daemon = null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java new file mode 100644 index 0000000..07f4836 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Options that can be specified when manually triggering a block report. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class BlockReportOptions { + private final boolean incremental; + + private BlockReportOptions(boolean incremental) { + this.incremental = incremental; + } + + public boolean isIncremental() { + return incremental; + } + + public static class Factory { + private boolean incremental = false; + + public Factory() { + } + + public Factory setIncremental(boolean incremental) { + this.incremental = incremental; + return this; + } + + public BlockReportOptions build() { + return new BlockReportOptions(incremental); + } + } + + @Override + public String toString() { + return "BlockReportOptions{incremental=" + incremental + "}"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 3b492ae..7b1e438 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -113,6 +113,11 @@ public interface HdfsClientConfigKeys { "dfs.datanode.hdfs-blocks-metadata.enabled"; boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; + String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; + String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; + long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB + String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri"; + String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY = PREFIX + "replica.accessor.builder.classes"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java new file mode 100644 index 0000000..69fa52d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A block and the full path information to the block data file and + * the metadata file stored on the local file system. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockLocalPathInfo { + private final ExtendedBlock block; + private String localBlockPath = ""; // local file storing the data + private String localMetaPath = ""; // local file storing the checksum + + /** + * Constructs BlockLocalPathInfo. + * @param b The block corresponding to this lock path info. + * @param file Block data file. + * @param metafile Metadata file for the block. + */ + public BlockLocalPathInfo(ExtendedBlock b, String file, String metafile) { + block = b; + localBlockPath = file; + localMetaPath = metafile; + } + + /** + * Get the Block data file. + * @return Block data file. + */ + public String getBlockPath() {return localBlockPath;} + + /** + * @return the Block + */ + public ExtendedBlock getBlock() { return block;} + + /** + * Get the Block metadata file. + * @return Block metadata file. + */ + public String getMetaPath() {return localMetaPath;} + + /** + * Get number of bytes in the block. + * @return Number of bytes in the block. + */ + public long getNumBytes() { + return block.getNumBytes(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java new file mode 100644 index 0000000..3374868 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.hdfs.client.BlockReportOptions; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenInfo; + +/** An client-datanode protocol for block recovery + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +@KerberosInfo( + serverPrincipal = HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY) +@TokenInfo(BlockTokenSelector.class) +public interface ClientDatanodeProtocol { + /** + * Until version 9, this class ClientDatanodeProtocol served as both + * the client interface to the DN AND the RPC protocol used to + * communicate with the NN. + * + * This class is used by both the DFSClient and the + * DN server side to insulate from the protocol serialization. + * + * If you are adding/changing DN's interface then you need to + * change both this class and ALSO related protocol buffer + * wire protocol definition in ClientDatanodeProtocol.proto. + * + * For more details on protocol buffer wire protocol, please see + * .../org/apache/hadoop/hdfs/protocolPB/overview.html + * + * The log of historical changes can be retrieved from the svn). + * 9: Added deleteBlockPool method + * + * 9 is the last version id when this class was used for protocols + * serialization. DO not update this version any further. + */ + public static final long versionID = 9L; + + /** Return the visible length of a replica. */ + long getReplicaVisibleLength(ExtendedBlock b) throws IOException; + + /** + * Refresh the list of federated namenodes from updated configuration + * Adds new namenodes and stops the deleted namenodes. + * + * @throws IOException on error + **/ + void refreshNamenodes() throws IOException; + + /** + * Delete the block pool directory. If force is false it is deleted only if + * it is empty, otherwise it is deleted along with its contents. + * + * @param bpid Blockpool id to be deleted. + * @param force If false blockpool directory is deleted only if it is empty + * i.e. if it doesn't contain any block files, otherwise it is + * deleted along with its contents. + * @throws IOException + */ + void deleteBlockPool(String bpid, boolean force) throws IOException; + + /** + * Retrieves the path names of the block file and metadata file stored on the + * local file system. + * + * In order for this method to work, one of the following should be satisfied: + *
    + *
  • + * The client user must be configured at the datanode to be able to use this + * method.
  • + *
  • + * When security is enabled, kerberos authentication must be used to connect + * to the datanode.
  • + *
+ * + * @param block + * the specified block on the local datanode + * @param token + * the block access token. + * @return the BlockLocalPathInfo of a block + * @throws IOException + * on error + */ + BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, + Token token) throws IOException; + + /** + * Shuts down a datanode. + * + * @param forUpgrade If true, data node does extra prep work before shutting + * down. The work includes advising clients to wait and saving + * certain states for quick restart. This should only be used when + * the stored data will remain the same during upgrade/restart. + * @throws IOException + */ + void shutdownDatanode(boolean forUpgrade) throws IOException; + + /** + * Obtains datanode info + * + * @return software/config version and uptime of the datanode + */ + DatanodeLocalInfo getDatanodeInfo() throws IOException; + + /** + * Asynchronously reload configuration on disk and apply changes. + */ + void startReconfiguration() throws IOException; + + /** + * Get the status of the previously issued reconfig task. + * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}. + */ + ReconfigurationTaskStatus getReconfigurationStatus() throws IOException; + + /** + * Get a list of allowed properties for reconfiguration. + */ + List listReconfigurableProperties() throws IOException; + + /** + * Trigger a new block report. + */ + void triggerBlockReport(BlockReportOptions options) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java new file mode 100644 index 0000000..170467e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Encryption key verification failed. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class InvalidEncryptionKeyException extends IOException { + private static final long serialVersionUID = 0l; + + public InvalidEncryptionKeyException() { + super(); + } + + public InvalidEncryptionKeyException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java new file mode 100644 index 0000000..7e3f66b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.TokenInfo; + +@KerberosInfo( + serverPrincipal = HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY) +@TokenInfo(BlockTokenSelector.class) +@ProtocolInfo(protocolName = + "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface ClientDatanodeProtocolPB extends + ClientDatanodeProtocolService.BlockingInterface { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java new file mode 100644 index 0000000..311fcea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import javax.net.SocketFactory; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.client.BlockReportOptions; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtocolMetaInterface; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcClientUtil; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is the client side translator to translate the requests made on + * {@link ClientDatanodeProtocol} interfaces to the RPC server implementing + * {@link ClientDatanodeProtocolPB}. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ClientDatanodeProtocolTranslatorPB implements + ProtocolMetaInterface, ClientDatanodeProtocol, + ProtocolTranslator, Closeable { + public static final Logger LOG = LoggerFactory + .getLogger(ClientDatanodeProtocolTranslatorPB.class); + + /** RpcController is not used and hence is set to null */ + private final static RpcController NULL_CONTROLLER = null; + private final ClientDatanodeProtocolPB rpcProxy; + private final static RefreshNamenodesRequestProto VOID_REFRESH_NAMENODES = + RefreshNamenodesRequestProto.newBuilder().build(); + private final static GetDatanodeInfoRequestProto VOID_GET_DATANODE_INFO = + GetDatanodeInfoRequestProto.newBuilder().build(); + private final static GetReconfigurationStatusRequestProto VOID_GET_RECONFIG_STATUS = + GetReconfigurationStatusRequestProto.newBuilder().build(); + private final static StartReconfigurationRequestProto VOID_START_RECONFIG = + StartReconfigurationRequestProto.newBuilder().build(); + private static final ListReconfigurablePropertiesRequestProto + VOID_LIST_RECONFIGURABLE_PROPERTIES = + ListReconfigurablePropertiesRequestProto.newBuilder().build(); + + public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, + Configuration conf, int socketTimeout, boolean connectToDnViaHostname, + LocatedBlock locatedBlock) throws IOException { + rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf, + socketTimeout, connectToDnViaHostname, locatedBlock); + } + + public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr, + UserGroupInformation ticket, Configuration conf, SocketFactory factory) + throws IOException { + rpcProxy = createClientDatanodeProtocolProxy(addr, ticket, conf, factory, 0); + } + + /** + * Constructor. + * @param datanodeid Datanode to connect to. + * @param conf Configuration. + * @param socketTimeout Socket timeout to use. + * @param connectToDnViaHostname connect to the Datanode using its hostname + * @throws IOException + */ + public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, + Configuration conf, int socketTimeout, boolean connectToDnViaHostname) + throws IOException { + final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname); + InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr); + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr); + } + rpcProxy = createClientDatanodeProtocolProxy(addr, + UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), socketTimeout); + } + + static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( + DatanodeID datanodeid, Configuration conf, int socketTimeout, + boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException { + final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname); + InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr); + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr); + } + + // Since we're creating a new UserGroupInformation here, we know that no + // future RPC proxies will be able to re-use the same connection. And + // usages of this proxy tend to be one-off calls. + // + // This is a temporary fix: callers should really achieve this by using + // RPC.stopProxy() on the resulting object, but this is currently not + // working in trunk. See the discussion on HDFS-1965. + Configuration confWithNoIpcIdle = new Configuration(conf); + confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic + .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); + + UserGroupInformation ticket = UserGroupInformation + .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString()); + ticket.addToken(locatedBlock.getBlockToken()); + return createClientDatanodeProtocolProxy(addr, ticket, confWithNoIpcIdle, + NetUtils.getDefaultSocketFactory(conf), socketTimeout); + } + + static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int socketTimeout) throws IOException { + RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + return RPC.getProxy(ClientDatanodeProtocolPB.class, + RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), addr, ticket, + conf, factory, socketTimeout); + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } + + @Override + public long getReplicaVisibleLength(ExtendedBlock b) throws IOException { + GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto + .newBuilder().setBlock(PBHelperClient.convert(b)).build(); + try { + return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void refreshNamenodes() throws IOException { + try { + rpcProxy.refreshNamenodes(NULL_CONTROLLER, VOID_REFRESH_NAMENODES); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void deleteBlockPool(String bpid, boolean force) throws IOException { + DeleteBlockPoolRequestProto req = DeleteBlockPoolRequestProto.newBuilder() + .setBlockPool(bpid).setForce(force).build(); + try { + rpcProxy.deleteBlockPool(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, + Token token) throws IOException { + GetBlockLocalPathInfoRequestProto req = + GetBlockLocalPathInfoRequestProto.newBuilder() + .setBlock(PBHelperClient.convert(block)) + .setToken(PBHelperClient.convert(token)).build(); + GetBlockLocalPathInfoResponseProto resp; + try { + resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return new BlockLocalPathInfo(PBHelperClient.convert(resp.getBlock()), + resp.getLocalPath(), resp.getLocalMetaPath()); + } + + @Override + public boolean isMethodSupported(String methodName) throws IOException { + return RpcClientUtil.isMethodSupported(rpcProxy, + ClientDatanodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName); + } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + @Override + public void shutdownDatanode(boolean forUpgrade) throws IOException { + ShutdownDatanodeRequestProto request = ShutdownDatanodeRequestProto + .newBuilder().setForUpgrade(forUpgrade).build(); + try { + rpcProxy.shutdownDatanode(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public DatanodeLocalInfo getDatanodeInfo() throws IOException { + GetDatanodeInfoResponseProto response; + try { + response = rpcProxy.getDatanodeInfo(NULL_CONTROLLER, VOID_GET_DATANODE_INFO); + return PBHelperClient.convert(response.getLocalInfo()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void startReconfiguration() throws IOException { + try { + rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException { + GetReconfigurationStatusResponseProto response; + Map> statusMap = null; + long startTime; + long endTime = 0; + try { + response = rpcProxy.getReconfigurationStatus(NULL_CONTROLLER, + VOID_GET_RECONFIG_STATUS); + startTime = response.getStartTime(); + if (response.hasEndTime()) { + endTime = response.getEndTime(); + } + if (response.getChangesCount() > 0) { + statusMap = Maps.newHashMap(); + for (GetReconfigurationStatusConfigChangeProto change : + response.getChangesList()) { + PropertyChange pc = new PropertyChange( + change.getName(), change.getNewValue(), change.getOldValue()); + String errorMessage = null; + if (change.hasErrorMessage()) { + errorMessage = change.getErrorMessage(); + } + statusMap.put(pc, Optional.fromNullable(errorMessage)); + } + } + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return new ReconfigurationTaskStatus(startTime, endTime, statusMap); + } + + @Override + public List listReconfigurableProperties() + throws IOException { + ListReconfigurablePropertiesResponseProto response; + try { + response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER, + VOID_LIST_RECONFIGURABLE_PROPERTIES); + return response.getNameList(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void triggerBlockReport(BlockReportOptions options) + throws IOException { + try { + rpcProxy.triggerBlockReport(NULL_CONTROLLER, + TriggerBlockReportRequestProto.newBuilder(). + setIncremental(options.isIncremental()). + build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index edf658a..d921507 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -23,12 +23,14 @@ import com.google.protobuf.CodedInputStream; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeLocalInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; @@ -185,6 +187,17 @@ public class PBHelperClient { return pinnings; } + public static ExtendedBlock convert(ExtendedBlockProto eb) { + if (eb == null) return null; + return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(), + eb.getGenerationStamp()); + } + + public static DatanodeLocalInfo convert(DatanodeLocalInfoProto proto) { + return new DatanodeLocalInfo(proto.getSoftwareVersion(), + proto.getConfigVersion(), proto.getUptime()); + } + static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) { if (di == null) return null; return convert(di); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java new file mode 100644 index 0000000..f67ca00 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.security.token.block; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; + +/** + * A block token selector for HDFS + */ +@InterfaceAudience.Private +public class BlockTokenSelector implements TokenSelector { + + @Override + @SuppressWarnings("unchecked") + public Token selectToken(Text service, + Collection> tokens) { + if (service == null) { + return null; + } + for (Token token : tokens) { + if (BlockTokenIdentifier.KIND_NAME.equals(token.getKind())) { + return (Token) token; + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9f77e85..a561909 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -858,6 +858,9 @@ Release 2.8.0 - UNRELEASED HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. (Mingliang Liu via wheat9) + HDFS-8925. Move BlockReaderLocal to hdfs-client. + (Mingliang Liu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java deleted file mode 100644 index aa3e8ba..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.util.EnumSet; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ByteBufferReadable; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; - -/** - * A BlockReader is responsible for reading a single block - * from a single datanode. - */ -@InterfaceAudience.Private -public interface BlockReader extends ByteBufferReadable { - - - /* same interface as inputStream java.io.InputStream#read() - * used by DFSInputStream#read() - * This violates one rule when there is a checksum error: - * "Read should not modify user buffer before successful read" - * because it first reads the data to user buffer and then checks - * the checksum. - * Note: this must return -1 on EOF, even in the case of a 0-byte read. - * See HDFS-5762 for details. - */ - int read(byte[] buf, int off, int len) throws IOException; - - /** - * Skip the given number of bytes - */ - long skip(long n) throws IOException; - - /** - * Returns an estimate of the number of bytes that can be read - * (or skipped over) from this input stream without performing - * network I/O. - * This may return more than what is actually present in the block. - */ - int available() throws IOException; - - /** - * Close the block reader. - * - * @throws IOException - */ - void close() throws IOException; - - /** - * Read exactly the given amount of data, throwing an exception - * if EOF is reached before that amount - */ - void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException; - - /** - * Similar to {@link #readFully(byte[], int, int)} except that it will - * not throw an exception on EOF. However, it differs from the simple - * {@link #read(byte[], int, int)} call in that it is guaranteed to - * read the data if it is available. In other words, if this call - * does not throw an exception, then either the buffer has been - * filled or the next call will return EOF. - */ - int readAll(byte[] buf, int offset, int len) throws IOException; - - /** - * @return true only if this is a local read. - */ - boolean isLocal(); - - /** - * @return true only if this is a short-circuit read. - * All short-circuit reads are also local. - */ - boolean isShortCircuit(); - - /** - * Get a ClientMmap object for this BlockReader. - * - * @param opts The read options to use. - * @return The ClientMmap object, or null if mmap is not - * supported. - */ - ClientMmap getClientMmap(EnumSet opts); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java deleted file mode 100644 index d913f3a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ /dev/null @@ -1,741 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.util.EnumSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; -import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.DirectBufferPool; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -/** - * BlockReaderLocal enables local short circuited reads. If the DFS client is on - * the same machine as the datanode, then the client can read files directly - * from the local file system rather than going through the datanode for better - * performance.
- * {@link BlockReaderLocal} works as follows: - *
    - *
  • The client performing short circuit reads must be configured at the - * datanode.
  • - *
  • The client gets the file descriptors for the metadata file and the data - * file for the block using - * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}. - *
  • - *
  • The client reads the file descriptors.
  • - *
- */ -@InterfaceAudience.Private -class BlockReaderLocal implements BlockReader { - static final Log LOG = LogFactory.getLog(BlockReaderLocal.class); - - private static final DirectBufferPool bufferPool = new DirectBufferPool(); - - public static class Builder { - private final int bufferSize; - private boolean verifyChecksum; - private int maxReadahead; - private String filename; - private ShortCircuitReplica replica; - private long dataPos; - private ExtendedBlock block; - private StorageType storageType; - - public Builder(ShortCircuitConf conf) { - this.maxReadahead = Integer.MAX_VALUE; - this.verifyChecksum = !conf.isSkipShortCircuitChecksums(); - this.bufferSize = conf.getShortCircuitBufferSize(); - } - - public Builder setVerifyChecksum(boolean verifyChecksum) { - this.verifyChecksum = verifyChecksum; - return this; - } - - public Builder setCachingStrategy(CachingStrategy cachingStrategy) { - long readahead = cachingStrategy.getReadahead() != null ? - cachingStrategy.getReadahead() : - DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; - this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); - return this; - } - - public Builder setFilename(String filename) { - this.filename = filename; - return this; - } - - public Builder setShortCircuitReplica(ShortCircuitReplica replica) { - this.replica = replica; - return this; - } - - public Builder setStartOffset(long startOffset) { - this.dataPos = Math.max(0, startOffset); - return this; - } - - public Builder setBlock(ExtendedBlock block) { - this.block = block; - return this; - } - - public Builder setStorageType(StorageType storageType) { - this.storageType = storageType; - return this; - } - - public BlockReaderLocal build() { - Preconditions.checkNotNull(replica); - return new BlockReaderLocal(this); - } - } - - private boolean closed = false; - - /** - * Pair of streams for this block. - */ - private final ShortCircuitReplica replica; - - /** - * The data FileChannel. - */ - private final FileChannel dataIn; - - /** - * The next place we'll read from in the block data FileChannel. - * - * If data is buffered in dataBuf, this offset will be larger than the - * offset of the next byte which a read() operation will give us. - */ - private long dataPos; - - /** - * The Checksum FileChannel. - */ - private final FileChannel checksumIn; - - /** - * Checksum type and size. - */ - private final DataChecksum checksum; - - /** - * If false, we will always skip the checksum. - */ - private final boolean verifyChecksum; - - /** - * Name of the block, for logging purposes. - */ - private final String filename; - - /** - * Block ID and Block Pool ID. - */ - private final ExtendedBlock block; - - /** - * Cache of Checksum#bytesPerChecksum. - */ - private final int bytesPerChecksum; - - /** - * Cache of Checksum#checksumSize. - */ - private final int checksumSize; - - /** - * Maximum number of chunks to allocate. - * - * This is used to allocate dataBuf and checksumBuf, in the event that - * we need them. - */ - private final int maxAllocatedChunks; - - /** - * True if zero readahead was requested. - */ - private final boolean zeroReadaheadRequested; - - /** - * Maximum amount of readahead we'll do. This will always be at least the, - * size of a single chunk, even if {@link #zeroReadaheadRequested} is true. - * The reason is because we need to do a certain amount of buffering in order - * to do checksumming. - * - * This determines how many bytes we'll use out of dataBuf and checksumBuf. - * Why do we allocate buffers, and then (potentially) only use part of them? - * The rationale is that allocating a lot of buffers of different sizes would - * make it very difficult for the DirectBufferPool to re-use buffers. - */ - private final int maxReadaheadLength; - - /** - * Buffers data starting at the current dataPos and extending on - * for dataBuf.limit(). - * - * This may be null if we don't need it. - */ - private ByteBuffer dataBuf; - - /** - * Buffers checksums starting at the current checksumPos and extending on - * for checksumBuf.limit(). - * - * This may be null if we don't need it. - */ - private ByteBuffer checksumBuf; - - /** - * StorageType of replica on DataNode. - */ - private StorageType storageType; - - private BlockReaderLocal(Builder builder) { - this.replica = builder.replica; - this.dataIn = replica.getDataStream().getChannel(); - this.dataPos = builder.dataPos; - this.checksumIn = replica.getMetaStream().getChannel(); - BlockMetadataHeader header = builder.replica.getMetaHeader(); - this.checksum = header.getChecksum(); - this.verifyChecksum = builder.verifyChecksum && - (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); - this.filename = builder.filename; - this.block = builder.block; - this.bytesPerChecksum = checksum.getBytesPerChecksum(); - this.checksumSize = checksum.getChecksumSize(); - - this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : - ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); - // Calculate the effective maximum readahead. - // We can't do more readahead than there is space in the buffer. - int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : - ((Math.min(builder.bufferSize, builder.maxReadahead) + - bytesPerChecksum - 1) / bytesPerChecksum); - if (maxReadaheadChunks == 0) { - this.zeroReadaheadRequested = true; - maxReadaheadChunks = 1; - } else { - this.zeroReadaheadRequested = false; - } - this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; - this.storageType = builder.storageType; - } - - private synchronized void createDataBufIfNeeded() { - if (dataBuf == null) { - dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum); - dataBuf.position(0); - dataBuf.limit(0); - } - } - - private synchronized void freeDataBufIfExists() { - if (dataBuf != null) { - // When disposing of a dataBuf, we have to move our stored file index - // backwards. - dataPos -= dataBuf.remaining(); - dataBuf.clear(); - bufferPool.returnBuffer(dataBuf); - dataBuf = null; - } - } - - private synchronized void createChecksumBufIfNeeded() { - if (checksumBuf == null) { - checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize); - checksumBuf.position(0); - checksumBuf.limit(0); - } - } - - private synchronized void freeChecksumBufIfExists() { - if (checksumBuf != null) { - checksumBuf.clear(); - bufferPool.returnBuffer(checksumBuf); - checksumBuf = null; - } - } - - private synchronized int drainDataBuf(ByteBuffer buf) { - if (dataBuf == null) return -1; - int oldLimit = dataBuf.limit(); - int nRead = Math.min(dataBuf.remaining(), buf.remaining()); - if (nRead == 0) { - return (dataBuf.remaining() == 0) ? -1 : 0; - } - try { - dataBuf.limit(dataBuf.position() + nRead); - buf.put(dataBuf); - } finally { - dataBuf.limit(oldLimit); - } - return nRead; - } - - /** - * Read from the block file into a buffer. - * - * This function overwrites checksumBuf. It will increment dataPos. - * - * @param buf The buffer to read into. May be dataBuf. - * The position and limit of this buffer should be set to - * multiples of the checksum size. - * @param canSkipChecksum True if we can skip checksumming. - * - * @return Total bytes read. 0 on EOF. - */ - private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum) - throws IOException { - TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" + - block.getBlockId() + ")", Sampler.NEVER); - try { - int total = 0; - long startDataPos = dataPos; - int startBufPos = buf.position(); - while (buf.hasRemaining()) { - int nRead = dataIn.read(buf, dataPos); - if (nRead < 0) { - break; - } - dataPos += nRead; - total += nRead; - } - if (canSkipChecksum) { - freeChecksumBufIfExists(); - return total; - } - if (total > 0) { - try { - buf.limit(buf.position()); - buf.position(startBufPos); - createChecksumBufIfNeeded(); - int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum; - checksumBuf.clear(); - checksumBuf.limit(checksumsNeeded * checksumSize); - long checksumPos = BlockMetadataHeader.getHeaderSize() - + ((startDataPos / bytesPerChecksum) * checksumSize); - while (checksumBuf.hasRemaining()) { - int nRead = checksumIn.read(checksumBuf, checksumPos); - if (nRead < 0) { - throw new IOException("Got unexpected checksum file EOF at " + - checksumPos + ", block file position " + startDataPos + " for " + - "block " + block + " of file " + filename); - } - checksumPos += nRead; - } - checksumBuf.flip(); - - checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos); - } finally { - buf.position(buf.limit()); - } - } - return total; - } finally { - scope.close(); - } - } - - private boolean createNoChecksumContext() { - if (verifyChecksum) { - if (storageType != null && storageType.isTransient()) { - // Checksums are not stored for replicas on transient storage. We do not - // anchor, because we do not intend for client activity to block eviction - // from transient storage on the DataNode side. - return true; - } else { - return replica.addNoChecksumAnchor(); - } - } else { - return true; - } - } - - private void releaseNoChecksumContext() { - if (verifyChecksum) { - if (storageType == null || !storageType.isTransient()) { - replica.removeNoChecksumAnchor(); - } - } - } - - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - boolean canSkipChecksum = createNoChecksumContext(); - try { - String traceString = null; - if (LOG.isTraceEnabled()) { - traceString = new StringBuilder(). - append("read("). - append("buf.remaining=").append(buf.remaining()). - append(", block=").append(block). - append(", filename=").append(filename). - append(", canSkipChecksum=").append(canSkipChecksum). - append(")").toString(); - LOG.info(traceString + ": starting"); - } - int nRead; - try { - if (canSkipChecksum && zeroReadaheadRequested) { - nRead = readWithoutBounceBuffer(buf); - } else { - nRead = readWithBounceBuffer(buf, canSkipChecksum); - } - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.info(traceString + ": I/O error", e); - } - throw e; - } - if (LOG.isTraceEnabled()) { - LOG.info(traceString + ": returning " + nRead); - } - return nRead; - } finally { - if (canSkipChecksum) releaseNoChecksumContext(); - } - } - - private synchronized int readWithoutBounceBuffer(ByteBuffer buf) - throws IOException { - freeDataBufIfExists(); - freeChecksumBufIfExists(); - int total = 0; - while (buf.hasRemaining()) { - int nRead = dataIn.read(buf, dataPos); - if (nRead <= 0) break; - dataPos += nRead; - total += nRead; - } - return (total == 0 && (dataPos == dataIn.size())) ? -1 : total; - } - - /** - * Fill the data buffer. If necessary, validate the data against the - * checksums. - * - * We always want the offsets of the data contained in dataBuf to be - * aligned to the chunk boundary. If we are validating checksums, we - * accomplish this by seeking backwards in the file until we're on a - * chunk boundary. (This is necessary because we can't checksum a - * partial chunk.) If we are not validating checksums, we simply only - * fill the latter part of dataBuf. - * - * @param canSkipChecksum true if we can skip checksumming. - * @return true if we hit EOF. - * @throws IOException - */ - private synchronized boolean fillDataBuf(boolean canSkipChecksum) - throws IOException { - createDataBufIfNeeded(); - final int slop = (int)(dataPos % bytesPerChecksum); - final long oldDataPos = dataPos; - dataBuf.limit(maxReadaheadLength); - if (canSkipChecksum) { - dataBuf.position(slop); - fillBuffer(dataBuf, canSkipChecksum); - } else { - dataPos -= slop; - dataBuf.position(0); - fillBuffer(dataBuf, canSkipChecksum); - } - dataBuf.limit(dataBuf.position()); - dataBuf.position(Math.min(dataBuf.position(), slop)); - if (LOG.isTraceEnabled()) { - LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " + - "buffer from offset " + oldDataPos + " of " + block); - } - return dataBuf.limit() != maxReadaheadLength; - } - - /** - * Read using the bounce buffer. - * - * A 'direct' read actually has three phases. The first drains any - * remaining bytes from the slow read buffer. After this the read is - * guaranteed to be on a checksum chunk boundary. If there are still bytes - * to read, the fast direct path is used for as many remaining bytes as - * possible, up to a multiple of the checksum chunk size. Finally, any - * 'odd' bytes remaining at the end of the read cause another slow read to - * be issued, which involves an extra copy. - * - * Every 'slow' read tries to fill the slow read buffer in one go for - * efficiency's sake. As described above, all non-checksum-chunk-aligned - * reads will be served from the slower read path. - * - * @param buf The buffer to read into. - * @param canSkipChecksum True if we can skip checksums. - */ - private synchronized int readWithBounceBuffer(ByteBuffer buf, - boolean canSkipChecksum) throws IOException { - int total = 0; - int bb = drainDataBuf(buf); // drain bounce buffer if possible - if (bb >= 0) { - total += bb; - if (buf.remaining() == 0) return total; - } - boolean eof = true, done = false; - do { - if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength) - && ((dataPos % bytesPerChecksum) == 0)) { - // Fast lane: try to read directly into user-supplied buffer, bypassing - // bounce buffer. - int oldLimit = buf.limit(); - int nRead; - try { - buf.limit(buf.position() + maxReadaheadLength); - nRead = fillBuffer(buf, canSkipChecksum); - } finally { - buf.limit(oldLimit); - } - if (nRead < maxReadaheadLength) { - done = true; - } - if (nRead > 0) { - eof = false; - } - total += nRead; - } else { - // Slow lane: refill bounce buffer. - if (fillDataBuf(canSkipChecksum)) { - done = true; - } - bb = drainDataBuf(buf); // drain bounce buffer if possible - if (bb >= 0) { - eof = false; - total += bb; - } - } - } while ((!done) && (buf.remaining() > 0)); - return (eof && total == 0) ? -1 : total; - } - - @Override - public synchronized int read(byte[] arr, int off, int len) - throws IOException { - boolean canSkipChecksum = createNoChecksumContext(); - int nRead; - try { - String traceString = null; - if (LOG.isTraceEnabled()) { - traceString = new StringBuilder(). - append("read(arr.length=").append(arr.length). - append(", off=").append(off). - append(", len=").append(len). - append(", filename=").append(filename). - append(", block=").append(block). - append(", canSkipChecksum=").append(canSkipChecksum). - append(")").toString(); - LOG.trace(traceString + ": starting"); - } - try { - if (canSkipChecksum && zeroReadaheadRequested) { - nRead = readWithoutBounceBuffer(arr, off, len); - } else { - nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); - } - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace(traceString + ": I/O error", e); - } - throw e; - } - if (LOG.isTraceEnabled()) { - LOG.trace(traceString + ": returning " + nRead); - } - } finally { - if (canSkipChecksum) releaseNoChecksumContext(); - } - return nRead; - } - - private synchronized int readWithoutBounceBuffer(byte arr[], int off, - int len) throws IOException { - freeDataBufIfExists(); - freeChecksumBufIfExists(); - int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos); - if (nRead > 0) { - dataPos += nRead; - } else if ((nRead == 0) && (dataPos == dataIn.size())) { - return -1; - } - return nRead; - } - - private synchronized int readWithBounceBuffer(byte arr[], int off, int len, - boolean canSkipChecksum) throws IOException { - createDataBufIfNeeded(); - if (!dataBuf.hasRemaining()) { - dataBuf.position(0); - dataBuf.limit(maxReadaheadLength); - fillDataBuf(canSkipChecksum); - } - if (dataBuf.remaining() == 0) return -1; - int toRead = Math.min(dataBuf.remaining(), len); - dataBuf.get(arr, off, toRead); - return toRead; - } - - @Override - public synchronized long skip(long n) throws IOException { - int discardedFromBuf = 0; - long remaining = n; - if ((dataBuf != null) && dataBuf.hasRemaining()) { - discardedFromBuf = (int)Math.min(dataBuf.remaining(), n); - dataBuf.position(dataBuf.position() + discardedFromBuf); - remaining -= discardedFromBuf; - } - if (LOG.isTraceEnabled()) { - LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + - filename + "): discarded " + discardedFromBuf + " bytes from " + - "dataBuf and advanced dataPos by " + remaining); - } - dataPos += remaining; - return n; - } - - @Override - public int available() throws IOException { - // We never do network I/O in BlockReaderLocal. - return Integer.MAX_VALUE; - } - - @Override - public synchronized void close() throws IOException { - if (closed) return; - closed = true; - if (LOG.isTraceEnabled()) { - LOG.trace("close(filename=" + filename + ", block=" + block + ")"); - } - replica.unref(); - freeDataBufIfExists(); - freeChecksumBufIfExists(); - } - - @Override - public synchronized void readFully(byte[] arr, int off, int len) - throws IOException { - BlockReaderUtil.readFully(this, arr, off, len); - } - - @Override - public synchronized int readAll(byte[] buf, int off, int len) - throws IOException { - return BlockReaderUtil.readAll(this, buf, off, len); - } - - @Override - public boolean isLocal() { - return true; - } - - @Override - public boolean isShortCircuit() { - return true; - } - - /** - * Get or create a memory map for this replica. - * - * There are two kinds of ClientMmap objects we could fetch here: one that - * will always read pre-checksummed data, and one that may read data that - * hasn't been checksummed. - * - * If we fetch the former, "safe" kind of ClientMmap, we have to increment - * the anchor count on the shared memory slot. This will tell the DataNode - * not to munlock the block until this ClientMmap is closed. - * If we fetch the latter, we don't bother with anchoring. - * - * @param opts The options to use, such as SKIP_CHECKSUMS. - * - * @return null on failure; the ClientMmap otherwise. - */ - @Override - public ClientMmap getClientMmap(EnumSet opts) { - boolean anchor = verifyChecksum && - (opts.contains(ReadOption.SKIP_CHECKSUMS) == false); - if (anchor) { - if (!createNoChecksumContext()) { - if (LOG.isTraceEnabled()) { - LOG.trace("can't get an mmap for " + block + " of " + filename + - " since SKIP_CHECKSUMS was not given, " + - "we aren't skipping checksums, and the block is not mlocked."); - } - return null; - } - } - ClientMmap clientMmap = null; - try { - clientMmap = replica.getOrCreateClientMmap(anchor); - } finally { - if ((clientMmap == null) && anchor) { - releaseNoChecksumContext(); - } - } - return clientMmap; - } - - @VisibleForTesting - boolean getVerifyChecksum() { - return this.verifyChecksum; - } - - @VisibleForTesting - int getMaxReadaheadLength() { - return this.maxReadaheadLength; - } - - /** - * Make the replica anchorable. Normally this can only be done by the - * DataNode. This method is only for testing. - */ - @VisibleForTesting - void forceAnchorable() { - replica.getSlot().makeAnchorable(); - } - - /** - * Make the replica unanchorable. Normally this can only be done by the - * DataNode. This method is only for testing. - */ - @VisibleForTesting - void forceUnanchorable() { - replica.getSlot().makeUnanchorable(); - } -}