Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2A94918DEA for ; Mon, 10 Aug 2015 21:03:56 +0000 (UTC) Received: (qmail 8780 invoked by uid 500); 10 Aug 2015 21:03:49 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 8651 invoked by uid 500); 10 Aug 2015 21:03:49 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 7623 invoked by uid 99); 10 Aug 2015 21:03:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Aug 2015 21:03:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB12BE113A; Mon, 10 Aug 2015 21:03:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Mon, 10 Aug 2015 21:04:04 -0000 Message-Id: <875e2ba6058749f3bba772a51dcceff7@git.apache.org> In-Reply-To: <966f1d8861a3402888bdf6c685e5182a@git.apache.org> References: <966f1d8861a3402888bdf6c685e5182a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/18] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS. http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java new file mode 100644 index 0000000..3ed3e20 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/RSErasureEncoder.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder; + +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ECBlock; +import org.apache.hadoop.io.erasurecode.ECBlockGroup; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; + +/** + * Reed-Solomon erasure encoder that encodes a block group. + * + * It implements {@link ErasureCoder}. + */ +public class RSErasureEncoder extends AbstractErasureEncoder { + private RawErasureEncoder rawEncoder; + + public RSErasureEncoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + } + + public RSErasureEncoder(ECSchema schema) { + super(schema); + } + + @Override + protected ErasureCodingStep prepareEncodingStep(final ECBlockGroup blockGroup) { + + RawErasureEncoder rawEncoder = checkCreateRSRawEncoder(); + + ECBlock[] inputBlocks = getInputBlocks(blockGroup); + + return new ErasureEncodingStep(inputBlocks, + getOutputBlocks(blockGroup), rawEncoder); + } + + private RawErasureEncoder checkCreateRSRawEncoder() { + if (rawEncoder == null) { + rawEncoder = CodecUtil.createRSRawEncoder(getConf(), + getNumDataUnits(), getNumParityUnits()); + } + return rawEncoder; + } + + @Override + public void release() { + if (rawEncoder != null) { + rawEncoder.release(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java new file mode 100644 index 0000000..a847418 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureDecoder.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder; + +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ECBlock; +import org.apache.hadoop.io.erasurecode.ECBlockGroup; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; + +/** + * Xor erasure decoder that decodes a block group. + * + * It implements {@link ErasureCoder}. + */ +public class XORErasureDecoder extends AbstractErasureDecoder { + + public XORErasureDecoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + } + + public XORErasureDecoder(ECSchema schema) { + super(schema); + } + + @Override + protected ErasureCodingStep prepareDecodingStep( + final ECBlockGroup blockGroup) { + RawErasureDecoder rawDecoder = CodecUtil.createXORRawDecoder(getConf(), + getNumDataUnits(), getNumParityUnits()); + + ECBlock[] inputBlocks = getInputBlocks(blockGroup); + + return new ErasureDecodingStep(inputBlocks, + getErasedIndexes(inputBlocks), + getOutputBlocks(blockGroup), rawDecoder); + } + + /** + * Which blocks were erased ? For XOR it's simple we only allow and return one + * erased block, either data or parity. + * @param blockGroup + * @return output blocks to recover + */ + @Override + protected ECBlock[] getOutputBlocks(ECBlockGroup blockGroup) { + /** + * If more than one blocks (either data or parity) erased, then it's not + * edible to recover. We don't have the check here since it will be done + * by upper level: ErasreCoder call can be avoid if not possible to recover + * at all. + */ + int erasedNum = getNumErasedBlocks(blockGroup); + ECBlock[] outputBlocks = new ECBlock[erasedNum]; + + int idx = 0; + for (int i = 0; i < getNumParityUnits(); i++) { + if (blockGroup.getParityBlocks()[i].isErased()) { + outputBlocks[idx++] = blockGroup.getParityBlocks()[i]; + } + } + + for (int i = 0; i < getNumDataUnits(); i++) { + if (blockGroup.getDataBlocks()[i].isErased()) { + outputBlocks[idx++] = blockGroup.getDataBlocks()[i]; + } + } + + return outputBlocks; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java new file mode 100644 index 0000000..5c4bcdd --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/coder/XORErasureEncoder.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.coder; + +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ECBlock; +import org.apache.hadoop.io.erasurecode.ECBlockGroup; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; + +/** + * Xor erasure encoder that encodes a block group. + * + * It implements {@link ErasureCoder}. + */ +public class XORErasureEncoder extends AbstractErasureEncoder { + + public XORErasureEncoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + } + + public XORErasureEncoder(ECSchema schema) { + super(schema); + } + + @Override + protected ErasureCodingStep prepareEncodingStep( + final ECBlockGroup blockGroup) { + RawErasureEncoder rawEncoder = CodecUtil.createXORRawEncoder(getConf(), + getNumDataUnits(), getNumParityUnits()); + + ECBlock[] inputBlocks = getInputBlocks(blockGroup); + + return new ErasureEncodingStep(inputBlocks, + getOutputBlocks(blockGroup), rawEncoder); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/grouper/BlockGrouper.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/grouper/BlockGrouper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/grouper/BlockGrouper.java new file mode 100644 index 0000000..bdc1624 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/grouper/BlockGrouper.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.grouper; + +import org.apache.hadoop.io.erasurecode.ECBlock; +import org.apache.hadoop.io.erasurecode.ECBlockGroup; +import org.apache.hadoop.io.erasurecode.ECSchema; + +/** + * As part of a codec, to handle how to form a block group for encoding + * and provide instructions on how to recover erased blocks from a block group + */ +public class BlockGrouper { + + private ECSchema schema; + + /** + * Set EC schema. + * @param schema + */ + public void setSchema(ECSchema schema) { + this.schema = schema; + } + + /** + * Get EC schema. + * @return + */ + protected ECSchema getSchema() { + return schema; + } + + /** + * Get required data blocks count in a BlockGroup. + * @return count of required data blocks + */ + public int getRequiredNumDataBlocks() { + return schema.getNumDataUnits(); + } + + /** + * Get required parity blocks count in a BlockGroup. + * @return count of required parity blocks + */ + public int getRequiredNumParityBlocks() { + return schema.getNumParityUnits(); + } + + /** + * Calculating and organizing BlockGroup, to be called by ECManager + * @param dataBlocks Data blocks to compute parity blocks against + * @param parityBlocks To be computed parity blocks + * @return + */ + public ECBlockGroup makeBlockGroup(ECBlock[] dataBlocks, + ECBlock[] parityBlocks) { + + ECBlockGroup blockGroup = new ECBlockGroup(dataBlocks, parityBlocks); + return blockGroup; + } + + /** + * Given a BlockGroup, tell if any of the missing blocks can be recovered, + * to be called by ECManager + * @param blockGroup a blockGroup that may contain erased blocks but not sure + * recoverable or not + * @return true if any erased block recoverable, false otherwise + */ + public boolean anyRecoverable(ECBlockGroup blockGroup) { + int erasedCount = blockGroup.getErasedCount(); + + return erasedCount > 0 && erasedCount <= getRequiredNumParityBlocks(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java new file mode 100644 index 0000000..4b7461e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configured; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * A common class of basic facilities to be shared by encoder and decoder + * + * It implements the {@link RawErasureCoder} interface. + */ +public abstract class AbstractRawErasureCoder + extends Configured implements RawErasureCoder { + + private final int numDataUnits; + private final int numParityUnits; + + public AbstractRawErasureCoder(int numDataUnits, int numParityUnits) { + this.numDataUnits = numDataUnits; + this.numParityUnits = numParityUnits; + } + + @Override + public int getNumDataUnits() { + return numDataUnits; + } + + @Override + public int getNumParityUnits() { + return numParityUnits; + } + + @Override + public boolean preferDirectBuffer() { + return false; + } + + @Override + public void release() { + // Nothing to do by default + } + + /** + * Ensure a buffer filled with ZERO bytes from current readable/writable + * position. + * @param buffer a buffer ready to read / write certain size bytes + * @return the buffer itself, with ZERO bytes written, the position and limit + * are not changed after the call + */ + protected ByteBuffer resetBuffer(ByteBuffer buffer) { + int pos = buffer.position(); + for (int i = pos; i < buffer.limit(); ++i) { + buffer.put((byte) 0); + } + buffer.position(pos); + + return buffer; + } + + /** + * Ensure the buffer (either input or output) ready to read or write with ZERO + * bytes fully in specified length of len. + * @param buffer bytes array buffer + * @return the buffer itself + */ + protected byte[] resetBuffer(byte[] buffer, int offset, int len) { + for (int i = offset; i < len; ++i) { + buffer[i] = (byte) 0; + } + + return buffer; + } + + /** + * Check and ensure the buffers are of the length specified by dataLen, also + * ensure the buffers are direct buffers or not according to isDirectBuffer. + * @param buffers the buffers to check + * @param allowNull whether to allow any element to be null or not + * @param dataLen the length of data available in the buffer to ensure with + * @param isDirectBuffer is direct buffer or not to ensure with + */ + protected void ensureLengthAndType(ByteBuffer[] buffers, boolean allowNull, + int dataLen, boolean isDirectBuffer) { + for (ByteBuffer buffer : buffers) { + if (buffer == null && !allowNull) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } else if (buffer != null) { + if (buffer.remaining() != dataLen) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, not of length " + dataLen); + } + if (buffer.isDirect() != isDirectBuffer) { + throw new HadoopIllegalArgumentException( + "Invalid buffer, isDirect should be " + isDirectBuffer); + } + } + } + } + + /** + * Check and ensure the buffers are of the length specified by dataLen. + * @param buffers the buffers to check + * @param allowNull whether to allow any element to be null or not + * @param dataLen the length of data available in the buffer to ensure with + */ + protected void ensureLength(byte[][] buffers, + boolean allowNull, int dataLen) { + for (byte[] buffer : buffers) { + if (buffer == null && !allowNull) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } else if (buffer != null && buffer.length != dataLen) { + throw new HadoopIllegalArgumentException( + "Invalid buffer not of length " + dataLen); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java new file mode 100644 index 0000000..931cda1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.io.erasurecode.ECChunk; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * An abstract raw erasure decoder that's to be inherited by new decoders. + * + * It implements the {@link RawErasureDecoder} interface. + */ +public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder + implements RawErasureDecoder { + + public AbstractRawErasureDecoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + } + + @Override + public void decode(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs) { + checkParameters(inputs, erasedIndexes, outputs); + + ByteBuffer validInput = findFirstValidInput(inputs); + boolean usingDirectBuffer = validInput.isDirect(); + int dataLen = validInput.remaining(); + if (dataLen == 0) { + return; + } + ensureLengthAndType(inputs, true, dataLen, usingDirectBuffer); + ensureLengthAndType(outputs, false, dataLen, usingDirectBuffer); + + if (usingDirectBuffer) { + doDecode(inputs, erasedIndexes, outputs); + return; + } + + int[] inputOffsets = new int[inputs.length]; + int[] outputOffsets = new int[outputs.length]; + byte[][] newInputs = new byte[inputs.length][]; + byte[][] newOutputs = new byte[outputs.length][]; + + ByteBuffer buffer; + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + if (buffer != null) { + inputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newInputs[i] = buffer.array(); + } + } + + for (int i = 0; i < outputs.length; ++i) { + buffer = outputs[i]; + outputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newOutputs[i] = buffer.array(); + } + + doDecode(newInputs, inputOffsets, dataLen, + erasedIndexes, newOutputs, outputOffsets); + + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + if (buffer != null) { + // dataLen bytes consumed + buffer.position(buffer.position() + dataLen); + } + } + } + + /** + * Perform the real decoding using Direct ByteBuffer. + * @param inputs Direct ByteBuffers expected + * @param erasedIndexes indexes of erased units in the inputs array + * @param outputs Direct ByteBuffers expected + */ + protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs); + + @Override + public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) { + checkParameters(inputs, erasedIndexes, outputs); + + byte[] validInput = findFirstValidInput(inputs); + int dataLen = validInput.length; + if (dataLen == 0) { + return; + } + ensureLength(inputs, true, dataLen); + ensureLength(outputs, false, dataLen); + + int[] inputOffsets = new int[inputs.length]; // ALL ZERO + int[] outputOffsets = new int[outputs.length]; // ALL ZERO + + doDecode(inputs, inputOffsets, dataLen, erasedIndexes, outputs, + outputOffsets); + } + + /** + * Perform the real decoding using bytes array, supporting offsets and + * lengths. + * @param inputs the input byte arrays to read data from + * @param inputOffsets offsets for the input byte arrays to read data from + * @param dataLen how much data are to be read from + * @param erasedIndexes indexes of erased units in the inputs array + * @param outputs the output byte arrays to write resultant data into + * @param outputOffsets offsets from which to write resultant data into + */ + protected abstract void doDecode(byte[][] inputs, int[] inputOffsets, + int dataLen, int[] erasedIndexes, + byte[][] outputs, int[] outputOffsets); + + @Override + public void decode(ECChunk[] inputs, int[] erasedIndexes, + ECChunk[] outputs) { + ByteBuffer[] newInputs = ECChunk.toBuffers(inputs); + ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs); + decode(newInputs, erasedIndexes, newOutputs); + } + + /** + * Check and validate decoding parameters, throw exception accordingly. The + * checking assumes it's a MDS code. Other code can override this. + * @param inputs input buffers to check + * @param erasedIndexes indexes of erased units in the inputs array + * @param outputs output buffers to check + */ + protected void checkParameters(T[] inputs, int[] erasedIndexes, + T[] outputs) { + if (inputs.length != getNumParityUnits() + getNumDataUnits()) { + throw new IllegalArgumentException("Invalid inputs length"); + } + + if (erasedIndexes.length != outputs.length) { + throw new HadoopIllegalArgumentException( + "erasedIndexes and outputs mismatch in length"); + } + + if (erasedIndexes.length > getNumParityUnits()) { + throw new HadoopIllegalArgumentException( + "Too many erased, not recoverable"); + } + + int validInputs = 0; + for (T input : inputs) { + if (input != null) { + validInputs += 1; + } + } + + if (validInputs < getNumDataUnits()) { + throw new HadoopIllegalArgumentException( + "No enough valid inputs are provided, not recoverable"); + } + } + + /** + * Get indexes into inputs array for items marked as null, either erased or + * not to read. + * @return indexes into inputs array + */ + protected int[] getErasedOrNotToReadIndexes(T[] inputs) { + int[] invalidIndexes = new int[inputs.length]; + int idx = 0; + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] == null) { + invalidIndexes[idx++] = i; + } + } + + return Arrays.copyOf(invalidIndexes, idx); + } + + /** + * Find the valid input from all the inputs. + * @param inputs input buffers to look for valid input + * @return the first valid input + */ + protected static T findFirstValidInput(T[] inputs) { + for (T input : inputs) { + if (input != null) { + return input; + } + } + + throw new HadoopIllegalArgumentException( + "Invalid inputs are found, all being null"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java new file mode 100644 index 0000000..a0b3cfe --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.io.erasurecode.ECChunk; + +import java.nio.ByteBuffer; + +/** + * An abstract raw erasure encoder that's to be inherited by new encoders. + * + * It implements the {@link RawErasureEncoder} interface. + */ +public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder + implements RawErasureEncoder { + + public AbstractRawErasureEncoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + } + + @Override + public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) { + checkParameters(inputs, outputs); + + boolean usingDirectBuffer = inputs[0].isDirect(); + int dataLen = inputs[0].remaining(); + if (dataLen == 0) { + return; + } + ensureLengthAndType(inputs, false, dataLen, usingDirectBuffer); + ensureLengthAndType(outputs, false, dataLen, usingDirectBuffer); + + if (usingDirectBuffer) { + doEncode(inputs, outputs); + return; + } + + int[] inputOffsets = new int[inputs.length]; + int[] outputOffsets = new int[outputs.length]; + byte[][] newInputs = new byte[inputs.length][]; + byte[][] newOutputs = new byte[outputs.length][]; + + ByteBuffer buffer; + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + inputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newInputs[i] = buffer.array(); + } + + for (int i = 0; i < outputs.length; ++i) { + buffer = outputs[i]; + outputOffsets[i] = buffer.arrayOffset() + buffer.position(); + newOutputs[i] = buffer.array(); + } + + doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets); + + for (int i = 0; i < inputs.length; ++i) { + buffer = inputs[i]; + buffer.position(buffer.position() + dataLen); // dataLen bytes consumed + } + } + + /** + * Perform the real encoding work using direct ByteBuffer + * @param inputs Direct ByteBuffers expected + * @param outputs Direct ByteBuffers expected + */ + protected abstract void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs); + + @Override + public void encode(byte[][] inputs, byte[][] outputs) { + checkParameters(inputs, outputs); + int dataLen = inputs[0].length; + if (dataLen == 0) { + return; + } + ensureLength(inputs, false, dataLen); + ensureLength(outputs, false, dataLen); + + int[] inputOffsets = new int[inputs.length]; // ALL ZERO + int[] outputOffsets = new int[outputs.length]; // ALL ZERO + + doEncode(inputs, inputOffsets, dataLen, outputs, outputOffsets); + } + + /** + * Perform the real encoding work using bytes array, supporting offsets + * and lengths. + * @param inputs the input byte arrays to read data from + * @param inputOffsets offsets for the input byte arrays to read data from + * @param dataLen how much data are to be read from + * @param outputs the output byte arrays to write resultant data into + * @param outputOffsets offsets from which to write resultant data into + */ + protected abstract void doEncode(byte[][] inputs, int[] inputOffsets, + int dataLen, byte[][] outputs, + int[] outputOffsets); + + @Override + public void encode(ECChunk[] inputs, ECChunk[] outputs) { + ByteBuffer[] newInputs = ECChunk.toBuffers(inputs); + ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs); + encode(newInputs, newOutputs); + } + + /** + * Check and validate decoding parameters, throw exception accordingly. + * @param inputs input buffers to check + * @param outputs output buffers to check + */ + protected void checkParameters(T[] inputs, T[] outputs) { + if (inputs.length != getNumDataUnits()) { + throw new HadoopIllegalArgumentException("Invalid inputs length"); + } + if (outputs.length != getNumParityUnits()) { + throw new HadoopIllegalArgumentException("Invalid outputs length"); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java new file mode 100644 index 0000000..57e6957 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil; + +import java.nio.ByteBuffer; + +/** + * A raw erasure decoder in RS code scheme in pure Java in case native one + * isn't available in some environment. Please always use native implementations + * when possible. + * + * Currently this implementation will compute and decode not to read units + * unnecessarily due to the underlying implementation limit in GF. This will be + * addressed in HADOOP-11871. + */ +public class RSRawDecoder extends AbstractRawErasureDecoder { + // To describe and calculate the needed Vandermonde matrix + private int[] errSignature; + private int[] primitivePower; + + /** + * We need a set of reusable buffers either for the bytes array + * decoding version or direct buffer decoding version. Normally not both. + * + * For output, in addition to the valid buffers from the caller + * passed from above, we need to provide extra buffers for the internal + * decoding implementation. For output, the caller should provide no more + * than numParityUnits but at least one buffers. And the left buffers will be + * borrowed from either bytesArrayBuffers, for the bytes array version. + * + */ + // Reused buffers for decoding with bytes arrays + private byte[][] bytesArrayBuffers = new byte[getNumParityUnits()][]; + private byte[][] adjustedByteArrayOutputsParameter = + new byte[getNumParityUnits()][]; + private int[] adjustedOutputOffsets = new int[getNumParityUnits()]; + + // Reused buffers for decoding with direct ByteBuffers + private ByteBuffer[] directBuffers = new ByteBuffer[getNumParityUnits()]; + private ByteBuffer[] adjustedDirectBufferOutputsParameter = + new ByteBuffer[getNumParityUnits()]; + + public RSRawDecoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) { + throw new HadoopIllegalArgumentException( + "Invalid numDataUnits and numParityUnits"); + } + + this.errSignature = new int[numParityUnits]; + this.primitivePower = RSUtil.getPrimitivePower(numDataUnits, + numParityUnits); + } + + private void doDecodeImpl(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs) { + ByteBuffer valid = findFirstValidInput(inputs); + int dataLen = valid.remaining(); + for (int i = 0; i < erasedIndexes.length; i++) { + errSignature[i] = primitivePower[erasedIndexes[i]]; + RSUtil.GF.substitute(inputs, dataLen, outputs[i], primitivePower[i]); + } + + RSUtil.GF.solveVandermondeSystem(errSignature, + outputs, erasedIndexes.length); + } + + private void doDecodeImpl(byte[][] inputs, int[] inputOffsets, + int dataLen, int[] erasedIndexes, + byte[][] outputs, int[] outputOffsets) { + for (int i = 0; i < erasedIndexes.length; i++) { + errSignature[i] = primitivePower[erasedIndexes[i]]; + RSUtil.GF.substitute(inputs, inputOffsets, dataLen, outputs[i], + outputOffsets[i], primitivePower[i]); + } + + RSUtil.GF.solveVandermondeSystem(errSignature, outputs, outputOffsets, + erasedIndexes.length, dataLen); + } + + @Override + protected void doDecode(byte[][] inputs, int[] inputOffsets, + int dataLen, int[] erasedIndexes, + byte[][] outputs, int[] outputOffsets) { + /** + * As passed parameters are friendly to callers but not to the underlying + * implementations, so we have to adjust them before calling doDecodeImpl. + */ + + int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs); + + // Prepare for adjustedOutputsParameter + + // First reset the positions needed this time + for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) { + adjustedByteArrayOutputsParameter[i] = null; + adjustedOutputOffsets[i] = 0; + } + // Use the caller passed buffers in erasedIndexes positions + for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) { + boolean found = false; + for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) { + // If this index is one requested by the caller via erasedIndexes, then + // we use the passed output buffer to avoid copying data thereafter. + if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) { + found = true; + adjustedByteArrayOutputsParameter[j] = resetBuffer( + outputs[outputIdx], outputOffsets[outputIdx], dataLen); + adjustedOutputOffsets[j] = outputOffsets[outputIdx]; + outputIdx++; + } + } + if (!found) { + throw new HadoopIllegalArgumentException( + "Inputs not fully corresponding to erasedIndexes in null places"); + } + } + // Use shared buffers for other positions (not set yet) + for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) { + if (adjustedByteArrayOutputsParameter[i] == null) { + adjustedByteArrayOutputsParameter[i] = resetBuffer( + checkGetBytesArrayBuffer(bufferIdx, dataLen), 0, dataLen); + adjustedOutputOffsets[i] = 0; // Always 0 for such temp output + bufferIdx++; + } + } + + doDecodeImpl(inputs, inputOffsets, dataLen, erasedOrNotToReadIndexes, + adjustedByteArrayOutputsParameter, adjustedOutputOffsets); + } + + @Override + protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs) { + ByteBuffer validInput = findFirstValidInput(inputs); + int dataLen = validInput.remaining(); + + /** + * As passed parameters are friendly to callers but not to the underlying + * implementations, so we have to adjust them before calling doDecodeImpl. + */ + + int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs); + + // Prepare for adjustedDirectBufferOutputsParameter + + // First reset the positions needed this time + for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) { + adjustedDirectBufferOutputsParameter[i] = null; + } + // Use the caller passed buffers in erasedIndexes positions + for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) { + boolean found = false; + for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) { + // If this index is one requested by the caller via erasedIndexes, then + // we use the passed output buffer to avoid copying data thereafter. + if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) { + found = true; + adjustedDirectBufferOutputsParameter[j] = + resetBuffer(outputs[outputIdx++]); + } + } + if (!found) { + throw new HadoopIllegalArgumentException( + "Inputs not fully corresponding to erasedIndexes in null places"); + } + } + // Use shared buffers for other positions (not set yet) + for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) { + if (adjustedDirectBufferOutputsParameter[i] == null) { + ByteBuffer buffer = checkGetDirectBuffer(bufferIdx, dataLen); + buffer.position(0); + buffer.limit(dataLen); + adjustedDirectBufferOutputsParameter[i] = resetBuffer(buffer); + bufferIdx++; + } + } + + doDecodeImpl(inputs, erasedOrNotToReadIndexes, + adjustedDirectBufferOutputsParameter); + } + + private byte[] checkGetBytesArrayBuffer(int idx, int bufferLen) { + if (bytesArrayBuffers[idx] == null || + bytesArrayBuffers[idx].length < bufferLen) { + bytesArrayBuffers[idx] = new byte[bufferLen]; + } + return bytesArrayBuffers[idx]; + } + + private ByteBuffer checkGetDirectBuffer(int idx, int bufferLen) { + if (directBuffers[idx] == null || + directBuffers[idx].capacity() < bufferLen) { + directBuffers[idx] = ByteBuffer.allocateDirect(bufferLen); + } + return directBuffers[idx]; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java new file mode 100644 index 0000000..efeee90 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawEncoder.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil; + +import java.nio.ByteBuffer; + +/** + * A raw erasure encoder in RS code scheme in pure Java in case native one + * isn't available in some environment. Please always use native implementations + * when possible. + */ +public class RSRawEncoder extends AbstractRawErasureEncoder { + private int[] generatingPolynomial; + + public RSRawEncoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + + assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize()); + + int[] primitivePower = RSUtil.getPrimitivePower(numDataUnits, + numParityUnits); + // compute generating polynomial + int[] gen = {1}; + int[] poly = new int[2]; + for (int i = 0; i < numParityUnits; i++) { + poly[0] = primitivePower[i]; + poly[1] = 1; + gen = RSUtil.GF.multiply(gen, poly); + } + // generating polynomial has all generating roots + generatingPolynomial = gen; + } + + @Override + protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { + // parity units + data units + ByteBuffer[] all = new ByteBuffer[outputs.length + inputs.length]; + System.arraycopy(outputs, 0, all, 0, outputs.length); + System.arraycopy(inputs, 0, all, outputs.length, inputs.length); + + // Compute the remainder + RSUtil.GF.remainder(all, generatingPolynomial); + } + + @Override + protected void doEncode(byte[][] inputs, int[] inputOffsets, + int dataLen, byte[][] outputs, + int[] outputOffsets) { + // parity units + data units + byte[][] all = new byte[outputs.length + inputs.length][]; + System.arraycopy(outputs, 0, all, 0, outputs.length); + System.arraycopy(inputs, 0, all, outputs.length, inputs.length); + + int[] offsets = new int[inputOffsets.length + outputOffsets.length]; + System.arraycopy(outputOffsets, 0, offsets, 0, outputOffsets.length); + System.arraycopy(inputOffsets, 0, offsets, + outputOffsets.length, inputOffsets.length); + + // Compute the remainder + RSUtil.GF.remainder(all, offsets, dataLen, generatingPolynomial); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactory.java new file mode 100644 index 0000000..5db49e3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawErasureCoderFactory.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +/** + * A raw coder factory for raw Reed-Solomon coder in Java. + */ +public class RSRawErasureCoderFactory implements RawErasureCoderFactory { + + @Override + public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) { + return new RSRawEncoder(numDataUnits, numParityUnits); + } + + @Override + public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) { + return new RSRawDecoder(numDataUnits, numParityUnits); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoder.java new file mode 100644 index 0000000..0ddb460 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoder.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.conf.Configurable; + +/** + * RawErasureCoder is a common interface for {@link RawErasureEncoder} and + * {@link RawErasureDecoder} as both encoder and decoder share some properties. + * + * RawErasureCoder is part of ErasureCodec framework, where ErasureCoder is + * used to encode/decode a group of blocks (BlockGroup) according to the codec + * specific BlockGroup layout and logic. An ErasureCoder extracts chunks of + * data from the blocks and can employ various low level RawErasureCoders to + * perform encoding/decoding against the chunks. + * + * To distinguish from ErasureCoder, here RawErasureCoder is used to mean the + * low level constructs, since it only takes care of the math calculation with + * a group of byte buffers. + */ +public interface RawErasureCoder extends Configurable { + + /** + * The number of data input units for the coding. A unit can be a byte, + * chunk or buffer or even a block. + * @return count of data input units + */ + public int getNumDataUnits(); + + /** + * The number of parity output units for the coding. A unit can be a byte, + * chunk, buffer or even a block. + * @return count of parity output units + */ + public int getNumParityUnits(); + + /** + * Tell if direct buffer is preferred or not. It's for callers to + * decide how to allocate coding chunk buffers, using DirectByteBuffer or + * bytes array. It will return false by default. + * @return true if native buffer is preferred for performance consideration, + * otherwise false. + */ + public boolean preferDirectBuffer(); + + /** + * Should be called when release this coder. Good chance to release encoding + * or decoding buffers + */ + public void release(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java new file mode 100644 index 0000000..280daf3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderFactory.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +/** + * Raw erasure coder factory that can be used to create raw encoder and decoder. + * It helps in configuration since only one factory class is needed to be + * configured. + */ +public interface RawErasureCoderFactory { + + /** + * Create raw erasure encoder. + * @param numDataUnits number of data units in a coding group + * @param numParityUnits number of parity units in a coding group + * @return raw erasure encoder + */ + public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits); + + /** + * Create raw erasure decoder. + * @param numDataUnits number of data units in a coding group + * @param numParityUnits number of parity units in a coding group + * @return raw erasure decoder + */ + public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java new file mode 100644 index 0000000..e2d01d9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.io.erasurecode.ECChunk; + +import java.nio.ByteBuffer; + +/** + * RawErasureDecoder performs decoding given chunks of input data and generates + * missing data that corresponds to an erasure code scheme, like XOR and + * Reed-Solomon. + * + * It extends the {@link RawErasureCoder} interface. + */ +public interface RawErasureDecoder extends RawErasureCoder { + + /** + * Decode with inputs and erasedIndexes, generates outputs. + * How to prepare for inputs: + * 1. Create an array containing parity units + data units. Please note the + * parity units should be first or before the data units. + * 2. Set null in the array locations specified via erasedIndexes to indicate + * they're erased and no data are to read from; + * 3. Set null in the array locations for extra redundant items, as they're + * not necessary to read when decoding. For example in RS-6-3, if only 1 + * unit is really erased, then we have 2 extra items as redundant. They can + * be set as null to indicate no data will be used from them. + * + * For an example using RS (6, 3), assuming sources (d0, d1, d2, d3, d4, d5) + * and parities (p0, p1, p2), d2 being erased. We can and may want to use only + * 6 units like (d1, d3, d4, d5, p0, p2) to recover d2. We will have: + * inputs = [p0, null(p1), p2, null(d0), d1, null(d2), d3, d4, d5] + * erasedIndexes = [5] // index of d2 into inputs array + * outputs = [a-writable-buffer] + * + * Note, for both inputs and outputs, no mixing of on-heap buffers and direct + * buffers are allowed. + * + * @param inputs inputs to read data from, contents may change after the call + * @param erasedIndexes indexes of erased units in the inputs array + * @param outputs outputs to write into for data generated according to + * erasedIndexes, ready for reading the result data from after + * the call + */ + public void decode(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs); + + /** + * Decode with inputs and erasedIndexes, generates outputs. More see above. + * @param inputs inputs to read data from, contents may change after the call + * @param erasedIndexes indexes of erased units in the inputs array + * @param outputs outputs to write into for data generated according to + * erasedIndexes, ready for reading the result data from after + * the call + */ + public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs); + + /** + * Decode with inputs and erasedIndexes, generates outputs. More see above. + * + * Note, for both input and output ECChunks, no mixing of on-heap buffers and + * direct buffers are allowed. + * + * @param inputs inputs to read data from, contents may change after the call + * @param erasedIndexes indexes of erased units in the inputs array + * @param outputs outputs to write into for data generated according to + * erasedIndexes, ready for reading the result data from after + * the call + */ + public void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java new file mode 100644 index 0000000..7571f09 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureEncoder.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.io.erasurecode.ECChunk; + +import java.nio.ByteBuffer; + +/** + * RawErasureEncoder performs encoding given chunks of input data and generates + * parity outputs that corresponds to an erasure code scheme, like XOR and + * Reed-Solomon. + * + * It extends the {@link RawErasureCoder} interface. + */ +public interface RawErasureEncoder extends RawErasureCoder { + + /** + * Encode with inputs and generates outputs. + * + * Note, for both inputs and outputs, no mixing of on-heap buffers and direct + * buffers are allowed. + * + * @param inputs inputs to read data from, contents may change after the call + * @param outputs + */ + public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs); + + /** + * Encode with inputs and generates outputs + * @param inputs inputs to read data from, contents may change after the call + * @param outputs outputs to write into for data generated, ready for reading + * the result data from after the call + */ + public void encode(byte[][] inputs, byte[][] outputs); + + /** + * Encode with inputs and generates outputs. + * + * Note, for both input and output ECChunks, no mixing of on-heap buffers and + * direct buffers are allowed. + * + * @param inputs inputs to read data from, contents may change after the call + * @param outputs outputs to write into for data generated, ready for reading + * the result data from after the call + */ + public void encode(ECChunk[] inputs, ECChunk[] outputs); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java new file mode 100644 index 0000000..e20e543 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import java.nio.ByteBuffer; + +/** + * A raw decoder in XOR code scheme in pure Java, adapted from HDFS-RAID. + * + * XOR code is an important primitive code scheme in erasure coding and often + * used in advanced codes, like HitchHiker and LRC, though itself is rarely + * deployed independently. + */ +public class XORRawDecoder extends AbstractRawErasureDecoder { + + public XORRawDecoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + } + + @Override + protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs) { + ByteBuffer output = outputs[0]; + resetBuffer(output); + + int erasedIdx = erasedIndexes[0]; + + // Process the inputs. + int iIdx, oIdx; + for (int i = 0; i < inputs.length; i++) { + // Skip the erased location. + if (i == erasedIdx) { + continue; + } + + for (iIdx = inputs[i].position(), oIdx = output.position(); + iIdx < inputs[i].limit(); + iIdx++, oIdx++) { + output.put(oIdx, (byte) (output.get(oIdx) ^ inputs[i].get(iIdx))); + } + } + } + + @Override + protected void doDecode(byte[][] inputs, int[] inputOffsets, int dataLen, + int[] erasedIndexes, byte[][] outputs, + int[] outputOffsets) { + byte[] output = outputs[0]; + resetBuffer(output, outputOffsets[0], dataLen); + + int erasedIdx = erasedIndexes[0]; + + // Process the inputs. + int iIdx, oIdx; + for (int i = 0; i < inputs.length; i++) { + // Skip the erased location. + if (i == erasedIdx) { + continue; + } + + for (iIdx = inputOffsets[i], oIdx = outputOffsets[0]; + iIdx < inputOffsets[i] + dataLen; iIdx++, oIdx++) { + output[oIdx] ^= inputs[i][iIdx]; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java new file mode 100644 index 0000000..f4d242e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import java.nio.ByteBuffer; + +/** + * A raw encoder in XOR code scheme in pure Java, adapted from HDFS-RAID. + * + * XOR code is an important primitive code scheme in erasure coding and often + * used in advanced codes, like HitchHiker and LRC, though itself is rarely + * deployed independently. + */ +public class XORRawEncoder extends AbstractRawErasureEncoder { + + public XORRawEncoder(int numDataUnits, int numParityUnits) { + super(numDataUnits, numParityUnits); + } + + protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { + ByteBuffer output = outputs[0]; + resetBuffer(output); + + // Get the first buffer's data. + int iIdx, oIdx; + for (iIdx = inputs[0].position(), oIdx = output.position(); + iIdx < inputs[0].limit(); iIdx++, oIdx++) { + output.put(oIdx, inputs[0].get(iIdx)); + } + + // XOR with everything else. + for (int i = 1; i < inputs.length; i++) { + for (iIdx = inputs[i].position(), oIdx = output.position(); + iIdx < inputs[i].limit(); + iIdx++, oIdx++) { + output.put(oIdx, (byte) (output.get(oIdx) ^ inputs[i].get(iIdx))); + } + } + } + + @Override + protected void doEncode(byte[][] inputs, int[] inputOffsets, int dataLen, + byte[][] outputs, int[] outputOffsets) { + byte[] output = outputs[0]; + resetBuffer(output, outputOffsets[0], dataLen); + + // Get the first buffer's data. + int iIdx, oIdx; + for (iIdx = inputOffsets[0], oIdx = outputOffsets[0]; + iIdx < inputOffsets[0] + dataLen; iIdx++, oIdx++) { + output[oIdx] = inputs[0][iIdx]; + } + + // XOR with everything else. + for (int i = 1; i < inputs.length; i++) { + for (iIdx = inputOffsets[i], oIdx = outputOffsets[0]; + iIdx < inputOffsets[i] + dataLen; iIdx++, oIdx++) { + output[oIdx] ^= inputs[i][iIdx]; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawErasureCoderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawErasureCoderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawErasureCoderFactory.java new file mode 100644 index 0000000..de20c95 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawErasureCoderFactory.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +/** + * A raw coder factory for raw XOR coder. + */ +public class XORRawErasureCoderFactory implements RawErasureCoderFactory { + + @Override + public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) { + return new XORRawEncoder(numDataUnits, numParityUnits); + } + + @Override + public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) { + return new XORRawDecoder(numDataUnits, numParityUnits); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/DumpUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/DumpUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/DumpUtil.java new file mode 100644 index 0000000..c8f133f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/DumpUtil.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder.util; + +import org.apache.hadoop.io.erasurecode.ECChunk; + +/** + * A dump utility class for debugging data erasure coding/decoding issues. Don't + * suggest they are used in runtime production codes. + */ +public final class DumpUtil { + private static final String HEX_CHARS_STR = "0123456789ABCDEF"; + private static final char[] HEX_CHARS = HEX_CHARS_STR.toCharArray(); + + private DumpUtil() { + // No called + } + + /** + * Convert bytes into format like 0x02 02 00 80. + */ + public static String bytesToHex(byte[] bytes, int limit) { + if (limit > bytes.length) { + limit = bytes.length; + } + int len = limit * 2; + len += limit; // for ' ' appended for each char + len += 2; // for '0x' prefix + char[] hexChars = new char[len]; + hexChars[0] = '0'; + hexChars[1] = 'x'; + for (int j = 0; j < limit; j++) { + int v = bytes[j] & 0xFF; + hexChars[j * 3 + 2] = HEX_CHARS[v >>> 4]; + hexChars[j * 3 + 3] = HEX_CHARS[v & 0x0F]; + hexChars[j * 3 + 4] = ' '; + } + + return new String(hexChars); + } + + /** + * Print data in hex format in an array of chunks. + * @param header + * @param chunks + */ + public static void dumpChunks(String header, ECChunk[] chunks) { + System.out.println(); + System.out.println(header); + for (int i = 0; i < chunks.length; i++) { + dumpChunk(chunks[i]); + } + System.out.println(); + } + + /** + * Print data in hex format in a chunk. + * @param chunk + */ + public static void dumpChunk(ECChunk chunk) { + String str; + if (chunk == null) { + str = ""; + } else { + byte[] bytes = chunk.toBytesArray(); + str = DumpUtil.bytesToHex(bytes, 16); + } + System.out.println(str); + } +}