Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BF00318D81 for ; Fri, 3 Jul 2015 19:21:46 +0000 (UTC) Received: (qmail 22814 invoked by uid 500); 3 Jul 2015 19:21:46 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 22770 invoked by uid 500); 3 Jul 2015 19:21:46 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 22761 invoked by uid 99); 3 Jul 2015 19:21:46 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 19:21:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id DC8221A6655 for ; Fri, 3 Jul 2015 19:21:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 5BVlDnxA4mcI for ; Fri, 3 Jul 2015 19:21:19 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 5B2C22D8A8 for ; Fri, 3 Jul 2015 19:21:06 +0000 (UTC) Received: (qmail 18372 invoked by uid 99); 3 Jul 2015 19:21:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 19:21:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 638D1E3688; Fri, 3 Jul 2015 19:21:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rvs@apache.org To: commits@geode.incubator.apache.org Date: Fri, 03 Jul 2015 19:21:25 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [24/51] [partial] incubator-geode git commit: SGA #2 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java new file mode 100644 index 0000000..0a5145e --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java @@ -0,0 +1,3717 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io; + +import java.io.*; +import java.util.*; +import java.rmi.server.UID; +import java.security.MessageDigest; +import org.apache.commons.logging.*; +import org.apache.hadoop.util.Options; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.Options.CreateOpts; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.zlib.ZlibFactory; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.hadoop.util.MergeSort; +import org.apache.hadoop.util.PriorityQueue; +import org.apache.hadoop.util.Time; +// ** Pivotal Changes Begin +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.VersionMismatchException; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableName; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; +//** Pivotal Changes End + +/** + * SequenceFiles are flat files consisting of binary key/value + * pairs. + * + *

SequenceFile provides {@link Writer}, {@link Reader} and + * {@link Sorter} classes for writing, reading and sorting respectively.

+ * + * There are three SequenceFile Writers based on the + * {@link CompressionType} used to compress key/value pairs: + *
    + *
  1. + * Writer : Uncompressed records. + *
  2. + *
  3. + * RecordCompressWriter : Record-compressed files, only compress + * values. + *
  4. + *
  5. + * BlockCompressWriter : Block-compressed files, both keys & + * values are collected in 'blocks' + * separately and compressed. The size of + * the 'block' is configurable. + *
+ * + *

The actual compression algorithm used to compress key and/or values can be + * specified by using the appropriate {@link CompressionCodec}.

+ * + *

The recommended way is to use the static createWriter methods + * provided by the SequenceFile to chose the preferred format.

+ * + *

The {@link Reader} acts as the bridge and can read any of the above + * SequenceFile formats.

+ * + *

SequenceFile Formats

+ * + *

Essentially there are 3 different formats for SequenceFiles + * depending on the CompressionType specified. All of them share a + * common header described below. + * + *

+ *
    + *
  • + * version - 3 bytes of magic header SEQ, followed by 1 byte of actual + * version number (e.g. SEQ4 or SEQ6) + *
  • + *
  • + * keyClassName -key class + *
  • + *
  • + * valueClassName - value class + *
  • + *
  • + * compression - A boolean which specifies if compression is turned on for + * keys/values in this file. + *
  • + *
  • + * blockCompression - A boolean which specifies if block-compression is + * turned on for keys/values in this file. + *
  • + *
  • + * compression codec - CompressionCodec class which is used for + * compression of keys and/or values (if compression is + * enabled). + *
  • + *
  • + * metadata - {@link Metadata} for this file. + *
  • + *
  • + * sync - A sync marker to denote end of the header. + *
  • + *
+ * + *
Uncompressed SequenceFile Format
+ *
    + *
  • + * Header + *
  • + *
  • + * Record + *
      + *
    • Record length
    • + *
    • Key length
    • + *
    • Key
    • + *
    • Value
    • + *
    + *
  • + *
  • + * A sync-marker every few 100 bytes or so. + *
  • + *
+ * + *
Record-Compressed SequenceFile Format
+ *
    + *
  • + * Header + *
  • + *
  • + * Record + *
      + *
    • Record length
    • + *
    • Key length
    • + *
    • Key
    • + *
    • Compressed Value
    • + *
    + *
  • + *
  • + * A sync-marker every few 100 bytes or so. + *
  • + *
+ * + *
Block-Compressed SequenceFile Format
+ *
    + *
  • + * Header + *
  • + *
  • + * Record Block + *
      + *
    • Uncompressed number of records in the block
    • + *
    • Compressed key-lengths block-size
    • + *
    • Compressed key-lengths block
    • + *
    • Compressed keys block-size
    • + *
    • Compressed keys block
    • + *
    • Compressed value-lengths block-size
    • + *
    • Compressed value-lengths block
    • + *
    • Compressed values block-size
    • + *
    • Compressed values block
    • + *
    + *
  • + *
  • + * A sync-marker every block. + *
  • + *
+ * + *

The compressed blocks of key lengths and value lengths consist of the + * actual lengths of individual keys/values encoded in ZeroCompressedInteger + * format.

+ * + * @see CompressionCodec + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class SequenceFile { + private static final Log LOG = LogFactory.getLog(SequenceFile.class); + + private SequenceFile() {} // no public ctor + + private static final byte BLOCK_COMPRESS_VERSION = (byte)4; + private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; + private static final byte VERSION_WITH_METADATA = (byte)6; + private static byte[] VERSION = new byte[] { + (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA + }; + + private static final int SYNC_ESCAPE = -1; // "length" of sync entries + private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash + private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash + + /** The number of bytes between sync points.*/ + public static final int SYNC_INTERVAL = 100*SYNC_SIZE; + + /** + * The compression type used to compress key/value pairs in the + * {@link SequenceFile}. + * + * @see SequenceFile.Writer + */ + public static enum CompressionType { + /** Do not compress records. */ + NONE, + /** Compress values only, each separately. */ + RECORD, + /** Compress sequences of records together in blocks. */ + BLOCK + } + + /** + * Get the compression type for the reduce outputs + * @param job the job config to look in + * @return the kind of compression to use + */ + static public CompressionType getDefaultCompressionType(Configuration job) { + String name = job.get("io.seqfile.compression.type"); + return name == null ? CompressionType.RECORD : + CompressionType.valueOf(name); + } + + /** + * Set the default compression type for sequence files. + * @param job the configuration to modify + * @param val the new compression type (none, block, record) + */ + static public void setDefaultCompressionType(Configuration job, + CompressionType val) { + job.set("io.seqfile.compression.type", val.toString()); + } + + /** + * Create a new Writer with the given options. + * @param conf the configuration to use + * @param opts the options to create the file with + * @return a new Writer + * @throws IOException + */ + public static Writer createWriter(Configuration conf, Writer.Option... opts + ) throws IOException { + Writer.CompressionOption compressionOption = + Options.getOption(Writer.CompressionOption.class, opts); + CompressionType kind; + if (compressionOption != null) { + kind = compressionOption.getValue(); + } else { + kind = getDefaultCompressionType(conf); + opts = Options.prependOptions(opts, Writer.compression(kind)); + } + switch (kind) { + default: + case NONE: + return new Writer(conf, opts); + case RECORD: + return new RecordCompressWriter(conf, opts); + case BLOCK: + return new BlockCompressWriter(conf, opts); + } + } + + /** + * Construct the preferred type of SequenceFile Writer. + * @param fs The configured filesystem. + * @param conf The configuration. + * @param name The name of the file. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public static Writer + createWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass) throws IOException { + return createWriter(conf, Writer.filesystem(fs), + Writer.file(name), Writer.keyClass(keyClass), + Writer.valueClass(valClass)); + } + + /** + * Construct the preferred type of SequenceFile Writer. + * @param fs The configured filesystem. + * @param conf The configuration. + * @param name The name of the file. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compressionType The compression type. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public static Writer + createWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, + CompressionType compressionType) throws IOException { + return createWriter(conf, Writer.filesystem(fs), + Writer.file(name), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compression(compressionType)); + } + + /** + * Construct the preferred type of SequenceFile Writer. + * @param fs The configured filesystem. + * @param conf The configuration. + * @param name The name of the file. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compressionType The compression type. + * @param progress The Progressable object to track progress. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public static Writer + createWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, CompressionType compressionType, + Progressable progress) throws IOException { + return createWriter(conf, Writer.file(name), + Writer.filesystem(fs), + Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compression(compressionType), + Writer.progressable(progress)); + } + + /** + * Construct the preferred type of SequenceFile Writer. + * @param fs The configured filesystem. + * @param conf The configuration. + * @param name The name of the file. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compressionType The compression type. + * @param codec The compression codec. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public static Writer + createWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, CompressionType compressionType, + CompressionCodec codec) throws IOException { + return createWriter(conf, Writer.file(name), + Writer.filesystem(fs), + Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compression(compressionType, codec)); + } + + /** + * Construct the preferred type of SequenceFile Writer. + * @param fs The configured filesystem. + * @param conf The configuration. + * @param name The name of the file. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compressionType The compression type. + * @param codec The compression codec. + * @param progress The Progressable object to track progress. + * @param metadata The metadata of the file. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public static Writer + createWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, + CompressionType compressionType, CompressionCodec codec, + Progressable progress, Metadata metadata) throws IOException { + return createWriter(conf, Writer.file(name), + Writer.filesystem(fs), + Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compression(compressionType, codec), + Writer.progressable(progress), + Writer.metadata(metadata)); + } + + /** + * Construct the preferred type of SequenceFile Writer. + * @param fs The configured filesystem. + * @param conf The configuration. + * @param name The name of the file. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param bufferSize buffer size for the underlaying outputstream. + * @param replication replication factor for the file. + * @param blockSize block size for the file. + * @param compressionType The compression type. + * @param codec The compression codec. + * @param progress The Progressable object to track progress. + * @param metadata The metadata of the file. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public static Writer + createWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, int bufferSize, + short replication, long blockSize, + CompressionType compressionType, CompressionCodec codec, + Progressable progress, Metadata metadata) throws IOException { + return createWriter(conf, Writer.file(name), + Writer.filesystem(fs), + Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.bufferSize(bufferSize), + Writer.replication(replication), + Writer.blockSize(blockSize), + Writer.compression(compressionType, codec), + Writer.progressable(progress), + Writer.metadata(metadata)); + } + + /** + * Construct the preferred type of SequenceFile Writer. + * @param fs The configured filesystem. + * @param conf The configuration. + * @param name The name of the file. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param bufferSize buffer size for the underlaying outputstream. + * @param replication replication factor for the file. + * @param blockSize block size for the file. + * @param createParent create parent directory if non-existent + * @param compressionType The compression type. + * @param codec The compression codec. + * @param metadata The metadata of the file. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + */ + @Deprecated + public static Writer + createWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, int bufferSize, + short replication, long blockSize, boolean createParent, + CompressionType compressionType, CompressionCodec codec, + Metadata metadata) throws IOException { + return createWriter(FileContext.getFileContext(fs.getUri(), conf), + conf, name, keyClass, valClass, compressionType, codec, + metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), + CreateOpts.bufferSize(bufferSize), + createParent ? CreateOpts.createParent() + : CreateOpts.donotCreateParent(), + CreateOpts.repFac(replication), + CreateOpts.blockSize(blockSize) + ); + } + + /** + * Construct the preferred type of SequenceFile Writer. + * @param fc The context for the specified file. + * @param conf The configuration. + * @param name The name of the file. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compressionType The compression type. + * @param codec The compression codec. + * @param metadata The metadata of the file. + * @param createFlag gives the semantics of create: overwrite, append etc. + * @param opts file creation options; see {@link CreateOpts}. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + */ + public static Writer + createWriter(FileContext fc, Configuration conf, Path name, + Class keyClass, Class valClass, + CompressionType compressionType, CompressionCodec codec, + Metadata metadata, + final EnumSet createFlag, CreateOpts... opts) + throws IOException { + return createWriter(conf, fc.create(name, createFlag, opts), + keyClass, valClass, compressionType, codec, metadata).ownStream(); + } + + /** + * Construct the preferred type of SequenceFile Writer. + * @param fs The configured filesystem. + * @param conf The configuration. + * @param name The name of the file. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compressionType The compression type. + * @param codec The compression codec. + * @param progress The Progressable object to track progress. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public static Writer + createWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, + CompressionType compressionType, CompressionCodec codec, + Progressable progress) throws IOException { + return createWriter(conf, Writer.file(name), + Writer.filesystem(fs), + Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compression(compressionType, codec), + Writer.progressable(progress)); + } + + /** + * Construct the preferred type of 'raw' SequenceFile Writer. + * @param conf The configuration. + * @param out The stream on top which the writer is to be constructed. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compressionType The compression type. + * @param codec The compression codec. + * @param metadata The metadata of the file. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public static Writer + createWriter(Configuration conf, FSDataOutputStream out, + Class keyClass, Class valClass, + CompressionType compressionType, + CompressionCodec codec, Metadata metadata) throws IOException { + return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compression(compressionType, codec), + Writer.metadata(metadata)); + } + + /** + * Construct the preferred type of 'raw' SequenceFile Writer. + * @param conf The configuration. + * @param out The stream on top which the writer is to be constructed. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compressionType The compression type. + * @param codec The compression codec. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public static Writer + createWriter(Configuration conf, FSDataOutputStream out, + Class keyClass, Class valClass, CompressionType compressionType, + CompressionCodec codec) throws IOException { + return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compression(compressionType, codec)); + } + + + /** The interface to 'raw' values of SequenceFiles. */ + public static interface ValueBytes { + + /** Writes the uncompressed bytes to the outStream. + * @param outStream : Stream to write uncompressed bytes into. + * @throws IOException + */ + public void writeUncompressedBytes(DataOutputStream outStream) + throws IOException; + + /** Write compressed bytes to outStream. + * Note: that it will NOT compress the bytes if they are not compressed. + * @param outStream : Stream to write compressed bytes into. + */ + public void writeCompressedBytes(DataOutputStream outStream) + throws IllegalArgumentException, IOException; + + /** + * Size of stored data. + */ + public int getSize(); + } + + private static class UncompressedBytes implements ValueBytes { + private int dataSize; + private byte[] data; + + private UncompressedBytes() { + data = null; + dataSize = 0; + } + + private void reset(DataInputStream in, int length) throws IOException { + if (data == null) { + data = new byte[length]; + } else if (length > data.length) { + data = new byte[Math.max(length, data.length * 2)]; + } + dataSize = -1; + in.readFully(data, 0, length); + dataSize = length; + } + + @Override + public int getSize() { + return dataSize; + } + + @Override + public void writeUncompressedBytes(DataOutputStream outStream) + throws IOException { + outStream.write(data, 0, dataSize); + } + + @Override + public void writeCompressedBytes(DataOutputStream outStream) + throws IllegalArgumentException, IOException { + throw + new IllegalArgumentException("UncompressedBytes cannot be compressed!"); + } + + } // UncompressedBytes + + private static class CompressedBytes implements ValueBytes { + private int dataSize; + private byte[] data; + DataInputBuffer rawData = null; + CompressionCodec codec = null; + CompressionInputStream decompressedStream = null; + + private CompressedBytes(CompressionCodec codec) { + data = null; + dataSize = 0; + this.codec = codec; + } + + private void reset(DataInputStream in, int length) throws IOException { + if (data == null) { + data = new byte[length]; + } else if (length > data.length) { + data = new byte[Math.max(length, data.length * 2)]; + } + dataSize = -1; + in.readFully(data, 0, length); + dataSize = length; + } + + @Override + public int getSize() { + return dataSize; + } + + @Override + public void writeUncompressedBytes(DataOutputStream outStream) + throws IOException { + if (decompressedStream == null) { + rawData = new DataInputBuffer(); + decompressedStream = codec.createInputStream(rawData); + } else { + decompressedStream.resetState(); + } + rawData.reset(data, 0, dataSize); + + byte[] buffer = new byte[8192]; + int bytesRead = 0; + while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { + outStream.write(buffer, 0, bytesRead); + } + } + + @Override + public void writeCompressedBytes(DataOutputStream outStream) + throws IllegalArgumentException, IOException { + outStream.write(data, 0, dataSize); + } + + } // CompressedBytes + + /** + * The class encapsulating with the metadata of a file. + * The metadata of a file is a list of attribute name/value + * pairs of Text type. + * + */ + public static class Metadata implements Writable { + + private TreeMap theMetadata; + + public Metadata() { + this(new TreeMap()); + } + + public Metadata(TreeMap arg) { + if (arg == null) { + this.theMetadata = new TreeMap(); + } else { + this.theMetadata = arg; + } + } + + public Text get(Text name) { + return this.theMetadata.get(name); + } + + public void set(Text name, Text value) { + this.theMetadata.put(name, value); + } + + public TreeMap getMetadata() { + return new TreeMap(this.theMetadata); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(this.theMetadata.size()); + Iterator> iter = + this.theMetadata.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry en = iter.next(); + en.getKey().write(out); + en.getValue().write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int sz = in.readInt(); + if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); + this.theMetadata = new TreeMap(); + for (int i = 0; i < sz; i++) { + Text key = new Text(); + Text val = new Text(); + key.readFields(in); + val.readFields(in); + this.theMetadata.put(key, val); + } + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass() != this.getClass()) { + return false; + } else { + return equals((Metadata)other); + } + } + + public boolean equals(Metadata other) { + if (other == null) return false; + if (this.theMetadata.size() != other.theMetadata.size()) { + return false; + } + Iterator> iter1 = + this.theMetadata.entrySet().iterator(); + Iterator> iter2 = + other.theMetadata.entrySet().iterator(); + while (iter1.hasNext() && iter2.hasNext()) { + Map.Entry en1 = iter1.next(); + Map.Entry en2 = iter2.next(); + if (!en1.getKey().equals(en2.getKey())) { + return false; + } + if (!en1.getValue().equals(en2.getValue())) { + return false; + } + } + if (iter1.hasNext() || iter2.hasNext()) { + return false; + } + return true; + } + + @Override + public int hashCode() { + assert false : "hashCode not designed"; + return 42; // any arbitrary constant will do + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("size: ").append(this.theMetadata.size()).append("\n"); + Iterator> iter = + this.theMetadata.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry en = iter.next(); + sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); + sb.append("\n"); + } + return sb.toString(); + } + } + + /** Write key/value pairs to a sequence-format file. */ + public static class Writer implements java.io.Closeable, Syncable { + private Configuration conf; + FSDataOutputStream out; + boolean ownOutputStream = true; + DataOutputBuffer buffer = new DataOutputBuffer(); + + Class keyClass; + Class valClass; + + private final CompressionType compress; + CompressionCodec codec = null; + CompressionOutputStream deflateFilter = null; + DataOutputStream deflateOut = null; + Metadata metadata = null; + Compressor compressor = null; + + protected Serializer keySerializer; + protected Serializer uncompressedValSerializer; + protected Serializer compressedValSerializer; + + // Insert a globally unique 16-byte value every few entries, so that one + // can seek into the middle of a file and then synchronize with record + // starts and ends by scanning for this value. + long lastSyncPos; // position of last sync + byte[] sync; // 16 random bytes + { + try { + MessageDigest digester = MessageDigest.getInstance("MD5"); + long time = Time.now(); + digester.update((new UID()+"@"+time).getBytes()); + sync = digester.digest(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static interface Option {} + + static class FileOption extends Options.PathOption + implements Option { + FileOption(Path path) { + super(path); + } + } + + /** + * @deprecated only used for backwards-compatibility in the createWriter methods + * that take FileSystem. + */ + @Deprecated + private static class FileSystemOption implements Option { + private final FileSystem value; + protected FileSystemOption(FileSystem value) { + this.value = value; + } + public FileSystem getValue() { + return value; + } + } + + static class StreamOption extends Options.FSDataOutputStreamOption + implements Option { + StreamOption(FSDataOutputStream stream) { + super(stream); + } + } + + static class BufferSizeOption extends Options.IntegerOption + implements Option { + BufferSizeOption(int value) { + super(value); + } + } + + static class BlockSizeOption extends Options.LongOption implements Option { + BlockSizeOption(long value) { + super(value); + } + } + + static class ReplicationOption extends Options.IntegerOption + implements Option { + ReplicationOption(int value) { + super(value); + } + } + + static class KeyClassOption extends Options.ClassOption implements Option { + KeyClassOption(Class value) { + super(value); + } + } + + static class ValueClassOption extends Options.ClassOption + implements Option { + ValueClassOption(Class value) { + super(value); + } + } + + static class MetadataOption implements Option { + private final Metadata value; + MetadataOption(Metadata value) { + this.value = value; + } + Metadata getValue() { + return value; + } + } + + static class ProgressableOption extends Options.ProgressableOption + implements Option { + ProgressableOption(Progressable value) { + super(value); + } + } + + private static class CompressionOption implements Option { + private final CompressionType value; + private final CompressionCodec codec; + CompressionOption(CompressionType value) { + this(value, null); + } + CompressionOption(CompressionType value, CompressionCodec codec) { + this.value = value; + this.codec = (CompressionType.NONE != value && null == codec) + ? new DefaultCodec() + : codec; + } + CompressionType getValue() { + return value; + } + CompressionCodec getCodec() { + return codec; + } + } + + public static Option file(Path value) { + return new FileOption(value); + } + + /** + * @deprecated only used for backwards-compatibility in the createWriter methods + * that take FileSystem. + */ + @Deprecated + private static Option filesystem(FileSystem fs) { + return new SequenceFile.Writer.FileSystemOption(fs); + } + + public static Option bufferSize(int value) { + return new BufferSizeOption(value); + } + + public static Option stream(FSDataOutputStream value) { + return new StreamOption(value); + } + + public static Option replication(short value) { + return new ReplicationOption(value); + } + + public static Option blockSize(long value) { + return new BlockSizeOption(value); + } + + public static Option progressable(Progressable value) { + return new ProgressableOption(value); + } + + public static Option keyClass(Class value) { + return new KeyClassOption(value); + } + + public static Option valueClass(Class value) { + return new ValueClassOption(value); + } + + public static Option metadata(Metadata value) { + return new MetadataOption(value); + } + + public static Option compression(CompressionType value) { + return new CompressionOption(value); + } + + public static Option compression(CompressionType value, + CompressionCodec codec) { + return new CompressionOption(value, codec); + } + + /** + * Construct a uncompressed writer from a set of options. + * @param conf the configuration to use + * @param opts the options used when creating the writer + * @throws IOException if it fails + */ + Writer(Configuration conf, + Option... opts) throws IOException { + BlockSizeOption blockSizeOption = + Options.getOption(BlockSizeOption.class, opts); + BufferSizeOption bufferSizeOption = + Options.getOption(BufferSizeOption.class, opts); + ReplicationOption replicationOption = + Options.getOption(ReplicationOption.class, opts); + ProgressableOption progressOption = + Options.getOption(ProgressableOption.class, opts); + FileOption fileOption = Options.getOption(FileOption.class, opts); + FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); + StreamOption streamOption = Options.getOption(StreamOption.class, opts); + KeyClassOption keyClassOption = + Options.getOption(KeyClassOption.class, opts); + ValueClassOption valueClassOption = + Options.getOption(ValueClassOption.class, opts); + MetadataOption metadataOption = + Options.getOption(MetadataOption.class, opts); + CompressionOption compressionTypeOption = + Options.getOption(CompressionOption.class, opts); + // check consistency of options + if ((fileOption == null) == (streamOption == null)) { + throw new IllegalArgumentException("file or stream must be specified"); + } + if (fileOption == null && (blockSizeOption != null || + bufferSizeOption != null || + replicationOption != null || + progressOption != null)) { + throw new IllegalArgumentException("file modifier options not " + + "compatible with stream"); + } + + FSDataOutputStream out; + boolean ownStream = fileOption != null; + if (ownStream) { + Path p = fileOption.getValue(); + FileSystem fs; + if (fsOption != null) { + fs = fsOption.getValue(); + } else { + fs = p.getFileSystem(conf); + } + int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : + bufferSizeOption.getValue(); + short replication = replicationOption == null ? + fs.getDefaultReplication(p) : + (short) replicationOption.getValue(); + long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : + blockSizeOption.getValue(); + Progressable progress = progressOption == null ? null : + progressOption.getValue(); + out = fs.create(p, true, bufferSize, replication, blockSize, progress); + } else { + out = streamOption.getValue(); + } + Class keyClass = keyClassOption == null ? + Object.class : keyClassOption.getValue(); + Class valueClass = valueClassOption == null ? + Object.class : valueClassOption.getValue(); + Metadata metadata = metadataOption == null ? + new Metadata() : metadataOption.getValue(); + this.compress = compressionTypeOption.getValue(); + final CompressionCodec codec = compressionTypeOption.getCodec(); + if (codec != null && + (codec instanceof GzipCodec) && + !NativeCodeLoader.isNativeCodeLoaded() && + !ZlibFactory.isNativeZlibLoaded(conf)) { + throw new IllegalArgumentException("SequenceFile doesn't work with " + + "GzipCodec without native-hadoop " + + "code!"); + } + init(conf, out, ownStream, keyClass, valueClass, codec, metadata); + } + + /** Create the named file. + * @deprecated Use + * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public Writer(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass) throws IOException { + this.compress = CompressionType.NONE; + init(conf, fs.create(name), true, keyClass, valClass, null, + new Metadata()); + } + + /** Create the named file with write-progress reporter. + * @deprecated Use + * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public Writer(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, + Progressable progress, Metadata metadata) throws IOException { + this.compress = CompressionType.NONE; + init(conf, fs.create(name, progress), true, keyClass, valClass, + null, metadata); + } + + /** Create the named file with write-progress reporter. + * @deprecated Use + * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public Writer(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, + int bufferSize, short replication, long blockSize, + Progressable progress, Metadata metadata) throws IOException { + this.compress = CompressionType.NONE; + init(conf, + fs.create(name, true, bufferSize, replication, blockSize, progress), + true, keyClass, valClass, null, metadata); + } + + boolean isCompressed() { return compress != CompressionType.NONE; } + boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } + + Writer ownStream() { this.ownOutputStream = true; return this; } + + /** Write and flush the file header. */ + private void writeFileHeader() + throws IOException { + out.write(VERSION); + Text.writeString(out, keyClass.getName()); + Text.writeString(out, valClass.getName()); + + out.writeBoolean(this.isCompressed()); + out.writeBoolean(this.isBlockCompressed()); + + if (this.isCompressed()) { + Text.writeString(out, (codec.getClass()).getName()); + } + this.metadata.write(out); + out.write(sync); // write the sync bytes + out.flush(); // flush header + } + + /** Initialize. */ + @SuppressWarnings("unchecked") + void init(Configuration conf, FSDataOutputStream out, boolean ownStream, + Class keyClass, Class valClass, + CompressionCodec codec, Metadata metadata) + throws IOException { + this.conf = conf; + this.out = out; + this.ownOutputStream = ownStream; + this.keyClass = keyClass; + this.valClass = valClass; + this.codec = codec; + this.metadata = metadata; + SerializationFactory serializationFactory = new SerializationFactory(conf); + this.keySerializer = serializationFactory.getSerializer(keyClass); + if (this.keySerializer == null) { + throw new IOException( + "Could not find a serializer for the Key class: '" + + keyClass.getCanonicalName() + "'. " + + "Please ensure that the configuration '" + + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + + "properly configured, if you're using" + + "custom serialization."); + } + this.keySerializer.open(buffer); + this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); + if (this.uncompressedValSerializer == null) { + throw new IOException( + "Could not find a serializer for the Value class: '" + + valClass.getCanonicalName() + "'. " + + "Please ensure that the configuration '" + + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + + "properly configured, if you're using" + + "custom serialization."); + } + this.uncompressedValSerializer.open(buffer); + if (this.codec != null) { + ReflectionUtils.setConf(this.codec, this.conf); + this.compressor = CodecPool.getCompressor(this.codec); + this.deflateFilter = this.codec.createOutputStream(buffer, compressor); + this.deflateOut = + new DataOutputStream(new BufferedOutputStream(deflateFilter)); + this.compressedValSerializer = serializationFactory.getSerializer(valClass); + if (this.compressedValSerializer == null) { + throw new IOException( + "Could not find a serializer for the Value class: '" + + valClass.getCanonicalName() + "'. " + + "Please ensure that the configuration '" + + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + + "properly configured, if you're using" + + "custom serialization."); + } + this.compressedValSerializer.open(deflateOut); + } + writeFileHeader(); + } + + /** Returns the class of keys in this file. */ + public Class getKeyClass() { return keyClass; } + + /** Returns the class of values in this file. */ + public Class getValueClass() { return valClass; } + + /** Returns the compression codec of data in this file. */ + public CompressionCodec getCompressionCodec() { return codec; } + + /** create a sync point */ + public void sync() throws IOException { + if (sync != null && lastSyncPos != out.getPos()) { + out.writeInt(SYNC_ESCAPE); // mark the start of the sync + out.write(sync); // write sync + lastSyncPos = out.getPos(); // update lastSyncPos + } + } + + /** + * flush all currently written data to the file system + * @deprecated Use {@link #hsync()} or {@link #hflush()} instead + */ + @Deprecated + public void syncFs() throws IOException { + if (out != null) { + out.sync(); // flush contents to file system + } + } + + @Override + public void hsync() throws IOException { + if (out != null) { + out.hsync(); + } + } + // Pivotal changes begin + public void hsyncWithSizeUpdate() throws IOException { + if (out != null) { + if (out instanceof HdfsDataOutputStream) { + try { + ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); + } catch (NoSuchMethodError e){ + // We are probably working with an older version of hadoop jars which does not have the + // hsync function with SyncFlag. Use the hsync version that does not update the size. + out.hsync(); + } + } + else { + out.hsync(); + } + } + } + // Pivotal changes end + @Override + public void hflush() throws IOException { + if (out != null) { + out.hflush(); + } + } + + /** Returns the configuration of this file. */ + Configuration getConf() { return conf; } + + /** Close the file. */ + @Override + public synchronized void close() throws IOException { + keySerializer.close(); + uncompressedValSerializer.close(); + if (compressedValSerializer != null) { + compressedValSerializer.close(); + } + + CodecPool.returnCompressor(compressor); + compressor = null; + + if (out != null) { + + // Close the underlying stream iff we own it... + if (ownOutputStream) { + out.close(); + } else { + out.flush(); + } + out = null; + } + } + + synchronized void checkAndWriteSync() throws IOException { + if (sync != null && + out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync + sync(); + } + } + + /** Append a key/value pair. */ + public void append(Writable key, Writable val) + throws IOException { + append((Object) key, (Object) val); + } + + /** Append a key/value pair. */ + @SuppressWarnings("unchecked") + public synchronized void append(Object key, Object val) + throws IOException { + if (key.getClass() != keyClass) + throw new IOException("wrong key class: "+key.getClass().getName() + +" is not "+keyClass); + if (val.getClass() != valClass) + throw new IOException("wrong value class: "+val.getClass().getName() + +" is not "+valClass); + + buffer.reset(); + + // Append the 'key' + keySerializer.serialize(key); + int keyLength = buffer.getLength(); + if (keyLength < 0) + throw new IOException("negative length keys not allowed: " + key); + + // Append the 'value' + if (compress == CompressionType.RECORD) { + deflateFilter.resetState(); + compressedValSerializer.serialize(val); + deflateOut.flush(); + deflateFilter.finish(); + } else { + uncompressedValSerializer.serialize(val); + } + + // Write the record out + checkAndWriteSync(); // sync + out.writeInt(buffer.getLength()); // total record length + out.writeInt(keyLength); // key portion length + out.write(buffer.getData(), 0, buffer.getLength()); // data + } + + public synchronized void appendRaw(byte[] keyData, int keyOffset, + int keyLength, ValueBytes val) throws IOException { + if (keyLength < 0) + throw new IOException("negative length keys not allowed: " + keyLength); + + int valLength = val.getSize(); + + checkAndWriteSync(); + + out.writeInt(keyLength+valLength); // total record length + out.writeInt(keyLength); // key portion length + out.write(keyData, keyOffset, keyLength); // key + val.writeUncompressedBytes(out); // value + } + + /** Returns the current length of the output file. + * + *

This always returns a synchronized position. In other words, + * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position + * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However + * the key may be earlier in the file than key last written when this + * method was called (e.g., with block-compression, it may be the first key + * in the block that was being written when this method was called). + */ + public synchronized long getLength() throws IOException { + return out.getPos(); + } + + } // class Writer + + /** Write key/compressed-value pairs to a sequence-format file. */ + static class RecordCompressWriter extends Writer { + + RecordCompressWriter(Configuration conf, + Option... options) throws IOException { + super(conf, options); + } + + /** Append a key/value pair. */ + @Override + @SuppressWarnings("unchecked") + public synchronized void append(Object key, Object val) + throws IOException { + if (key.getClass() != keyClass) + throw new IOException("wrong key class: "+key.getClass().getName() + +" is not "+keyClass); + if (val.getClass() != valClass) + throw new IOException("wrong value class: "+val.getClass().getName() + +" is not "+valClass); + + buffer.reset(); + + // Append the 'key' + keySerializer.serialize(key); + int keyLength = buffer.getLength(); + if (keyLength < 0) + throw new IOException("negative length keys not allowed: " + key); + + // Compress 'value' and append it + deflateFilter.resetState(); + compressedValSerializer.serialize(val); + deflateOut.flush(); + deflateFilter.finish(); + + // Write the record out + checkAndWriteSync(); // sync + out.writeInt(buffer.getLength()); // total record length + out.writeInt(keyLength); // key portion length + out.write(buffer.getData(), 0, buffer.getLength()); // data + } + + /** Append a key/value pair. */ + @Override + public synchronized void appendRaw(byte[] keyData, int keyOffset, + int keyLength, ValueBytes val) throws IOException { + + if (keyLength < 0) + throw new IOException("negative length keys not allowed: " + keyLength); + + int valLength = val.getSize(); + + checkAndWriteSync(); // sync + out.writeInt(keyLength+valLength); // total record length + out.writeInt(keyLength); // key portion length + out.write(keyData, keyOffset, keyLength); // 'key' data + val.writeCompressedBytes(out); // 'value' data + } + + } // RecordCompressionWriter + + /** Write compressed key/value blocks to a sequence-format file. */ + static class BlockCompressWriter extends Writer { + + private int noBufferedRecords = 0; + + private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); + private DataOutputBuffer keyBuffer = new DataOutputBuffer(); + + private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); + private DataOutputBuffer valBuffer = new DataOutputBuffer(); + + private final int compressionBlockSize; + + BlockCompressWriter(Configuration conf, + Option... options) throws IOException { + super(conf, options); + compressionBlockSize = + conf.getInt("io.seqfile.compress.blocksize", 1000000); + keySerializer.close(); + keySerializer.open(keyBuffer); + uncompressedValSerializer.close(); + uncompressedValSerializer.open(valBuffer); + } + + /** Workhorse to check and write out compressed data/lengths */ + private synchronized + void writeBuffer(DataOutputBuffer uncompressedDataBuffer) + throws IOException { + deflateFilter.resetState(); + buffer.reset(); + deflateOut.write(uncompressedDataBuffer.getData(), 0, + uncompressedDataBuffer.getLength()); + deflateOut.flush(); + deflateFilter.finish(); + + WritableUtils.writeVInt(out, buffer.getLength()); + out.write(buffer.getData(), 0, buffer.getLength()); + } + + /** Compress and flush contents to dfs */ + @Override + public synchronized void sync() throws IOException { + if (noBufferedRecords > 0) { + super.sync(); + + // No. of records + WritableUtils.writeVInt(out, noBufferedRecords); + + // Write 'keys' and lengths + writeBuffer(keyLenBuffer); + writeBuffer(keyBuffer); + + // Write 'values' and lengths + writeBuffer(valLenBuffer); + writeBuffer(valBuffer); + + // Flush the file-stream + out.flush(); + + // Reset internal states + keyLenBuffer.reset(); + keyBuffer.reset(); + valLenBuffer.reset(); + valBuffer.reset(); + noBufferedRecords = 0; + } + + } + + /** Close the file. */ + @Override + public synchronized void close() throws IOException { + if (out != null) { + sync(); + } + super.close(); + } + + /** Append a key/value pair. */ + @Override + @SuppressWarnings("unchecked") + public synchronized void append(Object key, Object val) + throws IOException { + if (key.getClass() != keyClass) + throw new IOException("wrong key class: "+key+" is not "+keyClass); + if (val.getClass() != valClass) + throw new IOException("wrong value class: "+val+" is not "+valClass); + + // Save key/value into respective buffers + int oldKeyLength = keyBuffer.getLength(); + keySerializer.serialize(key); + int keyLength = keyBuffer.getLength() - oldKeyLength; + if (keyLength < 0) + throw new IOException("negative length keys not allowed: " + key); + WritableUtils.writeVInt(keyLenBuffer, keyLength); + + int oldValLength = valBuffer.getLength(); + uncompressedValSerializer.serialize(val); + int valLength = valBuffer.getLength() - oldValLength; + WritableUtils.writeVInt(valLenBuffer, valLength); + + // Added another key/value pair + ++noBufferedRecords; + + // Compress and flush? + int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); + if (currentBlockSize >= compressionBlockSize) { + sync(); + } + } + + /** Append a key/value pair. */ + @Override + public synchronized void appendRaw(byte[] keyData, int keyOffset, + int keyLength, ValueBytes val) throws IOException { + + if (keyLength < 0) + throw new IOException("negative length keys not allowed"); + + int valLength = val.getSize(); + + // Save key/value data in relevant buffers + WritableUtils.writeVInt(keyLenBuffer, keyLength); + keyBuffer.write(keyData, keyOffset, keyLength); + WritableUtils.writeVInt(valLenBuffer, valLength); + val.writeUncompressedBytes(valBuffer); + + // Added another key/value pair + ++noBufferedRecords; + + // Compress and flush? + int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); + if (currentBlockSize >= compressionBlockSize) { + sync(); + } + } + + } // BlockCompressionWriter + + /** Get the configured buffer size */ + private static int getBufferSize(Configuration conf) { + return conf.getInt("io.file.buffer.size", 4096); + } + + /** Reads key/value pairs from a sequence-format file. */ + public static class Reader implements java.io.Closeable { + private String filename; + private FSDataInputStream in; + private DataOutputBuffer outBuf = new DataOutputBuffer(); + + private byte version; + + private String keyClassName; + private String valClassName; + private Class keyClass; + private Class valClass; + + private CompressionCodec codec = null; + private Metadata metadata = null; + + private byte[] sync = new byte[SYNC_HASH_SIZE]; + private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; + private boolean syncSeen; + + private long headerEnd; + private long end; + private int keyLength; + private int recordLength; + + private boolean decompress; + private boolean blockCompressed; + + private Configuration conf; + + private int noBufferedRecords = 0; + private boolean lazyDecompress = true; + private boolean valuesDecompressed = true; + + private int noBufferedKeys = 0; + private int noBufferedValues = 0; + + private DataInputBuffer keyLenBuffer = null; + private CompressionInputStream keyLenInFilter = null; + private DataInputStream keyLenIn = null; + private Decompressor keyLenDecompressor = null; + private DataInputBuffer keyBuffer = null; + private CompressionInputStream keyInFilter = null; + private DataInputStream keyIn = null; + private Decompressor keyDecompressor = null; + + private DataInputBuffer valLenBuffer = null; + private CompressionInputStream valLenInFilter = null; + private DataInputStream valLenIn = null; + private Decompressor valLenDecompressor = null; + private DataInputBuffer valBuffer = null; + private CompressionInputStream valInFilter = null; + private DataInputStream valIn = null; + private Decompressor valDecompressor = null; + + private Deserializer keyDeserializer; + private Deserializer valDeserializer; + + /** + * A tag interface for all of the Reader options + */ + public static interface Option {} + + /** + * Create an option to specify the path name of the sequence file. + * @param value the path to read + * @return a new option + */ + public static Option file(Path value) { + return new FileOption(value); + } + + /** + * Create an option to specify the stream with the sequence file. + * @param value the stream to read. + * @return a new option + */ + public static Option stream(FSDataInputStream value) { + return new InputStreamOption(value); + } + + /** + * Create an option to specify the starting byte to read. + * @param value the number of bytes to skip over + * @return a new option + */ + public static Option start(long value) { + return new StartOption(value); + } + + /** + * Create an option to specify the number of bytes to read. + * @param value the number of bytes to read + * @return a new option + */ + public static Option length(long value) { + return new LengthOption(value); + } + + /** + * Create an option with the buffer size for reading the given pathname. + * @param value the number of bytes to buffer + * @return a new option + */ + public static Option bufferSize(int value) { + return new BufferSizeOption(value); + } + + private static class FileOption extends Options.PathOption + implements Option { + private FileOption(Path value) { + super(value); + } + } + + private static class InputStreamOption + extends Options.FSDataInputStreamOption + implements Option { + private InputStreamOption(FSDataInputStream value) { + super(value); + } + } + + private static class StartOption extends Options.LongOption + implements Option { + private StartOption(long value) { + super(value); + } + } + + private static class LengthOption extends Options.LongOption + implements Option { + private LengthOption(long value) { + super(value); + } + } + + private static class BufferSizeOption extends Options.IntegerOption + implements Option { + private BufferSizeOption(int value) { + super(value); + } + } + + // only used directly + private static class OnlyHeaderOption extends Options.BooleanOption + implements Option { + private OnlyHeaderOption() { + super(true); + } + } + + public Reader(Configuration conf, Option... opts) throws IOException { + // Look up the options, these are null if not set + FileOption fileOpt = Options.getOption(FileOption.class, opts); + InputStreamOption streamOpt = + Options.getOption(InputStreamOption.class, opts); + StartOption startOpt = Options.getOption(StartOption.class, opts); + LengthOption lenOpt = Options.getOption(LengthOption.class, opts); + BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); + OnlyHeaderOption headerOnly = + Options.getOption(OnlyHeaderOption.class, opts); + // check for consistency + if ((fileOpt == null) == (streamOpt == null)) { + throw new + IllegalArgumentException("File or stream option must be specified"); + } + if (fileOpt == null && bufOpt != null) { + throw new IllegalArgumentException("buffer size can only be set when" + + " a file is specified."); + } + // figure out the real values + Path filename = null; + FSDataInputStream file; + final long len; + if (fileOpt != null) { + filename = fileOpt.getValue(); + FileSystem fs = filename.getFileSystem(conf); + int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); + len = null == lenOpt + ? fs.getFileStatus(filename).getLen() + : lenOpt.getValue(); + file = openFile(fs, filename, bufSize, len); + } else { + len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); + file = streamOpt.getValue(); + } + long start = startOpt == null ? 0 : startOpt.getValue(); + // really set up + initialize(filename, file, start, len, conf, headerOnly != null); + } + + /** + * Construct a reader by opening a file from the given file system. + * @param fs The file system used to open the file. + * @param file The file being read. + * @param conf Configuration + * @throws IOException + * @deprecated Use Reader(Configuration, Option...) instead. + */ + @Deprecated + public Reader(FileSystem fs, Path file, + Configuration conf) throws IOException { + this(conf, file(file.makeQualified(fs))); + } + + /** + * Construct a reader by the given input stream. + * @param in An input stream. + * @param buffersize unused + * @param start The starting position. + * @param length The length being read. + * @param conf Configuration + * @throws IOException + * @deprecated Use Reader(Configuration, Reader.Option...) instead. + */ + @Deprecated + public Reader(FSDataInputStream in, int buffersize, + long start, long length, Configuration conf) throws IOException { + this(conf, stream(in), start(start), length(length)); + } + + /** Common work of the constructors. */ + private void initialize(Path filename, FSDataInputStream in, + long start, long length, Configuration conf, + boolean tempReader) throws IOException { + if (in == null) { + throw new IllegalArgumentException("in == null"); + } + this.filename = filename == null ? "" : filename.toString(); + this.in = in; + this.conf = conf; + boolean succeeded = false; + try { + seek(start); + this.end = this.in.getPos() + length; + // if it wrapped around, use the max + if (end < length) { + end = Long.MAX_VALUE; + } + init(tempReader); + succeeded = true; + } finally { + if (!succeeded) { + IOUtils.cleanup(LOG, this.in); + } + } + } + + /** + * Override this method to specialize the type of + * {@link FSDataInputStream} returned. + * @param fs The file system used to open the file. + * @param file The file being read. + * @param bufferSize The buffer size used to read the file. + * @param length The length being read if it is >= 0. Otherwise, + * the length is not available. + * @return The opened stream. + * @throws IOException + */ + protected FSDataInputStream openFile(FileSystem fs, Path file, + int bufferSize, long length) throws IOException { + return fs.open(file, bufferSize); + } + + /** + * Initialize the {@link Reader} + * @param tempReader true if we are constructing a temporary + * reader {@link SequenceFile.Sorter#cloneFileAttributes}, + * and hence do not initialize every component; + * false otherwise. + * @throws IOException + */ + private void init(boolean tempReader) throws IOException { + byte[] versionBlock = new byte[VERSION.length]; + in.readFully(versionBlock); + + if ((versionBlock[0] != VERSION[0]) || + (versionBlock[1] != VERSION[1]) || + (versionBlock[2] != VERSION[2])) + throw new IOException(this + " not a SequenceFile"); + + // Set 'version' + version = versionBlock[3]; + if (version > VERSION[3]) + throw new VersionMismatchException(VERSION[3], version); + + if (version < BLOCK_COMPRESS_VERSION) { + UTF8 className = new UTF8(); + + className.readFields(in); + keyClassName = className.toString(); // key class name + + className.readFields(in); + valClassName = className.toString(); // val class name + } else { + keyClassName = Text.readString(in); + valClassName = Text.readString(in); + } + + if (version > 2) { // if version > 2 + this.decompress = in.readBoolean(); // is compressed? + } else { + decompress = false; + } + + if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 + this.blockCompressed = in.readBoolean(); // is block-compressed? + } else { + blockCompressed = false; + } + + // if version >= 5 + // setup the compression codec + if (decompress) { + if (version >= CUSTOM_COMPRESS_VERSION) { + String codecClassname = Text.readString(in); + try { + Class codecClass + = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); + this.codec = ReflectionUtils.newInstance(codecClass, conf); + } catch (ClassNotFoundException cnfe) { + throw new IllegalArgumentException("Unknown codec: " + + codecClassname, cnfe); + } + } else { + codec = new DefaultCodec(); + ((Configurable)codec).setConf(conf); + } + } + + this.metadata = new Metadata(); + if (version >= VERSION_WITH_METADATA) { // if version >= 6 + this.metadata.readFields(in); + } + + if (version > 1) { // if version > 1 + in.readFully(sync); // read sync bytes + headerEnd = in.getPos(); // record end of header + } + + // Initialize... *not* if this we are constructing a temporary Reader + if (!tempReader) { + valBuffer = new DataInputBuffer(); + if (decompress) { + valDecompressor = CodecPool.getDecompressor(codec); + valInFilter = codec.createInputStream(valBuffer, valDecompressor); + valIn = new DataInputStream(valInFilter); + } else { + valIn = valBuffer; + } + + if (blockCompressed) { + keyLenBuffer = new DataInputBuffer(); + keyBuffer = new DataInputBuffer(); + valLenBuffer = new DataInputBuffer(); + + keyLenDecompressor = CodecPool.getDecompressor(codec); + keyLenInFilter = codec.createInputStream(keyLenBuffer, + keyLenDecompressor); + keyLenIn = new DataInputStream(keyLenInFilter); + + keyDecompressor = CodecPool.getDecompressor(codec); + keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); + keyIn = new DataInputStream(keyInFilter); + + valLenDecompressor = CodecPool.getDecompressor(codec); + valLenInFilter = codec.createInputStream(valLenBuffer, + valLenDecompressor); + valLenIn = new DataInputStream(valLenInFilter); + } + + SerializationFactory serializationFactory = + new SerializationFactory(conf); + this.keyDeserializer = + getDeserializer(serializationFactory, getKeyClass()); + if (this.keyDeserializer == null) { + throw new IOException( + "Could not find a deserializer for the Key class: '" + + getKeyClass().getCanonicalName() + "'. " + + "Please ensure that the configuration '" + + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + + "properly configured, if you're using " + + "custom serialization."); + } + if (!blockCompressed) { + this.keyDeserializer.open(valBuffer); + } else { + this.keyDeserializer.open(keyIn); + } + this.valDeserializer = + getDeserializer(serializationFactory, getValueClass()); + if (this.valDeserializer == null) { + throw new IOException( + "Could not find a deserializer for the Value class: '" + + getValueClass().getCanonicalName() + "'. " + + "Please ensure that the configuration '" + + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + + "properly configured, if you're using " + + "custom serialization."); + } + this.valDeserializer.open(valIn); + } + } + + @SuppressWarnings("unchecked") + private Deserializer getDeserializer(SerializationFactory sf, Class c) { + return sf.getDeserializer(c); + } + + /** Close the file. */ + @Override + public synchronized void close() throws IOException { + // Return the decompressors to the pool + CodecPool.returnDecompressor(keyLenDecompressor); + CodecPool.returnDecompressor(keyDecompressor); + CodecPool.returnDecompressor(valLenDecompressor); + CodecPool.returnDecompressor(valDecompressor); + keyLenDecompressor = keyDecompressor = null; + valLenDecompressor = valDecompressor = null; + + if (keyDeserializer != null) { + keyDeserializer.close(); + } + if (valDeserializer != null) { + valDeserializer.close(); + } + + // Close the input-stream + in.close(); + } + + /** Returns the name of the key class. */ + public String getKeyClassName() { + return keyClassName; + } + + /** Returns the class of keys in this file. */ + public synchronized Class getKeyClass() { + if (null == keyClass) { + try { + keyClass = WritableName.getClass(getKeyClassName(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return keyClass; + } + + /** Returns the name of the value class. */ + public String getValueClassName() { + return valClassName; + } + + /** Returns the class of values in this file. */ + public synchronized Class getValueClass() { + if (null == valClass) { + try { + valClass = WritableName.getClass(getValueClassName(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return valClass; + } + + /** Returns true if values are compressed. */ + public boolean isCompressed() { return decompress; } + + /** Returns true if records are block-compressed. */ + public boolean isBlockCompressed() { return blockCompressed; } + + /** Returns the compression codec of data in this file. */ + public CompressionCodec getCompressionCodec() { return codec; } + + /** + * Get the compression type for this file. + * @return the compression type + */ + public CompressionType getCompressionType() { + if (decompress) { + return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; + } else { + return CompressionType.NONE; + } + } + + /** Returns the metadata object of the file */ + public Metadata getMetadata() { + return this.metadata; + } + + /** Returns the configuration used for this file. */ + Configuration getConf() { return conf; } + + /** Read a compressed buffer */ + private synchronized void readBuffer(DataInputBuffer buffer, + CompressionInputStream filter) throws IOException { + // Read data into a temporary buffer + DataOutputBuffer dataBuffer = new DataOutputBuffer(); + + try { + int dataBufferLength = WritableUtils.readVInt(in); + dataBuffer.write(in, dataBufferLength); + + // Set up 'buffer' connected to the input-stream + buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); + } finally { + dataBuffer.close(); + } + + // Reset the codec + filter.resetState(); + } + + /** Read the next 'compressed' block */ + private synchronized void readBlock() throws IOException { + // Check if we need to throw away a whole block of + // 'values' due to 'lazy decompression' + if (lazyDecompress && !valuesDecompressed) { + in.seek(WritableUtils.readVInt(in)+in.getPos()); + in.seek(WritableUtils.readVInt(in)+in.getPos()); + } + + // Reset internal states + noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; + valuesDecompressed = false; + + //Process sync + if (sync != null) { + in.readInt(); + in.readFully(syncCheck); // read syncCheck + if (!Arrays.equals(sync, syncCheck)) // check it + throw new IOException("File is corrupt!"); + } + syncSeen = true; + + // Read number of records in this block + noBufferedRecords = WritableUtils.readVInt(in); + + // Read key lengths and keys + readBuffer(keyLenBuffer, keyLenInFilter); + readBuffer(keyBuffer, keyInFilter); + noBufferedKeys = noBufferedRecords; + + // Read value lengths and values + if (!lazyDecompress) { + readBuffer(valLenBuffer, valLenInFilter); + readBuffer(valBuffer, valInFilter); + noBufferedValues = noBufferedRecords; + valuesDecompressed = true; + } + } + + /** + * Position valLenIn/valIn to the 'value' + * corresponding to the 'current' key + */ + private synchronized void seekToCurrentValue() throws IOException { + if (!blockCompressed) { + if (decompress) { + valInFilter.resetState(); + } + valBuffer.reset(); + } else { + // Check if this is the first value in the 'block' to be read + if (lazyDecompress && !valuesDecompressed) { + // Read the value lengths and values + readBuffer(valLenBuffer, valLenInFilter); + readBuffer(valBuffer, valInFilter); + noBufferedValues = noBufferedRecords; + valuesDecompressed = true; + } + + // Calculate the no. of bytes to skip + // Note: 'current' key has already been read! + int skipValBytes = 0; + int currentKey = noBufferedKeys + 1; + for (int i=noBufferedValues; i > currentKey; --i) { + skipValBytes += WritableUtils.readVInt(valLenIn); + --noBufferedValues; + } + + // Skip to the 'val' corresponding to 'current' key + if (skipValBytes > 0) { + if (valIn.skipBytes(skipValBytes) != skipValBytes) { + throw new IOException("Failed to seek to " + currentKey + + "(th) value!"); + } + } + } + } + + /** + * Get the 'value' corresponding to the last read 'key'. + * @param val : The 'value' to be read. + * @throws IOException + */ + public synchronized void getCurrentValue(Writable val) + throws IOException { + if (val instanceof Configurable) { + ((Configurable) val).setConf(this.conf); + } + + // Position stream to 'current' value + seekToCurrentValue(); + + if (!blockCompressed) { + val.readFields(valIn); + + if (valIn.read() > 0) { + LOG.info("available bytes: " + valIn.available()); + throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) + + " bytes, should read " + + (valBuffer.getLength()-keyLength)); + } + } else { + // Get the value + int valLength = WritableUtils.readVInt(valLenIn); + val.readFields(valIn); + + // Read another compressed 'value' + --noBufferedValues; + + // Sanity check + if ((valLength < 0) && LOG.isDebugEnabled()) { + LOG.debug(val + " is a zero-length value"); + } + } + + } + + /** + * Get the 'value' corresponding to the last read 'key'. + * @param val : The 'value' to be read. + * @throws IOException + */ + public synchronized Object getCurrentValue(Object val) + throws IOException { + if (val instanceof Configurable) { + ((Configurable) val).setConf(this.conf); + } + + // Position stream to 'current' value + seekToCurrentValue(); + + if (!blockCompressed) { + val = deserializeValue(val); + + if (valIn.read() > 0) { + LOG.info("available bytes: " + valIn.available()); + throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) + + " bytes, should read " + + (valBuffer.getLength()-keyLength)); + } + } else { + // Get the value + int valLength = WritableUtils.readVInt(valLenIn); + val = deserializeValue(val); + + // Read another compressed 'value' + --noBufferedValues; + + // Sanity check + if ((valLength < 0) && LOG.isDebugEnabled()) { + LOG.debug(val + " is a zero-length value"); + } + } + return val; + + } + + @SuppressWarnings("unchecked") + private Object deserializeValue(Object val) throws IOException { + return valDeserializer.deserialize(val); + } + + /** Read the next key in the file into key, skipping its + * value. True if another entry exists, and false at end of file. */ + public synchronized boolean next(Writable key) throws IOException { + if (key.getClass() != getKeyClass()) + throw new IOException("wrong key class: "+key.getClass().getName() + +" is not "+keyClass); + + if (!blockCompressed) { + outBuf.reset(); + + keyLength = next(outBuf); + if (keyLength < 0) + return false; + + valBuffer.reset(outBuf.getData(), outBuf.getLength()); + + key.readFields(valBuffer); + valBuffer.mark(0); + if (valBuffer.getPosition() != keyLength) + throw new IOException(key + " read " + valBuffer.getPosition() + + " bytes, should read " + keyLength); + } else { + //Reset syncSeen + syncSeen = false; + + if (noBufferedKeys == 0) { + try { + readBlock(); + } catch (EOFException eof) { + return false; + } + } + + int keyLength = WritableUtils.readVInt(keyLenIn); + + // Sanity check + if (keyLength < 0) { + return false; + } + + //Read another compressed 'key' + key.readFields(keyIn); + --noBufferedKeys; + } + + return true; + } + + /** Read the next key/value pair in the file into key and + * val. Returns true if such a pair exists and false when at + * end of file */ + public synchronized boolean next(Writable key, Writable val) + throws IOException { + if (val.getClass() != getValueClass()) + throw new IOException("wrong value class: "+val+" is not "+valClass); + + boolean more = next(key); + + if (more) { + getCurrentValue(val); + } + + return more; + } + + /** + * Read and return the next record length, potentially skipping over + * a sync block. + * @return the length of the next record or -1 if there is no next record + * @throws IOException + */ + private synchronized int readRecordLength() throws IOException { + if (in.getPos() >= end) { + return -1; + } + int length = in.readInt(); + if (version > 1 && sync != null && + length == SYNC_ESCAPE) { // process a sync entry + in.readFully(syncCheck); // read syncCheck + if (!Arrays.equals(sync, syncCheck)) // check it + throw new IOException("File is corrupt!"); + syncSeen = true; + if (in.getPos() >= end) { + return -1; + } + length = in.readInt(); // re-read length + } else { + syncSeen = false; + } + + return length; + } + + /** Read the next key/value pair in the file into buffer. + * Returns the length of the key read, or -1 if at end of file. The length + * of the value may be computed by calling buffer.getLength() before and + * after calls to this method. */ + /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ + @Deprecated + synchronized int next(DataOutputBuffer buffer) throws IOException { + // Unsupported for block-compressed sequence files + if (blockCompressed) { + throw new IOException("Unsupported call for block-compressed" + + " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); + } + try { + int length = readRecordLength(); + if (length == -1) { + return -1; + } + int keyLength = in.readInt(); + buffer.write(in, length); + return keyLength; + } catch (ChecksumException e) { // checksum failure + handleChecksumException(e); + return next(buffer); + } + } + + public ValueBytes createValueBytes() { + ValueBytes val = null; + if (!decompress || blockCompressed) { + val = new UncompressedBytes(); + } else { + val = new CompressedBytes(codec); + } + return val; + } + + /** + * Read 'raw' records. + * @param key - The buffer into which the key is read + * @param val - The 'raw' value + * @return Returns the total record length or -1 for end of file + * @throws IOException + */ + public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) + throws IOException { + if (!blockCompressed) { + int length = readRecordLength(); + if (length == -1) { + return -1; + } + int keyLength = in.readInt(); + int valLength = length - keyLength; + key.write(in, keyLength); + if (decompress) { + CompressedBytes value = (CompressedBytes)val; + value.reset(in, valLength); + } else { + UncompressedBytes value = (UncompressedBytes)val; + value.reset(in, valLength); + } + + return length; + } else { + //Reset syncSeen + syncSeen = false; + + // Read 'key' + if (noBufferedKeys == 0) { + if (in.getPos() >= end) + return -1; + + try { + readBlock(); + } catch (EOFException eof) { + return -1; + } + } + int keyLength = WritableUtils.readVInt(keyLenIn); + if (keyLength < 0) { + throw new IOException("zero length key found!"); + } + key.write(keyIn, keyLength); + --noBufferedKeys; + + // Read raw 'value' + seekToCurrentValue(); + int valLength = WritableUtils.readVInt(valLenIn); + UncompressedBytes rawValue = (UncompressedBytes)val; + rawValue.reset(valIn, valLength); + --noBufferedValues; + + return (keyLength+valLength); + } + + } + + /** + * Read 'raw' keys. + * @param ke