Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 390B5200CA9 for ; Fri, 16 Jun 2017 20:30:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 344A8160BC0; Fri, 16 Jun 2017 18:30:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D569C160BEB for ; Fri, 16 Jun 2017 20:30:00 +0200 (CEST) Received: (qmail 9394 invoked by uid 500); 16 Jun 2017 18:30:00 -0000 Mailing-List: contact commits-help@orc.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@orc.apache.org Delivered-To: mailing list commits@orc.apache.org Received: (qmail 9370 invoked by uid 99); 16 Jun 2017 18:30:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Jun 2017 18:30:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2DD4BDF999; Fri, 16 Jun 2017 18:29:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: omalley@apache.org To: commits@orc.apache.org Date: Fri, 16 Jun 2017 18:29:59 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl. archived-at: Fri, 16 Jun 2017 18:30:03 -0000 http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java new file mode 100644 index 0000000..5835b5a --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.orc.BinaryColumnStatistics; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.IntegerWriter; +import org.apache.orc.impl.PositionRecorder; +import org.apache.orc.impl.PositionedOutputStream; + +import java.io.IOException; + +public class BinaryTreeWriter extends TreeWriterBase { + private final PositionedOutputStream stream; + private final IntegerWriter length; + private boolean isDirectV2 = true; + + public BinaryTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + this.stream = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.isDirectV2 = isNewWriteFormat(writer); + this.length = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + OrcProto.ColumnEncoding.Builder getEncoding() { + OrcProto.ColumnEncoding.Builder result = super.getEncoding(); + if (isDirectV2) { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2); + } else { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT); + } + return result; + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + BytesColumnVector vec = (BytesColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + for (int i = 0; i < length; ++i) { + stream.write(vec.vector[0], vec.start[0], + vec.length[0]); + this.length.write(vec.length[0]); + } + indexStatistics.updateBinary(vec.vector[0], vec.start[0], + vec.length[0], length); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]); + } + bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + stream.write(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + this.length.write(vec.length[offset + i]); + indexStatistics.updateBinary(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i], 1); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addBytes(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + } + bloomFilterUtf8.addBytes(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + } + } + } + } + } + + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + stream.flush(); + length.flush(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + stream.getPosition(recorder); + length.getPosition(recorder); + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + stream.getBufferSize() + + length.estimateMemory(); + } + + @Override + public long getRawDataSize() { + // get total length of binary blob + BinaryColumnStatistics bcs = (BinaryColumnStatistics) fileStatistics; + return bcs.getSum(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java new file mode 100644 index 0000000..5f572bd --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.BitFieldWriter; +import org.apache.orc.impl.PositionRecorder; +import org.apache.orc.impl.PositionedOutputStream; + +import java.io.IOException; + +public class BooleanTreeWriter extends TreeWriterBase { + private final BitFieldWriter writer; + + public BooleanTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + PositionedOutputStream out = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.writer = new BitFieldWriter(out, 1); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + LongColumnVector vec = (LongColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + int value = vec.vector[0] == 0 ? 0 : 1; + indexStatistics.updateBoolean(value != 0, length); + for (int i = 0; i < length; ++i) { + writer.write(value); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + int value = vec.vector[i + offset] == 0 ? 0 : 1; + writer.write(value); + indexStatistics.updateBoolean(value != 0, 1); + } + } + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + writer.flush(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + writer.getPosition(recorder); + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + writer.estimateMemory(); + } + + @Override + public long getRawDataSize() { + long num = fileStatistics.getNumberOfValues(); + return num * JavaDataModel.get().primitive1(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java new file mode 100644 index 0000000..edd6411 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.PositionRecorder; +import org.apache.orc.impl.RunLengthByteWriter; + +import java.io.IOException; + +public class ByteTreeWriter extends TreeWriterBase { + private final RunLengthByteWriter writer; + + public ByteTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + this.writer = new RunLengthByteWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA)); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + LongColumnVector vec = (LongColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + byte value = (byte) vec.vector[0]; + indexStatistics.updateInteger(value, length); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(value); + } + bloomFilterUtf8.addLong(value); + } + for (int i = 0; i < length; ++i) { + writer.write(value); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + byte value = (byte) vec.vector[i + offset]; + writer.write(value); + indexStatistics.updateInteger(value, 1); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(value); + } + bloomFilterUtf8.addLong(value); + } + } + } + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + writer.flush(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + writer.getPosition(recorder); + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + writer.estimateMemory(); + } + + @Override + public long getRawDataSize() { + long num = fileStatistics.getNumberOfValues(); + return num * JavaDataModel.get().primitive1(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java new file mode 100644 index 0000000..92a6bab --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.orc.TypeDescription;import org.apache.orc.impl.WriterImpl; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * Under the covers, char is written to ORC the same way as string. + */ +public class CharTreeWriter extends StringBaseTreeWriter { + private final int itemLength; + private final byte[] padding; + + CharTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + itemLength = schema.getMaxLength(); + padding = new byte[itemLength]; + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + BytesColumnVector vec = (BytesColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + byte[] ptr; + int ptrOffset; + if (vec.length[0] >= itemLength) { + ptr = vec.vector[0]; + ptrOffset = vec.start[0]; + } else { + ptr = padding; + ptrOffset = 0; + System.arraycopy(vec.vector[0], vec.start[0], ptr, 0, + vec.length[0]); + Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' '); + } + if (useDictionaryEncoding) { + int id = dictionary.add(ptr, ptrOffset, itemLength); + for(int i=0; i < length; ++i) { + rows.add(id); + } + } else { + for(int i=0; i < length; ++i) { + directStreamOutput.write(ptr, ptrOffset, itemLength); + lengthOutput.write(itemLength); + } + } + indexStatistics.updateString(ptr, ptrOffset, itemLength, length); + if (createBloomFilter) { + if (bloomFilter != null) { + // translate from UTF-8 to the default charset + bloomFilter.addString(new String(vec.vector[0], vec.start[0], + vec.length[0], StandardCharsets.UTF_8)); + } + bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + byte[] ptr; + int ptrOffset; + if (vec.length[offset + i] >= itemLength) { + ptr = vec.vector[offset + i]; + ptrOffset = vec.start[offset + i]; + } else { + // it is the wrong length, so copy it + ptr = padding; + ptrOffset = 0; + System.arraycopy(vec.vector[offset + i], vec.start[offset + i], + ptr, 0, vec.length[offset + i]); + Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' '); + } + if (useDictionaryEncoding) { + rows.add(dictionary.add(ptr, ptrOffset, itemLength)); + } else { + directStreamOutput.write(ptr, ptrOffset, itemLength); + lengthOutput.write(itemLength); + } + indexStatistics.updateString(ptr, ptrOffset, itemLength, 1); + if (createBloomFilter) { + if (bloomFilter != null) { + // translate from UTF-8 to the default charset + bloomFilter.addString(new String(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i], + StandardCharsets.UTF_8)); + } + bloomFilterUtf8.addBytes(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java new file mode 100644 index 0000000..d15fb13 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.IntegerWriter; +import org.apache.orc.impl.OutStream; +import org.apache.orc.impl.PositionRecorder; + +import java.io.IOException; + +public class DateTreeWriter extends TreeWriterBase { + private final IntegerWriter writer; + private final boolean isDirectV2; + + public DateTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + OutStream out = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.isDirectV2 = isNewWriteFormat(writer); + this.writer = createIntegerWriter(out, true, isDirectV2, writer); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + LongColumnVector vec = (LongColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + int value = (int) vec.vector[0]; + indexStatistics.updateDate(value); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(value); + } + bloomFilterUtf8.addLong(value); + } + for (int i = 0; i < length; ++i) { + writer.write(value); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + int value = (int) vec.vector[i + offset]; + writer.write(value); + indexStatistics.updateDate(value); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(value); + } + bloomFilterUtf8.addLong(value); + } + } + } + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + writer.flush(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + writer.getPosition(recorder); + } + + @Override + OrcProto.ColumnEncoding.Builder getEncoding() { + OrcProto.ColumnEncoding.Builder result = super.getEncoding(); + if (isDirectV2) { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2); + } else { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT); + } + return result; + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + writer.estimateMemory(); + } + + @Override + public long getRawDataSize() { + return fileStatistics.getNumberOfValues() * + JavaDataModel.get().lengthOfDate(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java new file mode 100644 index 0000000..0428253 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.IntegerWriter; +import org.apache.orc.impl.PositionRecorder; +import org.apache.orc.impl.PositionedOutputStream; + +import java.io.IOException; + +public class DecimalTreeWriter extends TreeWriterBase { + private final PositionedOutputStream valueStream; + + // These scratch buffers allow us to serialize decimals much faster. + private final long[] scratchLongs; + private final byte[] scratchBuffer; + + private final IntegerWriter scaleStream; + private final boolean isDirectV2; + + public DecimalTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); + valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA); + scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN]; + scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES]; + this.scaleStream = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + OrcProto.ColumnEncoding.Builder getEncoding() { + OrcProto.ColumnEncoding.Builder result = super.getEncoding(); + if (isDirectV2) { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2); + } else { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT); + } + return result; + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + DecimalColumnVector vec = (DecimalColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + HiveDecimalWritable value = vec.vector[0]; + indexStatistics.updateDecimal(value); + if (createBloomFilter) { + String str = value.toString(scratchBuffer); + if (bloomFilter != null) { + bloomFilter.addString(str); + } + bloomFilterUtf8.addString(str); + } + for (int i = 0; i < length; ++i) { + value.serializationUtilsWrite(valueStream, + scratchLongs); + scaleStream.write(value.scale()); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + HiveDecimalWritable value = vec.vector[i + offset]; + value.serializationUtilsWrite(valueStream, scratchLongs); + scaleStream.write(value.scale()); + indexStatistics.updateDecimal(value); + if (createBloomFilter) { + String str = value.toString(scratchBuffer); + if (bloomFilter != null) { + bloomFilter.addString(str); + } + bloomFilterUtf8.addString(str); + } + } + } + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + valueStream.flush(); + scaleStream.flush(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + valueStream.getPosition(recorder); + scaleStream.getPosition(recorder); + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + valueStream.getBufferSize() + + scaleStream.estimateMemory(); + } + + @Override + public long getRawDataSize() { + return fileStatistics.getNumberOfValues() * + JavaDataModel.get().lengthOfDecimal(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java new file mode 100644 index 0000000..d2c0db2 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.PositionRecorder; +import org.apache.orc.impl.PositionedOutputStream; +import org.apache.orc.impl.SerializationUtils; + +import java.io.IOException; + +public class DoubleTreeWriter extends TreeWriterBase { + private final PositionedOutputStream stream; + private final SerializationUtils utils; + + public DoubleTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + this.stream = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.utils = new SerializationUtils(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + DoubleColumnVector vec = (DoubleColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + double value = vec.vector[0]; + indexStatistics.updateDouble(value); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addDouble(value); + } + bloomFilterUtf8.addDouble(value); + } + for (int i = 0; i < length; ++i) { + utils.writeDouble(stream, value); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + double value = vec.vector[i + offset]; + utils.writeDouble(stream, value); + indexStatistics.updateDouble(value); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addDouble(value); + } + bloomFilterUtf8.addDouble(value); + } + } + } + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + stream.flush(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + stream.getPosition(recorder); + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + stream.getBufferSize(); + } + + @Override + public long getRawDataSize() { + long num = fileStatistics.getNumberOfValues(); + return num * JavaDataModel.get().primitive2(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java new file mode 100644 index 0000000..c825bf1 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.PositionRecorder; +import org.apache.orc.impl.PositionedOutputStream; +import org.apache.orc.impl.SerializationUtils; + +import java.io.IOException; + +public class FloatTreeWriter extends TreeWriterBase { + private final PositionedOutputStream stream; + private final SerializationUtils utils; + + public FloatTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + this.stream = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.utils = new SerializationUtils(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + DoubleColumnVector vec = (DoubleColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + float value = (float) vec.vector[0]; + indexStatistics.updateDouble(value); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addDouble(value); + } + bloomFilterUtf8.addDouble(value); + } + for (int i = 0; i < length; ++i) { + utils.writeFloat(stream, value); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + float value = (float) vec.vector[i + offset]; + utils.writeFloat(stream, value); + indexStatistics.updateDouble(value); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addDouble(value); + } + bloomFilterUtf8.addDouble(value); + } + } + } + } + } + + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + stream.flush(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + stream.getPosition(recorder); + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + stream.getBufferSize(); + } + + @Override + public long getRawDataSize() { + long num = fileStatistics.getNumberOfValues(); + return num * JavaDataModel.get().primitive1(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java new file mode 100644 index 0000000..6036ef5 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.IntegerWriter; +import org.apache.orc.impl.OutStream; +import org.apache.orc.impl.PositionRecorder; + +import java.io.IOException; + +public class IntegerTreeWriter extends TreeWriterBase { + private final IntegerWriter writer; + private boolean isDirectV2 = true; + private final boolean isLong; + + public IntegerTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + OutStream out = writer.createStream(id, + OrcProto.Stream.Kind.DATA); + this.isDirectV2 = isNewWriteFormat(writer); + this.writer = createIntegerWriter(out, true, isDirectV2, writer); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + this.isLong = schema.getCategory() == TypeDescription.Category.LONG; + } + + @Override + OrcProto.ColumnEncoding.Builder getEncoding() { + OrcProto.ColumnEncoding.Builder result = super.getEncoding(); + if (isDirectV2) { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2); + } else { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT); + } + return result; + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + LongColumnVector vec = (LongColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + long value = vec.vector[0]; + indexStatistics.updateInteger(value, length); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(value); + } + bloomFilterUtf8.addLong(value); + } + for (int i = 0; i < length; ++i) { + writer.write(value); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + long value = vec.vector[i + offset]; + writer.write(value); + indexStatistics.updateInteger(value, 1); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(value); + } + bloomFilterUtf8.addLong(value); + } + } + } + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + writer.flush(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + writer.getPosition(recorder); + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + writer.estimateMemory(); + } + + @Override + public long getRawDataSize() { + JavaDataModel jdm = JavaDataModel.get(); + long num = fileStatistics.getNumberOfValues(); + return num * (isLong ? jdm.primitive2() : jdm.primitive1()); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java new file mode 100644 index 0000000..2c5bd50 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.IntegerWriter; +import org.apache.orc.impl.PositionRecorder; +import org.apache.orc.impl.WriterImpl; + +import java.io.IOException; + +public class ListTreeWriter extends TreeWriterBase { + private final IntegerWriter lengths; + private final boolean isDirectV2; + private final TreeWriter childWriter; + + ListTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); + childWriter = Factory.create(schema.getChildren().get(0), writer, true); + lengths = createIntegerWriter(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + OrcProto.ColumnEncoding.Builder getEncoding() { + OrcProto.ColumnEncoding.Builder result = super.getEncoding(); + if (isDirectV2) { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2); + } else { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT); + } + return result; + } + + @Override + public void createRowIndexEntry() throws IOException { + super.createRowIndexEntry(); + childWriter.createRowIndexEntry(); + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + ListColumnVector vec = (ListColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + int childOffset = (int) vec.offsets[0]; + int childLength = (int) vec.lengths[0]; + for (int i = 0; i < length; ++i) { + lengths.write(childLength); + childWriter.writeBatch(vec.child, childOffset, childLength); + } + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(childLength); + } + bloomFilterUtf8.addLong(childLength); + } + } + } else { + // write the elements in runs + int currentOffset = 0; + int currentLength = 0; + for (int i = 0; i < length; ++i) { + if (!vec.isNull[i + offset]) { + int nextLength = (int) vec.lengths[offset + i]; + int nextOffset = (int) vec.offsets[offset + i]; + lengths.write(nextLength); + if (currentLength == 0) { + currentOffset = nextOffset; + currentLength = nextLength; + } else if (currentOffset + currentLength != nextOffset) { + childWriter.writeBatch(vec.child, currentOffset, + currentLength); + currentOffset = nextOffset; + currentLength = nextLength; + } else { + currentLength += nextLength; + } + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(nextLength); + } + bloomFilterUtf8.addLong(nextLength); + } + } + } + if (currentLength != 0) { + childWriter.writeBatch(vec.child, currentOffset, + currentLength); + } + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + lengths.flush(); + childWriter.writeStripe(builder, stats, requiredIndexEntries); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + lengths.getPosition(recorder); + } + + @Override + public void updateFileStatistics(OrcProto.StripeStatistics stats) { + super.updateFileStatistics(stats); + childWriter.updateFileStatistics(stats); + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + lengths.estimateMemory() + + childWriter.estimateMemory(); + } + + @Override + public long getRawDataSize() { + return childWriter.getRawDataSize(); + } + + @Override + public void writeFileStatistics(OrcProto.Footer.Builder footer) { + super.writeFileStatistics(footer); + childWriter.writeFileStatistics(footer); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java new file mode 100644 index 0000000..26ace05 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.IntegerWriter; +import org.apache.orc.impl.PositionRecorder; + +import java.io.IOException; +import java.util.List; + +public class MapTreeWriter extends TreeWriterBase { + private final IntegerWriter lengths; + private final boolean isDirectV2; + private final TreeWriter keyWriter; + private final TreeWriter valueWriter; + + MapTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); + List children = schema.getChildren(); + keyWriter = Factory.create(children.get(0), writer, true); + valueWriter = Factory.create(children.get(1), writer, true); + lengths = createIntegerWriter(writer.createStream(columnId, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + OrcProto.ColumnEncoding.Builder getEncoding() { + OrcProto.ColumnEncoding.Builder result = super.getEncoding(); + if (isDirectV2) { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2); + } else { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT); + } + return result; + } + + @Override + public void createRowIndexEntry() throws IOException { + super.createRowIndexEntry(); + keyWriter.createRowIndexEntry(); + valueWriter.createRowIndexEntry(); + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + MapColumnVector vec = (MapColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + int childOffset = (int) vec.offsets[0]; + int childLength = (int) vec.lengths[0]; + for (int i = 0; i < length; ++i) { + lengths.write(childLength); + keyWriter.writeBatch(vec.keys, childOffset, childLength); + valueWriter.writeBatch(vec.values, childOffset, childLength); + } + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(childLength); + } + bloomFilterUtf8.addLong(childLength); + } + } + } else { + // write the elements in runs + int currentOffset = 0; + int currentLength = 0; + for (int i = 0; i < length; ++i) { + if (!vec.isNull[i + offset]) { + int nextLength = (int) vec.lengths[offset + i]; + int nextOffset = (int) vec.offsets[offset + i]; + lengths.write(nextLength); + if (currentLength == 0) { + currentOffset = nextOffset; + currentLength = nextLength; + } else if (currentOffset + currentLength != nextOffset) { + keyWriter.writeBatch(vec.keys, currentOffset, + currentLength); + valueWriter.writeBatch(vec.values, currentOffset, + currentLength); + currentOffset = nextOffset; + currentLength = nextLength; + } else { + currentLength += nextLength; + } + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(nextLength); + } + bloomFilterUtf8.addLong(nextLength); + } + } + } + if (currentLength != 0) { + keyWriter.writeBatch(vec.keys, currentOffset, + currentLength); + valueWriter.writeBatch(vec.values, currentOffset, + currentLength); + } + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + lengths.flush(); + keyWriter.writeStripe(builder, stats, requiredIndexEntries); + valueWriter.writeStripe(builder, stats, requiredIndexEntries); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + lengths.getPosition(recorder); + } + + @Override + public void updateFileStatistics(OrcProto.StripeStatistics stats) { + super.updateFileStatistics(stats); + keyWriter.updateFileStatistics(stats); + valueWriter.updateFileStatistics(stats); + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + lengths.estimateMemory() + + keyWriter.estimateMemory() + valueWriter.estimateMemory(); + } + + @Override + public long getRawDataSize() { + return keyWriter.getRawDataSize() + valueWriter.getRawDataSize(); + } + + @Override + public void writeFileStatistics(OrcProto.Footer.Builder footer) { + super.writeFileStatistics(footer); + keyWriter.writeFileStatistics(footer); + valueWriter.writeFileStatistics(footer); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java new file mode 100644 index 0000000..f49cb7f --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.io.Text; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcProto; +import org.apache.orc.StringColumnStatistics; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.DynamicIntArray; +import org.apache.orc.impl.IntegerWriter; +import org.apache.orc.impl.OutStream; +import org.apache.orc.impl.PositionRecorder; +import org.apache.orc.impl.PositionedOutputStream; +import org.apache.orc.impl.StringRedBlackTree; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public abstract class StringBaseTreeWriter extends TreeWriterBase { + private static final int INITIAL_DICTIONARY_SIZE = 4096; + private final OutStream stringOutput; + protected final IntegerWriter lengthOutput; + private final IntegerWriter rowOutput; + protected final StringRedBlackTree dictionary = + new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); + protected final DynamicIntArray rows = new DynamicIntArray(); + protected final PositionedOutputStream directStreamOutput; + private final List savedRowIndex = + new ArrayList<>(); + private final boolean buildIndex; + private final List rowIndexValueCount = new ArrayList<>(); + // If the number of keys in a dictionary is greater than this fraction of + //the total number of non-null rows, turn off dictionary encoding + private final double dictionaryKeySizeThreshold; + protected boolean useDictionaryEncoding = true; + private boolean isDirectV2 = true; + private boolean doneDictionaryCheck; + private final boolean strideDictionaryCheck; + + StringBaseTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); + directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA); + stringOutput = writer.createStream(id, + OrcProto.Stream.Kind.DICTIONARY_DATA); + lengthOutput = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); + rowOutput = createIntegerWriter(directStreamOutput, false, isDirectV2, + writer); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + rowIndexValueCount.add(0L); + buildIndex = writer.buildIndex(); + Configuration conf = writer.getConfiguration(); + dictionaryKeySizeThreshold = + OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf); + strideDictionaryCheck = + OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf); + doneDictionaryCheck = false; + } + + private void checkDictionaryEncoding() { + if (!doneDictionaryCheck) { + // Set the flag indicating whether or not to use dictionary encoding + // based on whether or not the fraction of distinct keys over number of + // non-null rows is less than the configured threshold + float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f; + useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold; + doneDictionaryCheck = true; + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + // if rows in stripe is less than dictionaryCheckAfterRows, dictionary + // checking would not have happened. So do it again here. + checkDictionaryEncoding(); + + if (useDictionaryEncoding) { + flushDictionary(); + } else { + // flushout any left over entries from dictionary + if (rows.size() > 0) { + flushDictionary(); + } + + // suppress the stream for every stripe if dictionary is disabled + stringOutput.suppress(); + } + + // we need to build the rowindex before calling super, since it + // writes it out. + super.writeStripe(builder, stats, requiredIndexEntries); + if (useDictionaryEncoding) { + stringOutput.flush(); + lengthOutput.flush(); + rowOutput.flush(); + } else { + directStreamOutput.flush(); + lengthOutput.flush(); + } + // reset all of the fields to be ready for the next stripe. + dictionary.clear(); + savedRowIndex.clear(); + rowIndexValueCount.clear(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + rowIndexValueCount.add(0L); + + if (!useDictionaryEncoding) { + // record the start positions of first index stride of next stripe i.e + // beginning of the direct streams when dictionary is disabled + recordDirectStreamPosition(); + } + } + + private void flushDictionary() throws IOException { + final int[] dumpOrder = new int[dictionary.size()]; + + if (useDictionaryEncoding) { + // Write the dictionary by traversing the red-black tree writing out + // the bytes and lengths; and creating the map from the original order + // to the final sorted order. + + dictionary.visit(new StringRedBlackTree.Visitor() { + private int currentId = 0; + + @Override + public void visit(StringRedBlackTree.VisitorContext context + ) throws IOException { + context.writeBytes(stringOutput); + lengthOutput.write(context.getLength()); + dumpOrder[context.getOriginalPosition()] = currentId++; + } + }); + } else { + // for direct encoding, we don't want the dictionary data stream + stringOutput.suppress(); + } + int length = rows.size(); + int rowIndexEntry = 0; + OrcProto.RowIndex.Builder rowIndex = getRowIndex(); + Text text = new Text(); + // write the values translated into the dump order. + for (int i = 0; i <= length; ++i) { + // now that we are writing out the row values, we can finalize the + // row index + if (buildIndex) { + while (i == rowIndexValueCount.get(rowIndexEntry) && + rowIndexEntry < savedRowIndex.size()) { + OrcProto.RowIndexEntry.Builder base = + savedRowIndex.get(rowIndexEntry++).toBuilder(); + if (useDictionaryEncoding) { + rowOutput.getPosition(new RowIndexPositionRecorder(base)); + } else { + PositionRecorder posn = new RowIndexPositionRecorder(base); + directStreamOutput.getPosition(posn); + lengthOutput.getPosition(posn); + } + rowIndex.addEntry(base.build()); + } + } + if (i != length) { + if (useDictionaryEncoding) { + rowOutput.write(dumpOrder[rows.get(i)]); + } else { + dictionary.getText(text, rows.get(i)); + directStreamOutput.write(text.getBytes(), 0, text.getLength()); + lengthOutput.write(text.getLength()); + } + } + } + rows.clear(); + } + + @Override + OrcProto.ColumnEncoding.Builder getEncoding() { + OrcProto.ColumnEncoding.Builder result = super.getEncoding(); + if (useDictionaryEncoding) { + result.setDictionarySize(dictionary.size()); + if (isDirectV2) { + result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2); + } else { + result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY); + } + } else { + if (isDirectV2) { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2); + } else { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT); + } + } + return result; + } + + /** + * This method doesn't call the super method, because unlike most of the + * other TreeWriters, this one can't record the position in the streams + * until the stripe is being flushed. Therefore it saves all of the entries + * and augments them with the final information as the stripe is written. + */ + @Override + public void createRowIndexEntry() throws IOException { + getStripeStatistics().merge(indexStatistics); + OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry(); + rowIndexEntry.setStatistics(indexStatistics.serialize()); + indexStatistics.reset(); + OrcProto.RowIndexEntry base = rowIndexEntry.build(); + savedRowIndex.add(base); + rowIndexEntry.clear(); + addBloomFilterEntry(); + recordPosition(rowIndexPosition); + rowIndexValueCount.add((long) rows.size()); + if (strideDictionaryCheck) { + checkDictionaryEncoding(); + } + if (!useDictionaryEncoding) { + if (rows.size() > 0) { + flushDictionary(); + // just record the start positions of next index stride + recordDirectStreamPosition(); + } else { + // record the start positions of next index stride + recordDirectStreamPosition(); + getRowIndex().addEntry(base); + } + } + } + + private void recordDirectStreamPosition() throws IOException { + if (rowIndexPosition != null) { + directStreamOutput.getPosition(rowIndexPosition); + lengthOutput.getPosition(rowIndexPosition); + } + } + + @Override + public long estimateMemory() { + long parent = super.estimateMemory(); + if (useDictionaryEncoding) { + return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes(); + } else { + return parent + lengthOutput.estimateMemory() + + directStreamOutput.getBufferSize(); + } + } + + @Override + public long getRawDataSize() { + // ORC strings are converted to java Strings. so use JavaDataModel to + // compute the overall size of strings + StringColumnStatistics scs = (StringColumnStatistics) fileStatistics; + long numVals = fileStatistics.getNumberOfValues(); + if (numVals == 0) { + return 0; + } else { + int avgSize = (int) (scs.getSum() / numVals); + return numVals * JavaDataModel.get().lengthForStringOfLength(avgSize); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java new file mode 100644 index 0000000..ab6f38f --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class StringTreeWriter extends StringBaseTreeWriter { + StringTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + BytesColumnVector vec = (BytesColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + if (useDictionaryEncoding) { + int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]); + for (int i = 0; i < length; ++i) { + rows.add(id); + } + } else { + for (int i = 0; i < length; ++i) { + directStreamOutput.write(vec.vector[0], vec.start[0], + vec.length[0]); + lengthOutput.write(vec.length[0]); + } + } + indexStatistics.updateString(vec.vector[0], vec.start[0], + vec.length[0], length); + if (createBloomFilter) { + if (bloomFilter != null) { + // translate from UTF-8 to the default charset + bloomFilter.addString(new String(vec.vector[0], vec.start[0], + vec.length[0], StandardCharsets.UTF_8)); + } + bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + if (useDictionaryEncoding) { + rows.add(dictionary.add(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i])); + } else { + directStreamOutput.write(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + lengthOutput.write(vec.length[offset + i]); + } + indexStatistics.updateString(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i], 1); + if (createBloomFilter) { + if (bloomFilter != null) { + // translate from UTF-8 to the default charset + bloomFilter.addString(new String(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i], + StandardCharsets.UTF_8)); + } + bloomFilterUtf8.addBytes(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java new file mode 100644 index 0000000..9a1384d --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.util.List; + +public class StructTreeWriter extends TreeWriterBase { + final TreeWriter[] childrenWriters; + + public StructTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + List children = schema.getChildren(); + childrenWriters = new TreeWriterBase[children.size()]; + for (int i = 0; i < childrenWriters.length; ++i) { + childrenWriters[i] = Factory.create(children.get(i), writer, true); + } + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + public void writeRootBatch(VectorizedRowBatch batch, int offset, + int length) throws IOException { + // update the statistics for the root column + indexStatistics.increment(length); + // I'm assuming that the root column isn't nullable so that I don't need + // to update isPresent. + for (int i = 0; i < childrenWriters.length; ++i) { + childrenWriters[i].writeBatch(batch.cols[i], offset, length); + } + } + + private static void writeFields(StructColumnVector vector, + TreeWriter[] childrenWriters, + int offset, int length) throws IOException { + for (int field = 0; field < childrenWriters.length; ++field) { + childrenWriters[field].writeBatch(vector.fields[field], offset, length); + } + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + StructColumnVector vec = (StructColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + writeFields(vec, childrenWriters, offset, length); + } + } else if (vector.noNulls) { + writeFields(vec, childrenWriters, offset, length); + } else { + // write the records in runs + int currentRun = 0; + boolean started = false; + for (int i = 0; i < length; ++i) { + if (!vec.isNull[i + offset]) { + if (!started) { + started = true; + currentRun = i; + } + } else if (started) { + started = false; + writeFields(vec, childrenWriters, offset + currentRun, + i - currentRun); + } + } + if (started) { + writeFields(vec, childrenWriters, offset + currentRun, + length - currentRun); + } + } + } + + @Override + public void createRowIndexEntry() throws IOException { + super.createRowIndexEntry(); + for (TreeWriter child : childrenWriters) { + child.createRowIndexEntry(); + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + for (TreeWriter child : childrenWriters) { + child.writeStripe(builder, stats, requiredIndexEntries); + } + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + @Override + public void updateFileStatistics(OrcProto.StripeStatistics stats) { + super.updateFileStatistics(stats); + for (TreeWriter child : childrenWriters) { + child.updateFileStatistics(stats); + } + } + + @Override + public long estimateMemory() { + long result = 0; + for (TreeWriter writer : childrenWriters) { + result += writer.estimateMemory(); + } + return super.estimateMemory() + result; + } + + @Override + public long getRawDataSize() { + long result = 0; + for (TreeWriter writer : childrenWriters) { + result += writer.getRawDataSize(); + } + return result; + } + + @Override + public void writeFileStatistics(OrcProto.Footer.Builder footer) { + super.writeFileStatistics(footer); + for (TreeWriter child : childrenWriters) { + child.writeFileStatistics(footer); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java new file mode 100644 index 0000000..fae108e --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl.writer; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.orc.OrcProto; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.IntegerWriter; +import org.apache.orc.impl.PositionRecorder; +import org.apache.orc.impl.SerializationUtils; + +import java.io.IOException; +import java.sql.Timestamp; +import java.util.TimeZone; + +public class TimestampTreeWriter extends TreeWriterBase { + public static final int MILLIS_PER_SECOND = 1000; + public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00"; + + private final IntegerWriter seconds; + private final IntegerWriter nanos; + private final boolean isDirectV2; + private final TimeZone localTimezone; + private final long baseEpochSecsLocalTz; + + public TimestampTreeWriter(int columnId, + TypeDescription schema, + WriterContext writer, + boolean nullable) throws IOException { + super(columnId, schema, writer, nullable); + this.isDirectV2 = isNewWriteFormat(writer); + this.seconds = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.DATA), true, isDirectV2, writer); + this.nanos = createIntegerWriter(writer.createStream(id, + OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + this.localTimezone = TimeZone.getDefault(); + // for unit tests to set different time zones + this.baseEpochSecsLocalTz = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND; + } + + @Override + OrcProto.ColumnEncoding.Builder getEncoding() { + OrcProto.ColumnEncoding.Builder result = super.getEncoding(); + if (isDirectV2) { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2); + } else { + result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT); + } + return result; + } + + @Override + public void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + TimestampColumnVector vec = (TimestampColumnVector) vector; + Timestamp val; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + val = vec.asScratchTimestamp(0); + long millis = val.getTime(); + long utc = SerializationUtils.convertToUtc(localTimezone, millis); + indexStatistics.updateTimestamp(utc); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(millis); + } + bloomFilterUtf8.addLong(utc); + } + final long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz; + final long nano = formatNanos(val.getNanos()); + for (int i = 0; i < length; ++i) { + seconds.write(secs); + nanos.write(nano); + } + } + } else { + for (int i = 0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + val = vec.asScratchTimestamp(i + offset); + long millis = val.getTime(); + long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz; + long utc = SerializationUtils.convertToUtc(localTimezone, millis); + seconds.write(secs); + nanos.write(formatNanos(val.getNanos())); + indexStatistics.updateTimestamp(utc); + if (createBloomFilter) { + if (bloomFilter != null) { + bloomFilter.addLong(millis); + } + bloomFilterUtf8.addLong(utc); + } + } + } + } + } + + @Override + public void writeStripe(OrcProto.StripeFooter.Builder builder, + OrcProto.StripeStatistics.Builder stats, + int requiredIndexEntries) throws IOException { + super.writeStripe(builder, stats, requiredIndexEntries); + seconds.flush(); + nanos.flush(); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } + } + + private static long formatNanos(int nanos) { + if (nanos == 0) { + return 0; + } else if (nanos % 100 != 0) { + return ((long) nanos) << 3; + } else { + nanos /= 100; + int trailingZeros = 1; + while (nanos % 10 == 0 && trailingZeros < 7) { + nanos /= 10; + trailingZeros += 1; + } + return ((long) nanos) << 3 | trailingZeros; + } + } + + @Override + void recordPosition(PositionRecorder recorder) throws IOException { + super.recordPosition(recorder); + seconds.getPosition(recorder); + nanos.getPosition(recorder); + } + + @Override + public long estimateMemory() { + return super.estimateMemory() + seconds.estimateMemory() + + nanos.estimateMemory(); + } + + @Override + public long getRawDataSize() { + return fileStatistics.getNumberOfValues() * + JavaDataModel.get().lengthOfTimestamp(); + } +}