Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C89EF18785 for ; Mon, 20 Jul 2015 17:23:29 +0000 (UTC) Received: (qmail 96149 invoked by uid 500); 20 Jul 2015 17:16:49 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 96088 invoked by uid 500); 20 Jul 2015 17:16:49 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 96079 invoked by uid 99); 20 Jul 2015 17:16:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jul 2015 17:16:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27320E0F7C; Mon, 20 Jul 2015 17:16:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jing9@apache.org To: common-commits@hadoop.apache.org Message-Id: <9a01858b926446c9b10c854c402d7acb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HADOOP-12060. Fix ByteBuffer usage for raw erasure coders. Contributed by Kai Zheng. Date: Mon, 20 Jul 2015 17:16:49 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/HDFS-7285 06394e376 -> 29495cb8f HADOOP-12060. Fix ByteBuffer usage for raw erasure coders. Contributed by Kai Zheng. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/29495cb8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/29495cb8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/29495cb8 Branch: refs/heads/HDFS-7285 Commit: 29495cb8f6b940caa9964c39a290ef233ce1ec7c Parents: 06394e3 Author: Jing Zhao Authored: Mon Jul 20 10:15:14 2015 -0700 Committer: Jing Zhao Committed: Mon Jul 20 10:15:14 2015 -0700 ---------------------------------------------------------------------- .../hadoop-common/CHANGES-HDFS-EC-7285.txt | 5 +- .../apache/hadoop/io/erasurecode/CodecUtil.java | 38 ++++---- .../apache/hadoop/io/erasurecode/ECBlock.java | 14 +-- .../hadoop/io/erasurecode/ECBlockGroup.java | 6 +- .../apache/hadoop/io/erasurecode/ECChunk.java | 6 +- .../apache/hadoop/io/erasurecode/ECSchema.java | 18 ++-- .../hadoop/io/erasurecode/SchemaLoader.java | 3 +- .../rawcoder/AbstractRawErasureCoder.java | 42 +++++---- .../rawcoder/AbstractRawErasureDecoder.java | 50 +++++------ .../rawcoder/AbstractRawErasureEncoder.java | 27 +++--- .../rawcoder/RawErasureCoderFactory.java | 8 +- .../erasurecode/rawcoder/RawErasureDecoder.java | 25 ++++-- .../erasurecode/rawcoder/RawErasureEncoder.java | 24 ++++-- .../hadoop/io/erasurecode/BufferAllocator.java | 91 ++++++++++++++++++++ .../hadoop/io/erasurecode/TestCoderBase.java | 17 +++- .../erasurecode/coder/TestErasureCoderBase.java | 10 +-- .../erasurecode/rawcoder/TestRawCoderBase.java | 13 +-- 17 files changed, 268 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt index 9ccd3a7..1f3006e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt @@ -68,4 +68,7 @@ HADOOP-12065. Using more meaningful keys in EC schema. (Kai Zheng) - HDFS-8557. Allow to configure RS and XOR raw coders (Kai Zheng) \ No newline at end of file + HDFS-8557. Allow to configure RS and XOR raw coders (Kai Zheng) + + HADOOP-12060. Fix ByteBuffer usage for raw erasure coders. (Kai Zheng via + jing9) http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java index 5d22624..027d58b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/CodecUtil.java @@ -22,17 +22,17 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.erasurecode.rawcoder.*; /** - * A codec utility. + * A codec & coder utility to help create raw coders conveniently. */ public final class CodecUtil { - private CodecUtil() {} + private CodecUtil() { } /** * Create RS raw encoder according to configuration. - * @param conf - * @param numDataUnits - * @param numParityUnits + * @param conf configuration possibly with some items to configure the coder + * @param numDataUnits number of data units in a coding group + * @param numParityUnits number of parity units in a coding group * @return raw encoder */ public static RawErasureEncoder createRSRawEncoder( @@ -49,9 +49,9 @@ public final class CodecUtil { /** * Create RS raw decoder according to configuration. - * @param conf - * @param numDataUnits - * @param numParityUnits + * @param conf configuration possibly with some items to configure the coder + * @param numDataUnits number of data units in a coding group + * @param numParityUnits number of parity units in a coding group * @return raw decoder */ public static RawErasureDecoder createRSRawDecoder( @@ -68,9 +68,9 @@ public final class CodecUtil { /** * Create XOR raw encoder according to configuration. - * @param conf - * @param numDataUnits - * @param numParityUnits + * @param conf configuration possibly with some items to configure the coder + * @param numDataUnits number of data units in a coding group + * @param numParityUnits number of parity units in a coding group * @return raw encoder */ public static RawErasureEncoder createXORRawEncoder( @@ -87,9 +87,9 @@ public final class CodecUtil { /** * Create XOR raw decoder according to configuration. - * @param conf - * @param numDataUnits - * @param numParityUnits + * @param conf configuration possibly with some items to configure the coder + * @param numDataUnits number of data units in a coding group + * @param numParityUnits number of parity units in a coding group * @return raw decoder */ public static RawErasureDecoder createXORRawDecoder( @@ -106,11 +106,11 @@ public final class CodecUtil { /** * Create raw coder using specified conf and raw coder factory key. - * @param conf - * @param rawCoderFactoryKey - * @param isEncoder - * @param numDataUnits - * @param numParityUnits + * @param conf configuration possibly with some items to configure the coder + * @param rawCoderFactoryKey configuration key to find the raw coder factory + * @param isEncoder is encoder or not we're going to create + * @param numDataUnits number of data units in a coding group + * @param numParityUnits number of parity units in a coding group * @return raw coder */ public static RawErasureCoder createRawCoder(Configuration conf, http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlock.java index 956954a..5c0a160 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlock.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlock.java @@ -37,8 +37,8 @@ public class ECBlock { /** * A constructor specifying isParity and isErased. - * @param isParity - * @param isErased + * @param isParity is a parity block + * @param isErased is erased or not */ public ECBlock(boolean isParity, boolean isErased) { this.isParity = isParity; @@ -47,7 +47,7 @@ public class ECBlock { /** * Set true if it's for a parity block. - * @param isParity + * @param isParity is parity or not */ public void setParity(boolean isParity) { this.isParity = isParity; @@ -55,10 +55,10 @@ public class ECBlock { /** * Set true if the block is missing. - * @param isMissing + * @param isErased is erased or not */ - public void setErased(boolean isMissing) { - this.isErased = isMissing; + public void setErased(boolean isErased) { + this.isErased = isErased; } /** @@ -71,7 +71,7 @@ public class ECBlock { /** * - * @return true if it's missing or corrupt due to erasure, otherwise false + * @return true if it's erased due to erasure, otherwise false */ public boolean isErased() { return isErased; http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlockGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlockGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlockGroup.java index 0a86907..91e4fb8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlockGroup.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECBlockGroup.java @@ -27,8 +27,8 @@ public class ECBlockGroup { /** * A constructor specifying data blocks and parity blocks. - * @param dataBlocks - * @param parityBlocks + * @param dataBlocks data blocks in the group + * @param parityBlocks parity blocks in the group */ public ECBlockGroup(ECBlock[] dataBlocks, ECBlock[] parityBlocks) { this.dataBlocks = dataBlocks; @@ -81,7 +81,7 @@ public class ECBlockGroup { /** * Get erased blocks count - * @return + * @return erased count of blocks */ public int getErasedCount() { int erasedCount = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java index 310c738..d0120d8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECChunk.java @@ -28,7 +28,7 @@ public class ECChunk { /** * Wrapping a ByteBuffer - * @param buffer + * @param buffer buffer to be wrapped by the chunk */ public ECChunk(ByteBuffer buffer) { this.chunkBuffer = buffer; @@ -36,7 +36,7 @@ public class ECChunk { /** * Wrapping a bytes array - * @param buffer + * @param buffer buffer to be wrapped by the chunk */ public ECChunk(byte[] buffer) { this.chunkBuffer = ByteBuffer.wrap(buffer); @@ -52,7 +52,7 @@ public class ECChunk { /** * Convert an array of this chunks to an array of ByteBuffers - * @param chunks + * @param chunks chunks to convert into buffers * @return an array of ByteBuffers */ public static ByteBuffer[] toBuffers(ECChunk[] chunks) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java index 1e07d3d..fb02476 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ECSchema.java @@ -94,10 +94,10 @@ public final class ECSchema { /** * Constructor with key parameters provided. - * @param schemaName - * @param codecName - * @param numDataUnits - * @param numParityUnits + * @param schemaName schema name + * @param codecName codec name + * @param numDataUnits number of data units used in the schema + * @param numParityUnits number os parity units used in the schema */ public ECSchema(String schemaName, String codecName, int numDataUnits, int numParityUnits) { @@ -107,11 +107,11 @@ public final class ECSchema { /** * Constructor with key parameters provided. Note the extraOptions may contain * additional information for the erasure codec to interpret further. - * @param schemaName - * @param codecName - * @param numDataUnits - * @param numParityUnits - * @param extraOptions + * @param schemaName schema name + * @param codecName codec name + * @param numDataUnits number of data units used in the schema + * @param numParityUnits number os parity units used in the schema + * @param extraOptions extra options to configure the codec */ public ECSchema(String schemaName, String codecName, int numDataUnits, int numParityUnits, Map extraOptions) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java index 9b10c78..fce46f8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/SchemaLoader.java @@ -43,7 +43,8 @@ import org.xml.sax.SAXException; * A EC schema loading utility that loads predefined EC schemas from XML file */ public class SchemaLoader { - private static final Logger LOG = LoggerFactory.getLogger(SchemaLoader.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger( + SchemaLoader.class.getName()); /** * Load predefined ec schemas from configuration file. This file is http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java index e6a1542..4b7461e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java @@ -91,37 +91,45 @@ public abstract class AbstractRawErasureCoder } /** - * Check and ensure the buffers are of the length specified by dataLen. - * @param buffers - * @param allowNull - * @param dataLen + * Check and ensure the buffers are of the length specified by dataLen, also + * ensure the buffers are direct buffers or not according to isDirectBuffer. + * @param buffers the buffers to check + * @param allowNull whether to allow any element to be null or not + * @param dataLen the length of data available in the buffer to ensure with + * @param isDirectBuffer is direct buffer or not to ensure with */ - protected void ensureLength(ByteBuffer[] buffers, - boolean allowNull, int dataLen) { - for (int i = 0; i < buffers.length; ++i) { - if (buffers[i] == null && !allowNull) { + protected void ensureLengthAndType(ByteBuffer[] buffers, boolean allowNull, + int dataLen, boolean isDirectBuffer) { + for (ByteBuffer buffer : buffers) { + if (buffer == null && !allowNull) { throw new HadoopIllegalArgumentException( "Invalid buffer found, not allowing null"); - } else if (buffers[i] != null && buffers[i].remaining() != dataLen) { - throw new HadoopIllegalArgumentException( - "Invalid buffer, not of length " + dataLen); + } else if (buffer != null) { + if (buffer.remaining() != dataLen) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + dataLen); + } + if (buffer.isDirect() != isDirectBuffer) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, isDirect should be " + isDirectBuffer); + } } } } /** * Check and ensure the buffers are of the length specified by dataLen. - * @param buffers - * @param allowNull - * @param dataLen + * @param buffers the buffers to check + * @param allowNull whether to allow any element to be null or not + * @param dataLen the length of data available in the buffer to ensure with */ protected void ensureLength(byte[][] buffers, boolean allowNull, int dataLen) { - for (int i = 0; i < buffers.length; ++i) { - if (buffers[i] == null && !allowNull) { + for (byte[] buffer : buffers) { + if (buffer == null && !allowNull) { throw new HadoopIllegalArgumentException( "Invalid buffer found, not allowing null"); - } else if (buffers[i] != null && buffers[i].length != dataLen) { + } else if (buffer != null && buffer.length != dataLen) { throw new HadoopIllegalArgumentException( "Invalid buffer not of length " + dataLen); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java index c6105b0..931cda1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java @@ -41,14 +41,14 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder checkParameters(inputs, erasedIndexes, outputs); ByteBuffer validInput = findFirstValidInput(inputs); + boolean usingDirectBuffer = validInput.isDirect(); int dataLen = validInput.remaining(); if (dataLen == 0) { return; } - ensureLength(inputs, true, dataLen); - ensureLength(outputs, false, dataLen); + ensureLengthAndType(inputs, true, dataLen, usingDirectBuffer); + ensureLengthAndType(outputs, false, dataLen, usingDirectBuffer); - boolean usingDirectBuffer = validInput.isDirect(); if (usingDirectBuffer) { doDecode(inputs, erasedIndexes, outputs); return; @@ -63,14 +63,14 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder for (int i = 0; i < inputs.length; ++i) { buffer = inputs[i]; if (buffer != null) { - inputOffsets[i] = buffer.position(); + inputOffsets[i] = buffer.arrayOffset() + buffer.position(); newInputs[i] = buffer.array(); } } for (int i = 0; i < outputs.length; ++i) { buffer = outputs[i]; - outputOffsets[i] = buffer.position(); + outputOffsets[i] = buffer.arrayOffset() + buffer.position(); newOutputs[i] = buffer.array(); } @@ -81,7 +81,7 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder buffer = inputs[i]; if (buffer != null) { // dataLen bytes consumed - buffer.position(inputOffsets[i] + dataLen); + buffer.position(buffer.position() + dataLen); } } } @@ -89,7 +89,7 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder /** * Perform the real decoding using Direct ByteBuffer. * @param inputs Direct ByteBuffers expected - * @param erasedIndexes + * @param erasedIndexes indexes of erased units in the inputs array * @param outputs Direct ByteBuffers expected */ protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, @@ -117,12 +117,12 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder /** * Perform the real decoding using bytes array, supporting offsets and * lengths. - * @param inputs - * @param inputOffsets - * @param dataLen - * @param erasedIndexes - * @param outputs - * @param outputOffsets + * @param inputs the input byte arrays to read data from + * @param inputOffsets offsets for the input byte arrays to read data from + * @param dataLen how much data are to be read from + * @param erasedIndexes indexes of erased units in the inputs array + * @param outputs the output byte arrays to write resultant data into + * @param outputOffsets offsets from which to write resultant data into */ protected abstract void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen, int[] erasedIndexes, @@ -139,12 +139,12 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder /** * Check and validate decoding parameters, throw exception accordingly. The * checking assumes it's a MDS code. Other code can override this. - * @param inputs - * @param erasedIndexes - * @param outputs + * @param inputs input buffers to check + * @param erasedIndexes indexes of erased units in the inputs array + * @param outputs output buffers to check */ - protected void checkParameters(Object[] inputs, int[] erasedIndexes, - Object[] outputs) { + protected void checkParameters(T[] inputs, int[] erasedIndexes, + T[] outputs) { if (inputs.length != getNumParityUnits() + getNumDataUnits()) { throw new IllegalArgumentException("Invalid inputs length"); } @@ -160,8 +160,8 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder } int validInputs = 0; - for (int i = 0; i < inputs.length; ++i) { - if (inputs[i] != null) { + for (T input : inputs) { + if (input != null) { validInputs += 1; } } @@ -177,7 +177,7 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder * not to read. * @return indexes into inputs array */ - protected int[] getErasedOrNotToReadIndexes(Object[] inputs) { + protected int[] getErasedOrNotToReadIndexes(T[] inputs) { int[] invalidIndexes = new int[inputs.length]; int idx = 0; for (int i = 0; i < inputs.length; i++) { @@ -191,13 +191,13 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder /** * Find the valid input from all the inputs. - * @param inputs + * @param inputs input buffers to look for valid input * @return the first valid input */ protected static T findFirstValidInput(T[] inputs) { - for (int i = 0; i < inputs.length; i++) { - if (inputs[i] != null) { - return inputs[i]; + for (T input : inputs) { + if (input != null) { + return input; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java index d1faa8c..a0b3cfe 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java @@ -37,14 +37,15 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder @Override public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) { checkParameters(inputs, outputs); + + boolean usingDirectBuffer = inputs[0].isDirect(); int dataLen = inputs[0].remaining(); if (dataLen == 0) { return; } - ensureLength(inputs, false, dataLen); - ensureLength(outputs, false, dataLen); + ensureLengthAndType(inputs, false, dataLen, usingDirectBuffer); + ensureLengthAndType(outputs, false, dataLen, usingDirectBuffer); - boolean usingDirectBuffer = inputs[0].isDirect(); if (usingDirectBuffer) { doEncode(inputs, outputs); return; @@ -58,13 +59,13 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder ByteBuffer buffer; for (int i = 0; i < inputs.length; ++i) { buffer = inputs[i]; - inputOffsets[i] = buffer.position(); + inputOffsets[i] = buffer.arrayOffset() + buffer.position(); newInputs[i] = buffer.array(); } for (int i = 0; i < outputs.length; ++i) { buffer = outputs[i]; - outputOffsets[i] = buffer.position(); + outputOffsets[i] = buffer.arrayOffset() + buffer.position(); newOutputs[i] = buffer.array(); } @@ -102,11 +103,11 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder /** * Perform the real encoding work using bytes array, supporting offsets * and lengths. - * @param inputs - * @param inputOffsets - * @param dataLen - * @param outputs - * @param outputOffsets + * @param inputs the input byte arrays to read data from + * @param inputOffsets offsets for the input byte arrays to read data from + * @param dataLen how much data are to be read from + * @param outputs the output byte arrays to write resultant data into + * @param outputOffsets offsets from which to write resultant data into */ protected abstract void doEncode(byte[][] inputs, int[] inputOffsets, int dataLen, byte[][] outputs, @@ -121,10 +122,10 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder /** * Check and validate decoding parameters, throw exception accordingly. - * @param inputs - * @param outputs + * @param inputs input buffers to check + * @param outputs output buffers to check */ - protected void checkParameters(Object[] inputs, Object[] outputs) { + protected void checkParameters(T[] inputs, T[] outputs) { if (inputs.length != getNumDataUnits()) { throw new HadoopIllegalArgumentException("Invalid inputs length"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java index 26eddfc..280daf3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java @@ -26,16 +26,16 @@ public interface RawErasureCoderFactory { /** * Create raw erasure encoder. - * @param numDataUnits - * @param numParityUnits + * @param numDataUnits number of data units in a coding group + * @param numParityUnits number of parity units in a coding group * @return raw erasure encoder */ public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits); /** * Create raw erasure decoder. - * @param numDataUnits - * @param numParityUnits + * @param numDataUnits number of data units in a coding group + * @param numParityUnits number of parity units in a coding group * @return raw erasure decoder */ public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits); http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java index ad7f32d..e2d01d9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java @@ -33,7 +33,8 @@ public interface RawErasureDecoder extends RawErasureCoder { /** * Decode with inputs and erasedIndexes, generates outputs. * How to prepare for inputs: - * 1. Create an array containing parity units + data units; + * 1. Create an array containing parity units + data units. Please note the + * parity units should be first or before the data units. * 2. Set null in the array locations specified via erasedIndexes to indicate * they're erased and no data are to read from; * 3. Set null in the array locations for extra redundant items, as they're @@ -48,29 +49,39 @@ public interface RawErasureDecoder extends RawErasureCoder { * erasedIndexes = [5] // index of d2 into inputs array * outputs = [a-writable-buffer] * - * @param inputs inputs to read data from + * Note, for both inputs and outputs, no mixing of on-heap buffers and direct + * buffers are allowed. + * + * @param inputs inputs to read data from, contents may change after the call * @param erasedIndexes indexes of erased units in the inputs array * @param outputs outputs to write into for data generated according to - * erasedIndexes + * erasedIndexes, ready for reading the result data from after + * the call */ public void decode(ByteBuffer[] inputs, int[] erasedIndexes, ByteBuffer[] outputs); /** * Decode with inputs and erasedIndexes, generates outputs. More see above. - * @param inputs inputs to read data from + * @param inputs inputs to read data from, contents may change after the call * @param erasedIndexes indexes of erased units in the inputs array * @param outputs outputs to write into for data generated according to - * erasedIndexes + * erasedIndexes, ready for reading the result data from after + * the call */ public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs); /** * Decode with inputs and erasedIndexes, generates outputs. More see above. - * @param inputs inputs to read data from + * + * Note, for both input and output ECChunks, no mixing of on-heap buffers and + * direct buffers are allowed. + * + * @param inputs inputs to read data from, contents may change after the call * @param erasedIndexes indexes of erased units in the inputs array * @param outputs outputs to write into for data generated according to - * erasedIndexes + * erasedIndexes, ready for reading the result data from after + * the call */ public void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs); http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java index 974f86c..7571f09 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java @@ -31,23 +31,33 @@ import java.nio.ByteBuffer; public interface RawErasureEncoder extends RawErasureCoder { /** - * Encode with inputs and generates outputs - * @param inputs + * Encode with inputs and generates outputs. + * + * Note, for both inputs and outputs, no mixing of on-heap buffers and direct + * buffers are allowed. + * + * @param inputs inputs to read data from, contents may change after the call * @param outputs */ public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs); /** * Encode with inputs and generates outputs - * @param inputs - * @param outputs + * @param inputs inputs to read data from, contents may change after the call + * @param outputs outputs to write into for data generated, ready for reading + * the result data from after the call */ public void encode(byte[][] inputs, byte[][] outputs); /** - * Encode with inputs and generates outputs - * @param inputs - * @param outputs + * Encode with inputs and generates outputs. + * + * Note, for both input and output ECChunks, no mixing of on-heap buffers and + * direct buffers are allowed. + * + * @param inputs inputs to read data from, contents may change after the call + * @param outputs outputs to write into for data generated, ready for reading + * the result data from after the call */ public void encode(ECChunk[] inputs, ECChunk[] outputs); http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/BufferAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/BufferAllocator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/BufferAllocator.java new file mode 100644 index 0000000..8f552b7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/BufferAllocator.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode; + + +import java.nio.ByteBuffer; + +/** + * An abstract buffer allocator used for test. + */ +public abstract class BufferAllocator { + private boolean usingDirect = false; + + public BufferAllocator(boolean usingDirect) { + this.usingDirect = usingDirect; + } + + protected boolean isUsingDirect() { + return usingDirect; + } + + /** + * Allocate and return a ByteBuffer of specified length. + * @param bufferLen + * @return + */ + public abstract ByteBuffer allocate(int bufferLen); + + /** + * A simple buffer allocator that just uses ByteBuffer's + * allocate/allocateDirect API. + */ + public static class SimpleBufferAllocator extends BufferAllocator { + + public SimpleBufferAllocator(boolean usingDirect) { + super(usingDirect); + } + + @Override + public ByteBuffer allocate(int bufferLen) { + return isUsingDirect() ? ByteBuffer.allocateDirect(bufferLen) : + ByteBuffer.allocate(bufferLen); + } + } + + /** + * A buffer allocator that allocates a buffer from an existing large buffer by + * slice calling, but if no available space just degrades as + * SimpleBufferAllocator. So please ensure enough space for it. + */ + public static class SlicedBufferAllocator extends BufferAllocator { + private ByteBuffer overallBuffer; + + public SlicedBufferAllocator(boolean usingDirect, int totalBufferLen) { + super(usingDirect); + overallBuffer = isUsingDirect() ? + ByteBuffer.allocateDirect(totalBufferLen) : + ByteBuffer.allocate(totalBufferLen); + } + + @Override + public ByteBuffer allocate(int bufferLen) { + if (bufferLen > overallBuffer.capacity() - overallBuffer.position()) { + // If no available space for the requested length, then allocate new + return isUsingDirect() ? ByteBuffer.allocateDirect(bufferLen) : + ByteBuffer.allocate(bufferLen); + } + + overallBuffer.limit(overallBuffer.position() + bufferLen); + ByteBuffer result = overallBuffer.slice(); + overallBuffer.position(overallBuffer.position() + bufferLen); + return result; + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java index 10edae8..8f277f4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java @@ -18,6 +18,8 @@ package org.apache.hadoop.io.erasurecode; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.erasurecode.BufferAllocator.SimpleBufferAllocator; +import org.apache.hadoop.io.erasurecode.BufferAllocator.SlicedBufferAllocator; import org.apache.hadoop.io.erasurecode.rawcoder.util.DumpUtil; import java.nio.ByteBuffer; @@ -40,6 +42,7 @@ public abstract class TestCoderBase { protected int numParityUnits; protected int baseChunkSize = 513; private int chunkSize = baseChunkSize; + private BufferAllocator allocator; private byte[] zeroChunkBytes; @@ -70,6 +73,17 @@ public abstract class TestCoderBase { this.zeroChunkBytes = new byte[chunkSize]; // With ZERO by default } + protected void prepareBufferAllocator(boolean usingSlicedBuffer) { + if (usingSlicedBuffer) { + int roughEstimationSpace = + chunkSize * (numDataUnits + numParityUnits) * 10; + allocator = new SlicedBufferAllocator(usingDirectBuffer, + roughEstimationSpace); + } else { + allocator = new SimpleBufferAllocator(usingDirectBuffer); + } + } + /** * Set true during setup if want to dump test settings and coding data, * useful in debugging. @@ -299,8 +313,7 @@ public abstract class TestCoderBase { */ int startOffset = startBufferWithZero ? 0 : 11; // 11 is arbitrary int allocLen = startOffset + bufferLen + startOffset; - ByteBuffer buffer = usingDirectBuffer ? - ByteBuffer.allocateDirect(allocLen) : ByteBuffer.allocate(allocLen); + ByteBuffer buffer = allocator.allocate(allocLen); buffer.limit(startOffset + bufferLen); fillDummyData(buffer, startOffset); startBufferWithZero = ! startBufferWithZero; http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java index 98fa956..738d28e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java @@ -65,14 +65,14 @@ public abstract class TestErasureCoderBase extends TestCoderBase { * The following runs will use 3 different chunkSize for inputs and outputs, * to verify the same encoder/decoder can process variable width of data. */ - performTestCoding(baseChunkSize); - performTestCoding(baseChunkSize - 17); - performTestCoding(baseChunkSize + 16); + performTestCoding(baseChunkSize, true); + performTestCoding(baseChunkSize - 17, false); + performTestCoding(baseChunkSize + 16, true); } - private void performTestCoding(int chunkSize) { + private void performTestCoding(int chunkSize, boolean usingSlicedBuffer) { setChunkSize(chunkSize); - + prepareBufferAllocator(usingSlicedBuffer); // Generate data and encode ECBlockGroup blockGroup = prepareBlockGroupForEncoding(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/29495cb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java index 587ce96..2b7a3c4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java @@ -68,9 +68,9 @@ public abstract class TestRawCoderBase extends TestCoderBase { * The following runs will use 3 different chunkSize for inputs and outputs, * to verify the same encoder/decoder can process variable width of data. */ - performTestCoding(baseChunkSize, false, false); - performTestCoding(baseChunkSize - 17, false, false); - performTestCoding(baseChunkSize + 16, false, false); + performTestCoding(baseChunkSize, true, false, false); + performTestCoding(baseChunkSize - 17, false, false, false); + performTestCoding(baseChunkSize + 16, true, false, false); } /** @@ -82,7 +82,7 @@ public abstract class TestRawCoderBase extends TestCoderBase { prepareCoders(); try { - performTestCoding(baseChunkSize, true, false); + performTestCoding(baseChunkSize, false, true, false); Assert.fail("Encoding test with bad input should fail"); } catch (Exception e) { // Expected @@ -98,7 +98,7 @@ public abstract class TestRawCoderBase extends TestCoderBase { prepareCoders(); try { - performTestCoding(baseChunkSize, false, true); + performTestCoding(baseChunkSize, false, false, true); Assert.fail("Decoding test with bad output should fail"); } catch (Exception e) { // Expected @@ -122,9 +122,10 @@ public abstract class TestRawCoderBase extends TestCoderBase { } } - private void performTestCoding(int chunkSize, + private void performTestCoding(int chunkSize, boolean usingSlicedBuffer, boolean useBadInput, boolean useBadOutput) { setChunkSize(chunkSize); + prepareBufferAllocator(usingSlicedBuffer); dumpSetting();