Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3D8A118759 for ; Wed, 16 Sep 2015 14:39:49 +0000 (UTC) Received: (qmail 11503 invoked by uid 500); 16 Sep 2015 14:39:49 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 11450 invoked by uid 500); 16 Sep 2015 14:39:49 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 11384 invoked by uid 99); 16 Sep 2015 14:39:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Sep 2015 14:39:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DDA16DFFC2; Wed, 16 Sep 2015 14:39:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hyunsik@apache.org To: commits@tajo.apache.org Date: Wed, 16 Sep 2015 14:39:49 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/6] tajo git commit: TAJO-1465: Add ORCFileAppender to write into ORCFile table. http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java new file mode 100644 index 0000000..e520082 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java @@ -0,0 +1,2265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.tajo.datum.*; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.thirdparty.orc.CompressionCodec.Modifier; +import org.apache.tajo.storage.thirdparty.orc.OrcProto.RowIndexEntry; +import org.apache.tajo.storage.thirdparty.orc.OrcProto.StripeStatistics; +import org.apache.tajo.storage.thirdparty.orc.OrcProto.Type; +import org.apache.tajo.storage.thirdparty.orc.OrcProto.UserMetadataItem; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.serde2.objectinspector.*; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; +import org.apache.hadoop.io.Text; +import org.apache.tajo.util.datetime.DateTimeUtil; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.*; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * An ORC file writer. The file is divided into stripes, which is the natural + * unit of work when reading. Each stripe is buffered in memory until the + * memory reaches the stripe size and then it is written out broken down by + * columns. Each column is written by a TreeWriter that is specific to that + * type of column. TreeWriters may have children TreeWriters that handle the + * sub-types. Each of the TreeWriters writes the column's data as a set of + * streams. + * + * This class is unsynchronized like most Stream objects, so from the creation of an OrcFile and all + * access to a single instance has to be from a single thread. + * + * There are no known cases where these happen between different threads today. + * + * Caveat: the MemoryManager is created during WriterOptions create, that has to be confined to a single + * thread as well. + * + */ +public class WriterImpl implements Writer, MemoryManager.Callback { + + private static final Log LOG = LogFactory.getLog(WriterImpl.class); + + private static final int HDFS_BUFFER_SIZE = 256 * 1024; + private static final int MIN_ROW_INDEX_STRIDE = 1000; + + // threshold above which buffer size will be automatically resized + private static final int COLUMN_COUNT_THRESHOLD = 1000; + + private final FileSystem fs; + private final Path path; + private final long defaultStripeSize; + private long adjustedStripeSize; + private final int rowIndexStride; + private final CompressionKind compress; + private final CompressionCodec codec; + private final boolean addBlockPadding; + private final int bufferSize; + private final long blockSize; + private final float paddingTolerance; + // the streams that make up the current stripe + private final Map streams = + new TreeMap(); + + private FSDataOutputStream rawWriter = null; + // the compressed metadata information outStream + private OutStream writer = null; + // a protobuf outStream around streamFactory + private CodedOutputStream protobufWriter = null; + private long headerLength; + private int columnCount; + private long rowCount = 0; + private long rowsInStripe = 0; + private long rawDataSize = 0; + private int rowsInIndex = 0; + private int stripesAtLastFlush = -1; + private final List stripes = + new ArrayList(); + private final Map userMetadata = + new TreeMap(); + private final TreeWriter treeWriter; + private final boolean buildIndex; + private final MemoryManager memoryManager; + private final OrcFile.Version version; + private final Configuration conf; + private final OrcFile.WriterCallback callback; + private final OrcFile.WriterContext callbackContext; + private final OrcFile.EncodingStrategy encodingStrategy; + private final OrcFile.CompressionStrategy compressionStrategy; + private final boolean[] bloomFilterColumns; + private final double bloomFilterFpp; + private boolean writeTimeZone; + private TimeZone timeZone; + + WriterImpl(FileSystem fs, + Path path, + Configuration conf, + ObjectInspector inspector, + long stripeSize, + CompressionKind compress, + int bufferSize, + int rowIndexStride, + MemoryManager memoryManager, + boolean addBlockPadding, + OrcFile.Version version, + OrcFile.WriterCallback callback, + OrcFile.EncodingStrategy encodingStrategy, + OrcFile.CompressionStrategy compressionStrategy, + float paddingTolerance, + long blockSizeValue, + String bloomFilterColumnNames, + double bloomFilterFpp, + TimeZone timeZone) throws IOException { + this.fs = fs; + this.path = path; + this.conf = conf; + this.callback = callback; + if (callback != null) { + callbackContext = new OrcFile.WriterContext(){ + + @Override + public Writer getWriter() { + return WriterImpl.this; + } + }; + } else { + callbackContext = null; + } + this.adjustedStripeSize = stripeSize; + this.defaultStripeSize = stripeSize; + this.version = version; + this.encodingStrategy = encodingStrategy; + this.compressionStrategy = compressionStrategy; + this.addBlockPadding = addBlockPadding; + this.blockSize = blockSizeValue; + this.paddingTolerance = paddingTolerance; + this.compress = compress; + this.rowIndexStride = rowIndexStride; + this.memoryManager = memoryManager; + this.timeZone = timeZone; + buildIndex = rowIndexStride > 0; + codec = createCodec(compress); + String allColumns = conf.get(IOConstants.COLUMNS); + if (allColumns == null) { + allColumns = getColumnNamesFromInspector(inspector); + } + this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize); + if (version == OrcFile.Version.V_0_11) { + /* do not write bloom filters for ORC v11 */ + this.bloomFilterColumns = + OrcUtils.includeColumns(null, allColumns, inspector); + } else { + this.bloomFilterColumns = + OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector); + } + this.bloomFilterFpp = bloomFilterFpp; + treeWriter = createTreeWriter(inspector, new StreamFactory(), false); + if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { + throw new IllegalArgumentException("Row stride must be at least " + + MIN_ROW_INDEX_STRIDE); + } + + // ensure that we are able to handle callbacks before we register ourselves + memoryManager.addWriter(path, stripeSize, this); + } + + private String getColumnNamesFromInspector(ObjectInspector inspector) { + List fieldNames = Lists.newArrayList(); + Joiner joiner = Joiner.on(","); + if (inspector instanceof StructObjectInspector) { + StructObjectInspector soi = (StructObjectInspector) inspector; + List fields = soi.getAllStructFieldRefs(); + for(StructField sf : fields) { + fieldNames.add(sf.getFieldName()); + } + } + return joiner.join(fieldNames); + } + + @VisibleForTesting + int getEstimatedBufferSize(int bs) { + return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs); + } + + int getEstimatedBufferSize(String colNames, int bs) { + long availableMem = getMemoryAvailableForORC(); + if (colNames != null) { + final int numCols = colNames.split(",").length; + if (numCols > COLUMN_COUNT_THRESHOLD) { + // In BufferedStream, there are 3 outstream buffers (compressed, + // uncompressed and overflow) and list of previously compressed buffers. + // Since overflow buffer is rarely used, lets consider only 2 allocation. + // Also, initially, the list of compression buffers will be empty. + final int outStreamBuffers = codec == null ? 1 : 2; + + // max possible streams per column is 5. For string columns, there is + // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams. + final int maxStreams = 5; + + // Lets assume 10% memory for holding dictionary in memory and other + // object allocations + final long miscAllocation = (long) (0.1f * availableMem); + + // compute the available memory + final long remainingMem = availableMem - miscAllocation; + + int estBufferSize = (int) (remainingMem / + (maxStreams * outStreamBuffers * numCols)); + estBufferSize = getClosestBufferSize(estBufferSize, bs); + if (estBufferSize > bs) { + estBufferSize = bs; + } + + LOG.info("WIDE TABLE - Number of columns: " + numCols + + " Chosen compression buffer size: " + estBufferSize); + return estBufferSize; + } + } + return bs; + } + + private int getClosestBufferSize(int estBufferSize, int bs) { + final int kb4 = 4 * 1024; + final int kb8 = 8 * 1024; + final int kb16 = 16 * 1024; + final int kb32 = 32 * 1024; + final int kb64 = 64 * 1024; + final int kb128 = 128 * 1024; + final int kb256 = 256 * 1024; + if (estBufferSize <= kb4) { + return kb4; + } else if (estBufferSize > kb4 && estBufferSize <= kb8) { + return kb8; + } else if (estBufferSize > kb8 && estBufferSize <= kb16) { + return kb16; + } else if (estBufferSize > kb16 && estBufferSize <= kb32) { + return kb32; + } else if (estBufferSize > kb32 && estBufferSize <= kb64) { + return kb64; + } else if (estBufferSize > kb64 && estBufferSize <= kb128) { + return kb128; + } else { + return kb256; + } + } + + // the assumption is only one ORC writer open at a time, which holds true for + // most of the cases. HIVE-6455 forces single writer case. + private long getMemoryAvailableForORC() { + OrcConf.ConfVars poolVar = OrcConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL; + double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal); + long totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). + getHeapMemoryUsage().getMax() * maxLoad); + return totalMemoryPool; + } + + public static CompressionCodec createCodec(CompressionKind kind) { + switch (kind) { + case NONE: + return null; + case ZLIB: + return new ZlibCodec(); + case SNAPPY: + return new SnappyCodec(); + case LZO: + try { + Class lzo = + (Class) + Class.forName("org.apache.hadoop.hive.ql.io.orc.LzoCodec"); + return lzo.newInstance(); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("LZO is not available.", e); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Problem initializing LZO", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Insufficient access to LZO", e); + } + default: + throw new IllegalArgumentException("Unknown compression codec: " + + kind); + } + } + + @Override + public boolean checkMemory(double newScale) throws IOException { + long limit = (long) Math.round(adjustedStripeSize * newScale); + long size = estimateStripeSize(); + if (LOG.isDebugEnabled()) { + LOG.debug("ORC writer " + path + " size = " + size + " limit = " + + limit); + } + if (size > limit) { + flushStripe(); + return true; + } + return false; + } + + /** + * This class is used to hold the contents of streams as they are buffered. + * The TreeWriters write to the outStream and the codec compresses the + * data as buffers fill up and stores them in the output list. When the + * stripe is being written, the whole stream is written to the file. + */ + private class BufferedStream implements OutStream.OutputReceiver { + private final OutStream outStream; + private final List output = new ArrayList(); + + BufferedStream(String name, int bufferSize, + CompressionCodec codec) throws IOException { + outStream = new OutStream(name, bufferSize, codec, this); + } + + /** + * Receive a buffer from the compression codec. + * @param buffer the buffer to save + * @throws IOException + */ + @Override + public void output(ByteBuffer buffer) { + output.add(buffer); + } + + /** + * Get the number of bytes in buffers that are allocated to this stream. + * @return number of bytes in buffers + */ + public long getBufferSize() { + long result = 0; + for(ByteBuffer buf: output) { + result += buf.capacity(); + } + return outStream.getBufferSize() + result; + } + + /** + * Flush the stream to the codec. + * @throws IOException + */ + public void flush() throws IOException { + outStream.flush(); + } + + /** + * Clear all of the buffers. + * @throws IOException + */ + public void clear() throws IOException { + outStream.clear(); + output.clear(); + } + + /** + * Check the state of suppress flag in output stream + * @return value of suppress flag + */ + public boolean isSuppressed() { + return outStream.isSuppressed(); + } + + /** + * Get the number of bytes that will be written to the output. Assumes + * the stream has already been flushed. + * @return the number of bytes + */ + public long getOutputSize() { + long result = 0; + for(ByteBuffer buffer: output) { + result += buffer.remaining(); + } + return result; + } + + /** + * Write the saved compressed buffers to the OutputStream. + * @param out the stream to write to + * @throws IOException + */ + void spillTo(OutputStream out) throws IOException { + for(ByteBuffer buffer: output) { + out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), + buffer.remaining()); + } + } + + @Override + public String toString() { + return outStream.toString(); + } + } + + /** + * An output receiver that writes the ByteBuffers to the output stream + * as they are received. + */ + private class DirectStream implements OutStream.OutputReceiver { + private final FSDataOutputStream output; + + DirectStream(FSDataOutputStream output) { + this.output = output; + } + + @Override + public void output(ByteBuffer buffer) throws IOException { + output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), + buffer.remaining()); + } + } + + private static class RowIndexPositionRecorder implements PositionRecorder { + private final OrcProto.RowIndexEntry.Builder builder; + + RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) { + this.builder = builder; + } + + @Override + public void addPosition(long position) { + builder.addPositions(position); + } + } + + /** + * Interface from the Writer to the TreeWriters. This limits the visibility + * that the TreeWriters have into the Writer. + */ + private class StreamFactory { + /** + * Create a stream to store part of a column. + * @param column the column id for the stream + * @param kind the kind of stream + * @return The output outStream that the section needs to be written to. + * @throws IOException + */ + public OutStream createStream(int column, + OrcProto.Stream.Kind kind + ) throws IOException { + final StreamName name = new StreamName(column, kind); + final EnumSet modifiers; + + switch (kind) { + case BLOOM_FILTER: + case DATA: + case DICTIONARY_DATA: + if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) { + modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT); + } else { + modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT); + } + break; + case LENGTH: + case DICTIONARY_COUNT: + case PRESENT: + case ROW_INDEX: + case SECONDARY: + // easily compressed using the fastest modes + modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY); + break; + default: + LOG.warn("Missing ORC compression modifiers for " + kind); + modifiers = null; + break; + } + + BufferedStream result = streams.get(name); + if (result == null) { + result = new BufferedStream(name.toString(), bufferSize, + codec == null ? codec : codec.modify(modifiers)); + streams.put(name, result); + } + return result.outStream; + } + + /** + * Get the next column id. + * @return a number from 0 to the number of columns - 1 + */ + public int getNextColumnId() { + return columnCount++; + } + + /** + * Get the current column id. After creating all tree writers this count should tell how many + * columns (including columns within nested complex objects) are created in total. + * @return current column id + */ + public int getCurrentColumnId() { + return columnCount; + } + + /** + * Get the stride rate of the row index. + */ + public int getRowIndexStride() { + return rowIndexStride; + } + + /** + * Should be building the row index. + * @return true if we are building the index + */ + public boolean buildIndex() { + return buildIndex; + } + + /** + * Is the ORC file compressed? + * @return are the streams compressed + */ + public boolean isCompressed() { + return codec != null; + } + + /** + * Get the encoding strategy to use. + * @return encoding strategy + */ + public OrcFile.EncodingStrategy getEncodingStrategy() { + return encodingStrategy; + } + + /** + * Get the compression strategy to use. + * @return compression strategy + */ + public OrcFile.CompressionStrategy getCompressionStrategy() { + return compressionStrategy; + } + + /** + * Get the bloom filter columns + * @return bloom filter columns + */ + public boolean[] getBloomFilterColumns() { + return bloomFilterColumns; + } + + /** + * Get bloom filter false positive percentage. + * @return fpp + */ + public double getBloomFilterFPP() { + return bloomFilterFpp; + } + + /** + * Get the writer's configuration. + * @return configuration + */ + public Configuration getConfiguration() { + return conf; + } + + /** + * Get the version of the file to write. + */ + public OrcFile.Version getVersion() { + return version; + } + + public void useWriterTimeZone(boolean val) { + writeTimeZone = val; + } + + public boolean hasWriterTimeZone() { + return writeTimeZone; + } + + public TimeZone getTimeZone() { + return timeZone; + } + } + + /** + * The parent class of all of the writers for each column. Each column + * is written by an instance of this class. The compound types (struct, + * list, map, and union) have children tree writers that write the children + * types. + */ + private abstract static class TreeWriter { + protected final int id; + protected final ObjectInspector inspector; + private final BitFieldWriter isPresent; + private final boolean isCompressed; + protected final ColumnStatisticsImpl indexStatistics; + protected final ColumnStatisticsImpl stripeColStatistics; + private final ColumnStatisticsImpl fileStatistics; + protected TreeWriter[] childrenWriters; + protected final RowIndexPositionRecorder rowIndexPosition; + private final OrcProto.RowIndex.Builder rowIndex; + private final OrcProto.RowIndexEntry.Builder rowIndexEntry; + private final PositionedOutputStream rowIndexStream; + private final PositionedOutputStream bloomFilterStream; + protected final BloomFilterIO bloomFilter; + protected final boolean createBloomFilter; + private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex; + private final OrcProto.BloomFilter.Builder bloomFilterEntry; + private boolean foundNulls; + private OutStream isPresentOutStream; + private final List stripeStatsBuilders; + private final StreamFactory streamFactory; + + /** + * Create a tree writer. + * @param columnId the column id of the column to write + * @param inspector the object inspector to use + * @param streamFactory limited access to the Writer's data. + * @param nullable can the value be null? + * @throws IOException + */ + TreeWriter(int columnId, ObjectInspector inspector, + StreamFactory streamFactory, + boolean nullable) throws IOException { + this.streamFactory = streamFactory; + this.isCompressed = streamFactory.isCompressed(); + this.id = columnId; + this.inspector = inspector; + if (nullable) { + isPresentOutStream = streamFactory.createStream(id, + OrcProto.Stream.Kind.PRESENT); + isPresent = new BitFieldWriter(isPresentOutStream, 1); + } else { + isPresent = null; + } + this.foundNulls = false; + createBloomFilter = streamFactory.getBloomFilterColumns()[columnId]; + indexStatistics = ColumnStatisticsImpl.create(inspector); + stripeColStatistics = ColumnStatisticsImpl.create(inspector); + fileStatistics = ColumnStatisticsImpl.create(inspector); + childrenWriters = new TreeWriter[0]; + rowIndex = OrcProto.RowIndex.newBuilder(); + rowIndexEntry = OrcProto.RowIndexEntry.newBuilder(); + rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry); + stripeStatsBuilders = Lists.newArrayList(); + if (streamFactory.buildIndex()) { + rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX); + } else { + rowIndexStream = null; + } + if (createBloomFilter) { + bloomFilterEntry = OrcProto.BloomFilter.newBuilder(); + bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder(); + bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER); + bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(), + streamFactory.getBloomFilterFPP()); + } else { + bloomFilterEntry = null; + bloomFilterIndex = null; + bloomFilterStream = null; + bloomFilter = null; + } + } + + protected OrcProto.RowIndex.Builder getRowIndex() { + return rowIndex; + } + + protected ColumnStatisticsImpl getStripeStatistics() { + return stripeColStatistics; + } + + protected ColumnStatisticsImpl getFileStatistics() { + return fileStatistics; + } + + protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() { + return rowIndexEntry; + } + + IntegerWriter createIntegerWriter(PositionedOutputStream output, + boolean signed, boolean isDirectV2, + StreamFactory writer) { + if (isDirectV2) { + boolean alignedBitpacking = false; + if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) { + alignedBitpacking = true; + } + return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking); + } else { + return new RunLengthIntegerWriter(output, signed); + } + } + + boolean isNewWriteFormat(StreamFactory writer) { + return writer.getVersion() != OrcFile.Version.V_0_11; + } + + /** + * Add a new value to the column. + * @param datum + * @throws IOException + */ + void write(Datum datum) throws IOException { + if (datum != null && datum.isNotNull()) { + indexStatistics.increment(); + } else { + indexStatistics.setNull(); + } + if (isPresent != null) { + if(datum == null || datum.isNull()) { + foundNulls = true; + isPresent.write(0); + } + else { + isPresent.write(1); + } + } + } + + void write(Tuple tuple) throws IOException { + if (tuple != null) { + indexStatistics.increment(); + } else { + indexStatistics.setNull(); + } + if (isPresent != null) { + if (tuple == null) { + foundNulls = true; + isPresent.write(0); + } else { + isPresent.write(1); + } + } + } + + private void removeIsPresentPositions() { + for(int i=0; i < rowIndex.getEntryCount(); ++i) { + RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i); + List positions = entry.getPositionsList(); + // bit streams use 3 positions if uncompressed, 4 if compressed + positions = positions.subList(isCompressed ? 4 : 3, positions.size()); + entry.clearPositions(); + entry.addAllPositions(positions); + } + } + + /** + * Write the stripe out to the file. + * @param builder the stripe footer that contains the information about the + * layout of the stripe. The TreeWriter is required to update + * the footer with its information. + * @param requiredIndexEntries the number of index entries that are + * required. this is to check to make sure the + * row index is well formed. + * @throws IOException + */ + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + if (isPresent != null) { + isPresent.flush(); + + // if no nulls are found in a stream, then suppress the stream + if(!foundNulls) { + isPresentOutStream.suppress(); + // since isPresent bitstream is suppressed, update the index to + // remove the positions of the isPresent stream + if (rowIndexStream != null) { + removeIsPresentPositions(); + } + } + } + + // merge stripe-level column statistics to file statistics and write it to + // stripe statistics + OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder(); + writeStripeStatistics(stripeStatsBuilder, this); + stripeStatsBuilders.add(stripeStatsBuilder); + + // reset the flag for next stripe + foundNulls = false; + + builder.addColumns(getEncoding()); + if (streamFactory.hasWriterTimeZone()) { + builder.setWriterTimezone(streamFactory.getTimeZone().getID()); + } + if (rowIndexStream != null) { + if (rowIndex.getEntryCount() != requiredIndexEntries) { + throw new IllegalArgumentException("Column has wrong number of " + + "index entries found: " + rowIndex.getEntryCount() + " expected: " + + requiredIndexEntries); + } + rowIndex.build().writeTo(rowIndexStream); + rowIndexStream.flush(); + } + rowIndex.clear(); + rowIndexEntry.clear(); + + // write the bloom filter to out stream + if (bloomFilterStream != null) { + bloomFilterIndex.build().writeTo(bloomFilterStream); + bloomFilterStream.flush(); + bloomFilterIndex.clear(); + bloomFilterEntry.clear(); + } + } + + private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder, + TreeWriter treeWriter) { + treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics); + builder.addColStats(treeWriter.stripeColStatistics.serialize().build()); + treeWriter.stripeColStatistics.reset(); + for (TreeWriter child : treeWriter.getChildrenWriters()) { + writeStripeStatistics(builder, child); + } + } + + TreeWriter[] getChildrenWriters() { + return childrenWriters; + } + + /** + * Get the encoding for this column. + * @return the information about the encoding of this column + */ + OrcProto.ColumnEncoding getEncoding() { + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + + /** + * Create a row index entry with the previous location and the current + * index statistics. Also merges the index statistics into the file + * statistics before they are cleared. Finally, it records the start of the + * next index and ensures all of the children columns also create an entry. + * @throws IOException + */ + void createRowIndexEntry() throws IOException { + stripeColStatistics.merge(indexStatistics); + rowIndexEntry.setStatistics(indexStatistics.serialize()); + indexStatistics.reset(); + rowIndex.addEntry(rowIndexEntry); + rowIndexEntry.clear(); + addBloomFilterEntry(); + recordPosition(rowIndexPosition); + for(TreeWriter child: childrenWriters) { + child.createRowIndexEntry(); + } + } + + void addBloomFilterEntry() { + if (createBloomFilter) { + bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions()); + bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet())); + bloomFilterIndex.addBloomFilter(bloomFilterEntry.build()); + bloomFilter.reset(); + bloomFilterEntry.clear(); + } + } + + /** + * Record the current position in each of this column's streams. + * @param recorder where should the locations be recorded + * @throws IOException + */ + void recordPosition(PositionRecorder recorder) throws IOException { + if (isPresent != null) { + isPresent.getPosition(recorder); + } + } + + /** + * Estimate how much memory the writer is consuming excluding the streams. + * @return the number of bytes. + */ + long estimateMemory() { + long result = 0; + for (TreeWriter child: childrenWriters) { + result += child.estimateMemory(); + } + return result; + } + } + + private static class BooleanTreeWriter extends TreeWriter { + private final BitFieldWriter writer; + + BooleanTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + PositionedOutputStream out = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.writer = new BitFieldWriter(out, 1); + recordPosition(rowIndexPosition); + } + + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + boolean val = datum.asBool(); + indexStatistics.updateBoolean(val); + writer.write(val ? 1 : 0); + } + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, requiredIndexEntries); + writer.flush(); + recordPosition(rowIndexPosition); + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + writer.getPosition(recorder); + } + } + + private static class ByteTreeWriter extends TreeWriter { + private final RunLengthByteWriter writer; + + ByteTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + this.writer = new RunLengthByteWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA)); + recordPosition(rowIndexPosition); + } + + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + byte val = datum.asByte(); + indexStatistics.updateInteger(val); + if (createBloomFilter) { + bloomFilter.addLong(val); + } + writer.write(val); + } + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, requiredIndexEntries); + writer.flush(); + recordPosition(rowIndexPosition); + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + writer.getPosition(recorder); + } + } + + private static class IntegerTreeWriter extends TreeWriter { + private final IntegerWriter writer; + private boolean isDirectV2 = true; + + IntegerTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + OutStream out = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.isDirectV2 = isNewWriteFormat(writer); + this.writer = createIntegerWriter(out, true, isDirectV2, writer); + recordPosition(rowIndexPosition); + } + + @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + long val; + if (datum instanceof Int4Datum || datum instanceof Inet4Datum) { + val = datum.asInt4(); + } else if (datum instanceof Int8Datum) { + val = datum.asInt8(); + } else { + val = datum.asInt2(); + } + indexStatistics.updateInteger(val); + if (createBloomFilter) { + // integers are converted to longs in column statistics and during SARG evaluation + bloomFilter.addLong(val); + } + writer.write(val); + } + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, requiredIndexEntries); + writer.flush(); + recordPosition(rowIndexPosition); + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + writer.getPosition(recorder); + } + } + + private static class FloatTreeWriter extends TreeWriter { + private final PositionedOutputStream stream; + private final SerializationUtils utils; + + FloatTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + this.stream = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.utils = new SerializationUtils(); + recordPosition(rowIndexPosition); + } + + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + float val = datum.asFloat4(); + indexStatistics.updateDouble(val); + if (createBloomFilter) { + // floats are converted to doubles in column statistics and during SARG evaluation + bloomFilter.addDouble(val); + } + utils.writeFloat(stream, val); + } + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, requiredIndexEntries); + stream.flush(); + recordPosition(rowIndexPosition); + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + stream.getPosition(recorder); + } + } + + private static class DoubleTreeWriter extends TreeWriter { + private final PositionedOutputStream stream; + private final SerializationUtils utils; + + DoubleTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + this.stream = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.utils = new SerializationUtils(); + recordPosition(rowIndexPosition); + } + + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + double val = datum.asFloat8(); + indexStatistics.updateDouble(val); + if (createBloomFilter) { + bloomFilter.addDouble(val); + } + utils.writeDouble(stream, val); + } + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, requiredIndexEntries); + stream.flush(); + recordPosition(rowIndexPosition); + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + stream.getPosition(recorder); + } + } + + private static class StringTreeWriter extends TreeWriter { + private static final int INITIAL_DICTIONARY_SIZE = 4096; + private final OutStream stringOutput; + private final IntegerWriter lengthOutput; + private final IntegerWriter rowOutput; + private final StringRedBlackTree dictionary = + new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); + private final DynamicIntArray rows = new DynamicIntArray(); + private final PositionedOutputStream directStreamOutput; + private final IntegerWriter directLengthOutput; + private final List savedRowIndex = + new ArrayList(); + private final boolean buildIndex; + private final List rowIndexValueCount = new ArrayList(); + // If the number of keys in a dictionary is greater than this fraction of + //the total number of non-null rows, turn off dictionary encoding + private final float dictionaryKeySizeThreshold; + private boolean useDictionaryEncoding = true; + private boolean isDirectV2 = true; + private boolean doneDictionaryCheck; + private final boolean strideDictionaryCheck; + + StringTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); + stringOutput = writer.createStream(id, + OrcProto.Stream.Kind.DICTIONARY_DATA); + lengthOutput = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); + rowOutput = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), false, isDirectV2, writer); + recordPosition(rowIndexPosition); + rowIndexValueCount.add(0L); + buildIndex = writer.buildIndex(); + directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA); + directLengthOutput = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); + dictionaryKeySizeThreshold = writer.getConfiguration().getFloat( + OrcConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, + OrcConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.defaultFloatVal); + strideDictionaryCheck = writer.getConfiguration().getBoolean( + OrcConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.varname, + OrcConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.defaultBoolVal); + doneDictionaryCheck = false; + } + + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + if (useDictionaryEncoding || !strideDictionaryCheck) { + rows.add(dictionary.add(datum.toString())); + } else { + // write data and length + directStreamOutput.write(datum.asByteArray(), 0, datum.size()); + directLengthOutput.write(datum.size()); + } + indexStatistics.updateString(datum.toString()); + if (createBloomFilter) { + bloomFilter.addBytes(datum.asByteArray(), datum.size()); + } + } + } + + private boolean checkDictionaryEncoding() { + if (!doneDictionaryCheck) { + // Set the flag indicating whether or not to use dictionary encoding + // based on whether or not the fraction of distinct keys over number of + // non-null rows is less than the configured threshold + float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f; + useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold; + doneDictionaryCheck = true; + } + return useDictionaryEncoding; + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + // if rows in stripe is less than dictionaryCheckAfterRows, dictionary + // checking would not have happened. So do it again here. + checkDictionaryEncoding(); + + if (useDictionaryEncoding) { + flushDictionary(); + } else { + // flushout any left over entries from dictionary + if (rows.size() > 0) { + flushDictionary(); + } + + // suppress the stream for every stripe if dictionary is disabled + stringOutput.suppress(); + } + + // we need to build the rowindex before calling super, since it + // writes it out. + super.writeStripe(builder, requiredIndexEntries); + stringOutput.flush(); + lengthOutput.flush(); + rowOutput.flush(); + directStreamOutput.flush(); + directLengthOutput.flush(); + // reset all of the fields to be ready for the next stripe. + dictionary.clear(); + savedRowIndex.clear(); + rowIndexValueCount.clear(); + recordPosition(rowIndexPosition); + rowIndexValueCount.add(0L); + + if (!useDictionaryEncoding) { + // record the start positions of first index stride of next stripe i.e + // beginning of the direct streams when dictionary is disabled + recordDirectStreamPosition(); + } + } + + private void flushDictionary() throws IOException { + final int[] dumpOrder = new int[dictionary.size()]; + + if (useDictionaryEncoding) { + // Write the dictionary by traversing the red-black tree writing out + // the bytes and lengths; and creating the map from the original order + // to the final sorted order. + + dictionary.visit(new StringRedBlackTree.Visitor() { + private int currentId = 0; + @Override + public void visit(StringRedBlackTree.VisitorContext context + ) throws IOException { + context.writeBytes(stringOutput); + lengthOutput.write(context.getLength()); + dumpOrder[context.getOriginalPosition()] = currentId++; + } + }); + } else { + // for direct encoding, we don't want the dictionary data stream + stringOutput.suppress(); + } + int length = rows.size(); + int rowIndexEntry = 0; + OrcProto.RowIndex.Builder rowIndex = getRowIndex(); + Text text = new Text(); + // write the values translated into the dump order. + for(int i = 0; i <= length; ++i) { + // now that we are writing out the row values, we can finalize the + // row index + if (buildIndex) { + while (i == rowIndexValueCount.get(rowIndexEntry) && + rowIndexEntry < savedRowIndex.size()) { + OrcProto.RowIndexEntry.Builder base = + savedRowIndex.get(rowIndexEntry++).toBuilder(); + if (useDictionaryEncoding) { + rowOutput.getPosition(new RowIndexPositionRecorder(base)); + } else { + PositionRecorder posn = new RowIndexPositionRecorder(base); + directStreamOutput.getPosition(posn); + directLengthOutput.getPosition(posn); + } + rowIndex.addEntry(base.build()); + } + } + if (i != length) { + if (useDictionaryEncoding) { + rowOutput.write(dumpOrder[rows.get(i)]); + } else { + dictionary.getText(text, rows.get(i)); + directStreamOutput.write(text.getBytes(), 0, text.getLength()); + directLengthOutput.write(text.getLength()); + } + } + } + rows.clear(); + } + + @Override + OrcProto.ColumnEncoding getEncoding() { + // Returns the encoding used for the last call to writeStripe + if (useDictionaryEncoding) { + if(isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DICTIONARY_V2). + setDictionarySize(dictionary.size()).build(); + } + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DICTIONARY). + setDictionarySize(dictionary.size()).build(); + } else { + if(isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder().setKind( + OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + } + + /** + * This method doesn't call the super method, because unlike most of the + * other TreeWriters, this one can't record the position in the streams + * until the stripe is being flushed. Therefore it saves all of the entries + * and augments them with the final information as the stripe is written. + * @throws IOException + */ + @Override + void createRowIndexEntry() throws IOException { + getStripeStatistics().merge(indexStatistics); + OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry(); + rowIndexEntry.setStatistics(indexStatistics.serialize()); + indexStatistics.reset(); + OrcProto.RowIndexEntry base = rowIndexEntry.build(); + savedRowIndex.add(base); + rowIndexEntry.clear(); + addBloomFilterEntry(); + recordPosition(rowIndexPosition); + rowIndexValueCount.add(Long.valueOf(rows.size())); + if (strideDictionaryCheck) { + checkDictionaryEncoding(); + } + if (!useDictionaryEncoding) { + if (rows.size() > 0) { + flushDictionary(); + // just record the start positions of next index stride + recordDirectStreamPosition(); + } else { + // record the start positions of next index stride + recordDirectStreamPosition(); + getRowIndex().addEntry(base); + } + } + } + + private void recordDirectStreamPosition() throws IOException { + directStreamOutput.getPosition(rowIndexPosition); + directLengthOutput.getPosition(rowIndexPosition); + } + + @Override + long estimateMemory() { + return rows.getSizeInBytes() + dictionary.getSizeInBytes(); + } + } + + /** + * Under the covers, char is written to ORC the same way as string. + */ + private static class CharTreeWriter extends StringTreeWriter { + + CharTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + } + } + + /** + * Under the covers, varchar is written to ORC the same way as string. + */ + private static class VarcharTreeWriter extends StringTreeWriter { + + VarcharTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + } + } + + private static class BinaryTreeWriter extends TreeWriter { + private final PositionedOutputStream stream; + private final IntegerWriter length; + private boolean isDirectV2 = true; + + BinaryTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + this.stream = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.isDirectV2 = isNewWriteFormat(writer); + this.length = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); + recordPosition(rowIndexPosition); + } + + @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + stream.write(datum.asByteArray(), 0, datum.size()); + length.write(datum.size()); + indexStatistics.updateBinary(datum); + if (createBloomFilter) { + bloomFilter.addBytes(datum.asByteArray(), datum.size()); + } + } + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, requiredIndexEntries); + stream.flush(); + length.flush(); + recordPosition(rowIndexPosition); + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + stream.getPosition(recorder); + length.getPosition(recorder); + } + } + + static final int MILLIS_PER_SECOND = 1000; + static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00"; + + private static class TimestampTreeWriter extends TreeWriter { + private final IntegerWriter seconds; + private final IntegerWriter nanos; + private final boolean isDirectV2; + private final long base_timestamp; + private TimeZone timeZone; + + TimestampTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); + this.seconds = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), true, isDirectV2, writer); + this.nanos = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer); + recordPosition(rowIndexPosition); + // for unit tests to set different time zones + this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND; + writer.useWriterTimeZone(true); + timeZone = writer.getTimeZone(); + } + + @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + long javaTimestamp = DateTimeUtil.julianTimeToJavaTime(datum.asInt8()); + + // revise timestamp value depends on timezone + javaTimestamp += timeZone.getRawOffset(); + + Timestamp val = new Timestamp(javaTimestamp); + indexStatistics.updateTimestamp(val); + seconds.write((val.getTime() / MILLIS_PER_SECOND) - base_timestamp); + nanos.write(formatNanos(val.getNanos())); + if (createBloomFilter) { + bloomFilter.addLong(val.getTime()); + } + } + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, requiredIndexEntries); + seconds.flush(); + nanos.flush(); + recordPosition(rowIndexPosition); + } + + private static long formatNanos(int nanos) { + if (nanos == 0) { + return 0; + } else if (nanos % 100 != 0) { + return ((long) nanos) << 3; + } else { + nanos /= 100; + int trailingZeros = 1; + while (nanos % 10 == 0 && trailingZeros < 7) { + nanos /= 10; + trailingZeros += 1; + } + return ((long) nanos) << 3 | trailingZeros; + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + seconds.getPosition(recorder); + nanos.getPosition(recorder); + } + } + + private static class DateTreeWriter extends TreeWriter { + private final IntegerWriter writer; + private final boolean isDirectV2; + + DateTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + OutStream out = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.isDirectV2 = isNewWriteFormat(writer); + this.writer = createIntegerWriter(out, true, isDirectV2, writer); + recordPosition(rowIndexPosition); + } + + @Override + void write(Datum datum) throws IOException { + super.write(datum); + if (datum != null && datum.isNotNull()) { + int daysSinceEpoch = datum.asInt4() - DateTimeUtil.DAYS_FROM_JULIAN_TO_EPOCH; + // Using the Writable here as it's used directly for writing as well as for stats. + indexStatistics.updateDate(daysSinceEpoch); + writer.write(daysSinceEpoch); + if (createBloomFilter) { + bloomFilter.addLong(daysSinceEpoch); + } + } + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, requiredIndexEntries); + writer.flush(); + recordPosition(rowIndexPosition); + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + writer.getPosition(recorder); + } + + @Override + OrcProto.ColumnEncoding getEncoding() { + if (isDirectV2) { + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build(); + } + return OrcProto.ColumnEncoding.newBuilder() + .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); + } + } + + private static class StructTreeWriter extends TreeWriter { + private final List fields; + StructTreeWriter(int columnId, + ObjectInspector inspector, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, writer, nullable); + StructObjectInspector structObjectInspector = + (StructObjectInspector) inspector; + fields = structObjectInspector.getAllStructFieldRefs(); + childrenWriters = new TreeWriter[fields.size()]; + for(int i=0; i < childrenWriters.length; ++i) { + childrenWriters[i] = createTreeWriter( + fields.get(i).getFieldObjectInspector(), writer, true); + } + recordPosition(rowIndexPosition); + } + + @Override + void write(Datum datum) throws IOException { + } + + void writeTuple(Tuple tuple) throws IOException { + super.write(tuple); + if (tuple != null) { + for(int i = 0; i < fields.size(); ++i) { + TreeWriter writer = childrenWriters[i]; + writer.write(tuple.asDatum(i)); + } + } + } + + @Override + void writeStripe(OrcProto.StripeFooter.Builder builder, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, requiredIndexEntries); + for(TreeWriter child: childrenWriters) { + child.writeStripe(builder, requiredIndexEntries); + } + recordPosition(rowIndexPosition); + } + } + + private static TreeWriter createTreeWriter(ObjectInspector inspector, + StreamFactory streamFactory, + boolean nullable) throws IOException { + switch (inspector.getCategory()) { + case PRIMITIVE: + switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) { + case BOOLEAN: + case VOID: + return new BooleanTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + case BYTE: + return new ByteTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + case SHORT: + case INT: + case LONG: + return new IntegerTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + case FLOAT: + return new FloatTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + case DOUBLE: + return new DoubleTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + case STRING: + return new StringTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + case CHAR: + return new CharTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + case VARCHAR: + return new VarcharTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + case BINARY: + return new BinaryTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + case TIMESTAMP: + return new TimestampTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + case DATE: + return new DateTreeWriter(streamFactory.getNextColumnId(), + inspector, streamFactory, nullable); + default: + throw new IllegalArgumentException("Bad primitive category " + + ((PrimitiveObjectInspector) inspector).getPrimitiveCategory()); + } + case STRUCT: + return new StructTreeWriter(streamFactory.getNextColumnId(), inspector, + streamFactory, nullable); + default: + throw new IllegalArgumentException("Bad category: " + + inspector.getCategory()); + } + } + + private static void writeTypes(OrcProto.Footer.Builder builder, + TreeWriter treeWriter) { + OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); + switch (treeWriter.inspector.getCategory()) { + case PRIMITIVE: + switch (((PrimitiveObjectInspector) treeWriter.inspector). + getPrimitiveCategory()) { + case VOID: + case BOOLEAN: + type.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + type.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + type.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + type.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + type.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + type.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + type.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + type.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + // The char length needs to be written to file and should be available + // from the object inspector + CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo(); + type.setKind(Type.Kind.CHAR); + type.setMaximumLength(charTypeInfo.getLength()); + break; + case VARCHAR: + // The varchar length needs to be written to file and should be available + // from the object inspector + VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo(); + type.setKind(Type.Kind.VARCHAR); + type.setMaximumLength(typeInfo.getLength()); + break; + case BINARY: + type.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + type.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + type.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)((PrimitiveObjectInspector)treeWriter.inspector).getTypeInfo(); + type.setKind(OrcProto.Type.Kind.DECIMAL); + type.setPrecision(decTypeInfo.precision()); + type.setScale(decTypeInfo.scale()); + break; + default: + throw new IllegalArgumentException("Unknown primitive category: " + + ((PrimitiveObjectInspector) treeWriter.inspector). + getPrimitiveCategory()); + } + break; + case LIST: + type.setKind(OrcProto.Type.Kind.LIST); + type.addSubtypes(treeWriter.childrenWriters[0].id); + break; + case MAP: + type.setKind(OrcProto.Type.Kind.MAP); + type.addSubtypes(treeWriter.childrenWriters[0].id); + type.addSubtypes(treeWriter.childrenWriters[1].id); + break; + case STRUCT: + type.setKind(OrcProto.Type.Kind.STRUCT); + for(TreeWriter child: treeWriter.childrenWriters) { + type.addSubtypes(child.id); + } + for(StructField field: ((StructTreeWriter) treeWriter).fields) { + type.addFieldNames(field.getFieldName()); + } + break; + case UNION: + type.setKind(OrcProto.Type.Kind.UNION); + for(TreeWriter child: treeWriter.childrenWriters) { + type.addSubtypes(child.id); + } + break; + default: + throw new IllegalArgumentException("Unknown category: " + + treeWriter.inspector.getCategory()); + } + builder.addTypes(type); + for(TreeWriter child: treeWriter.childrenWriters) { + writeTypes(builder, child); + } + } + + @VisibleForTesting + FSDataOutputStream getStream() throws IOException { + if (rawWriter == null) { + rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE, + fs.getDefaultReplication(path), blockSize); + rawWriter.writeBytes(OrcFile.MAGIC); + headerLength = rawWriter.getPos(); + writer = new OutStream("metadata", bufferSize, codec, + new DirectStream(rawWriter)); + protobufWriter = CodedOutputStream.newInstance(writer); + } + return rawWriter; + } + + private void createRowIndexEntry() throws IOException { + treeWriter.createRowIndexEntry(); + rowsInIndex = 0; + } + + private void flushStripe() throws IOException { + getStream(); + if (buildIndex && rowsInIndex != 0) { + createRowIndexEntry(); + } + if (rowsInStripe != 0) { + if (callback != null) { + callback.preStripeWrite(callbackContext); + } + // finalize the data for the stripe + int requiredIndexEntries = rowIndexStride == 0 ? 0 : + (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); + OrcProto.StripeFooter.Builder builder = + OrcProto.StripeFooter.newBuilder(); + treeWriter.writeStripe(builder, requiredIndexEntries); + long indexSize = 0; + long dataSize = 0; + for(Map.Entry pair: streams.entrySet()) { + BufferedStream stream = pair.getValue(); + if (!stream.isSuppressed()) { + stream.flush(); + StreamName name = pair.getKey(); + long streamSize = pair.getValue().getOutputSize(); + builder.addStreams(OrcProto.Stream.newBuilder() + .setColumn(name.getColumn()) + .setKind(name.getKind()) + .setLength(streamSize)); + if (StreamName.Area.INDEX == name.getArea()) { + indexSize += streamSize; + } else { + dataSize += streamSize; + } + } + } + OrcProto.StripeFooter footer = builder.build(); + + // Do we need to pad the file so the stripe doesn't straddle a block + // boundary? + long start = rawWriter.getPos(); + final long currentStripeSize = indexSize + dataSize + footer.getSerializedSize(); + final long available = blockSize - (start % blockSize); + final long overflow = currentStripeSize - adjustedStripeSize; + final float availRatio = (float) available / (float) defaultStripeSize; + + if (availRatio > 0.0f && availRatio < 1.0f + && availRatio > paddingTolerance) { + // adjust default stripe size to fit into remaining space, also adjust + // the next stripe for correction based on the current stripe size + // and user specified padding tolerance. Since stripe size can overflow + // the default stripe size we should apply this correction to avoid + // writing portion of last stripe to next hdfs block. + float correction = overflow > 0 ? (float) overflow + / (float) adjustedStripeSize : 0.0f; + + // correction should not be greater than user specified padding + // tolerance + correction = correction > paddingTolerance ? paddingTolerance + : correction; + + // adjust next stripe size based on current stripe estimate correction + adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize)); + } else if (availRatio >= 1.0) { + adjustedStripeSize = defaultStripeSize; + } + + if (availRatio < paddingTolerance && addBlockPadding) { + long padding = blockSize - (start % blockSize); + byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; + LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)", + padding, availRatio, defaultStripeSize)); + start += padding; + while (padding > 0) { + int writeLen = (int) Math.min(padding, pad.length); + rawWriter.write(pad, 0, writeLen); + padding -= writeLen; + } + adjustedStripeSize = defaultStripeSize; + } else if (currentStripeSize < blockSize + && (start % blockSize) + currentStripeSize > blockSize) { + // even if you don't pad, reset the default stripe size when crossing a + // block boundary + adjustedStripeSize = defaultStripeSize; + } + + // write out the data streams + for(Map.Entry pair: streams.entrySet()) { + BufferedStream stream = pair.getValue(); + if (!stream.isSuppressed()) { + stream.spillTo(rawWriter); + } + stream.clear(); + } + footer.writeTo(protobufWriter); + protobufWriter.flush(); + writer.flush(); + long footerLength = rawWriter.getPos() - start - dataSize - indexSize; + OrcProto.StripeInformation dirEntry = + OrcProto.StripeInformation.newBuilder() + .setOffset(start) + .setNumberOfRows(rowsInStripe) + .setIndexLength(indexSize) + .setDataLength(dataSize) + .setFooterLength(footerLength).build(); + stripes.add(dirEntry); + rowCount += rowsInStripe; + rowsInStripe = 0; + } + } + + private long computeRawDataSize() { + long result = 0; + for (TreeWriter child : treeWriter.getChildrenWriters()) { + result += getRawDataSizeFromInspectors(child, child.inspector); + } + return result; + } + + private long getRawDataSizeFromInspectors(TreeWriter child, ObjectInspector oi) { + long total = 0; + switch (oi.getCategory()) { + case PRIMITIVE: + total += getRawDataSizeFromPrimitives(child, oi); + break; + case LIST: + case MAP: + case UNION: + case STRUCT: + for (TreeWriter tw : child.childrenWriters) { + total += getRawDataSizeFromInspectors(tw, tw.inspector); + } + break; + default: + LOG.debug("Unknown object inspector category."); + break; + } + return total; + } + + private long getRawDataSizeFromPrimitives(TreeWriter child, ObjectInspector oi) { + long result = 0; + long numVals = child.fileStatistics.getNumberOfValues(); + switch (((PrimitiveObjectInspector) oi).getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case FLOAT: + return numVals * JavaDataModel.get().primitive1(); + case LONG: + case DOUBLE: + return numVals * JavaDataModel.get().primitive2(); + case STRING: + case VARCHAR: + case CHAR: + // ORC strings are converted to java Strings. so use JavaDataModel to + // compute the overall size of strings + child = (StringTreeWriter) child; + StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics; + numVals = numVals == 0 ? 1 : numVals; + int avgStringLen = (int) (scs.getSum() / numVals); + return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen); + case DECIMAL: + return numVals * JavaDataModel.get().lengthOfDecimal(); + case DATE: + return numVals * JavaDataModel.get().lengthOfDate(); + case BINARY: + // get total length of binary blob + BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics; + return bcs.getSum(); + case TIMESTAMP: + return numVals * JavaDataModel.get().lengthOfTimestamp(); + default: + LOG.debug("Unknown primitive category."); + break; + } + + return result; + } + + private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { + switch (kind) { + case NONE: return OrcProto.CompressionKind.NONE; + case ZLIB: return OrcProto.CompressionKind.ZLIB; + case SNAPPY: return OrcProto.CompressionKind.SNAPPY; + case LZO: return OrcProto.CompressionKind.LZO; + default: + throw new IllegalArgumentException("Unknown compression " + kind); + } + } + + private void writeFileStatistics(OrcProto.Footer.Builder builder, + TreeWriter writer) throws IOException { + builder.addStatistics(writer.fileStatistics.serialize()); + for(TreeWriter child: writer.getChildrenWriters()) { + writeFileStatistics(builder, child); + } + } + + private int writeMetadata(long bodyLength) throws IOException { + getStream(); + OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder(); + for(OrcProto.StripeStatistics.Builder ssb : treeWriter.stripeStatsBuilders) { + builder.addStripeStats(ssb.build()); + } + + long startPosn = rawWriter.getPos(); + OrcProto.Metadata metadata = builder.build(); + metadata.writeTo(protobufWriter); + protobufWriter.flush(); + writer.flush(); + return (int) (rawWriter.getPos() - startPosn); + } + + private int writeFooter(long bodyLength) throws IOException { + getStream(); + OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder(); + builder.setContentLength(bodyLength); + builder.setHeaderLength(headerLength); + builder.setNumberOfRows(rowCount); + builder.setRowIndexStride(rowIndexStride); + // populate raw data size + rawDataSize = computeRawDataSize(); + // serialize the types + writeTypes(builder, treeWriter); + // add the stripe information + for(OrcProto.StripeInformation stripe: stripes) { + builder.addStripes(stripe); + } + // add the column statistics + writeFileStatistics(builder, treeWriter); + // add all of the user metadata + for(Map.Entry entry: userMetadata.entrySet()) { + builder.addMetadata(OrcProto.UserMetadataItem.newBuilder() + .setName(entry.getKey()).setValue(entry.getValue())); + } + long startPosn = rawWriter.getPos(); + OrcProto.Footer footer = builder.build(); + footer.writeTo(protobufWriter); + protobufWriter.flush(); + writer.flush(); + return (int) (rawWriter.getPos() - startPosn); + } + + private int writePostScript(int footerLength, int metadataLength) throws IOException { + OrcProto.PostScript.Builder builder = + OrcProto.PostScript.newBuilder() + .setCompression(writeCompressionKind(compress)) + .setFooterLength(footerLength) + .setMetadataLength(metadataLength) + .setMagic(OrcFile.MAGIC) + .addVersion(version.getMajor()) + .addVersion(version.getMinor()) + .setWriterVersion(OrcFile.WriterVersion.HIVE_8732.getId()); + if (compress != CompressionKind.NONE) { + builder.setCompressionBlockSize(bufferSize); + } + OrcProto.PostScript ps = builder.build(); + // need to write this uncompressed + long startPosn = rawWriter.getPos(); + ps.writeTo(rawWriter); + long length = rawWriter.getPos() - startPosn; + if (length > 255) { + throw new IllegalArgumentException("PostScript too large at " + length); + } + return (int) length; + } + + private long estimateStripeSize() { + long result = 0; + for(BufferedStream stream: streams.values()) { + result += stream.getBufferSize(); + } + result += treeWriter.estimateMemory(); + return result; + } + + @Override + public void addUserMetadata(String name, ByteBuffer value) { + userMetadata.put(name, ByteString.copyFrom(value)); + } + + public void addTuple(Tuple tuple) throws IOException { + ((StructTreeWriter)treeWriter).writeTuple(tuple); + rowsInStripe += 1; + if (buildIndex) { + rowsInIndex += 1; + + if (rowsInIndex >= rowIndexStride) { + createRowIndexEntry(); + } + } + memoryManager.addedRow(); + } + + @Override + public void close() throws IOException { + if (callback != null) { + callback.preFooterWrite(callbackContext); + } + // remove us from the memory manager so that we don't get any callbacks + memoryManager.removeWriter(path); + // actually close the file + flushStripe(); + int metadataLength = writeMetadata(rawWriter.getPos()); + int footerLength = writeFooter(rawWriter.getPos() - metadataLength); + rawWriter.writeByte(writePostScript(footerLength, metadataLength)); + rawWriter.close(); + } + + /** + * Raw data size will be compute when writing the file footer. Hence raw data + * size value will be available only after closing the writer. + */ + @Override + public long getRawDataSize() { + return rawDataSize; + } + + /** + * Row count gets updated when flushing the stripes. To get accurate row + * count call this method after writer is closed. + */ + @Override + public long getNumberOfRows() { + return rowCount; + } + + @Override + public long writeIntermediateFooter() throws IOException { + // flush any buffered rows + flushStripe(); + // write a footer + if (stripesAtLastFlush != stripes.size()) { + if (callback != null) { + callback.preFooterWrite(callbackContext); + } + int metaLength = writeMetadata(rawWriter.getPos()); + int footLength = writeFooter(rawWriter.getPos() - metaLength); + rawWriter.writeByte(writePostScript(footLength, metaLength)); + stripesAtLastFlush = stripes.size(); + ShimLoader.getHadoopShims().hflush(rawWriter); + } + return rawWriter.getPos(); + } + + @Override + public void appendStripe(byte[] stripe, int offset, int length, + StripeInformation stripeInfo, + OrcProto.StripeStatistics stripeStatistics) throws IOException { + checkArgument(stripe != null, "Stripe must not be null"); + checkArgument(length <= stripe.length, + "Specified length must not be greater specified array length"); + checkArgument(stripeInfo != null, "Stripe information must not be null"); + checkArgument(stripeStatistics != null, + "Stripe statistics must not be null"); + + getStream(); + long start = rawWriter.getPos(); + long stripeLen = length; + long availBlockSpace = blockSize - (start % blockSize); + + // see if stripe can fit in the current hdfs block, else pad the remaining + // space in the block + if (stripeLen < blockSize && stripeLen > availBlockSpace && + addBlockPadding) { + byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; + LOG.info(String.format("Padding ORC by %d bytes while merging..", + availBlockSpace)); + start += availBlockSpace; + while (availBlockSpace > 0) { + int writeLen = (int) Math.min(availBlockSpace, pad.length); + rawWriter.write(pad, 0, writeLen); + availBlockSpace -= writeLen; + } + } + + rawWriter.write(stripe); + rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues(); + rowCount += rowsInStripe; + + // since we have already written the stripe, just update stripe statistics + treeWriter.stripeStatsBuilders.add(stripeStatistics.toBuilder()); + + // update file level statistics + updateFileStatistics(stripeStatistics); + + // update stripe information + OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation + .newBuilder() + .setOffset(start) + .setNumberOfRows(rowsInStripe) + .setIndexLength(stripeInfo.getIndexLength()) + .setDataLength(stripeInfo.getDataLength()) + .setFooterLength(stripeInfo.getFooterLength()) + .build(); + stripes.add(dirEntry); + + // reset it after writing the stripe + rowsInStripe = 0; + } + + private void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics) { + List cs = stripeStatistics.getColStatsList(); + List allWriters = getAllColumnTreeWriters(treeWriter); + for (int i = 0; i < allWriters.size(); i++) { + allWriters.get(i).fileStatistics.merge(ColumnStatisticsImpl.deserialize(cs.get(i))); + } + } + + private List getAllColumnTreeWriters(TreeWriter rootTreeWriter) { + List result = Lists.newArrayList(); + getAllColumnTreeWritersImpl(rootTreeWriter, result); + return result; + } + + private void getAllColumnTreeWritersImpl(TreeWriter tw, + List result) { + result.add(tw); + for (TreeWriter child : tw.childrenWriters) { + getAllColumnTreeWritersImpl(child, result); + } + } + + @Override + public void appendUserMetadata(List userMetadata) { + if (userMetadata != null) { + for (UserMetadataItem item : userMetadata) { + this.userMetadata.put(item.getName(), item.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/c3c78fc2/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java new file mode 100644 index 0000000..d0a8fa7 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/ZlibCodec.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; +import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; +import org.apache.hadoop.hive.shims.ShimLoader; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.Inflater; + +class ZlibCodec implements CompressionCodec, DirectDecompressionCodec { + + private Boolean direct = null; + + private final int level; + private final int strategy; + + public ZlibCodec() { + level = Deflater.DEFAULT_COMPRESSION; + strategy = Deflater.DEFAULT_STRATEGY; + } + + private ZlibCodec(int level, int strategy) { + this.level = level; + this.strategy = strategy; + } + + @Override + public boolean compress(ByteBuffer in, ByteBuffer out, + ByteBuffer overflow) throws IOException { + Deflater deflater = new Deflater(level, true); + deflater.setStrategy(strategy); + int length = in.remaining(); + deflater.setInput(in.array(), in.arrayOffset() + in.position(), length); + deflater.finish(); + int outSize = 0; + int offset = out.arrayOffset() + out.position(); + while (!deflater.finished() && (length > outSize)) { + int size = deflater.deflate(out.array(), offset, out.remaining()); + out.position(size + out.position()); + outSize += size; + offset += size; + // if we run out of space in the out buffer, use the overflow + if (out.remaining() == 0) { + if (overflow == null) { + deflater.end(); + return false; + } + out = overflow; + offset = out.arrayOffset() + out.position(); + } + } + deflater.end(); + return length > outSize; + } + + @Override + public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + + if(in.isDirect() && out.isDirect()) { + directDecompress(in, out); + return; + } + + Inflater inflater = new Inflater(true); + inflater.setInput(in.array(), in.arrayOffset() + in.position(), + in.remaining()); + while (!(inflater.finished() || inflater.needsDictionary() || + inflater.needsInput())) { + try { + int count = inflater.inflate(out.array(), + out.arrayOffset() + out.position(), + out.remaining()); + out.position(count + out.position()); + } catch (DataFormatException dfe) { + throw new IOException("Bad compression data", dfe); + } + } + out.flip(); + inflater.end(); + in.position(in.limit()); + } + + @Override + public boolean isAvailable() { + if (direct == null) { + // see nowrap option in new Inflater(boolean) which disables zlib headers + try { + if (ShimLoader.getHadoopShims().getDirectDecompressor( + DirectCompressionType.ZLIB_NOHEADER) != null) { + direct = Boolean.valueOf(true); + } else { + direct = Boolean.valueOf(false); + } + } catch (UnsatisfiedLinkError ule) { + direct = Boolean.valueOf(false); + } + } + return direct.booleanValue(); + } + + @Override + public void directDecompress(ByteBuffer in, ByteBuffer out) + throws IOException { + DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims() + .getDirectDecompressor(DirectCompressionType.ZLIB_NOHEADER); + decompressShim.decompress(in, out); + out.flip(); // flip for read + } + + @Override + public CompressionCodec modify(@Nullable EnumSet modifiers) { + + if (modifiers == null) { + return this; + } + + int l = this.level; + int s = this.strategy; + + for (Modifier m : modifiers) { + switch (m) { + case BINARY: + /* filtered == less LZ77, more huffman */ + s = Deflater.FILTERED; + break; + case TEXT: + s = Deflater.DEFAULT_STRATEGY; + break; + case FASTEST: + // deflate_fast looking for 8 byte patterns + l = Deflater.BEST_SPEED; + break; + case FAST: + // deflate_fast looking for 16 byte patterns + l = Deflater.BEST_SPEED + 1; + break; + case DEFAULT: + // deflate_slow looking for 128 byte patterns + l = Deflater.DEFAULT_COMPRESSION; + break; + default: + break; + } + } + return new ZlibCodec(l, s); + } +}