Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 82658D509 for ; Fri, 11 Jan 2013 00:02:37 +0000 (UTC) Received: (qmail 12705 invoked by uid 500); 11 Jan 2013 00:02:37 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 12657 invoked by uid 500); 11 Jan 2013 00:02:37 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 12649 invoked by uid 99); 11 Jan 2013 00:02:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jan 2013 00:02:36 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jan 2013 00:02:33 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 141DB2388847 for ; Fri, 11 Jan 2013 00:02:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1431743 - in /hadoop/common/branches/branch-1: CHANGES.txt src/core/org/apache/hadoop/io/compress/GzipCodec.java src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java Date: Fri, 11 Jan 2013 00:02:12 -0000 To: common-commits@hadoop.apache.org From: eyang@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130111000213.141DB2388847@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: eyang Date: Fri Jan 11 00:02:12 2013 New Revision: 1431743 URL: http://svn.apache.org/viewvc?rev=1431743&view=rev Log: HADOOP-8419. Fixed GzipCode NPE reset for IBM JDK. (Yu Li via eyang) Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/compress/GzipCodec.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1431743&r1=1431742&r2=1431743&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Fri Jan 11 00:02:12 2013 @@ -167,6 +167,8 @@ Release 1.2.0 - unreleased BUG FIXES + HADOOP-8419. Fixed GzipCode NPE reset for IBM JDK. (Yu Li via eyang) + MAPREDUCE-4904. OTHER_LOCAL_MAPS counter is not correct. (Junping Du via llu) Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/compress/GzipCodec.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/compress/GzipCodec.java?rev=1431743&r1=1431742&r2=1431743&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/compress/GzipCodec.java (original) +++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/compress/GzipCodec.java Fri Jan 11 00:02:12 2013 @@ -39,14 +39,74 @@ public class GzipCodec extends DefaultCo protected static class GzipOutputStream extends CompressorStream { private static class ResetableGZIPOutputStream extends GZIPOutputStream { - + private static final int TRAILER_SIZE = 8; + public static final String JVMVendor= System.getProperty("java.vendor"); + public static final String JVMVersion= System.getProperty("java.version"); + private static final boolean HAS_BROKEN_FINISH = + (JVMVendor.contains("IBM") && JVMVersion.contains("1.6.0")); + public ResetableGZIPOutputStream(OutputStream out) throws IOException { super(out); } - + public void resetState() throws IOException { def.reset(); } + + /** + * Override this method for HADOOP-8419. + * Override because IBM implementation calls def.end() which + * causes problem when reseting the stream for reuse. + * + */ + @Override + public void finish() throws IOException { + if (HAS_BROKEN_FINISH) { + if (!def.finished()) { + def.finish(); + while (!def.finished()) { + int i = def.deflate(this.buf, 0, this.buf.length); + if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) { + writeTrailer(this.buf, i); + i += TRAILER_SIZE; + out.write(this.buf, 0, i); + + return; + } + if (i > 0) { + out.write(this.buf, 0, i); + } + } + + byte[] arrayOfByte = new byte[TRAILER_SIZE]; + writeTrailer(arrayOfByte, 0); + out.write(arrayOfByte); + } + } else { + super.finish(); + } + } + + /** re-implement for HADOOP-8419 because the relative method in jdk is invisible */ + private void writeTrailer(byte[] paramArrayOfByte, int paramInt) + throws IOException { + writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt); + writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4); + } + + /** re-implement for HADOOP-8419 because the relative method in jdk is invisible */ + private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2) + throws IOException { + writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2); + writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2); + } + + /** re-implement for HADOOP-8419 because the relative method in jdk is invisible */ + private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2) + throws IOException { + paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF); + paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF); + } } public GzipOutputStream(OutputStream out) throws IOException { Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java?rev=1431743&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java Fri Jan 11 00:02:12 2013 @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.RandomDatum; +import org.apache.hadoop.io.compress.zlib.ZlibFactory; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; +import org.apache.hadoop.util.ReflectionUtils; + +import junit.framework.TestCase; + +public class TestCompressionStreamReuse extends TestCase { + private static final Log LOG = LogFactory + .getLog(TestCompressionStreamReuse.class); + + private Configuration conf = new Configuration(); + private int count = 10000; + private int seed = new Random().nextInt(); + + public void testBZip2Codec() throws IOException { + resetStateTest(conf, seed, count, + "org.apache.hadoop.io.compress.BZip2Codec"); + } + + public void testGzipCompressStreamReuse() throws IOException { + resetStateTest(conf, seed, count, + "org.apache.hadoop.io.compress.GzipCodec"); + } + + public void testGzipCompressStreamReuseWithParam() throws IOException { + Configuration conf = new Configuration(this.conf); + ZlibFactory + .setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION); + ZlibFactory.setCompressionStrategy(conf, + CompressionStrategy.HUFFMAN_ONLY); + resetStateTest(conf, seed, count, + "org.apache.hadoop.io.compress.GzipCodec"); + } + + private static void resetStateTest(Configuration conf, int seed, int count, + String codecClass) throws IOException { + // Create the codec + CompressionCodec codec = null; + try { + codec = (CompressionCodec) ReflectionUtils.newInstance(conf + .getClassByName(codecClass), conf); + } catch (ClassNotFoundException cnfe) { + throw new IOException("Illegal codec!"); + } + LOG.info("Created a Codec object of type: " + codecClass); + + // Generate data + DataOutputBuffer data = new DataOutputBuffer(); + RandomDatum.Generator generator = new RandomDatum.Generator(seed); + for (int i = 0; i < count; ++i) { + generator.next(); + RandomDatum key = generator.getKey(); + RandomDatum value = generator.getValue(); + + key.write(data); + value.write(data); + } + LOG.info("Generated " + count + " records"); + + // Compress data + DataOutputBuffer compressedDataBuffer = new DataOutputBuffer(); + DataOutputStream deflateOut = new DataOutputStream( + new BufferedOutputStream(compressedDataBuffer)); + CompressionOutputStream deflateFilter = codec + .createOutputStream(deflateOut); + deflateFilter.write(data.getData(), 0, data.getLength()); + deflateFilter.finish(); + deflateFilter.flush(); + LOG.info("Finished compressing data"); + + // reset deflator + deflateFilter.resetState(); + LOG.info("Finished reseting deflator"); + + // re-generate data + data.reset(); + generator = new RandomDatum.Generator(seed); + for (int i = 0; i < count; ++i) { + generator.next(); + RandomDatum key = generator.getKey(); + RandomDatum value = generator.getValue(); + + key.write(data); + value.write(data); + } + DataInputBuffer originalData = new DataInputBuffer(); + DataInputStream originalIn = new DataInputStream( + new BufferedInputStream(originalData)); + originalData.reset(data.getData(), 0, data.getLength()); + + // re-compress data + compressedDataBuffer.reset(); + deflateOut = new DataOutputStream(new BufferedOutputStream( + compressedDataBuffer)); + deflateFilter = codec.createOutputStream(deflateOut); + + deflateFilter.write(data.getData(), 0, data.getLength()); + deflateFilter.finish(); + deflateFilter.flush(); + LOG.info("Finished re-compressing data"); + + // De-compress data + DataInputBuffer deCompressedDataBuffer = new DataInputBuffer(); + deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, + compressedDataBuffer.getLength()); + CompressionInputStream inflateFilter = codec + .createInputStream(deCompressedDataBuffer); + DataInputStream inflateIn = new DataInputStream( + new BufferedInputStream(inflateFilter)); + + // Check + for (int i = 0; i < count; ++i) { + RandomDatum k1 = new RandomDatum(); + RandomDatum v1 = new RandomDatum(); + k1.readFields(originalIn); + v1.readFields(originalIn); + + RandomDatum k2 = new RandomDatum(); + RandomDatum v2 = new RandomDatum(); + k2.readFields(inflateIn); + v2.readFields(inflateIn); + assertTrue( + "original and compressed-then-decompressed-output not equal", + k1.equals(k2) && v1.equals(v2)); + } + LOG.info("SUCCESS! Completed checking " + count + " records"); + } +}