Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AB2A518D60 for ; Tue, 25 Aug 2015 22:22:10 +0000 (UTC) Received: (qmail 55703 invoked by uid 500); 25 Aug 2015 22:21:58 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 55562 invoked by uid 500); 25 Aug 2015 22:21:58 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 53778 invoked by uid 99); 25 Aug 2015 22:21:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 22:21:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A97BE7D97; Tue, 25 Aug 2015 22:21:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Tue, 25 Aug 2015 22:22:30 -0000 Message-Id: <15cafd75465143fa8b603332560c3c71@git.apache.org> In-Reply-To: <70cb7b002a3a4cd491011d9dbfa20dee@git.apache.org> References: <70cb7b002a3a4cd491011d9dbfa20dee@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [36/50] [abbrv] hadoop git commit: HDFS-8934. Move ShortCircuitShm to hdfs-client. Contributed by Mingliang Liu. HDFS-8934. Move ShortCircuitShm to hdfs-client. Contributed by Mingliang Liu. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/490bb5eb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/490bb5eb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/490bb5eb Branch: refs/heads/HDFS-7240 Commit: 490bb5ebd6c6d6f9c08fcad167f976687fc3aa42 Parents: 61bf9ca Author: Haohui Mai Authored: Sat Aug 22 13:30:19 2015 -0700 Committer: Haohui Mai Committed: Sat Aug 22 13:31:03 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/ExtendedBlockId.java | 82 +++ .../org/apache/hadoop/hdfs/net/DomainPeer.java | 132 ++++ .../java/org/apache/hadoop/hdfs/net/Peer.java | 123 ++++ .../datatransfer/BlockConstructionStage.java | 62 ++ .../datatransfer/DataTransferProtoUtil.java | 146 +++++ .../datatransfer/DataTransferProtocol.java | 202 ++++++ .../hadoop/hdfs/protocol/datatransfer/Op.java | 66 ++ .../hdfs/protocol/datatransfer/Sender.java | 261 ++++++++ .../hadoop/hdfs/protocolPB/PBHelperClient.java | 254 ++++++++ .../token/block/InvalidBlockTokenException.java | 41 ++ .../hdfs/server/datanode/CachingStrategy.java | 76 +++ .../hadoop/hdfs/shortcircuit/DfsClientShm.java | 119 ++++ .../hdfs/shortcircuit/DfsClientShmManager.java | 522 +++++++++++++++ .../hdfs/shortcircuit/ShortCircuitShm.java | 647 +++++++++++++++++++ .../hadoop/hdfs/util/ExactSizeInputStream.java | 125 ++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/hdfs/BlockReaderFactory.java | 4 +- .../java/org/apache/hadoop/hdfs/DFSClient.java | 10 +- .../org/apache/hadoop/hdfs/DataStreamer.java | 6 +- .../org/apache/hadoop/hdfs/ExtendedBlockId.java | 82 --- .../apache/hadoop/hdfs/RemoteBlockReader.java | 4 +- .../apache/hadoop/hdfs/RemoteBlockReader2.java | 4 +- .../org/apache/hadoop/hdfs/net/DomainPeer.java | 132 ---- .../java/org/apache/hadoop/hdfs/net/Peer.java | 123 ---- .../datatransfer/BlockConstructionStage.java | 62 -- .../datatransfer/DataTransferProtoUtil.java | 148 ----- .../datatransfer/DataTransferProtocol.java | 201 ------ .../hadoop/hdfs/protocol/datatransfer/Op.java | 66 -- .../hdfs/protocol/datatransfer/PipelineAck.java | 2 +- .../hdfs/protocol/datatransfer/Receiver.java | 7 +- .../hdfs/protocol/datatransfer/Sender.java | 261 -------- .../datatransfer/sasl/DataTransferSaslUtil.java | 2 +- ...tDatanodeProtocolServerSideTranslatorPB.java | 2 +- .../ClientDatanodeProtocolTranslatorPB.java | 6 +- ...tNamenodeProtocolServerSideTranslatorPB.java | 6 +- .../ClientNamenodeProtocolTranslatorPB.java | 28 +- .../DatanodeProtocolClientSideTranslatorPB.java | 4 +- .../InterDatanodeProtocolTranslatorPB.java | 2 +- .../NamenodeProtocolTranslatorPB.java | 2 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 228 +------ .../token/block/InvalidBlockTokenException.java | 41 -- .../hadoop/hdfs/server/balancer/Dispatcher.java | 2 +- .../hdfs/server/datanode/CachingStrategy.java | 76 --- .../hadoop/hdfs/server/datanode/DataNode.java | 4 +- .../hdfs/server/datanode/DataXceiver.java | 14 +- .../server/namenode/FSImageFormatPBINode.java | 5 +- .../hadoop/hdfs/shortcircuit/DfsClientShm.java | 119 ---- .../hdfs/shortcircuit/DfsClientShmManager.java | 514 --------------- .../hdfs/shortcircuit/ShortCircuitCache.java | 4 +- .../hdfs/shortcircuit/ShortCircuitShm.java | 646 ------------------ .../hadoop/hdfs/util/ExactSizeInputStream.java | 125 ---- .../hadoop/hdfs/protocolPB/TestPBHelper.java | 20 +- 52 files changed, 2949 insertions(+), 2873 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java new file mode 100644 index 0000000..7b9e8e3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + +/** + * An immutable key which identifies a block. + */ +@InterfaceAudience.Private +final public class ExtendedBlockId { + /** + * The block ID for this block. + */ + private final long blockId; + + /** + * The block pool ID for this block. + */ + private final String bpId; + + public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) { + return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); + } + + public ExtendedBlockId(long blockId, String bpId) { + this.blockId = blockId; + this.bpId = bpId; + } + + public long getBlockId() { + return this.blockId; + } + + public String getBlockPoolId() { + return this.bpId; + } + + @Override + public boolean equals(Object o) { + if ((o == null) || (o.getClass() != this.getClass())) { + return false; + } + ExtendedBlockId other = (ExtendedBlockId)o; + return new EqualsBuilder(). + append(blockId, other.blockId). + append(bpId, other.bpId). + isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(this.blockId). + append(this.bpId). + toHashCode(); + } + + @Override + public String toString() { + return new StringBuilder().append(blockId). + append("_").append(bpId).toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java new file mode 100644 index 0000000..4792b0e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ReadableByteChannel; + +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Represents a peer that we communicate with by using blocking I/O + * on a UNIX domain socket. + */ +@InterfaceAudience.Private +public class DomainPeer implements Peer { + private final DomainSocket socket; + private final OutputStream out; + private final InputStream in; + private final ReadableByteChannel channel; + + public DomainPeer(DomainSocket socket) { + this.socket = socket; + this.out = socket.getOutputStream(); + this.in = socket.getInputStream(); + this.channel = socket.getChannel(); + } + + @Override + public ReadableByteChannel getInputStreamChannel() { + return channel; + } + + @Override + public void setReadTimeout(int timeoutMs) throws IOException { + socket.setAttribute(DomainSocket.RECEIVE_TIMEOUT, timeoutMs); + } + + @Override + public int getReceiveBufferSize() throws IOException { + return socket.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); + } + + @Override + public boolean getTcpNoDelay() throws IOException { + /* No TCP, no TCP_NODELAY. */ + return false; + } + + @Override + public void setWriteTimeout(int timeoutMs) throws IOException { + socket.setAttribute(DomainSocket.SEND_TIMEOUT, timeoutMs); + } + + @Override + public boolean isClosed() { + return !socket.isOpen(); + } + + @Override + public void close() throws IOException { + socket.close(); + } + + @Override + public String getRemoteAddressString() { + return "unix:" + socket.getPath(); + } + + @Override + public String getLocalAddressString() { + return ""; + } + + @Override + public InputStream getInputStream() throws IOException { + return in; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public boolean isLocal() { + /* UNIX domain sockets can only be used for local communication. */ + return true; + } + + @Override + public String toString() { + return "DomainPeer(" + getRemoteAddressString() + ")"; + } + + @Override + public DomainSocket getDomainSocket() { + return socket; + } + + @Override + public boolean hasSecureChannel() { + // + // Communication over domain sockets is assumed to be secure, since it + // doesn't pass over any network. We also carefully control the privileges + // that can be used on the domain socket inode and its parent directories. + // See #{java.org.apache.hadoop.net.unix.DomainSocket#validateSocketPathSecurity0} + // for details. + // + // So unless you are running as root or the hdfs superuser, you cannot + // launch a man-in-the-middle attach on UNIX domain socket traffic. + // + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java new file mode 100644 index 0000000..42cf287 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ReadableByteChannel; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.net.unix.DomainSocket; + +/** + * Represents a connection to a peer. + */ +@InterfaceAudience.Private +public interface Peer extends Closeable { + /** + * @return The input stream channel associated with this + * peer, or null if it has none. + */ + public ReadableByteChannel getInputStreamChannel(); + + /** + * Set the read timeout on this peer. + * + * @param timeoutMs The timeout in milliseconds. + */ + public void setReadTimeout(int timeoutMs) throws IOException; + + /** + * @return The receive buffer size. + */ + public int getReceiveBufferSize() throws IOException; + + /** + * @return True if TCP_NODELAY is turned on. + */ + public boolean getTcpNoDelay() throws IOException; + + /** + * Set the write timeout on this peer. + * + * Note: this is not honored for BasicInetPeer. + * See {@link BasicSocketPeer#setWriteTimeout} for details. + * + * @param timeoutMs The timeout in milliseconds. + */ + public void setWriteTimeout(int timeoutMs) throws IOException; + + /** + * @return true only if the peer is closed. + */ + public boolean isClosed(); + + /** + * Close the peer. + * + * It's safe to re-close a Peer that is already closed. + */ + public void close() throws IOException; + + /** + * @return A string representing the remote end of our + * connection to the peer. + */ + public String getRemoteAddressString(); + + /** + * @return A string representing the local end of our + * connection to the peer. + */ + public String getLocalAddressString(); + + /** + * @return An InputStream associated with the Peer. + * This InputStream will be valid until you close + * this peer with Peer#close. + */ + public InputStream getInputStream() throws IOException; + + /** + * @return An OutputStream associated with the Peer. + * This OutputStream will be valid until you close + * this peer with Peer#close. + */ + public OutputStream getOutputStream() throws IOException; + + /** + * @return True if the peer resides on the same + * computer as we. + */ + public boolean isLocal(); + + /** + * @return The DomainSocket associated with the current + * peer, or null if there is none. + */ + public DomainSocket getDomainSocket(); + + /** + * Return true if the channel is secure. + * + * @return True if our channel to this peer is not + * susceptible to man-in-the-middle attacks. + */ + public boolean hasSecureChannel(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java new file mode 100644 index 0000000..5f86e52 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** Block Construction Stage */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public enum BlockConstructionStage { + /** The enumerates are always listed as regular stage followed by the + * recovery stage. + * Changing this order will make getRecoveryStage not working. + */ + // pipeline set up for block append + PIPELINE_SETUP_APPEND, + // pipeline set up for failed PIPELINE_SETUP_APPEND recovery + PIPELINE_SETUP_APPEND_RECOVERY, + // data streaming + DATA_STREAMING, + // pipeline setup for failed data streaming recovery + PIPELINE_SETUP_STREAMING_RECOVERY, + // close the block and pipeline + PIPELINE_CLOSE, + // Recover a failed PIPELINE_CLOSE + PIPELINE_CLOSE_RECOVERY, + // pipeline set up for block creation + PIPELINE_SETUP_CREATE, + // transfer RBW for adding datanodes + TRANSFER_RBW, + // transfer Finalized for adding datanodes + TRANSFER_FINALIZED; + + final static private byte RECOVERY_BIT = (byte)1; + + /** + * get the recovery stage of this stage + */ + public BlockConstructionStage getRecoveryStage() { + if (this == PIPELINE_SETUP_CREATE) { + throw new IllegalArgumentException( "Unexpected blockStage " + this); + } else { + return values()[ordinal()|RECOVERY_BIT]; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java new file mode 100644 index 0000000..28097ab --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceInfo; +import org.apache.htrace.TraceScope; + +/** + * Static utilities for dealing with the protocol buffers used by the + * Data Transfer Protocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class DataTransferProtoUtil { + static BlockConstructionStage fromProto( + OpWriteBlockProto.BlockConstructionStage stage) { + return BlockConstructionStage.valueOf(stage.name()); + } + + static OpWriteBlockProto.BlockConstructionStage toProto( + BlockConstructionStage stage) { + return OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()); + } + + public static ChecksumProto toProto(DataChecksum checksum) { + ChecksumTypeProto type = PBHelperClient.convert(checksum.getChecksumType()); + // ChecksumType#valueOf never returns null + return ChecksumProto.newBuilder() + .setBytesPerChecksum(checksum.getBytesPerChecksum()) + .setType(type) + .build(); + } + + public static DataChecksum fromProto(ChecksumProto proto) { + if (proto == null) return null; + + int bytesPerChecksum = proto.getBytesPerChecksum(); + DataChecksum.Type type = PBHelperClient.convert(proto.getType()); + return DataChecksum.newDataChecksum(type, bytesPerChecksum); + } + + static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk, + String client, Token blockToken) { + ClientOperationHeaderProto header = + ClientOperationHeaderProto.newBuilder() + .setBaseHeader(buildBaseHeader(blk, blockToken)) + .setClientName(client) + .build(); + return header; + } + + static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, + Token blockToken) { + BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() + .setBlock(PBHelperClient.convert(blk)) + .setToken(PBHelperClient.convert(blockToken)); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()) + .setParentId(s.getSpanId())); + } + return builder.build(); + } + + public static TraceInfo fromProto(DataTransferTraceInfoProto proto) { + if (proto == null) return null; + if (!proto.hasTraceId()) return null; + return new TraceInfo(proto.getTraceId(), proto.getParentId()); + } + + public static TraceScope continueTraceSpan(ClientOperationHeaderProto header, + String description) { + return continueTraceSpan(header.getBaseHeader(), description); + } + + public static TraceScope continueTraceSpan(BaseHeaderProto header, + String description) { + return continueTraceSpan(header.getTraceInfo(), description); + } + + public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto, + String description) { + TraceScope scope = null; + TraceInfo info = fromProto(proto); + if (info != null) { + scope = Trace.startSpan(description, info); + } + return scope; + } + + public static void checkBlockOpStatus( + BlockOpResponseProto response, + String logInfo) throws IOException { + if (response.getStatus() != Status.SUCCESS) { + if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { + throw new InvalidBlockTokenException( + "Got access token error" + + ", status message " + response.getMessage() + + ", " + logInfo + ); + } else { + throw new IOException( + "Got error" + + ", status message " + response.getMessage() + + ", " + logInfo + ); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java new file mode 100644 index 0000000..1f7e378 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Transfer data to/from datanode using a streaming protocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface DataTransferProtocol { + public static final Logger LOG = LoggerFactory.getLogger(DataTransferProtocol.class); + + /** Version for data transfers between clients and datanodes + * This should change when serialization of DatanodeInfo, not just + * when protocol changes. It is not very obvious. + */ + /* + * Version 28: + * Declare methods in DataTransferProtocol interface. + */ + public static final int DATA_TRANSFER_VERSION = 28; + + /** + * Read a block. + * + * @param blk the block being read. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param blockOffset offset of the block. + * @param length maximum number of bytes for this read. + * @param sendChecksum if false, the DN should skip reading and sending + * checksums + * @param cachingStrategy The caching strategy to use. + */ + public void readBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final long blockOffset, + final long length, + final boolean sendChecksum, + final CachingStrategy cachingStrategy) throws IOException; + + /** + * Write a block to a datanode pipeline. + * The receiver datanode of this call is the next datanode in the pipeline. + * The other downstream datanodes are specified by the targets parameter. + * Note that the receiver {@link DatanodeInfo} is not required in the + * parameter list since the receiver datanode knows its info. However, the + * {@link StorageType} for storing the replica in the receiver datanode is a + * parameter since the receiver datanode may support multiple storage types. + * + * @param blk the block being written. + * @param storageType for storing the replica in the receiver datanode. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param targets other downstream datanodes in the pipeline. + * @param targetStorageTypes target {@link StorageType}s corresponding + * to the target datanodes. + * @param source source datanode. + * @param stage pipeline stage. + * @param pipelineSize the size of the pipeline. + * @param minBytesRcvd minimum number of bytes received. + * @param maxBytesRcvd maximum number of bytes received. + * @param latestGenerationStamp the latest generation stamp of the block. + * @param pinning whether to pin the block, so Balancer won't move it. + * @param targetPinnings whether to pin the block on target datanode + */ + public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, + final Token blockToken, + final String clientName, + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, + final DatanodeInfo source, + final BlockConstructionStage stage, + final int pipelineSize, + final long minBytesRcvd, + final long maxBytesRcvd, + final long latestGenerationStamp, + final DataChecksum requestedChecksum, + final CachingStrategy cachingStrategy, + final boolean allowLazyPersist, + final boolean pinning, + final boolean[] targetPinnings) throws IOException; + /** + * Transfer a block to another datanode. + * The block stage must be + * either {@link BlockConstructionStage#TRANSFER_RBW} + * or {@link BlockConstructionStage#TRANSFER_FINALIZED}. + * + * @param blk the block being transferred. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param targets target datanodes. + */ + public void transferBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException; + + /** + * Request short circuit access file descriptors from a DataNode. + * + * @param blk The block to get file descriptors for. + * @param blockToken Security token for accessing the block. + * @param slotId The shared memory slot id to use, or null + * to use no slot id. + * @param maxVersion Maximum version of the block data the client + * can understand. + * @param supportsReceiptVerification True if the client supports + * receipt verification. + */ + public void requestShortCircuitFds(final ExtendedBlock blk, + final Token blockToken, + SlotId slotId, int maxVersion, boolean supportsReceiptVerification) + throws IOException; + + /** + * Release a pair of short-circuit FDs requested earlier. + * + * @param slotId SlotID used by the earlier file descriptors. + */ + public void releaseShortCircuitFds(final SlotId slotId) throws IOException; + + /** + * Request a short circuit shared memory area from a DataNode. + * + * @param clientName The name of the client. + */ + public void requestShortCircuitShm(String clientName) throws IOException; + + /** + * Receive a block from a source datanode + * and then notifies the namenode + * to remove the copy from the original datanode. + * Note that the source datanode and the original datanode can be different. + * It is used for balancing purpose. + * + * @param blk the block being replaced. + * @param storageType the {@link StorageType} for storing the block. + * @param blockToken security token for accessing the block. + * @param delHint the hint for deleting the block in the original datanode. + * @param source the source datanode for receiving the block. + */ + public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, + final Token blockToken, + final String delHint, + final DatanodeInfo source) throws IOException; + + /** + * Copy a block. + * It is used for balancing purpose. + * + * @param blk the block being copied. + * @param blockToken security token for accessing the block. + */ + public void copyBlock(final ExtendedBlock blk, + final Token blockToken) throws IOException; + + /** + * Get block checksum (MD5 of CRC32). + * + * @param blk a block. + * @param blockToken security token for accessing the block. + * @throws IOException + */ + public void blockChecksum(final ExtendedBlock blk, + final Token blockToken) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java new file mode 100644 index 0000000..3077498 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** Operation */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public enum Op { + WRITE_BLOCK((byte)80), + READ_BLOCK((byte)81), + READ_METADATA((byte)82), + REPLACE_BLOCK((byte)83), + COPY_BLOCK((byte)84), + BLOCK_CHECKSUM((byte)85), + TRANSFER_BLOCK((byte)86), + REQUEST_SHORT_CIRCUIT_FDS((byte)87), + RELEASE_SHORT_CIRCUIT_FDS((byte)88), + REQUEST_SHORT_CIRCUIT_SHM((byte)89), + CUSTOM((byte)127); + + /** The code for this operation. */ + public final byte code; + + private Op(byte code) { + this.code = code; + } + + private static final int FIRST_CODE = values()[0].code; + /** Return the object represented by the code. */ + private static Op valueOf(byte code) { + final int i = (code & 0xff) - FIRST_CODE; + return i < 0 || i >= values().length? null: values()[i]; + } + + /** Read from in */ + public static Op read(DataInput in) throws IOException { + return valueOf(in.readByte()); + } + + /** Write to out */ + public void write(DataOutput out) throws IOException { + out.write(code); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java new file mode 100644 index 0000000..2d11dc2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; + +import org.apache.htrace.Trace; +import org.apache.htrace.Span; + +import com.google.protobuf.Message; + +/** Sender */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class Sender implements DataTransferProtocol { + private final DataOutputStream out; + + /** Create a sender for DataTransferProtocol with a output stream. */ + public Sender(final DataOutputStream out) { + this.out = out; + } + + /** Initialize a operation. */ + private static void op(final DataOutput out, final Op op + ) throws IOException { + out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); + op.write(out); + } + + private static void send(final DataOutputStream out, final Op opcode, + final Message proto) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName() + + ": " + proto); + } + op(out, opcode); + proto.writeDelimitedTo(out); + out.flush(); + } + + static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) { + CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder(); + if (cachingStrategy.getReadahead() != null) { + builder.setReadahead(cachingStrategy.getReadahead().longValue()); + } + if (cachingStrategy.getDropBehind() != null) { + builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue()); + } + return builder.build(); + } + + @Override + public void readBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final long blockOffset, + final long length, + final boolean sendChecksum, + final CachingStrategy cachingStrategy) throws IOException { + + OpReadBlockProto proto = OpReadBlockProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) + .setOffset(blockOffset) + .setLen(length) + .setSendChecksums(sendChecksum) + .setCachingStrategy(getCachingStrategy(cachingStrategy)) + .build(); + + send(out, Op.READ_BLOCK, proto); + } + + + @Override + public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, + final Token blockToken, + final String clientName, + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, + final DatanodeInfo source, + final BlockConstructionStage stage, + final int pipelineSize, + final long minBytesRcvd, + final long maxBytesRcvd, + final long latestGenerationStamp, + DataChecksum requestedChecksum, + final CachingStrategy cachingStrategy, + final boolean allowLazyPersist, + final boolean pinning, + final boolean[] targetPinnings) throws IOException { + ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( + blk, clientName, blockToken); + + ChecksumProto checksumProto = + DataTransferProtoUtil.toProto(requestedChecksum); + + OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() + .setHeader(header) + .setStorageType(PBHelperClient.convertStorageType(storageType)) + .addAllTargets(PBHelperClient.convert(targets, 1)) + .addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes, 1)) + .setStage(toProto(stage)) + .setPipelineSize(pipelineSize) + .setMinBytesRcvd(minBytesRcvd) + .setMaxBytesRcvd(maxBytesRcvd) + .setLatestGenerationStamp(latestGenerationStamp) + .setRequestedChecksum(checksumProto) + .setCachingStrategy(getCachingStrategy(cachingStrategy)) + .setAllowLazyPersist(allowLazyPersist) + .setPinning(pinning) + .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1)); + + if (source != null) { + proto.setSource(PBHelperClient.convertDatanodeInfo(source)); + } + + send(out, Op.WRITE_BLOCK, proto.build()); + } + + @Override + public void transferBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { + + OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildClientHeader( + blk, clientName, blockToken)) + .addAllTargets(PBHelperClient.convert(targets)) + .addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes)) + .build(); + + send(out, Op.TRANSFER_BLOCK, proto); + } + + @Override + public void requestShortCircuitFds(final ExtendedBlock blk, + final Token blockToken, + SlotId slotId, int maxVersion, boolean supportsReceiptVerification) + throws IOException { + OpRequestShortCircuitAccessProto.Builder builder = + OpRequestShortCircuitAccessProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader( + blk, blockToken)).setMaxVersion(maxVersion); + if (slotId != null) { + builder.setSlotId(PBHelperClient.convert(slotId)); + } + builder.setSupportsReceiptVerification(supportsReceiptVerification); + OpRequestShortCircuitAccessProto proto = builder.build(); + send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); + } + + @Override + public void releaseShortCircuitFds(SlotId slotId) throws IOException { + ReleaseShortCircuitAccessRequestProto.Builder builder = + ReleaseShortCircuitAccessRequestProto.newBuilder(). + setSlotId(PBHelperClient.convert(slotId)); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); + } + ReleaseShortCircuitAccessRequestProto proto = builder.build(); + send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto); + } + + @Override + public void requestShortCircuitShm(String clientName) throws IOException { + ShortCircuitShmRequestProto.Builder builder = + ShortCircuitShmRequestProto.newBuilder(). + setClientName(clientName); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); + } + ShortCircuitShmRequestProto proto = builder.build(); + send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); + } + + @Override + public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, + final Token blockToken, + final String delHint, + final DatanodeInfo source) throws IOException { + OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .setStorageType(PBHelperClient.convertStorageType(storageType)) + .setDelHint(delHint) + .setSource(PBHelperClient.convertDatanodeInfo(source)) + .build(); + + send(out, Op.REPLACE_BLOCK, proto); + } + + @Override + public void copyBlock(final ExtendedBlock blk, + final Token blockToken) throws IOException { + OpCopyBlockProto proto = OpCopyBlockProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .build(); + + send(out, Op.COPY_BLOCK, proto); + } + + @Override + public void blockChecksum(final ExtendedBlock blk, + final Token blockToken) throws IOException { + OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .build(); + + send(out, Op.BLOCK_CHECKSUM, proto); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java new file mode 100644 index 0000000..edf658a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.apache.hadoop.hdfs.util.ExactSizeInputStream; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Utilities for converting protobuf classes to and from implementation classes + * and other helper utilities to help in dealing with protobuf. + * + * Note that when converting from an internal type to protobuf type, the + * converter never return null for protobuf type. The check for internal type + * being null must be done before calling the convert() method. + */ +public class PBHelperClient { + private PBHelperClient() { + /** Hidden constructor */ + } + + public static ByteString getByteString(byte[] bytes) { + return ByteString.copyFrom(bytes); + } + + public static ShmId convert(ShortCircuitShmIdProto shmId) { + return new ShmId(shmId.getHi(), shmId.getLo()); + } + + public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) { + return DataChecksum.Type.valueOf(type.getNumber()); + } + + public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) { + return HdfsProtos.ChecksumTypeProto.valueOf(type.id); + } + + public static ExtendedBlockProto convert(final ExtendedBlock b) { + if (b == null) return null; + return ExtendedBlockProto.newBuilder(). + setPoolId(b.getBlockPoolId()). + setBlockId(b.getBlockId()). + setNumBytes(b.getNumBytes()). + setGenerationStamp(b.getGenerationStamp()). + build(); + } + + public static TokenProto convert(Token tok) { + return TokenProto.newBuilder(). + setIdentifier(ByteString.copyFrom(tok.getIdentifier())). + setPassword(ByteString.copyFrom(tok.getPassword())). + setKind(tok.getKind().toString()). + setService(tok.getService().toString()).build(); + } + + public static ShortCircuitShmIdProto convert(ShmId shmId) { + return ShortCircuitShmIdProto.newBuilder(). + setHi(shmId.getHi()). + setLo(shmId.getLo()). + build(); + + } + + public static ShortCircuitShmSlotProto convert(SlotId slotId) { + return ShortCircuitShmSlotProto.newBuilder(). + setShmId(convert(slotId.getShmId())). + setSlotIdx(slotId.getSlotIdx()). + build(); + } + + public static DatanodeIDProto convert(DatanodeID dn) { + // For wire compatibility with older versions we transmit the StorageID + // which is the same as the DatanodeUuid. Since StorageID is a required + // field we pass the empty string if the DatanodeUuid is not yet known. + return DatanodeIDProto.newBuilder() + .setIpAddr(dn.getIpAddr()) + .setHostName(dn.getHostName()) + .setXferPort(dn.getXferPort()) + .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "") + .setInfoPort(dn.getInfoPort()) + .setInfoSecurePort(dn.getInfoSecurePort()) + .setIpcPort(dn.getIpcPort()).build(); + } + + public static DatanodeInfoProto.AdminState convert( + final DatanodeInfo.AdminStates inAs) { + switch (inAs) { + case NORMAL: return DatanodeInfoProto.AdminState.NORMAL; + case DECOMMISSION_INPROGRESS: + return DatanodeInfoProto.AdminState.DECOMMISSION_INPROGRESS; + case DECOMMISSIONED: return DatanodeInfoProto.AdminState.DECOMMISSIONED; + default: return DatanodeInfoProto.AdminState.NORMAL; + } + } + + public static DatanodeInfoProto convert(DatanodeInfo info) { + DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder(); + if (info.getNetworkLocation() != null) { + builder.setLocation(info.getNetworkLocation()); + } + builder + .setId(convert((DatanodeID) info)) + .setCapacity(info.getCapacity()) + .setDfsUsed(info.getDfsUsed()) + .setRemaining(info.getRemaining()) + .setBlockPoolUsed(info.getBlockPoolUsed()) + .setCacheCapacity(info.getCacheCapacity()) + .setCacheUsed(info.getCacheUsed()) + .setLastUpdate(info.getLastUpdate()) + .setLastUpdateMonotonic(info.getLastUpdateMonotonic()) + .setXceiverCount(info.getXceiverCount()) + .setAdminState(convert(info.getAdminState())) + .build(); + return builder.build(); + } + + public static List convert( + DatanodeInfo[] dnInfos) { + return convert(dnInfos, 0); + } + + /** + * Copy from {@code dnInfos} to a target of list of same size starting at + * {@code startIdx}. + */ + public static List convert( + DatanodeInfo[] dnInfos, int startIdx) { + if (dnInfos == null) + return null; + ArrayList protos = Lists + .newArrayListWithCapacity(dnInfos.length); + for (int i = startIdx; i < dnInfos.length; i++) { + protos.add(convert(dnInfos[i])); + } + return protos; + } + + public static List convert(boolean[] targetPinnings, int idx) { + List pinnings = new ArrayList<>(); + if (targetPinnings == null) { + pinnings.add(Boolean.FALSE); + } else { + for (; idx < targetPinnings.length; ++idx) { + pinnings.add(targetPinnings[idx]); + } + } + return pinnings; + } + + static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) { + if (di == null) return null; + return convert(di); + } + + public static StorageTypeProto convertStorageType(StorageType type) { + switch(type) { + case DISK: + return StorageTypeProto.DISK; + case SSD: + return StorageTypeProto.SSD; + case ARCHIVE: + return StorageTypeProto.ARCHIVE; + case RAM_DISK: + return StorageTypeProto.RAM_DISK; + default: + throw new IllegalStateException( + "BUG: StorageType not found, type=" + type); + } + } + + public static StorageType convertStorageType(StorageTypeProto type) { + switch(type) { + case DISK: + return StorageType.DISK; + case SSD: + return StorageType.SSD; + case ARCHIVE: + return StorageType.ARCHIVE; + case RAM_DISK: + return StorageType.RAM_DISK; + default: + throw new IllegalStateException( + "BUG: StorageTypeProto not found, type=" + type); + } + } + + public static List convertStorageTypes( + StorageType[] types) { + return convertStorageTypes(types, 0); + } + + public static List convertStorageTypes( + StorageType[] types, int startIdx) { + if (types == null) { + return null; + } + final List protos = new ArrayList<>( + types.length); + for (int i = startIdx; i < types.length; ++i) { + protos.add(PBHelperClient.convertStorageType(types[i])); + } + return protos; + } + + public static InputStream vintPrefixed(final InputStream input) + throws IOException { + final int firstByte = input.read(); + if (firstByte == -1) { + throw new EOFException("Premature EOF: no length prefix available"); + } + + int size = CodedInputStream.readRawVarint32(firstByte, input); + assert size >= 0; + return new ExactSizeInputStream(input, size); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java new file mode 100644 index 0000000..2fa86fa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/InvalidBlockTokenException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.security.token.block; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Access token verification failed. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class InvalidBlockTokenException extends IOException { + private static final long serialVersionUID = 168L; + + public InvalidBlockTokenException() { + super(); + } + + public InvalidBlockTokenException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java new file mode 100644 index 0000000..215df13 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +/** + * The caching strategy we should use for an HDFS read or write operation. + */ +public class CachingStrategy { + private final Boolean dropBehind; // null = use server defaults + private final Long readahead; // null = use server defaults + + public static CachingStrategy newDefaultStrategy() { + return new CachingStrategy(null, null); + } + + public static CachingStrategy newDropBehind() { + return new CachingStrategy(true, null); + } + + public static class Builder { + private Boolean dropBehind; + private Long readahead; + + public Builder(CachingStrategy prev) { + this.dropBehind = prev.dropBehind; + this.readahead = prev.readahead; + } + + public Builder setDropBehind(Boolean dropBehind) { + this.dropBehind = dropBehind; + return this; + } + + public Builder setReadahead(Long readahead) { + this.readahead = readahead; + return this; + } + + public CachingStrategy build() { + return new CachingStrategy(dropBehind, readahead); + } + } + + public CachingStrategy(Boolean dropBehind, Long readahead) { + this.dropBehind = dropBehind; + this.readahead = readahead; + } + + public Boolean getDropBehind() { + return dropBehind; + } + + public Long getReadahead() { + return readahead; + } + + public String toString() { + return "CachingStrategy(dropBehind=" + dropBehind + + ", readahead=" + readahead + ")"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java new file mode 100644 index 0000000..81cc68d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.EndpointShmManager; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.DomainSocketWatcher; + +import com.google.common.base.Preconditions; + +/** + * DfsClientShm is a subclass of ShortCircuitShm which is used by the + * DfsClient. + * When the UNIX domain socket associated with this shared memory segment + * closes unexpectedly, we mark the slots inside this segment as disconnected. + * ShortCircuitReplica objects that contain disconnected slots are stale, + * and will not be used to service new reads or mmap operations. + * However, in-progress read or mmap operations will continue to proceed. + * Once the last slot is deallocated, the segment can be safely munmapped. + * + * Slots may also become stale because the associated replica has been deleted + * on the DataNode. In this case, the DataNode will clear the 'valid' bit. + * The client will then see these slots as stale (see + * #{ShortCircuitReplica#isStale}). + */ +public class DfsClientShm extends ShortCircuitShm + implements DomainSocketWatcher.Handler { + /** + * The EndpointShmManager associated with this shared memory segment. + */ + private final EndpointShmManager manager; + + /** + * The UNIX domain socket associated with this DfsClientShm. + * We rely on the DomainSocketWatcher to close the socket associated with + * this DomainPeer when necessary. + */ + private final DomainPeer peer; + + /** + * True if this shared memory segment has lost its connection to the + * DataNode. + * + * {@link DfsClientShm#handle} sets this to true. + */ + private boolean disconnected = false; + + DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager, + DomainPeer peer) throws IOException { + super(shmId, stream); + this.manager = manager; + this.peer = peer; + } + + public EndpointShmManager getEndpointShmManager() { + return manager; + } + + public DomainPeer getPeer() { + return peer; + } + + /** + * Determine if the shared memory segment is disconnected from the DataNode. + * + * This must be called with the DfsClientShmManager lock held. + * + * @return True if the shared memory segment is stale. + */ + public synchronized boolean isDisconnected() { + return disconnected; + } + + /** + * Handle the closure of the UNIX domain socket associated with this shared + * memory segment by marking this segment as stale. + * + * If there are no slots associated with this shared memory segment, it will + * be freed immediately in this function. + */ + @Override + public boolean handle(DomainSocket sock) { + manager.unregisterShm(getShmId()); + synchronized (this) { + Preconditions.checkState(!disconnected); + disconnected = true; + boolean hadSlots = false; + for (Iterator iter = slotIterator(); iter.hasNext(); ) { + Slot slot = iter.next(); + slot.makeInvalid(); + hadSlots = true; + } + if (!hadSlots) { + free(); + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/490bb5eb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java new file mode 100644 index 0000000..f70398a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java @@ -0,0 +1,522 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.shortcircuit; + +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.DomainSocketWatcher; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages short-circuit memory segments for an HDFS client. + * + * Clients are responsible for requesting and releasing shared memory segments used + * for communicating with the DataNode. The client will try to allocate new slots + * in the set of existing segments, falling back to getting a new segment from the + * DataNode via {@link DataTransferProtocol#requestShortCircuitFds}. + * + * The counterpart to this class on the DataNode is {@link ShortCircuitRegistry}. + * See {@link ShortCircuitRegistry} for more information on the communication protocol. + */ +@InterfaceAudience.Private +public class DfsClientShmManager implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger( + DfsClientShmManager.class); + + /** + * Manages short-circuit memory segments that pertain to a given DataNode. + */ + class EndpointShmManager { + /** + * The datanode we're managing. + */ + private final DatanodeInfo datanode; + + /** + * Shared memory segments which have no empty slots. + * + * Protected by the manager lock. + */ + private final TreeMap full = + new TreeMap(); + + /** + * Shared memory segments which have at least one empty slot. + * + * Protected by the manager lock. + */ + private final TreeMap notFull = + new TreeMap(); + + /** + * True if this datanode doesn't support short-circuit shared memory + * segments. + * + * Protected by the manager lock. + */ + private boolean disabled = false; + + /** + * True if we're in the process of loading a shared memory segment from + * this DataNode. + * + * Protected by the manager lock. + */ + private boolean loading = false; + + EndpointShmManager (DatanodeInfo datanode) { + this.datanode = datanode; + } + + /** + * Pull a slot out of a preexisting shared memory segment. + * + * Must be called with the manager lock held. + * + * @param blockId The blockId to put inside the Slot object. + * + * @return null if none of our shared memory segments contain a + * free slot; the slot object otherwise. + */ + private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) { + if (notFull.isEmpty()) { + return null; + } + Entry entry = notFull.firstEntry(); + DfsClientShm shm = entry.getValue(); + ShmId shmId = shm.getShmId(); + Slot slot = shm.allocAndRegisterSlot(blockId); + if (shm.isFull()) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() + + " out of " + shm); + } + DfsClientShm removedShm = notFull.remove(shmId); + Preconditions.checkState(removedShm == shm); + full.put(shmId, shm); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": pulled slot " + slot.getSlotIdx() + + " out of " + shm); + } + } + return slot; + } + + /** + * Ask the DataNode for a new shared memory segment. This function must be + * called with the manager lock held. We will release the lock while + * communicating with the DataNode. + * + * @param clientName The current client name. + * @param peer The peer to use to talk to the DataNode. + * + * @return Null if the DataNode does not support shared memory + * segments, or experienced an error creating the + * shm. The shared memory segment itself on success. + * @throws IOException If there was an error communicating over the socket. + * We will not throw an IOException unless the socket + * itself (or the network) is the problem. + */ + private DfsClientShm requestNewShm(String clientName, DomainPeer peer) + throws IOException { + final DataOutputStream out = + new DataOutputStream( + new BufferedOutputStream(peer.getOutputStream())); + new Sender(out).requestShortCircuitShm(clientName); + ShortCircuitShmResponseProto resp = + ShortCircuitShmResponseProto.parseFrom( + PBHelperClient.vintPrefixed(peer.getInputStream())); + String error = resp.hasError() ? resp.getError() : "(unknown)"; + switch (resp.getStatus()) { + case SUCCESS: + DomainSocket sock = peer.getDomainSocket(); + byte buf[] = new byte[1]; + FileInputStream fis[] = new FileInputStream[1]; + if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) { + throw new EOFException("got EOF while trying to transfer the " + + "file descriptor for the shared memory segment."); + } + if (fis[0] == null) { + throw new IOException("the datanode " + datanode + " failed to " + + "pass a file descriptor for the shared memory segment."); + } + try { + DfsClientShm shm = + new DfsClientShm(PBHelperClient.convert(resp.getId()), + fis[0], this, peer); + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": createNewShm: created " + shm); + } + return shm; + } finally { + try { + fis[0].close(); + } catch (Throwable e) { + LOG.debug("Exception in closing " + fis[0], e); + } + } + case ERROR_UNSUPPORTED: + // The DataNode just does not support short-circuit shared memory + // access, and we should stop asking. + LOG.info(this + ": datanode does not support short-circuit " + + "shared memory access: " + error); + disabled = true; + return null; + default: + // The datanode experienced some kind of unexpected error when trying to + // create the short-circuit shared memory segment. + LOG.warn(this + ": error requesting short-circuit shared memory " + + "access: " + error); + return null; + } + } + + /** + * Allocate a new shared memory slot connected to this datanode. + * + * Must be called with the EndpointShmManager lock held. + * + * @param peer The peer to use to talk to the DataNode. + * @param usedPeer (out param) Will be set to true if we used the peer. + * When a peer is used + * + * @param clientName The client name. + * @param blockId The block ID to use. + * @return null if the DataNode does not support shared memory + * segments, or experienced an error creating the + * shm. The shared memory segment itself on success. + * @throws IOException If there was an error communicating over the socket. + */ + Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer, + String clientName, ExtendedBlockId blockId) throws IOException { + while (true) { + if (closed) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": the DfsClientShmManager has been closed."); + } + return null; + } + if (disabled) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": shared memory segment access is disabled."); + } + return null; + } + // Try to use an existing slot. + Slot slot = allocSlotFromExistingShm(blockId); + if (slot != null) { + return slot; + } + // There are no free slots. If someone is loading more slots, wait + // for that to finish. + if (loading) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": waiting for loading to finish..."); + } + finishedLoading.awaitUninterruptibly(); + } else { + // Otherwise, load the slot ourselves. + loading = true; + lock.unlock(); + DfsClientShm shm; + try { + shm = requestNewShm(clientName, peer); + if (shm == null) continue; + // See #{DfsClientShmManager#domainSocketWatcher} for details + // about why we do this before retaking the manager lock. + domainSocketWatcher.add(peer.getDomainSocket(), shm); + // The DomainPeer is now our responsibility, and should not be + // closed by the caller. + usedPeer.setValue(true); + } finally { + lock.lock(); + loading = false; + finishedLoading.signalAll(); + } + if (shm.isDisconnected()) { + // If the peer closed immediately after the shared memory segment + // was created, the DomainSocketWatcher callback might already have + // fired and marked the shm as disconnected. In this case, we + // obviously don't want to add the SharedMemorySegment to our list + // of valid not-full segments. + if (LOG.isDebugEnabled()) { + LOG.debug(this + ": the UNIX domain socket associated with " + + "this short-circuit memory closed before we could make " + + "use of the shm."); + } + } else { + notFull.put(shm.getShmId(), shm); + } + } + } + } + + /** + * Stop tracking a slot. + * + * Must be called with the EndpointShmManager lock held. + * + * @param slot The slot to release. + */ + void freeSlot(Slot slot) { + DfsClientShm shm = (DfsClientShm)slot.getShm(); + shm.unregisterSlot(slot.getSlotIdx()); + if (shm.isDisconnected()) { + // Stale shared memory segments should not be tracked here. + Preconditions.checkState(!full.containsKey(shm.getShmId())); + Preconditions.checkState(!notFull.containsKey(shm.getShmId())); + if (shm.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": freeing empty stale " + shm); + } + shm.free(); + } + } else { + ShmId shmId = shm.getShmId(); + full.remove(shmId); // The shm can't be full if we just freed a slot. + if (shm.isEmpty()) { + notFull.remove(shmId); + + // If the shared memory segment is now empty, we call shutdown(2) on + // the UNIX domain socket associated with it. The DomainSocketWatcher, + // which is watching this socket, will call DfsClientShm#handle, + // cleaning up this shared memory segment. + // + // See #{DfsClientShmManager#domainSocketWatcher} for details about why + // we don't want to call DomainSocketWatcher#remove directly here. + // + // Note that we could experience 'fragmentation' here, where the + // DFSClient allocates a bunch of slots in different shared memory + // segments, and then frees most of them, but never fully empties out + // any segment. We make some attempt to avoid this fragmentation by + // always allocating new slots out of the shared memory segment with the + // lowest ID, but it could still occur. In most workloads, + // fragmentation should not be a major concern, since it doesn't impact + // peak file descriptor usage or the speed of allocation. + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": shutting down UNIX domain socket for " + + "empty " + shm); + } + shutdown(shm); + } else { + notFull.put(shmId, shm); + } + } + } + + /** + * Unregister a shared memory segment. + * + * Once a segment is unregistered, we will not allocate any more slots + * inside that segment. + * + * The DomainSocketWatcher calls this while holding the DomainSocketWatcher + * lock. + * + * @param shmId The ID of the shared memory segment to unregister. + */ + void unregisterShm(ShmId shmId) { + lock.lock(); + try { + full.remove(shmId); + notFull.remove(shmId); + } finally { + lock.unlock(); + } + } + + @Override + public String toString() { + return String.format("EndpointShmManager(%s, parent=%s)", + datanode, DfsClientShmManager.this); + } + + PerDatanodeVisitorInfo getVisitorInfo() { + return new PerDatanodeVisitorInfo(full, notFull, disabled); + } + + final void shutdown(DfsClientShm shm) { + try { + shm.getPeer().getDomainSocket().shutdown(); + } catch (IOException e) { + LOG.warn(this + ": error shutting down shm: got IOException calling " + + "shutdown(SHUT_RDWR)", e); + } + } + } + + private boolean closed = false; + + private final ReentrantLock lock = new ReentrantLock(); + + /** + * A condition variable which is signalled when we finish loading a segment + * from the Datanode. + */ + private final Condition finishedLoading = lock.newCondition(); + + /** + * Information about each Datanode. + */ + private final HashMap datanodes = + new HashMap(1); + + /** + * The DomainSocketWatcher which keeps track of the UNIX domain socket + * associated with each shared memory segment. + * + * Note: because the DomainSocketWatcher makes callbacks into this + * DfsClientShmManager object, you must MUST NOT attempt to take the + * DomainSocketWatcher lock while holding the DfsClientShmManager lock, + * or else deadlock might result. This means that most DomainSocketWatcher + * methods are off-limits unless you release the manager lock first. + */ + private final DomainSocketWatcher domainSocketWatcher; + + DfsClientShmManager(int interruptCheckPeriodMs) throws IOException { + this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs, + "client"); + } + + public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer, + MutableBoolean usedPeer, ExtendedBlockId blockId, + String clientName) throws IOException { + lock.lock(); + try { + if (closed) { + LOG.trace(this + ": the DfsClientShmManager isclosed."); + return null; + } + EndpointShmManager shmManager = datanodes.get(datanode); + if (shmManager == null) { + shmManager = new EndpointShmManager(datanode); + datanodes.put(datanode, shmManager); + } + return shmManager.allocSlot(peer, usedPeer, clientName, blockId); + } finally { + lock.unlock(); + } + } + + public void freeSlot(Slot slot) { + lock.lock(); + try { + DfsClientShm shm = (DfsClientShm)slot.getShm(); + shm.getEndpointShmManager().freeSlot(slot); + } finally { + lock.unlock(); + } + } + + @VisibleForTesting + public static class PerDatanodeVisitorInfo { + public final TreeMap full; + public final TreeMap notFull; + public final boolean disabled; + + PerDatanodeVisitorInfo(TreeMap full, + TreeMap notFull, boolean disabled) { + this.full = full; + this.notFull = notFull; + this.disabled = disabled; + } + } + + @VisibleForTesting + public interface Visitor { + void visit(HashMap info) + throws IOException; + } + + @VisibleForTesting + public void visit(Visitor visitor) throws IOException { + lock.lock(); + try { + HashMap info = + new HashMap(); + for (Entry entry : + datanodes.entrySet()) { + info.put(entry.getKey(), entry.getValue().getVisitorInfo()); + } + visitor.visit(info); + } finally { + lock.unlock(); + } + } + + /** + * Close the DfsClientShmManager. + */ + @Override + public void close() throws IOException { + lock.lock(); + try { + if (closed) return; + closed = true; + } finally { + lock.unlock(); + } + // When closed, the domainSocketWatcher will issue callbacks that mark + // all the outstanding DfsClientShm segments as stale. + try { + domainSocketWatcher.close(); + } catch (Throwable e) { + LOG.debug("Exception in closing " + domainSocketWatcher, e); + } + } + + + @Override + public String toString() { + return String.format("ShortCircuitShmManager(%08x)", + System.identityHashCode(this)); + } + + @VisibleForTesting + public DomainSocketWatcher getDomainSocketWatcher() { + return domainSocketWatcher; + } +}