Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 747A46A7D for ; Wed, 13 Jul 2011 16:13:09 +0000 (UTC) Received: (qmail 44235 invoked by uid 500); 13 Jul 2011 16:13:09 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 44141 invoked by uid 500); 13 Jul 2011 16:13:08 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 43974 invoked by uid 99); 13 Jul 2011 16:13:07 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jul 2011 16:13:07 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jul 2011 16:13:04 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9755C23888C2; Wed, 13 Jul 2011 16:12:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1146111 - in /hadoop/common/trunk/common: CHANGES.txt src/java/org/apache/hadoop/util/DataChecksum.java src/test/core/org/apache/hadoop/util/TestDataChecksum.java Date: Wed, 13 Jul 2011 16:12:42 -0000 To: common-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110713161242.9755C23888C2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: todd Date: Wed Jul 13 16:12:42 2011 New Revision: 1146111 URL: http://svn.apache.org/viewvc?rev=1146111&view=rev Log: HADOOP-7444. Add Checksum API to verify and calculate checksums "in bulk". Contributed by Todd Lipcon. Added: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestDataChecksum.java Modified: hadoop/common/trunk/common/CHANGES.txt hadoop/common/trunk/common/src/java/org/apache/hadoop/util/DataChecksum.java Modified: hadoop/common/trunk/common/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/CHANGES.txt?rev=1146111&r1=1146110&r2=1146111&view=diff ============================================================================== --- hadoop/common/trunk/common/CHANGES.txt (original) +++ hadoop/common/trunk/common/CHANGES.txt Wed Jul 13 16:12:42 2011 @@ -259,6 +259,9 @@ Trunk (unreleased changes) HADOOP-7457. Remove out-of-date Chinese language documentation. (Jakob Homan via eli) + HADOOP-7444. Add Checksum API to verify and calculate checksums "in bulk" + (todd) + OPTIMIZATIONS HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole Modified: hadoop/common/trunk/common/src/java/org/apache/hadoop/util/DataChecksum.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/util/DataChecksum.java?rev=1146111&r1=1146110&r2=1146111&view=diff ============================================================================== --- hadoop/common/trunk/common/src/java/org/apache/hadoop/util/DataChecksum.java (original) +++ hadoop/common/trunk/common/src/java/org/apache/hadoop/util/DataChecksum.java Wed Jul 13 16:12:42 2011 @@ -21,10 +21,12 @@ package org.apache.hadoop.util; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.zip.Checksum; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ChecksumException; /** * This class provides inteface and utilities for processing checksums for @@ -234,6 +236,157 @@ public class DataChecksum implements Che } /** + * Verify that the given checksums match the given data. + * + * The 'mark' of the ByteBuffer parameters may be modified by this function,. + * but the position is maintained. + * + * @param data the DirectByteBuffer pointing to the data to verify. + * @param checksums the DirectByteBuffer pointing to a series of stored + * checksums + * @param fileName the name of the file being read, for error-reporting + * @param basePos the file position to which the start of 'data' corresponds + * @throws ChecksumException if the checksums do not match + */ + public void verifyChunkedSums(ByteBuffer data, ByteBuffer checksums, + String fileName, long basePos) + throws ChecksumException { + if (size == 0) return; + + if (data.hasArray() && checksums.hasArray()) { + verifyChunkedSums( + data.array(), data.arrayOffset() + data.position(), data.remaining(), + checksums.array(), checksums.arrayOffset() + checksums.position(), + fileName, basePos); + return; + } + + int startDataPos = data.position(); + data.mark(); + checksums.mark(); + try { + byte[] buf = new byte[bytesPerChecksum]; + byte[] sum = new byte[size]; + while (data.remaining() > 0) { + int n = Math.min(data.remaining(), bytesPerChecksum); + checksums.get(sum); + data.get(buf, 0, n); + summer.reset(); + summer.update(buf, 0, n); + int calculated = (int)summer.getValue(); + int stored = (sum[0] << 24 & 0xff000000) | + (sum[1] << 16 & 0xff0000) | + (sum[2] << 8 & 0xff00) | + sum[3] & 0xff; + if (calculated != stored) { + long errPos = basePos + data.position() - startDataPos - n; + throw new ChecksumException( + "Checksum error: "+ fileName + " at "+ errPos + + " exp: " + stored + " got: " + calculated, errPos); + } + } + } finally { + data.reset(); + checksums.reset(); + } + } + + /** + * Implementation of chunked verification specifically on byte arrays. This + * is to avoid the copy when dealing with ByteBuffers that have array backing. + */ + private void verifyChunkedSums( + byte[] data, int dataOff, int dataLen, + byte[] checksums, int checksumsOff, String fileName, + long basePos) throws ChecksumException { + + int remaining = dataLen; + int dataPos = 0; + while (remaining > 0) { + int n = Math.min(remaining, bytesPerChecksum); + + summer.reset(); + summer.update(data, dataOff + dataPos, n); + dataPos += n; + remaining -= n; + + int calculated = (int)summer.getValue(); + int stored = (checksums[checksumsOff] << 24 & 0xff000000) | + (checksums[checksumsOff + 1] << 16 & 0xff0000) | + (checksums[checksumsOff + 2] << 8 & 0xff00) | + checksums[checksumsOff + 3] & 0xff; + checksumsOff += 4; + if (calculated != stored) { + long errPos = basePos + dataPos - n; + throw new ChecksumException( + "Checksum error: "+ fileName + " at "+ errPos + + " exp: " + stored + " got: " + calculated, errPos); + } + } + } + + /** + * Calculate checksums for the given data. + * + * The 'mark' of the ByteBuffer parameters may be modified by this function, + * but the position is maintained. + * + * @param data the DirectByteBuffer pointing to the data to checksum. + * @param checksums the DirectByteBuffer into which checksums will be + * stored. Enough space must be available in this + * buffer to put the checksums. + */ + public void calculateChunkedSums(ByteBuffer data, ByteBuffer checksums) { + if (size == 0) return; + + if (data.hasArray() && checksums.hasArray()) { + calculateChunkedSums(data.array(), data.arrayOffset() + data.position(), data.remaining(), + checksums.array(), checksums.arrayOffset() + checksums.position()); + return; + } + + data.mark(); + checksums.mark(); + try { + byte[] buf = new byte[bytesPerChecksum]; + while (data.remaining() > 0) { + int n = Math.min(data.remaining(), bytesPerChecksum); + data.get(buf, 0, n); + summer.reset(); + summer.update(buf, 0, n); + checksums.putInt((int)summer.getValue()); + } + } finally { + data.reset(); + checksums.reset(); + } + } + + /** + * Implementation of chunked calculation specifically on byte arrays. This + * is to avoid the copy when dealing with ByteBuffers that have array backing. + */ + private void calculateChunkedSums( + byte[] data, int dataOffset, int dataLength, + byte[] sums, int sumsOffset) { + + int remaining = dataLength; + while (remaining > 0) { + int n = Math.min(remaining, bytesPerChecksum); + summer.reset(); + summer.update(data, dataOffset, n); + dataOffset += n; + remaining -= n; + long calculated = summer.getValue(); + sums[sumsOffset++] = (byte) (calculated >> 24); + sums[sumsOffset++] = (byte) (calculated >> 16); + sums[sumsOffset++] = (byte) (calculated >> 8); + sums[sumsOffset++] = (byte) (calculated); + } + } + + + /** * This just provides a dummy implimentation for Checksum class * This is used when there is no checksum available or required for * data Added: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestDataChecksum.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestDataChecksum.java?rev=1146111&view=auto ============================================================================== --- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestDataChecksum.java (added) +++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestDataChecksum.java Wed Jul 13 16:12:42 2011 @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.nio.ByteBuffer; +import java.util.Random; + +import org.apache.hadoop.fs.ChecksumException; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestDataChecksum { + + // Set up buffers that have some header and trailer before the + // actual data or checksums, to make sure the code handles + // buffer.position(), limit, etc correctly. + private static final int SUMS_OFFSET_IN_BUFFER = 3; + private static final int DATA_OFFSET_IN_BUFFER = 3; + private static final int DATA_TRAILER_IN_BUFFER = 3; + + private static final int BYTES_PER_CHUNK = 512; + private static final DataChecksum checksum = + DataChecksum.newDataChecksum( + DataChecksum.CHECKSUM_CRC32, BYTES_PER_CHUNK); + + @Test + public void testBulkOps() throws Exception { + for (boolean useDirect : new boolean[]{false, true}) { + doBulkTest(1023, useDirect); + doBulkTest(1024, useDirect); + doBulkTest(1025, useDirect); + } + } + + private void doBulkTest(int dataLength, boolean useDirect) + throws Exception { + System.err.println("Testing bulk checksums of length " + + dataLength + " with " + + (useDirect ? "direct" : "array-backed") + " buffers"); + int numSums = (dataLength - 1)/checksum.getBytesPerChecksum() + 1; + int sumsLength = numSums * checksum.getChecksumSize(); + + byte data[] = new byte[dataLength + + DATA_OFFSET_IN_BUFFER + + DATA_TRAILER_IN_BUFFER]; + new Random().nextBytes(data); + ByteBuffer dataBuf = ByteBuffer.wrap( + data, DATA_OFFSET_IN_BUFFER, dataLength); + + byte checksums[] = new byte[SUMS_OFFSET_IN_BUFFER + sumsLength]; + ByteBuffer checksumBuf = ByteBuffer.wrap( + checksums, SUMS_OFFSET_IN_BUFFER, sumsLength); + + // Swap out for direct buffers if requested. + if (useDirect) { + dataBuf = directify(dataBuf); + checksumBuf = directify(checksumBuf); + } + + // calculate real checksum, make sure it passes + checksum.calculateChunkedSums(dataBuf, checksumBuf); + checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0); + + // Change a byte in the header and in the trailer, make sure + // it doesn't affect checksum result + corruptBufferOffset(checksumBuf, 0); + checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0); + corruptBufferOffset(dataBuf, 0); + dataBuf.limit(dataBuf.limit() + 1); + corruptBufferOffset(dataBuf, dataLength + DATA_OFFSET_IN_BUFFER); + dataBuf.limit(dataBuf.limit() - 1); + checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0); + + // Make sure bad checksums fail - error at beginning of array + corruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER); + try { + checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0); + fail("Did not throw on bad checksums"); + } catch (ChecksumException ce) { + assertEquals(0, ce.getPos()); + } + + // Make sure bad checksums fail - error at end of array + uncorruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER); + corruptBufferOffset(checksumBuf, SUMS_OFFSET_IN_BUFFER + sumsLength - 1); + try { + checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0); + fail("Did not throw on bad checksums"); + } catch (ChecksumException ce) { + int expectedPos = checksum.getBytesPerChecksum() * (numSums - 1); + assertEquals(expectedPos, ce.getPos()); + } + } + + private static void corruptBufferOffset(ByteBuffer buf, int offset) { + buf.put(offset, (byte)(buf.get(offset) + 1)); + } + + private static void uncorruptBufferOffset(ByteBuffer buf, int offset) { + buf.put(offset, (byte)(buf.get(offset) - 1)); + } + + private static ByteBuffer directify(ByteBuffer dataBuf) { + ByteBuffer newBuf = ByteBuffer.allocateDirect(dataBuf.capacity()); + newBuf.position(dataBuf.position()); + newBuf.mark(); + newBuf.put(dataBuf); + newBuf.reset(); + newBuf.limit(dataBuf.limit()); + return newBuf; + } +}