Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7E273200BD5 for ; Thu, 8 Dec 2016 20:07:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7C9FC160B0A; Thu, 8 Dec 2016 19:07:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 55E98160B1F for ; Thu, 8 Dec 2016 20:07:38 +0100 (CET) Received: (qmail 29866 invoked by uid 500); 8 Dec 2016 19:07:37 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 29780 invoked by uid 99); 8 Dec 2016 19:07:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Dec 2016 19:07:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 473D9E040F; Thu, 8 Dec 2016 19:07:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Date: Thu, 08 Dec 2016 19:07:37 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] hive git commit: HIVE-14453 : refactor physical writing of ORC data and metadata to FS from the logical writers (Sergey Shelukhin, reviewed by Prasanth Jayachandran) archived-at: Thu, 08 Dec 2016 19:07:40 -0000 Repository: hive Updated Branches: refs/heads/master 84b7fc5bd -> 777477f25 HIVE-14453 : refactor physical writing of ORC data and metadata to FS from the logical writers (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/65d8fae0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/65d8fae0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/65d8fae0 Branch: refs/heads/master Commit: 65d8fae07034e640f13e5b5c88db57204d4e0787 Parents: 84b7fc5 Author: Sergey Shelukhin Authored: Thu Dec 8 11:04:42 2016 -0800 Committer: Sergey Shelukhin Committed: Thu Dec 8 11:04:42 2016 -0800 ---------------------------------------------------------------------- .../llap/io/decode/OrcEncodedDataConsumer.java | 3 +- orc/src/java/org/apache/orc/impl/OrcTail.java | 2 +- .../org/apache/orc/impl/PhysicalFsWriter.java | 529 ++++++++++++++++++ .../org/apache/orc/impl/PhysicalWriter.java | 123 +++++ .../java/org/apache/orc/impl/ReaderImpl.java | 6 +- .../org/apache/orc/impl/RecordReaderUtils.java | 2 +- .../java/org/apache/orc/impl/WriterImpl.java | 535 ++----------------- .../org/apache/orc/impl/TestOrcWideTable.java | 12 +- .../hadoop/hive/ql/io/orc/WriterImpl.java | 8 + .../hive/ql/io/orc/TestInputOutputFormat.java | 25 +- 10 files changed, 743 insertions(+), 502 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 2cb7f79..29f1ba8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; import org.apache.orc.OrcUtils; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.PhysicalFsWriter; import org.apache.orc.impl.TreeReaderFactory; import org.apache.orc.impl.TreeReaderFactory.StructTreeReader; import org.apache.orc.impl.TreeReaderFactory.TreeReader; @@ -85,7 +86,7 @@ public class OrcEncodedDataConsumer assert fileMetadata == null; fileMetadata = f; stripes = new OrcStripeMetadata[f.getStripes().size()]; - codec = WriterImpl.createCodec(fileMetadata.getCompressionKind()); + codec = PhysicalFsWriter.createCodec(fileMetadata.getCompressionKind()); } public void setStripeMetadata(OrcStripeMetadata m) { http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/OrcTail.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/OrcTail.java b/orc/src/java/org/apache/orc/impl/OrcTail.java index b5f85fb..f095603 100644 --- a/orc/src/java/org/apache/orc/impl/OrcTail.java +++ b/orc/src/java/org/apache/orc/impl/OrcTail.java @@ -87,7 +87,7 @@ public final class OrcTail { } public CompressionCodec getCompressionCodec() { - return WriterImpl.createCodec(getCompressionKind()); + return PhysicalFsWriter.createCodec(getCompressionKind()); } public int getCompressionBufferSize() { http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java new file mode 100644 index 0000000..ba8c13f --- /dev/null +++ b/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -0,0 +1,529 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionCodec; +import org.apache.orc.CompressionCodec.Modifier; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.OrcFile.CompressionStrategy; +import org.apache.orc.OrcProto; +import org.apache.orc.OrcProto.BloomFilterIndex; +import org.apache.orc.OrcProto.Footer; +import org.apache.orc.OrcProto.Metadata; +import org.apache.orc.OrcProto.PostScript; +import org.apache.orc.OrcProto.Stream.Kind; +import org.apache.orc.OrcProto.StripeFooter; +import org.apache.orc.OrcProto.StripeInformation; +import org.apache.orc.OrcProto.RowIndex.Builder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.CodedOutputStream; + +public class PhysicalFsWriter implements PhysicalWriter { + private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class); + + private static final int HDFS_BUFFER_SIZE = 256 * 1024; + + private FSDataOutputStream rawWriter = null; + // the compressed metadata information outStream + private OutStream writer = null; + // a protobuf outStream around streamFactory + private CodedOutputStream protobufWriter = null; + + private final FileSystem fs; + private final Path path; + private final long blockSize; + private final int bufferSize; + private final CompressionCodec codec; + private final double paddingTolerance; + private final long defaultStripeSize; + private final CompressionKind compress; + private final boolean addBlockPadding; + private final CompressionStrategy compressionStrategy; + + // the streams that make up the current stripe + private final Map streams = + new TreeMap(); + + private long adjustedStripeSize; + private long headerLength; + private long stripeStart; + private int metadataLength; + private int footerLength; + + public PhysicalFsWriter(FileSystem fs, Path path, int numColumns, OrcFile.WriterOptions opts) { + this.fs = fs; + this.path = path; + this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize(); + this.addBlockPadding = opts.getBlockPadding(); + if (opts.isEnforceBufferSize()) { + this.bufferSize = opts.getBufferSize(); + } else { + this.bufferSize = getEstimatedBufferSize(defaultStripeSize, numColumns, opts.getBufferSize()); + } + this.compress = opts.getCompress(); + this.compressionStrategy = opts.getCompressionStrategy(); + codec = createCodec(compress); + this.paddingTolerance = opts.getPaddingTolerance(); + this.blockSize = opts.getBlockSize(); + LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" + + " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize, + compress, bufferSize); + } + + @Override + public void initialize() throws IOException { + if (rawWriter != null) return; + rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE, + fs.getDefaultReplication(path), blockSize); + rawWriter.writeBytes(OrcFile.MAGIC); + headerLength = rawWriter.getPos(); + writer = new OutStream("metadata", bufferSize, codec, + new DirectStream(rawWriter)); + protobufWriter = CodedOutputStream.newInstance(writer); + } + + private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException { + this.stripeStart = rawWriter.getPos(); + final long currentStripeSize = indexSize + dataSize + footerSize; + final long available = blockSize - (stripeStart % blockSize); + final long overflow = currentStripeSize - adjustedStripeSize; + final float availRatio = (float) available / (float) defaultStripeSize; + + if (availRatio > 0.0f && availRatio < 1.0f + && availRatio > paddingTolerance) { + // adjust default stripe size to fit into remaining space, also adjust + // the next stripe for correction based on the current stripe size + // and user specified padding tolerance. Since stripe size can overflow + // the default stripe size we should apply this correction to avoid + // writing portion of last stripe to next hdfs block. + double correction = overflow > 0 ? (double) overflow + / (double) adjustedStripeSize : 0.0; + + // correction should not be greater than user specified padding + // tolerance + correction = correction > paddingTolerance ? paddingTolerance + : correction; + + // adjust next stripe size based on current stripe estimate correction + adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize)); + } else if (availRatio >= 1.0) { + adjustedStripeSize = defaultStripeSize; + } + + if (availRatio < paddingTolerance && addBlockPadding) { + long padding = blockSize - (stripeStart % blockSize); + byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; + LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)", + padding, availRatio, defaultStripeSize)); + stripeStart += padding; + while (padding > 0) { + int writeLen = (int) Math.min(padding, pad.length); + rawWriter.write(pad, 0, writeLen); + padding -= writeLen; + } + adjustedStripeSize = defaultStripeSize; + } else if (currentStripeSize < blockSize + && (stripeStart % blockSize) + currentStripeSize > blockSize) { + // even if you don't pad, reset the default stripe size when crossing a + // block boundary + adjustedStripeSize = defaultStripeSize; + } + } + + /** + * An output receiver that writes the ByteBuffers to the output stream + * as they are received. + */ + private class DirectStream implements OutStream.OutputReceiver { + private final FSDataOutputStream output; + + DirectStream(FSDataOutputStream output) { + this.output = output; + } + + @Override + public void output(ByteBuffer buffer) throws IOException { + output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } + } + + @Override + public long getPhysicalStripeSize() { + return adjustedStripeSize; + } + + @Override + public boolean isCompressed() { + return codec != null; + } + + + public static CompressionCodec createCodec(CompressionKind kind) { + switch (kind) { + case NONE: + return null; + case ZLIB: + return new ZlibCodec(); + case SNAPPY: + return new SnappyCodec(); + case LZO: + try { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + if (loader == null) { + loader = WriterImpl.class.getClassLoader(); + } + @SuppressWarnings("unchecked") + Class lzo = + (Class) + loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec"); + return lzo.newInstance(); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("LZO is not available.", e); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Problem initializing LZO", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Insufficient access to LZO", e); + } + default: + throw new IllegalArgumentException("Unknown compression codec: " + + kind); + } + } + + private void writeStripeFooter(StripeFooter footer, long dataSize, long indexSize, + StripeInformation.Builder dirEntry) throws IOException { + footer.writeTo(protobufWriter); + protobufWriter.flush(); + writer.flush(); + dirEntry.setOffset(stripeStart); + dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - dataSize - indexSize); + } + + @VisibleForTesting + public static int getEstimatedBufferSize(long stripeSize, int numColumns, + int bs) { + // The worst case is that there are 2 big streams per a column and + // we want to guarantee that each stream gets ~10 buffers. + // This keeps buffers small enough that we don't get really small stripe + // sizes. + int estBufferSize = (int) (stripeSize / (20 * numColumns)); + estBufferSize = getClosestBufferSize(estBufferSize); + return estBufferSize > bs ? bs : estBufferSize; + } + + private static int getClosestBufferSize(int estBufferSize) { + final int kb4 = 4 * 1024; + final int kb8 = 8 * 1024; + final int kb16 = 16 * 1024; + final int kb32 = 32 * 1024; + final int kb64 = 64 * 1024; + final int kb128 = 128 * 1024; + final int kb256 = 256 * 1024; + if (estBufferSize <= kb4) { + return kb4; + } else if (estBufferSize > kb4 && estBufferSize <= kb8) { + return kb8; + } else if (estBufferSize > kb8 && estBufferSize <= kb16) { + return kb16; + } else if (estBufferSize > kb16 && estBufferSize <= kb32) { + return kb32; + } else if (estBufferSize > kb32 && estBufferSize <= kb64) { + return kb64; + } else if (estBufferSize > kb64 && estBufferSize <= kb128) { + return kb128; + } else { + return kb256; + } + } + + @Override + public void writeFileMetadata(Metadata.Builder builder) throws IOException { + long startPosn = rawWriter.getPos(); + Metadata metadata = builder.build(); + metadata.writeTo(protobufWriter); + protobufWriter.flush(); + writer.flush(); + this.metadataLength = (int) (rawWriter.getPos() - startPosn); + } + + @Override + public void writeFileFooter(Footer.Builder builder) throws IOException { + long bodyLength = rawWriter.getPos() - metadataLength; + builder.setContentLength(bodyLength); + builder.setHeaderLength(headerLength); + long startPosn = rawWriter.getPos(); + Footer footer = builder.build(); + footer.writeTo(protobufWriter); + protobufWriter.flush(); + writer.flush(); + this.footerLength = (int) (rawWriter.getPos() - startPosn); + } + + @Override + public void writePostScript(PostScript.Builder builder) throws IOException { + builder.setCompression(writeCompressionKind(compress)); + builder.setFooterLength(footerLength); + builder.setMetadataLength(metadataLength); + if (compress != CompressionKind.NONE) { + builder.setCompressionBlockSize(bufferSize); + } + PostScript ps = builder.build(); + // need to write this uncompressed + long startPosn = rawWriter.getPos(); + ps.writeTo(rawWriter); + long length = rawWriter.getPos() - startPosn; + if (length > 255) { + throw new IllegalArgumentException("PostScript too large at " + length); + } + rawWriter.writeByte((int)length); + } + + @Override + public void close() throws IOException { + rawWriter.close(); + } + + private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { + switch (kind) { + case NONE: return OrcProto.CompressionKind.NONE; + case ZLIB: return OrcProto.CompressionKind.ZLIB; + case SNAPPY: return OrcProto.CompressionKind.SNAPPY; + case LZO: return OrcProto.CompressionKind.LZO; + default: + throw new IllegalArgumentException("Unknown compression " + kind); + } + } + + @Override + public void flush() throws IOException { + rawWriter.hflush(); + // TODO: reset? + } + + @Override + public long getRawWriterPosition() throws IOException { + return rawWriter.getPos(); + } + + @Override + public void appendRawStripe(byte[] stripe, int offset, int length, + StripeInformation.Builder dirEntry) throws IOException { + long start = rawWriter.getPos(); + long availBlockSpace = blockSize - (start % blockSize); + + // see if stripe can fit in the current hdfs block, else pad the remaining + // space in the block + if (length < blockSize && length > availBlockSpace && + addBlockPadding) { + byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; + LOG.info(String.format("Padding ORC by %d bytes while merging..", + availBlockSpace)); + start += availBlockSpace; + while (availBlockSpace > 0) { + int writeLen = (int) Math.min(availBlockSpace, pad.length); + rawWriter.write(pad, 0, writeLen); + availBlockSpace -= writeLen; + } + } + + rawWriter.write(stripe); + dirEntry.setOffset(start); + } + + + /** + * This class is used to hold the contents of streams as they are buffered. + * The TreeWriters write to the outStream and the codec compresses the + * data as buffers fill up and stores them in the output list. When the + * stripe is being written, the whole stream is written to the file. + */ + private class BufferedStream implements OutStream.OutputReceiver { + private final OutStream outStream; + private final List output = new ArrayList(); + + BufferedStream(String name, int bufferSize, + CompressionCodec codec) throws IOException { + outStream = new OutStream(name, bufferSize, codec, this); + } + + /** + * Receive a buffer from the compression codec. + * @param buffer the buffer to save + */ + @Override + public void output(ByteBuffer buffer) { + output.add(buffer); + } + + /** + * @return the number of bytes in buffers that are allocated to this stream. + */ + public long getBufferSize() { + long result = 0; + for (ByteBuffer buf: output) { + result += buf.capacity(); + } + return outStream.getBufferSize() + result; + } + + /** + * Write any saved buffers to the OutputStream if needed, and clears all the buffers. + */ + public void spillToDiskAndClear() throws IOException { + if (!outStream.isSuppressed()) { + for (ByteBuffer buffer: output) { + rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(), + buffer.remaining()); + } + } + outStream.clear(); + output.clear(); + } + + /** + * @return The number of bytes that will be written to the output. Assumes the stream writing + * into this receiver has already been flushed. + */ + public long getOutputSize() { + long result = 0; + for (ByteBuffer buffer: output) { + result += buffer.remaining(); + } + return result; + } + + @Override + public String toString() { + return outStream.toString(); + } + } + + @Override + public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException { + BufferedStream result = streams.get(name); + if (result == null) { + EnumSet modifiers = createCompressionModifiers(name.getKind()); + result = new BufferedStream(name.toString(), bufferSize, + codec == null ? null : codec.modify(modifiers)); + streams.put(name, result); + } + return result.outStream; + } + + private EnumSet createCompressionModifiers(Kind kind) { + switch (kind) { + case BLOOM_FILTER: + case DATA: + case DICTIONARY_DATA: + return EnumSet.of(Modifier.TEXT, + compressionStrategy == CompressionStrategy.SPEED ? Modifier.FAST : Modifier.DEFAULT); + case LENGTH: + case DICTIONARY_COUNT: + case PRESENT: + case ROW_INDEX: + case SECONDARY: + // easily compressed using the fastest modes + return EnumSet.of(CompressionCodec.Modifier.FASTEST, CompressionCodec.Modifier.BINARY); + default: + LOG.warn("Missing ORC compression modifiers for " + kind); + return null; + } + } + + @Override + public void finalizeStripe(StripeFooter.Builder footerBuilder, + StripeInformation.Builder dirEntry) throws IOException { + long indexSize = 0; + long dataSize = 0; + for (Map.Entry pair: streams.entrySet()) { + BufferedStream receiver = pair.getValue(); + OutStream outStream = receiver.outStream; + if (!outStream.isSuppressed()) { + outStream.flush(); + long streamSize = receiver.getOutputSize(); + StreamName name = pair.getKey(); + footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn()) + .setKind(name.getKind()).setLength(streamSize)); + if (StreamName.Area.INDEX == name.getArea()) { + indexSize += streamSize; + } else { + dataSize += streamSize; + } + } + } + dirEntry.setIndexLength(indexSize).setDataLength(dataSize); + + OrcProto.StripeFooter footer = footerBuilder.build(); + // Do we need to pad the file so the stripe doesn't straddle a block boundary? + padStripe(indexSize, dataSize, footer.getSerializedSize()); + + // write out the data streams + for (Map.Entry pair : streams.entrySet()) { + pair.getValue().spillToDiskAndClear(); + } + // Write out the footer. + writeStripeFooter(footer, dataSize, indexSize, dirEntry); + } + + @Override + public long estimateMemory() { + long result = 0; + for (BufferedStream stream: streams.values()) { + result += stream.getBufferSize(); + } + return result; + } + + @Override + public void writeIndexStream(StreamName name, Builder rowIndex) throws IOException { + OutStream stream = getOrCreatePhysicalStream(name); + rowIndex.build().writeTo(stream); + stream.flush(); + } + + @Override + public void writeBloomFilterStream( + StreamName name, BloomFilterIndex.Builder bloomFilterIndex) throws IOException { + OutStream stream = getOrCreatePhysicalStream(name); + bloomFilterIndex.build().writeTo(stream); + stream.flush(); + } + + @VisibleForTesting + public OutputStream getStream() throws IOException { + initialize(); + return rawWriter; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/PhysicalWriter.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java new file mode 100644 index 0000000..83742e4 --- /dev/null +++ b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import java.io.IOException; +import java.util.EnumSet; + +import org.apache.orc.OrcProto.BloomFilterIndex; +import org.apache.orc.OrcProto.Footer; +import org.apache.orc.OrcProto.Metadata; +import org.apache.orc.OrcProto.PostScript; +import org.apache.orc.OrcProto.RowIndex; +import org.apache.orc.OrcProto.StripeFooter; +import org.apache.orc.OrcProto.StripeInformation; + +public interface PhysicalWriter { + + /** + * Creates all the streams/connections/etc. necessary to write. + */ + void initialize() throws IOException; + + /** + * Writes out the file metadata. + * @param builder Metadata builder to finalize and write. + */ + void writeFileMetadata(Metadata.Builder builder) throws IOException; + + /** + * Writes out the file footer. + * @param builder Footer builder to finalize and write. + */ + void writeFileFooter(Footer.Builder builder) throws IOException; + + /** + * Writes out the postscript (including the size byte if needed). + * @param builder Postscript builder to finalize and write. + */ + void writePostScript(PostScript.Builder builder) throws IOException; + + /** + * Creates physical stream to write data to. + * @param name Stream name. + * @return The output stream. + */ + OutStream getOrCreatePhysicalStream(StreamName name) throws IOException; + + /** + * Flushes the data in all the streams, spills them to disk, write out stripe footer. + * @param footer Stripe footer to be updated with relevant data and written out. + * @param dirEntry File metadata entry for the stripe, to be updated with relevant data. + */ + void finalizeStripe(StripeFooter.Builder footer, + StripeInformation.Builder dirEntry) throws IOException; + + /** + * Writes out the index for the stripe column. + * @param streamName Stream name. + * @param rowIndex Row index entries to write. + */ + void writeIndexStream(StreamName name, RowIndex.Builder rowIndex) throws IOException; + + /** + * Writes out the index for the stripe column. + * @param streamName Stream name. + * @param bloomFilterIndex Bloom filter index to write. + */ + void writeBloomFilterStream(StreamName streamName, + BloomFilterIndex.Builder bloomFilterIndex) throws IOException; + + /** + * Closes the writer. + */ + void close() throws IOException; + + /** + * Force-flushes the writer. + */ + void flush() throws IOException; + + /** + * @return the physical writer position (e.g. for updater). + */ + long getRawWriterPosition() throws IOException; + + /** @return physical stripe size, taking padding into account. */ + long getPhysicalStripeSize(); + + /** @return whether the writer is compressed. */ + boolean isCompressed(); + + /** + * Appends raw stripe data (e.g. for file merger). + * @param stripe Stripe data buffer. + * @param offset Stripe data buffer offset. + * @param length Stripe data buffer length. + * @param dirEntry File metadata entry for the stripe, to be updated with relevant data. + * @throws IOException + */ + void appendRawStripe(byte[] stripe, int offset, int length, + StripeInformation.Builder dirEntry) throws IOException; + + /** + * @return the estimated memory usage for the stripe. + */ + long estimateMemory(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/ReaderImpl.java b/orc/src/java/org/apache/orc/impl/ReaderImpl.java index 93fc0ce..70fa628 100644 --- a/orc/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/orc/src/java/org/apache/orc/impl/ReaderImpl.java @@ -350,7 +350,7 @@ public class ReaderImpl implements Reader { if (fileMetadata != null) { this.compressionKind = fileMetadata.getCompressionKind(); this.bufferSize = fileMetadata.getCompressionBufferSize(); - this.codec = WriterImpl.createCodec(compressionKind); + this.codec = PhysicalFsWriter.createCodec(compressionKind); this.metadataSize = fileMetadata.getMetadataSize(); this.stripeStats = fileMetadata.getStripeStats(); this.versionList = fileMetadata.getVersionList(); @@ -459,7 +459,7 @@ public class ReaderImpl implements Reader { System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen); OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer); int footerSize = (int) ps.getFooterLength(); - CompressionCodec codec = WriterImpl + CompressionCodec codec = PhysicalFsWriter .createCodec(CompressionKind.valueOf(ps.getCompression().name())); OrcProto.Footer footer = extractFooter(buffer, (int) (buffer.position() + ps.getMetadataLength()), @@ -509,7 +509,7 @@ public class ReaderImpl implements Reader { int psOffset = readSize - 1 - psLen; ps = extractPostScript(buffer, path, psLen, psOffset); bufferSize = (int) ps.getCompressionBlockSize(); - codec = WriterImpl.createCodec(CompressionKind.valueOf(ps.getCompression().name())); + codec = PhysicalFsWriter.createCodec(CompressionKind.valueOf(ps.getCompression().name())); fileTailBuilder.setPostscriptLength(psLen).setPostscript(ps); int footerSize = (int) ps.getFooterLength(); http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java b/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java index 1067957..6100d50 100644 --- a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java +++ b/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java @@ -71,7 +71,7 @@ public class RecordReaderUtils { this.fs = properties.getFileSystem(); this.path = properties.getPath(); this.useZeroCopy = properties.getZeroCopy(); - this.codec = WriterImpl.createCodec(properties.getCompression()); + this.codec = PhysicalFsWriter.createCodec(properties.getCompression()); this.bufferSize = properties.getBufferSize(); this.typeCount = properties.getTypeCount(); if (useZeroCopy) { http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/WriterImpl.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java index b2966e0..b17fb41 100644 --- a/orc/src/java/org/apache/orc/impl/WriterImpl.java +++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java @@ -21,12 +21,10 @@ package org.apache.orc.impl; import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; -import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.TimeZone; @@ -35,11 +33,12 @@ import java.util.TreeMap; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.orc.BinaryColumnStatistics; import org.apache.orc.BloomFilterIO; -import org.apache.orc.CompressionCodec; -import org.apache.orc.CompressionKind; import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; +import org.apache.orc.OrcProto.BloomFilterIndex; +import org.apache.orc.OrcProto.RowIndex; +import org.apache.orc.OrcProto.Stream; import org.apache.orc.OrcUtils; import org.apache.orc.StringColumnStatistics; import org.apache.orc.StripeInformation; @@ -48,7 +47,6 @@ import org.apache.orc.Writer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -69,7 +67,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; import com.google.protobuf.ByteString; -import com.google.protobuf.CodedOutputStream; /** * An ORC file writer. The file is divided into stripes, which is the natural @@ -94,35 +91,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class); - private static final int HDFS_BUFFER_SIZE = 256 * 1024; private static final int MIN_ROW_INDEX_STRIDE = 1000; - // threshold above which buffer size will be automatically resized - private static final int COLUMN_COUNT_THRESHOLD = 1000; - - private final FileSystem fs; private final Path path; - private final long defaultStripeSize; - private long adjustedStripeSize; private final int rowIndexStride; - private final CompressionKind compress; - private final CompressionCodec codec; - private final boolean addBlockPadding; - private final int bufferSize; - private final long blockSize; - private final double paddingTolerance; private final TypeDescription schema; - // the streams that make up the current stripe - private final Map streams = - new TreeMap(); - - private FSDataOutputStream rawWriter = null; - // the compressed metadata information outStream - private OutStream writer = null; - // a protobuf outStream around streamFactory - private CodedOutputStream protobufWriter = null; - private long headerLength; + @VisibleForTesting + protected final PhysicalWriter physWriter; private int columnCount; private long rowCount = 0; private long rowsInStripe = 0; @@ -142,7 +118,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final OrcFile.WriterCallback callback; private final OrcFile.WriterContext callbackContext; private final OrcFile.EncodingStrategy encodingStrategy; - private final OrcFile.CompressionStrategy compressionStrategy; private final boolean[] bloomFilterColumns; private final double bloomFilterFpp; private boolean writeTimeZone; @@ -150,7 +125,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback { public WriterImpl(FileSystem fs, Path path, OrcFile.WriterOptions opts) throws IOException { - this.fs = fs; this.path = path; this.conf = opts.getConfiguration(); this.callback = opts.getCallback(); @@ -166,26 +140,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } else { callbackContext = null; } - this.adjustedStripeSize = opts.getStripeSize(); - this.defaultStripeSize = opts.getStripeSize(); this.version = opts.getVersion(); this.encodingStrategy = opts.getEncodingStrategy(); - this.compressionStrategy = opts.getCompressionStrategy(); - this.addBlockPadding = opts.getBlockPadding(); - this.blockSize = opts.getBlockSize(); - this.paddingTolerance = opts.getPaddingTolerance(); - this.compress = opts.getCompress(); this.rowIndexStride = opts.getRowIndexStride(); this.memoryManager = opts.getMemoryManager(); buildIndex = rowIndexStride > 0; - codec = createCodec(compress); - int numColumns = schema.getMaximumId() + 1; - if (opts.isEnforceBufferSize()) { - this.bufferSize = opts.getBufferSize(); - } else { - this.bufferSize = getEstimatedBufferSize(defaultStripeSize, - numColumns, opts.getBufferSize()); - } if (version == OrcFile.Version.V_0_11) { /* do not write bloom filters for ORC v11 */ this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1]; @@ -194,6 +153,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback { OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema); } this.bloomFilterFpp = opts.getBloomFilterFpp(); + int numColumns = schema.getMaximumId() + 1; + physWriter = new PhysicalFsWriter(fs, path, numColumns, opts); treeWriter = createTreeWriter(schema, streamFactory, false); if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { throw new IllegalArgumentException("Row stride must be at least " + @@ -202,83 +163,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback { // ensure that we are able to handle callbacks before we register ourselves memoryManager.addWriter(path, opts.getStripeSize(), this); - LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" + - " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize, - compress, bufferSize); - } - - @VisibleForTesting - public static int getEstimatedBufferSize(long stripeSize, int numColumns, - int bs) { - // The worst case is that there are 2 big streams per a column and - // we want to guarantee that each stream gets ~10 buffers. - // This keeps buffers small enough that we don't get really small stripe - // sizes. - int estBufferSize = (int) (stripeSize / (20 * numColumns)); - estBufferSize = getClosestBufferSize(estBufferSize); - return estBufferSize > bs ? bs : estBufferSize; - } - - private static int getClosestBufferSize(int estBufferSize) { - final int kb4 = 4 * 1024; - final int kb8 = 8 * 1024; - final int kb16 = 16 * 1024; - final int kb32 = 32 * 1024; - final int kb64 = 64 * 1024; - final int kb128 = 128 * 1024; - final int kb256 = 256 * 1024; - if (estBufferSize <= kb4) { - return kb4; - } else if (estBufferSize > kb4 && estBufferSize <= kb8) { - return kb8; - } else if (estBufferSize > kb8 && estBufferSize <= kb16) { - return kb16; - } else if (estBufferSize > kb16 && estBufferSize <= kb32) { - return kb32; - } else if (estBufferSize > kb32 && estBufferSize <= kb64) { - return kb64; - } else if (estBufferSize > kb64 && estBufferSize <= kb128) { - return kb128; - } else { - return kb256; - } - } - - public static CompressionCodec createCodec(CompressionKind kind) { - switch (kind) { - case NONE: - return null; - case ZLIB: - return new ZlibCodec(); - case SNAPPY: - return new SnappyCodec(); - case LZO: - try { - ClassLoader loader = Thread.currentThread().getContextClassLoader(); - if (loader == null) { - loader = WriterImpl.class.getClassLoader(); - } - @SuppressWarnings("unchecked") - Class lzo = - (Class) - loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec"); - return lzo.newInstance(); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("LZO is not available.", e); - } catch (InstantiationException e) { - throw new IllegalArgumentException("Problem initializing LZO", e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException("Insufficient access to LZO", e); - } - default: - throw new IllegalArgumentException("Unknown compression codec: " + - kind); - } } @Override public boolean checkMemory(double newScale) throws IOException { - long limit = (long) Math.round(adjustedStripeSize * newScale); + long limit = (long) Math.round(physWriter.getPhysicalStripeSize() * newScale); long size = estimateStripeSize(); if (LOG.isDebugEnabled()) { LOG.debug("ORC writer " + path + " size = " + size + " limit = " + @@ -291,116 +180,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback { return false; } - /** - * This class is used to hold the contents of streams as they are buffered. - * The TreeWriters write to the outStream and the codec compresses the - * data as buffers fill up and stores them in the output list. When the - * stripe is being written, the whole stream is written to the file. - */ - private class BufferedStream implements OutStream.OutputReceiver { - private final OutStream outStream; - private final List output = new ArrayList(); - - BufferedStream(String name, int bufferSize, - CompressionCodec codec) throws IOException { - outStream = new OutStream(name, bufferSize, codec, this); - } - - /** - * Receive a buffer from the compression codec. - * @param buffer the buffer to save - */ - @Override - public void output(ByteBuffer buffer) { - output.add(buffer); - } - - /** - * Get the number of bytes in buffers that are allocated to this stream. - * @return number of bytes in buffers - */ - public long getBufferSize() { - long result = 0; - for(ByteBuffer buf: output) { - result += buf.capacity(); - } - return outStream.getBufferSize() + result; - } - - /** - * Flush the stream to the codec. - * @throws IOException - */ - public void flush() throws IOException { - outStream.flush(); - } - - /** - * Clear all of the buffers. - * @throws IOException - */ - public void clear() throws IOException { - outStream.clear(); - output.clear(); - } - - /** - * Check the state of suppress flag in output stream - * @return value of suppress flag - */ - public boolean isSuppressed() { - return outStream.isSuppressed(); - } - - /** - * Get the number of bytes that will be written to the output. Assumes - * the stream has already been flushed. - * @return the number of bytes - */ - public long getOutputSize() { - long result = 0; - for(ByteBuffer buffer: output) { - result += buffer.remaining(); - } - return result; - } - - /** - * Write the saved compressed buffers to the OutputStream. - * @param out the stream to write to - * @throws IOException - */ - void spillTo(OutputStream out) throws IOException { - for(ByteBuffer buffer: output) { - out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), - buffer.remaining()); - } - } - - @Override - public String toString() { - return outStream.toString(); - } - } - - /** - * An output receiver that writes the ByteBuffers to the output stream - * as they are received. - */ - private class DirectStream implements OutStream.OutputReceiver { - private final FSDataOutputStream output; - - DirectStream(FSDataOutputStream output) { - this.output = output; - } - - @Override - public void output(ByteBuffer buffer) throws IOException { - output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), - buffer.remaining()); - } - } - private static class RowIndexPositionRecorder implements PositionRecorder { private final OrcProto.RowIndexEntry.Builder builder; @@ -430,44 +209,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback { OrcProto.Stream.Kind kind ) throws IOException { final StreamName name = new StreamName(column, kind); - final EnumSet modifiers; - - switch (kind) { - case BLOOM_FILTER: - case DATA: - case DICTIONARY_DATA: - if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) { - modifiers = EnumSet.of(CompressionCodec.Modifier.FAST, - CompressionCodec.Modifier.TEXT); - } else { - modifiers = EnumSet.of(CompressionCodec.Modifier.DEFAULT, - CompressionCodec.Modifier.TEXT); - } - break; - case LENGTH: - case DICTIONARY_COUNT: - case PRESENT: - case ROW_INDEX: - case SECONDARY: - // easily compressed using the fastest modes - modifiers = EnumSet.of(CompressionCodec.Modifier.FASTEST, - CompressionCodec.Modifier.BINARY); - break; - default: - LOG.warn("Missing ORC compression modifiers for " + kind); - modifiers = null; - break; - } + return physWriter.getOrCreatePhysicalStream(name); + } - BufferedStream result = streams.get(name); - if (result == null) { - result = new BufferedStream(name.toString(), bufferSize, - codec == null ? codec : codec.modify(modifiers)); - streams.put(name, result); - } - return result.outStream; + public void writeIndex(int column, RowIndex.Builder rowIndex) throws IOException { + physWriter.writeIndexStream(new StreamName(column, Stream.Kind.ROW_INDEX), rowIndex); } + public void writeBloomFilter( + int column, BloomFilterIndex.Builder bloomFilterIndex) throws IOException { + physWriter.writeBloomFilterStream( + new StreamName(column, Stream.Kind.BLOOM_FILTER), bloomFilterIndex); + } /** * Get the next column id. * @return a number from 0 to the number of columns - 1 @@ -496,7 +249,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { * @return are the streams compressed */ public boolean isCompressed() { - return codec != null; + return physWriter.isCompressed(); } /** @@ -508,14 +261,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } /** - * Get the compression strategy to use. - * @return compression strategy - */ - public OrcFile.CompressionStrategy getCompressionStrategy() { - return compressionStrategy; - } - - /** * Get the bloom filter columns * @return bloom filter columns */ @@ -572,8 +317,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback { protected final RowIndexPositionRecorder rowIndexPosition; private final OrcProto.RowIndex.Builder rowIndex; private final OrcProto.RowIndexEntry.Builder rowIndexEntry; - private final PositionedOutputStream rowIndexStream; - private final PositionedOutputStream bloomFilterStream; protected final BloomFilterIO bloomFilter; protected final boolean createBloomFilter; private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex; @@ -615,21 +358,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback { rowIndexEntry = OrcProto.RowIndexEntry.newBuilder(); rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry); stripeStatsBuilders = Lists.newArrayList(); - if (streamFactory.buildIndex()) { - rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX); - } else { - rowIndexStream = null; - } if (createBloomFilter) { bloomFilterEntry = OrcProto.BloomFilter.newBuilder(); bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder(); - bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER); bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(), streamFactory.getBloomFilterFPP()); } else { bloomFilterEntry = null; bloomFilterIndex = null; - bloomFilterStream = null; bloomFilter = null; } } @@ -758,11 +494,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback { isPresent.flush(); // if no nulls are found in a stream, then suppress the stream - if(!foundNulls) { + if (!foundNulls) { isPresentOutStream.suppress(); // since isPresent bitstream is suppressed, update the index to // remove the positions of the isPresent stream - if (rowIndexStream != null) { + if (streamFactory.buildIndex()) { removeIsPresentPositions(); } } @@ -781,22 +517,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback { if (streamFactory.hasWriterTimeZone()) { builder.setWriterTimezone(TimeZone.getDefault().getID()); } - if (rowIndexStream != null) { + if (streamFactory.buildIndex()) { if (rowIndex.getEntryCount() != requiredIndexEntries) { throw new IllegalArgumentException("Column has wrong number of " + "index entries found: " + rowIndex.getEntryCount() + " expected: " + requiredIndexEntries); } - rowIndex.build().writeTo(rowIndexStream); - rowIndexStream.flush(); + streamFactory.writeIndex(id, rowIndex); } + rowIndex.clear(); rowIndexEntry.clear(); // write the bloom filter to out stream - if (bloomFilterStream != null) { - bloomFilterIndex.build().writeTo(bloomFilterStream); - bloomFilterStream.flush(); + if (createBloomFilter) { + streamFactory.writeBloomFilter(id, bloomFilterIndex); bloomFilterIndex.clear(); bloomFilterEntry.clear(); } @@ -2463,17 +2198,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @VisibleForTesting - public FSDataOutputStream getStream() throws IOException { - if (rawWriter == null) { - rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE, - fs.getDefaultReplication(path), blockSize); - rawWriter.writeBytes(OrcFile.MAGIC); - headerLength = rawWriter.getPos(); - writer = new OutStream("metadata", bufferSize, codec, - new DirectStream(rawWriter)); - protobufWriter = CodedOutputStream.newInstance(writer); - } - return rawWriter; + public void ensureStream() throws IOException { + physWriter.initialize(); } private void createRowIndexEntry() throws IOException { @@ -2482,7 +2208,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } private void flushStripe() throws IOException { - getStream(); + ensureStream(); if (buildIndex && rowsInIndex != 0) { createRowIndexEntry(); } @@ -2493,98 +2219,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback { // finalize the data for the stripe int requiredIndexEntries = rowIndexStride == 0 ? 0 : (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); - OrcProto.StripeFooter.Builder builder = - OrcProto.StripeFooter.newBuilder(); + OrcProto.StripeFooter.Builder builder = OrcProto.StripeFooter.newBuilder(); + OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation + .newBuilder().setNumberOfRows(rowsInStripe); treeWriter.writeStripe(builder, requiredIndexEntries); - long indexSize = 0; - long dataSize = 0; - for(Map.Entry pair: streams.entrySet()) { - BufferedStream stream = pair.getValue(); - if (!stream.isSuppressed()) { - stream.flush(); - StreamName name = pair.getKey(); - long streamSize = pair.getValue().getOutputSize(); - builder.addStreams(OrcProto.Stream.newBuilder() - .setColumn(name.getColumn()) - .setKind(name.getKind()) - .setLength(streamSize)); - if (StreamName.Area.INDEX == name.getArea()) { - indexSize += streamSize; - } else { - dataSize += streamSize; - } - } - } - OrcProto.StripeFooter footer = builder.build(); - - // Do we need to pad the file so the stripe doesn't straddle a block - // boundary? - long start = rawWriter.getPos(); - final long currentStripeSize = indexSize + dataSize + footer.getSerializedSize(); - final long available = blockSize - (start % blockSize); - final long overflow = currentStripeSize - adjustedStripeSize; - final float availRatio = (float) available / (float) defaultStripeSize; - - if (availRatio > 0.0f && availRatio < 1.0f - && availRatio > paddingTolerance) { - // adjust default stripe size to fit into remaining space, also adjust - // the next stripe for correction based on the current stripe size - // and user specified padding tolerance. Since stripe size can overflow - // the default stripe size we should apply this correction to avoid - // writing portion of last stripe to next hdfs block. - double correction = overflow > 0 ? (double) overflow - / (double) adjustedStripeSize : 0.0; - - // correction should not be greater than user specified padding - // tolerance - correction = correction > paddingTolerance ? paddingTolerance - : correction; - - // adjust next stripe size based on current stripe estimate correction - adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize)); - } else if (availRatio >= 1.0) { - adjustedStripeSize = defaultStripeSize; - } - - if (availRatio < paddingTolerance && addBlockPadding) { - long padding = blockSize - (start % blockSize); - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; - LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)", - padding, availRatio, defaultStripeSize)); - start += padding; - while (padding > 0) { - int writeLen = (int) Math.min(padding, pad.length); - rawWriter.write(pad, 0, writeLen); - padding -= writeLen; - } - adjustedStripeSize = defaultStripeSize; - } else if (currentStripeSize < blockSize - && (start % blockSize) + currentStripeSize > blockSize) { - // even if you don't pad, reset the default stripe size when crossing a - // block boundary - adjustedStripeSize = defaultStripeSize; - } - - // write out the data streams - for(Map.Entry pair: streams.entrySet()) { - BufferedStream stream = pair.getValue(); - if (!stream.isSuppressed()) { - stream.spillTo(rawWriter); - } - stream.clear(); - } - footer.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - long footerLength = rawWriter.getPos() - start - dataSize - indexSize; - OrcProto.StripeInformation dirEntry = - OrcProto.StripeInformation.newBuilder() - .setOffset(start) - .setNumberOfRows(rowsInStripe) - .setIndexLength(indexSize) - .setDataLength(dataSize) - .setFooterLength(footerLength).build(); - stripes.add(dirEntry); + physWriter.finalizeStripe(builder, dirEntry); + stripes.add(dirEntry.build()); rowCount += rowsInStripe; rowsInStripe = 0; } @@ -2645,17 +2285,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback { return total; } - private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { - switch (kind) { - case NONE: return OrcProto.CompressionKind.NONE; - case ZLIB: return OrcProto.CompressionKind.ZLIB; - case SNAPPY: return OrcProto.CompressionKind.SNAPPY; - case LZO: return OrcProto.CompressionKind.LZO; - default: - throw new IllegalArgumentException("Unknown compression " + kind); - } - } - private void writeFileStatistics(OrcProto.Footer.Builder builder, TreeWriter writer) throws IOException { builder.addStatistics(writer.fileStatistics.serialize()); @@ -2664,26 +2293,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } - private int writeMetadata() throws IOException { - getStream(); + private void writeMetadata() throws IOException { + ensureStream(); OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder(); for(OrcProto.StripeStatistics.Builder ssb : treeWriter.stripeStatsBuilders) { builder.addStripeStats(ssb.build()); } - long startPosn = rawWriter.getPos(); - OrcProto.Metadata metadata = builder.build(); - metadata.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - return (int) (rawWriter.getPos() - startPosn); + physWriter.writeFileMetadata(builder); } - private int writeFooter(long bodyLength) throws IOException { - getStream(); + private void writeFooter() throws IOException { + ensureStream(); OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder(); - builder.setContentLength(bodyLength); - builder.setHeaderLength(headerLength); builder.setNumberOfRows(rowCount); builder.setRowIndexStride(rowIndexStride); // populate raw data size @@ -2701,45 +2323,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback { builder.addMetadata(OrcProto.UserMetadataItem.newBuilder() .setName(entry.getKey()).setValue(entry.getValue())); } - long startPosn = rawWriter.getPos(); - OrcProto.Footer footer = builder.build(); - footer.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - return (int) (rawWriter.getPos() - startPosn); + physWriter.writeFileFooter(builder); } - private int writePostScript(int footerLength, int metadataLength) throws IOException { + private void writePostScript() throws IOException { OrcProto.PostScript.Builder builder = OrcProto.PostScript.newBuilder() - .setCompression(writeCompressionKind(compress)) - .setFooterLength(footerLength) - .setMetadataLength(metadataLength) .setMagic(OrcFile.MAGIC) .addVersion(version.getMajor()) .addVersion(version.getMinor()) .setWriterVersion(OrcFile.CURRENT_WRITER.getId()); - if (compress != CompressionKind.NONE) { - builder.setCompressionBlockSize(bufferSize); - } - OrcProto.PostScript ps = builder.build(); - // need to write this uncompressed - long startPosn = rawWriter.getPos(); - ps.writeTo(rawWriter); - long length = rawWriter.getPos() - startPosn; - if (length > 255) { - throw new IllegalArgumentException("PostScript too large at " + length); - } - return (int) length; + physWriter.writePostScript(builder); } private long estimateStripeSize() { - long result = 0; - for(BufferedStream stream: streams.values()) { - result += stream.getBufferSize(); - } - result += treeWriter.estimateMemory(); - return result; + return physWriter.estimateMemory() + treeWriter.estimateMemory(); } @Override @@ -2785,11 +2383,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { memoryManager.removeWriter(path); // actually close the file flushStripe(); - int metadataLength = writeMetadata(); - int footerLength = writeFooter(rawWriter.getPos() - metadataLength); - rawWriter.writeByte(writePostScript(footerLength, metadataLength)); - rawWriter.close(); - + writeMetadata(); + writeFooter(); + writePostScript(); + physWriter.close(); } /** @@ -2819,13 +2416,13 @@ public class WriterImpl implements Writer, MemoryManager.Callback { if (callback != null) { callback.preFooterWrite(callbackContext); } - int metaLength = writeMetadata(); - int footLength = writeFooter(rawWriter.getPos() - metaLength); - rawWriter.writeByte(writePostScript(footLength, metaLength)); + writeMetadata(); + writeFooter(); + writePostScript(); stripesAtLastFlush = stripes.size(); - rawWriter.hflush(); + physWriter.flush(); } - return rawWriter.getPos(); + return physWriter.getRawWriterPosition(); } @Override @@ -2839,26 +2436,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback { checkArgument(stripeStatistics != null, "Stripe statistics must not be null"); - getStream(); - long start = rawWriter.getPos(); - long availBlockSpace = blockSize - (start % blockSize); - - // see if stripe can fit in the current hdfs block, else pad the remaining - // space in the block - if (length < blockSize && length > availBlockSpace && - addBlockPadding) { - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; - LOG.info(String.format("Padding ORC by %d bytes while merging..", - availBlockSpace)); - start += availBlockSpace; - while (availBlockSpace > 0) { - int writeLen = (int) Math.min(availBlockSpace, pad.length); - rawWriter.write(pad, 0, writeLen); - availBlockSpace -= writeLen; - } - } + ensureStream(); + OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation.newBuilder(); + physWriter.appendRawStripe(stripe, offset, length, dirEntry); - rawWriter.write(stripe); rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues(); rowCount += rowsInStripe; @@ -2869,15 +2450,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback { updateFileStatistics(stripeStatistics); // update stripe information - OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation - .newBuilder() - .setOffset(start) - .setNumberOfRows(rowsInStripe) + stripes.add(dirEntry.setNumberOfRows(rowsInStripe) .setIndexLength(stripeInfo.getIndexLength()) .setDataLength(stripeInfo.getDataLength()) .setFooterLength(stripeInfo.getFooterLength()) - .build(); - stripes.add(dirEntry); + .build()); // reset it after writing the stripe rowsInStripe = 0; http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java ---------------------------------------------------------------------- diff --git a/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java b/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java index 289a86e..efa3ffb 100644 --- a/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java +++ b/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java @@ -28,37 +28,37 @@ public class TestOrcWideTable { @Test public void testBufferSizeFor1Col() throws IOException { - assertEquals(128 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + assertEquals(128 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024, 1, 128*1024)); } @Test public void testBufferSizeFor50Col() throws IOException { - assertEquals(256 * 1024, WriterImpl.getEstimatedBufferSize(256 * 1024 * 1024, + assertEquals(256 * 1024, PhysicalFsWriter.getEstimatedBufferSize(256 * 1024 * 1024, 50, 256*1024)); } @Test public void testBufferSizeFor1000Col() throws IOException { - assertEquals(32 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + assertEquals(32 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024, 1000, 128*1024)); } @Test public void testBufferSizeFor2000Col() throws IOException { - assertEquals(16 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + assertEquals(16 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024, 2000, 256*1024)); } @Test public void testBufferSizeFor4000Col() throws IOException { - assertEquals(8 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + assertEquals(8 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024, 4000, 256*1024)); } @Test public void testBufferSizeFor25000Col() throws IOException { - assertEquals(4 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + assertEquals(4 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024, 25000, 256*1024)); } } http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 8e52907..075c3b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -60,6 +60,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspe import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; +import org.apache.orc.impl.PhysicalWriter; + +import com.google.common.annotations.VisibleForTesting; /** * An ORC file writer. The file is divided into stripes, which is the natural @@ -312,4 +315,9 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer flushInternalBatch(); super.close(); } + + @VisibleForTesting + PhysicalWriter getPhysicalWriter() { + return physWriter; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index d6b48a3..197c1d2 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -122,6 +122,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.orc.*; +import org.apache.orc.impl.PhysicalFsWriter; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -2138,8 +2139,7 @@ public class TestInputOutputFormat { writer.addRow(new MyRow(i, 2*i)); } writer.close(); - ((MockOutputStream) ((WriterImpl) writer).getStream()) - .setBlocks(new MockBlock("host0", "host1")); + getStreamFromWriter(writer).setBlocks(new MockBlock("host0", "host1")); // call getsplits HiveInputFormat inputFormat = @@ -2160,6 +2160,11 @@ public class TestInputOutputFormat { assertEquals(false, reader.next(key, value)); } + private MockOutputStream getStreamFromWriter(Writer writer) throws IOException { + PhysicalFsWriter pfr = (PhysicalFsWriter)((WriterImpl) writer).getPhysicalWriter(); + return (MockOutputStream)pfr.getStream(); + } + /** * Test vectorization, non-acid, non-combine. * @throws Exception @@ -2185,8 +2190,7 @@ public class TestInputOutputFormat { writer.addRow(new MyRow(i, 2*i)); } writer.close(); - ((MockOutputStream) ((WriterImpl) writer).getStream()) - .setBlocks(new MockBlock("host0", "host1")); + getStreamFromWriter(writer).setBlocks(new MockBlock("host0", "host1")); // call getsplits conf.setInt(hive_metastoreConstants.BUCKET_COUNT, 3); @@ -2226,8 +2230,7 @@ public class TestInputOutputFormat { } WriterImpl baseWriter = (WriterImpl) writer.getWriter(); writer.close(false); - ((MockOutputStream) baseWriter.getStream()) - .setBlocks(new MockBlock("host0", "host1")); + getStreamFromWriter(baseWriter).setBlocks(new MockBlock("host0", "host1")); // call getsplits HiveInputFormat inputFormat = @@ -2305,7 +2308,7 @@ public class TestInputOutputFormat { writer.addRow(new MyRow(i, 2*i)); } writer.close(); - MockOutputStream outputStream = (MockOutputStream) ((WriterImpl) writer).getStream(); + MockOutputStream outputStream = getStreamFromWriter(writer); outputStream.setBlocks(new MockBlock("host0", "host1")); int length0 = outputStream.file.length; writer = @@ -2316,7 +2319,7 @@ public class TestInputOutputFormat { writer.addRow(new MyRow(i, 2*i)); } writer.close(); - outputStream = (MockOutputStream) ((WriterImpl) writer).getStream(); + outputStream = getStreamFromWriter(writer); outputStream.setBlocks(new MockBlock("host1", "host2")); // call getsplits @@ -2383,7 +2386,7 @@ public class TestInputOutputFormat { WriterImpl baseWriter = (WriterImpl) writer.getWriter(); writer.close(false); - MockOutputStream outputStream = (MockOutputStream) baseWriter.getStream(); + MockOutputStream outputStream = getStreamFromWriter(baseWriter); outputStream.setBlocks(new MockBlock("host1", "host2")); // write a delta file in partition 0 @@ -2394,7 +2397,7 @@ public class TestInputOutputFormat { writer.insert(10, new MyRow(i, 2*i)); } WriterImpl deltaWriter = (WriterImpl) writer.getWriter(); - outputStream = (MockOutputStream) deltaWriter.getStream(); + outputStream = getStreamFromWriter(deltaWriter); writer.close(false); outputStream.setBlocks(new MockBlock("host1", "host2")); @@ -2407,7 +2410,7 @@ public class TestInputOutputFormat { .bufferSize(1024) .inspector(inspector)); orc.addRow(new MyRow(1, 2)); - outputStream = (MockOutputStream) ((WriterImpl) orc).getStream(); + outputStream = getStreamFromWriter(orc); orc.close(); outputStream.setBlocks(new MockBlock("host3", "host4")); }