Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5939F17EFD for ; Mon, 7 Sep 2015 15:46:23 +0000 (UTC) Received: (qmail 40437 invoked by uid 500); 7 Sep 2015 15:46:06 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 40291 invoked by uid 500); 7 Sep 2015 15:46:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 37802 invoked by uid 99); 7 Sep 2015 15:46:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Sep 2015 15:46:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A25C4E01FC; Mon, 7 Sep 2015 15:46:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vvasudev@apache.org To: common-commits@hadoop.apache.org Date: Mon, 07 Sep 2015 15:46:45 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [43/50] [abbrv] hadoop git commit: HDFS-9012. Move o.a.h.hdfs.protocol.datatransfer.PipelineAck class to hadoop-hdfs-client module. Contributed by Mingliang Liu. HDFS-9012. Move o.a.h.hdfs.protocol.datatransfer.PipelineAck class to hadoop-hdfs-client module. Contributed by Mingliang Liu. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d16c4eee Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d16c4eee Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d16c4eee Branch: refs/heads/YARN-3926 Commit: d16c4eee186492608ffeb1c2e83f437000cc64f6 Parents: 6eaca2e Author: Haohui Mai Authored: Fri Sep 4 10:41:09 2015 -0700 Committer: Haohui Mai Committed: Fri Sep 4 10:41:09 2015 -0700 ---------------------------------------------------------------------- .../hdfs/protocol/datatransfer/PipelineAck.java | 243 ++++++++++++++++ .../apache/hadoop/hdfs/util/LongBitFormat.java | 71 +++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/protocol/datatransfer/PipelineAck.java | 274 ------------------- .../hdfs/server/datanode/BlockReceiver.java | 2 +- .../hadoop/hdfs/server/datanode/DataNode.java | 38 +++ .../apache/hadoop/hdfs/util/LongBitFormat.java | 71 ----- 7 files changed, 356 insertions(+), 346 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java new file mode 100644 index 0000000..3836606 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; + +import com.google.common.collect.Lists; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import com.google.protobuf.TextFormat; +import org.apache.hadoop.hdfs.util.LongBitFormat; + +/** Pipeline Acknowledgment **/ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class PipelineAck { + PipelineAckProto proto; + public final static long UNKOWN_SEQNO = -2; + final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type + final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type + + public enum ECN { + DISABLED(0), + SUPPORTED(1), + SUPPORTED2(2), + CONGESTED(3); + + private final int value; + private static final ECN[] VALUES = values(); + static ECN valueOf(int value) { + return VALUES[value]; + } + + ECN(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + } + + private enum StatusFormat { + STATUS(null, 4), + RESERVED(STATUS.BITS, 1), + ECN_BITS(RESERVED.BITS, 2); + + private final LongBitFormat BITS; + + StatusFormat(LongBitFormat prev, int bits) { + BITS = new LongBitFormat(name(), prev, bits, 0); + } + + static Status getStatus(int header) { + return Status.valueOf((int) STATUS.BITS.retrieve(header)); + } + + static ECN getECN(int header) { + return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header)); + } + + public static int setStatus(int old, Status status) { + return (int) STATUS.BITS.combine(status.getNumber(), old); + } + + public static int setECN(int old, ECN ecn) { + return (int) ECN_BITS.BITS.combine(ecn.getValue(), old); + } + } + + /** default constructor **/ + public PipelineAck() { + } + + /** + * Constructor assuming no next DN in pipeline + * @param seqno sequence number + * @param replies an array of replies + */ + public PipelineAck(long seqno, int[] replies) { + this(seqno, replies, 0L); + } + + /** + * Constructor + * @param seqno sequence number + * @param replies an array of replies + * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline + */ + public PipelineAck(long seqno, int[] replies, + long downstreamAckTimeNanos) { + ArrayList statusList = Lists.newArrayList(); + ArrayList flagList = Lists.newArrayList(); + for (int r : replies) { + statusList.add(StatusFormat.getStatus(r)); + flagList.add(r); + } + proto = PipelineAckProto.newBuilder() + .setSeqno(seqno) + .addAllReply(statusList) + .addAllFlag(flagList) + .setDownstreamAckTimeNanos(downstreamAckTimeNanos) + .build(); + } + + /** + * Get the sequence number + * @return the sequence number + */ + public long getSeqno() { + return proto.getSeqno(); + } + + /** + * Get the number of replies + * @return the number of replies + */ + public short getNumOfReplies() { + return (short)proto.getReplyCount(); + } + + /** + * get the header flag of ith reply + */ + public int getHeaderFlag(int i) { + if (proto.getFlagCount() > 0) { + return proto.getFlag(i); + } else { + return combineHeader(ECN.DISABLED, proto.getReply(i)); + } + } + + public int getFlag(int i) { + return proto.getFlag(i); + } + + /** + * Get the time elapsed for downstream ack RTT in nanoseconds + * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline + */ + public long getDownstreamAckTimeNanos() { + return proto.getDownstreamAckTimeNanos(); + } + + /** + * Check if this ack contains error status + * @return true if all statuses are SUCCESS + */ + public boolean isSuccess() { + for (Status s : proto.getReplyList()) { + if (s != Status.SUCCESS) { + return false; + } + } + return true; + } + + /** + * Returns the OOB status if this ack contains one. + * @return null if it is not an OOB ack. + */ + public Status getOOBStatus() { + // Normal data transfer acks will have a valid sequence number, so + // this will return right away in most cases. + if (getSeqno() != UNKOWN_SEQNO) { + return null; + } + for (Status s : proto.getReplyList()) { + // The following check is valid because protobuf guarantees to + // preserve the ordering of enum elements. + if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) { + return s; + } + } + return null; + } + + /** Get the Restart OOB ack status */ + public static Status getRestartOOBStatus() { + return Status.OOB_RESTART; + } + + /** return true if it is the restart OOB status code */ + public static boolean isRestartOOBStatus(Status st) { + return st.equals(Status.OOB_RESTART); + } + + /**** Writable interface ****/ + public void readFields(InputStream in) throws IOException { + proto = PipelineAckProto.parseFrom(vintPrefixed(in)); + } + + public void write(OutputStream out) throws IOException { + proto.writeDelimitedTo(out); + } + + @Override //Object + public String toString() { + return TextFormat.shortDebugString(proto); + } + + public static Status getStatusFromHeader(int header) { + return StatusFormat.getStatus(header); + } + + public static ECN getECNFromHeader(int header) { + return StatusFormat.getECN(header); + } + + public static int setStatusForHeader(int old, Status status) { + return StatusFormat.setStatus(old, status); + } + + public static int combineHeader(ECN ecn, Status status) { + int header = 0; + header = StatusFormat.setStatus(header, status); + header = StatusFormat.setECN(header, ecn); + return header; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java new file mode 100644 index 0000000..9399d84 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.io.Serializable; + + +/** + * Bit format in a long. + */ +public class LongBitFormat implements Serializable { + private static final long serialVersionUID = 1L; + + private final String NAME; + /** Bit offset */ + private final int OFFSET; + /** Bit length */ + private final int LENGTH; + /** Minimum value */ + private final long MIN; + /** Maximum value */ + private final long MAX; + /** Bit mask */ + private final long MASK; + + public LongBitFormat(String name, LongBitFormat previous, int length, long min) { + NAME = name; + OFFSET = previous == null? 0: previous.OFFSET + previous.LENGTH; + LENGTH = length; + MIN = min; + MAX = ((-1L) >>> (64 - LENGTH)); + MASK = MAX << OFFSET; + } + + /** Retrieve the value from the record. */ + public long retrieve(long record) { + return (record & MASK) >>> OFFSET; + } + + /** Combine the value to the record. */ + public long combine(long value, long record) { + if (value < MIN) { + throw new IllegalArgumentException( + "Illagal value: " + NAME + " = " + value + " < MIN = " + MIN); + } + if (value > MAX) { + throw new IllegalArgumentException( + "Illagal value: " + NAME + " = " + value + " > MAX = " + MAX); + } + return (record & ~MASK) | (value << OFFSET); + } + + public long getMin() { + return MIN; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b9b89aa..e67c9d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -888,6 +888,9 @@ Release 2.8.0 - UNRELEASED HDFS-9021. Use a yellow elephant rather than a blue one in diagram. (wang) + HDFS-9012. Move o.a.h.hdfs.protocol.datatransfer.PipelineAck class to + hadoop-hdfs-client module. (Mingliang Liu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java deleted file mode 100644 index 44f38c6..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; - -import com.google.common.collect.Lists; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT; - -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import com.google.protobuf.TextFormat; -import org.apache.hadoop.hdfs.util.LongBitFormat; - -/** Pipeline Acknowledgment **/ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class PipelineAck { - PipelineAckProto proto; - public final static long UNKOWN_SEQNO = -2; - final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type - final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type - final static int NUM_OOB_TYPES = OOB_END - OOB_START + 1; - // place holder for timeout value of each OOB type - final static long[] OOB_TIMEOUT; - - public enum ECN { - DISABLED(0), - SUPPORTED(1), - SUPPORTED2(2), - CONGESTED(3); - - private final int value; - private static final ECN[] VALUES = values(); - static ECN valueOf(int value) { - return VALUES[value]; - } - - ECN(int value) { - this.value = value; - } - - public int getValue() { - return value; - } - } - - private enum StatusFormat { - STATUS(null, 4), - RESERVED(STATUS.BITS, 1), - ECN_BITS(RESERVED.BITS, 2); - - private final LongBitFormat BITS; - - StatusFormat(LongBitFormat prev, int bits) { - BITS = new LongBitFormat(name(), prev, bits, 0); - } - - static Status getStatus(int header) { - return Status.valueOf((int) STATUS.BITS.retrieve(header)); - } - - static ECN getECN(int header) { - return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header)); - } - - public static int setStatus(int old, Status status) { - return (int) STATUS.BITS.combine(status.getNumber(), old); - } - - public static int setECN(int old, ECN ecn) { - return (int) ECN_BITS.BITS.combine(ecn.getValue(), old); - } - } - - static { - OOB_TIMEOUT = new long[NUM_OOB_TYPES]; - HdfsConfiguration conf = new HdfsConfiguration(); - String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY, - DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(","); - for (int i = 0; i < NUM_OOB_TYPES; i++) { - OOB_TIMEOUT[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0; - } - } - - /** default constructor **/ - public PipelineAck() { - } - - /** - * Constructor assuming no next DN in pipeline - * @param seqno sequence number - * @param replies an array of replies - */ - public PipelineAck(long seqno, int[] replies) { - this(seqno, replies, 0L); - } - - /** - * Constructor - * @param seqno sequence number - * @param replies an array of replies - * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline - */ - public PipelineAck(long seqno, int[] replies, - long downstreamAckTimeNanos) { - ArrayList statusList = Lists.newArrayList(); - ArrayList flagList = Lists.newArrayList(); - for (int r : replies) { - statusList.add(StatusFormat.getStatus(r)); - flagList.add(r); - } - proto = PipelineAckProto.newBuilder() - .setSeqno(seqno) - .addAllReply(statusList) - .addAllFlag(flagList) - .setDownstreamAckTimeNanos(downstreamAckTimeNanos) - .build(); - } - - /** - * Get the sequence number - * @return the sequence number - */ - public long getSeqno() { - return proto.getSeqno(); - } - - /** - * Get the number of replies - * @return the number of replies - */ - public short getNumOfReplies() { - return (short)proto.getReplyCount(); - } - - /** - * get the header flag of ith reply - */ - public int getHeaderFlag(int i) { - if (proto.getFlagCount() > 0) { - return proto.getFlag(i); - } else { - return combineHeader(ECN.DISABLED, proto.getReply(i)); - } - } - - public int getFlag(int i) { - return proto.getFlag(i); - } - - /** - * Get the time elapsed for downstream ack RTT in nanoseconds - * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline - */ - public long getDownstreamAckTimeNanos() { - return proto.getDownstreamAckTimeNanos(); - } - - /** - * Check if this ack contains error status - * @return true if all statuses are SUCCESS - */ - public boolean isSuccess() { - for (Status s : proto.getReplyList()) { - if (s != Status.SUCCESS) { - return false; - } - } - return true; - } - - /** - * Returns the OOB status if this ack contains one. - * @return null if it is not an OOB ack. - */ - public Status getOOBStatus() { - // Normal data transfer acks will have a valid sequence number, so - // this will return right away in most cases. - if (getSeqno() != UNKOWN_SEQNO) { - return null; - } - for (Status s : proto.getReplyList()) { - // The following check is valid because protobuf guarantees to - // preserve the ordering of enum elements. - if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) { - return s; - } - } - return null; - } - - /** - * Get the timeout to be used for transmitting the OOB type - * @return the timeout in milliseconds - */ - public static long getOOBTimeout(Status status) throws IOException { - int index = status.getNumber() - OOB_START; - if (index >= 0 && index < NUM_OOB_TYPES) { - return OOB_TIMEOUT[index]; - } - // Not an OOB. - throw new IOException("Not an OOB status: " + status); - } - - /** Get the Restart OOB ack status */ - public static Status getRestartOOBStatus() { - return Status.OOB_RESTART; - } - - /** return true if it is the restart OOB status code */ - public static boolean isRestartOOBStatus(Status st) { - return st.equals(Status.OOB_RESTART); - } - - /**** Writable interface ****/ - public void readFields(InputStream in) throws IOException { - proto = PipelineAckProto.parseFrom(vintPrefixed(in)); - } - - public void write(OutputStream out) throws IOException { - proto.writeDelimitedTo(out); - } - - @Override //Object - public String toString() { - return TextFormat.shortDebugString(proto); - } - - public static Status getStatusFromHeader(int header) { - return StatusFormat.getStatus(header); - } - - public static ECN getECNFromHeader(int header) { - return StatusFormat.getECN(header); - } - - public static int setStatusForHeader(int old, Status status) { - return StatusFormat.setStatus(old, status); - } - - public static int combineHeader(ECN ecn, Status status) { - int header = 0; - header = StatusFormat.setStatus(header, status); - header = StatusFormat.setECN(header, ecn); - return header; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 1cb308f..bc5396f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -1153,7 +1153,7 @@ class BlockReceiver implements Closeable { synchronized(this) { if (sending) { - wait(PipelineAck.getOOBTimeout(ackStatus)); + wait(datanode.getOOBTimeout(ackStatus)); // Didn't get my turn in time. Give up. if (sending) { throw new IOException("Could not send OOB reponse in time: " http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 42cbd96..e0adc6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -40,6 +40,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; @@ -359,6 +361,8 @@ public class DataNode extends ReconfigurableBase .availableProcessors(); private static final double CONGESTION_RATIO = 1.5; + private long[] oobTimeouts; /** timeout value of each OOB type */ + /** * Creates a dummy DataNode for testing purpose. */ @@ -373,6 +377,7 @@ public class DataNode extends ReconfigurableBase this.connectToDnViaHostname = false; this.blockScanner = new BlockScanner(this, conf); this.pipelineSupportECN = false; + initOOBTimeout(); } /** @@ -446,6 +451,8 @@ public class DataNode extends ReconfigurableBase return ret; } }); + + initOOBTimeout(); } @Override // ReconfigurableBase @@ -3226,4 +3233,35 @@ public class DataNode extends ReconfigurableBase checkSuperuserPrivilege(); spanReceiverHost.removeSpanReceiver(id); } + + /** + * Get timeout value of each OOB type from configuration + */ + private void initOOBTimeout() { + final int oobStart = Status.OOB_RESTART_VALUE; // the first OOB type + final int oobEnd = Status.OOB_RESERVED3_VALUE; // the last OOB type + final int numOobTypes = oobEnd - oobStart + 1; + oobTimeouts = new long[numOobTypes]; + + final String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY, + DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(","); + for (int i = 0; i < numOobTypes; i++) { + oobTimeouts[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0; + } + } + + /** + * Get the timeout to be used for transmitting the OOB type + * @return the timeout in milliseconds + */ + public long getOOBTimeout(Status status) + throws IOException { + if (status.getNumber() < Status.OOB_RESTART_VALUE || + status.getNumber() > Status.OOB_RESERVED3_VALUE) { + // Not an OOB. + throw new IOException("Not an OOB status: " + status); + } + + return oobTimeouts[status.getNumber() - Status.OOB_RESTART_VALUE]; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java deleted file mode 100644 index 9399d84..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.util; - -import java.io.Serializable; - - -/** - * Bit format in a long. - */ -public class LongBitFormat implements Serializable { - private static final long serialVersionUID = 1L; - - private final String NAME; - /** Bit offset */ - private final int OFFSET; - /** Bit length */ - private final int LENGTH; - /** Minimum value */ - private final long MIN; - /** Maximum value */ - private final long MAX; - /** Bit mask */ - private final long MASK; - - public LongBitFormat(String name, LongBitFormat previous, int length, long min) { - NAME = name; - OFFSET = previous == null? 0: previous.OFFSET + previous.LENGTH; - LENGTH = length; - MIN = min; - MAX = ((-1L) >>> (64 - LENGTH)); - MASK = MAX << OFFSET; - } - - /** Retrieve the value from the record. */ - public long retrieve(long record) { - return (record & MASK) >>> OFFSET; - } - - /** Combine the value to the record. */ - public long combine(long value, long record) { - if (value < MIN) { - throw new IllegalArgumentException( - "Illagal value: " + NAME + " = " + value + " < MIN = " + MIN); - } - if (value > MAX) { - throw new IllegalArgumentException( - "Illagal value: " + NAME + " = " + value + " > MAX = " + MAX); - } - return (record & ~MASK) | (value << OFFSET); - } - - public long getMin() { - return MIN; - } -}