Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AAC49186C2 for ; Thu, 3 Sep 2015 22:37:43 +0000 (UTC) Received: (qmail 93788 invoked by uid 500); 3 Sep 2015 22:37:43 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 93625 invoked by uid 500); 3 Sep 2015 22:37:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 93320 invoked by uid 99); 3 Sep 2015 22:37:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Sep 2015 22:37:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BF07AE7EB6; Thu, 3 Sep 2015 22:37:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vinodkv@apache.org To: common-commits@hadoop.apache.org Date: Thu, 03 Sep 2015 22:37:44 -0000 Message-Id: <57f6559c4f5547999bfcf614e2f0489c@git.apache.org> In-Reply-To: <388cb2efe7a04c9aa0ad4d6c559bdb2b@git.apache.org> References: <388cb2efe7a04c9aa0ad4d6c559bdb2b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/9] hadoop git commit: HADOOP-7139. Allow appending to existing SequenceFiles (Contributed by kanaka kumar avvaru) HADOOP-7139. Allow appending to existing SequenceFiles (Contributed by kanaka kumar avvaru) (cherry picked from commit 295d678be8853a52c3ec3da43d9265478d6632b3) (cherry picked from commit 80697e4f324948ec32b4cad3faccba55287be652) (cherry picked from commit b3546b60340e085c5abd8f8f0990d45c7445fe07) Conflicts: hadoop-common-project/hadoop-common/CHANGES.txt (cherry picked from commit e9c8d8c58516aa64589cd44e9e5dd0a23ba72a17) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4f53c98c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4f53c98c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4f53c98c Branch: refs/heads/branch-2.6.1 Commit: 4f53c98ca4b6fa4b75935e743df7aae6b54366ce Parents: 193d8d3 Author: Vinayakumar B Authored: Thu Jun 18 14:39:00 2015 +0530 Committer: Vinod Kumar Vavilapalli Committed: Thu Sep 3 14:15:23 2015 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../java/org/apache/hadoop/io/SequenceFile.java | 85 ++++- .../hadoop/io/TestSequenceFileAppend.java | 311 +++++++++++++++++++ 3 files changed, 394 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 107a95a..c3d18a1 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -8,6 +8,9 @@ Release 2.6.1 - UNRELEASED IMPROVEMENTS + HADOOP-7139. Allow appending to existing SequenceFiles + (kanaka kumar avvaru via vinayakumarb) + OPTIMIZATIONS HADOOP-11238. Update the NameNode's Group Cache in the background when http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index 35fc130..153856d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -835,7 +835,9 @@ public class SequenceFile { DataOutputStream deflateOut = null; Metadata metadata = null; Compressor compressor = null; - + + private boolean appendMode = false; + protected Serializer keySerializer; protected Serializer uncompressedValSerializer; protected Serializer compressedValSerializer; @@ -907,6 +909,13 @@ public class SequenceFile { } } + static class AppendIfExistsOption extends Options.BooleanOption implements + Option { + AppendIfExistsOption(boolean value) { + super(value); + } + } + static class KeyClassOption extends Options.ClassOption implements Option { KeyClassOption(Class value) { super(value); @@ -956,7 +965,7 @@ public class SequenceFile { return codec; } } - + public static Option file(Path value) { return new FileOption(value); } @@ -982,6 +991,10 @@ public class SequenceFile { return new ReplicationOption(value); } + public static Option appendIfExists(boolean value) { + return new AppendIfExistsOption(value); + } + public static Option blockSize(long value) { return new BlockSizeOption(value); } @@ -1028,6 +1041,8 @@ public class SequenceFile { ProgressableOption progressOption = Options.getOption(ProgressableOption.class, opts); FileOption fileOption = Options.getOption(FileOption.class, opts); + AppendIfExistsOption appendIfExistsOption = Options.getOption( + AppendIfExistsOption.class, opts); FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); StreamOption streamOption = Options.getOption(StreamOption.class, opts); KeyClassOption keyClassOption = @@ -1069,7 +1084,54 @@ public class SequenceFile { blockSizeOption.getValue(); Progressable progress = progressOption == null ? null : progressOption.getValue(); - out = fs.create(p, true, bufferSize, replication, blockSize, progress); + + if (appendIfExistsOption != null && appendIfExistsOption.getValue() + && fs.exists(p)) { + + // Read the file and verify header details + SequenceFile.Reader reader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption()); + try { + + if (keyClassOption.getValue() != reader.getKeyClass() + || valueClassOption.getValue() != reader.getValueClass()) { + throw new IllegalArgumentException( + "Key/value class provided does not match the file"); + } + + if (reader.getVersion() != VERSION[3]) { + throw new VersionMismatchException(VERSION[3], + reader.getVersion()); + } + + if (metadataOption != null) { + LOG.info("MetaData Option is ignored during append"); + } + metadataOption = (MetadataOption) SequenceFile.Writer + .metadata(reader.getMetadata()); + + CompressionOption readerCompressionOption = new CompressionOption( + reader.getCompressionType(), reader.getCompressionCodec()); + + if (readerCompressionOption.value != compressionTypeOption.value + || !readerCompressionOption.codec.getClass().getName() + .equals(compressionTypeOption.codec.getClass().getName())) { + throw new IllegalArgumentException( + "Compression option provided does not match the file"); + } + + sync = reader.getSync(); + + } finally { + reader.close(); + } + + out = fs.append(p, bufferSize, progress); + this.appendMode = true; + } else { + out = fs + .create(p, true, bufferSize, replication, blockSize, progress); + } } else { out = streamOption.getValue(); } @@ -1157,7 +1219,7 @@ public class SequenceFile { out.write(sync); // write the sync bytes out.flush(); // flush header } - + /** Initialize. */ @SuppressWarnings("unchecked") void init(Configuration conf, FSDataOutputStream out, boolean ownStream, @@ -1212,7 +1274,12 @@ public class SequenceFile { } this.compressedValSerializer.open(deflateOut); } - writeFileHeader(); + + if (appendMode) { + sync(); + } else { + writeFileHeader(); + } } /** Returns the class of keys in this file. */ @@ -2043,6 +2110,14 @@ public class SequenceFile { /** Returns the compression codec of data in this file. */ public CompressionCodec getCompressionCodec() { return codec; } + private byte[] getSync() { + return sync; + } + + private byte getVersion() { + return version; + } + /** * Get the compression type for this file. * @return the compression type http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java new file mode 100644 index 0000000..4576642 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Reader; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.SequenceFile.Writer.Option; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.serializer.JavaSerializationComparator; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSequenceFileAppend { + + private static Configuration conf; + private static FileSystem fs; + private static Path ROOT_PATH = new Path(System.getProperty( + "test.build.data", "build/test/data")); + + @BeforeClass + public static void setUp() throws Exception { + conf = new Configuration(); + conf.set("io.serializations", + "org.apache.hadoop.io.serializer.JavaSerialization"); + conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); + fs = FileSystem.get(conf); + } + + @AfterClass + public static void tearDown() throws Exception { + fs.close(); + } + + @Test(timeout = 30000) + public void testAppend() throws Exception { + + Path file = new Path(ROOT_PATH, "testseqappend.seq"); + fs.delete(file, true); + + Text key1 = new Text("Key1"); + Text value1 = new Text("Value1"); + Text value2 = new Text("Updated"); + + SequenceFile.Metadata metadata = new SequenceFile.Metadata(); + metadata.set(key1, value1); + Writer.Option metadataOption = Writer.metadata(metadata); + + Writer writer = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), metadataOption); + + writer.append(1L, "one"); + writer.append(2L, "two"); + writer.close(); + + verify2Values(file); + + metadata.set(key1, value2); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), metadataOption); + + // Verify the Meta data is not changed + assertEquals(value1, writer.metadata.get(key1)); + + writer.append(3L, "three"); + writer.append(4L, "four"); + + writer.close(); + + verifyAll4Values(file); + + // Verify the Meta data readable after append + Reader reader = new Reader(conf, Reader.file(file)); + assertEquals(value1, reader.getMetadata().get(key1)); + reader.close(); + + // Verify failure if the compression details are different + try { + Option wrongCompressOption = Writer.compression(CompressionType.RECORD, + new GzipCodec()); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), wrongCompressOption); + writer.close(); + fail("Expected IllegalArgumentException for compression options"); + } catch (IllegalArgumentException IAE) { + // Expected exception. Ignore it + } + + try { + Option wrongCompressOption = Writer.compression(CompressionType.BLOCK, + new DefaultCodec()); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), wrongCompressOption); + writer.close(); + fail("Expected IllegalArgumentException for compression options"); + } catch (IllegalArgumentException IAE) { + // Expected exception. Ignore it + } + + fs.deleteOnExit(file); + } + + @Test(timeout = 30000) + public void testAppendRecordCompression() throws Exception { + + Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq"); + fs.delete(file, true); + + Option compressOption = Writer.compression(CompressionType.RECORD, + new GzipCodec()); + Writer writer = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), compressOption); + + writer.append(1L, "one"); + writer.append(2L, "two"); + writer.close(); + + verify2Values(file); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), compressOption); + + writer.append(3L, "three"); + writer.append(4L, "four"); + writer.close(); + + verifyAll4Values(file); + + fs.deleteOnExit(file); + } + + @Test(timeout = 30000) + public void testAppendBlockCompression() throws Exception { + + Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq"); + fs.delete(file, true); + + Option compressOption = Writer.compression(CompressionType.BLOCK, + new GzipCodec()); + Writer writer = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), compressOption); + + writer.append(1L, "one"); + writer.append(2L, "two"); + writer.close(); + + verify2Values(file); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), compressOption); + + writer.append(3L, "three"); + writer.append(4L, "four"); + writer.close(); + + verifyAll4Values(file); + + // Verify failure if the compression details are different or not Provided + try { + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true)); + writer.close(); + fail("Expected IllegalArgumentException for compression options"); + } catch (IllegalArgumentException IAE) { + // Expected exception. Ignore it + } + + // Verify failure if the compression details are different + try { + Option wrongCompressOption = Writer.compression(CompressionType.RECORD, + new GzipCodec()); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), wrongCompressOption); + writer.close(); + fail("Expected IllegalArgumentException for compression options"); + } catch (IllegalArgumentException IAE) { + // Expected exception. Ignore it + } + + try { + Option wrongCompressOption = Writer.compression(CompressionType.BLOCK, + new DefaultCodec()); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), wrongCompressOption); + writer.close(); + fail("Expected IllegalArgumentException for compression options"); + } catch (IllegalArgumentException IAE) { + // Expected exception. Ignore it + } + + fs.deleteOnExit(file); + } + + @Test(timeout = 30000) + public void testAppendSort() throws Exception { + Path file = new Path(ROOT_PATH, "testseqappendSort.seq"); + fs.delete(file, true); + + Path sortedFile = new Path(ROOT_PATH, "testseqappendSort.seq.sort"); + fs.delete(sortedFile, true); + + SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, + new JavaSerializationComparator(), Long.class, String.class, conf); + + Option compressOption = Writer.compression(CompressionType.BLOCK, + new GzipCodec()); + Writer writer = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), compressOption); + + writer.append(2L, "two"); + writer.append(1L, "one"); + + writer.close(); + + writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file), + SequenceFile.Writer.keyClass(Long.class), + SequenceFile.Writer.valueClass(String.class), + SequenceFile.Writer.appendIfExists(true), compressOption); + + writer.append(4L, "four"); + writer.append(3L, "three"); + writer.close(); + + // Sort file after append + sorter.sort(file, sortedFile); + verifyAll4Values(sortedFile); + + fs.deleteOnExit(file); + fs.deleteOnExit(sortedFile); + } + + private void verify2Values(Path file) throws IOException { + Reader reader = new Reader(conf, Reader.file(file)); + assertEquals(1L, reader.next((Object) null)); + assertEquals("one", reader.getCurrentValue((Object) null)); + assertEquals(2L, reader.next((Object) null)); + assertEquals("two", reader.getCurrentValue((Object) null)); + assertNull(reader.next((Object) null)); + reader.close(); + } + + private void verifyAll4Values(Path file) throws IOException { + Reader reader = new Reader(conf, Reader.file(file)); + assertEquals(1L, reader.next((Object) null)); + assertEquals("one", reader.getCurrentValue((Object) null)); + assertEquals(2L, reader.next((Object) null)); + assertEquals("two", reader.getCurrentValue((Object) null)); + assertEquals(3L, reader.next((Object) null)); + assertEquals("three", reader.getCurrentValue((Object) null)); + assertEquals(4L, reader.next((Object) null)); + assertEquals("four", reader.getCurrentValue((Object) null)); + assertNull(reader.next((Object) null)); + reader.close(); + } +}