orc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [1/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl.
Date Fri, 16 Jun 2017 18:29:58 GMT
Repository: orc
Updated Branches:
  refs/heads/master 8b103da92 -> ded204a4a


http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
new file mode 100644
index 0000000..ea4e0e6
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+
+/**
+ * The writers for the specific writers of each type. This provides
+ * the generic API that they must all implement.
+ */
+public interface TreeWriter {
+
+  /**
+   * Estimate the memory currently used to buffer the stripe.
+   * @return the number of bytes
+   */
+  long estimateMemory();
+
+  /**
+   * Estimate the memory used if the file was read into Hive's Writable
+   * types. This is used as an estimate for the query optimizer.
+   * @return the number of bytes
+   */
+  long getRawDataSize();
+
+  /**
+   * Write a VectorizedRowBath to the file. This is called by the WriterImpl
+   * at the top level.
+   * @param batch the list of all of the columns
+   * @param offset the first row from the batch to write
+   * @param length the number of rows to write
+   */
+  void writeRootBatch(VectorizedRowBatch batch, int offset,
+                      int length) throws IOException;
+
+  /**
+   * Write a ColumnVector to the file. This is called recursively by
+   * writeRootBatch.
+   * @param vector the data to write
+   * @param offset the first value offset to write.
+   * @param length the number of values to write
+   */
+  void writeBatch(ColumnVector vector, int offset,
+                  int length) throws IOException;
+
+  /**
+   * Create a row index entry at the current point in the stripe.
+   */
+  void createRowIndexEntry() throws IOException;
+
+  /**
+   * Write the stripe out to the file.
+   * @param stripeFooter the stripe footer that contains the information about the
+   *                layout of the stripe. The TreeWriterBase is required to update
+   *                the footer with its information.
+   * @param stats the stripe statistics information
+   * @param requiredIndexEntries the number of index entries that are
+   *                             required. this is to check to make sure the
+   *                             row index is well formed.
+   */
+  void writeStripe(OrcProto.StripeFooter.Builder stripeFooter,
+                   OrcProto.StripeStatistics.Builder stats,
+                   int requiredIndexEntries) throws IOException;
+
+  /**
+   * During a stripe append, we need to update the file statistics.
+   * @param stripeStatistics the statistics for the new stripe
+   */
+  void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics);
+
+  /**
+   * Add the file statistics to the file footer.
+   * @param footer the file footer builder
+   */
+  void writeFileStatistics(OrcProto.Footer.Builder footer);
+
+  public class Factory {
+    public static TreeWriter create(TypeDescription schema,
+                                    WriterContext streamFactory,
+                                    boolean nullable) throws IOException {
+      switch (schema.getCategory()) {
+        case BOOLEAN:
+          return new BooleanTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case BYTE:
+          return new ByteTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case SHORT:
+        case INT:
+        case LONG:
+          return new IntegerTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case FLOAT:
+          return new FloatTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case DOUBLE:
+          return new DoubleTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case STRING:
+          return new StringTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case CHAR:
+          return new CharTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case VARCHAR:
+          return new VarcharTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case BINARY:
+          return new BinaryTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case TIMESTAMP:
+          return new TimestampTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case DATE:
+          return new DateTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case DECIMAL:
+          return new DecimalTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case STRUCT:
+          return new StructTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case MAP:
+          return new MapTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case LIST:
+          return new ListTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case UNION:
+          return new UnionTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        default:
+          throw new IllegalArgumentException("Bad category: " +
+              schema.getCategory());
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
new file mode 100644
index 0000000..5cfde07
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.BitFieldWriter;
+import org.apache.orc.impl.ColumnStatisticsImpl;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.RunLengthIntegerWriter;
+import org.apache.orc.impl.RunLengthIntegerWriterV2;
+import org.apache.orc.impl.StreamName;
+import org.apache.orc.util.BloomFilter;
+import org.apache.orc.util.BloomFilterIO;
+import org.apache.orc.util.BloomFilterUtf8;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * The parent class of all of the writers for each column. Each column
+ * is written by an instance of this class. The compound types (struct,
+ * list, map, and union) have children tree writers that write the children
+ * types.
+ */
+public abstract class TreeWriterBase implements TreeWriter {
+  protected final int id;
+  protected final BitFieldWriter isPresent;
+  private final boolean isCompressed;
+  protected final ColumnStatisticsImpl indexStatistics;
+  protected final ColumnStatisticsImpl stripeColStatistics;
+  protected final ColumnStatisticsImpl fileStatistics;
+  protected final RowIndexPositionRecorder rowIndexPosition;
+  private final OrcProto.RowIndex.Builder rowIndex;
+  private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
+  protected final BloomFilter bloomFilter;
+  protected final BloomFilterUtf8 bloomFilterUtf8;
+  protected final boolean createBloomFilter;
+  private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+  private final OrcProto.BloomFilterIndex.Builder bloomFilterIndexUtf8;
+  protected final OrcProto.BloomFilter.Builder bloomFilterEntry;
+  private boolean foundNulls;
+  private OutStream isPresentOutStream;
+  private final WriterContext streamFactory;
+
+  /**
+   * Create a tree writer.
+   * @param columnId the column id of the column to write
+   * @param schema the row schema
+   * @param streamFactory limited access to the Writer's data.
+   * @param nullable can the value be null?
+   */
+  TreeWriterBase(int columnId,
+                 TypeDescription schema,
+                 WriterContext streamFactory,
+                 boolean nullable) throws IOException {
+    this.streamFactory = streamFactory;
+    this.isCompressed = streamFactory.isCompressed();
+    this.id = columnId;
+    if (nullable) {
+      isPresentOutStream = streamFactory.createStream(id,
+          OrcProto.Stream.Kind.PRESENT);
+      isPresent = new BitFieldWriter(isPresentOutStream, 1);
+    } else {
+      isPresent = null;
+    }
+    this.foundNulls = false;
+    createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
+    indexStatistics = ColumnStatisticsImpl.create(schema);
+    stripeColStatistics = ColumnStatisticsImpl.create(schema);
+    fileStatistics = ColumnStatisticsImpl.create(schema);
+    if (streamFactory.buildIndex()) {
+      rowIndex = OrcProto.RowIndex.newBuilder();
+      rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+      rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
+    } else {
+      rowIndex = null;
+      rowIndexEntry = null;
+      rowIndexPosition = null;
+    }
+    if (createBloomFilter) {
+      bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
+      if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) {
+        bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
+            streamFactory.getBloomFilterFPP());
+        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+      } else {
+        bloomFilter = null;
+        bloomFilterIndex = null;
+      }
+      bloomFilterUtf8 = new BloomFilterUtf8(streamFactory.getRowIndexStride(),
+          streamFactory.getBloomFilterFPP());
+      bloomFilterIndexUtf8 = OrcProto.BloomFilterIndex.newBuilder();
+    } else {
+      bloomFilterEntry = null;
+      bloomFilterIndex = null;
+      bloomFilterIndexUtf8 = null;
+      bloomFilter = null;
+      bloomFilterUtf8 = null;
+    }
+  }
+
+  protected OrcProto.RowIndex.Builder getRowIndex() {
+    return rowIndex;
+  }
+
+  protected ColumnStatisticsImpl getStripeStatistics() {
+    return stripeColStatistics;
+  }
+
+  protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
+    return rowIndexEntry;
+  }
+
+  IntegerWriter createIntegerWriter(PositionedOutputStream output,
+                                    boolean signed, boolean isDirectV2,
+                                    WriterContext writer) {
+    if (isDirectV2) {
+      boolean alignedBitpacking = false;
+      if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
+        alignedBitpacking = true;
+      }
+      return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
+    } else {
+      return new RunLengthIntegerWriter(output, signed);
+    }
+  }
+
+  boolean isNewWriteFormat(WriterContext writer) {
+    return writer.getVersion() != OrcFile.Version.V_0_11;
+  }
+
+  /**
+   * Handle the top level object write.
+   *
+   * This default method is used for all types except structs, which are the
+   * typical case. VectorizedRowBatch assumes the top level object is a
+   * struct, so we use the first column for all other types.
+   * @param batch the batch to write from
+   * @param offset the row to start on
+   * @param length the number of rows to write
+   */
+  public void writeRootBatch(VectorizedRowBatch batch, int offset,
+                             int length) throws IOException {
+    writeBatch(batch.cols[0], offset, length);
+  }
+
+  /**
+   * Write the values from the given vector from offset for length elements.
+   * @param vector the vector to write from
+   * @param offset the first value from the vector to write
+   * @param length the number of values from the vector to write
+   */
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    if (vector.noNulls) {
+      indexStatistics.increment(length);
+      if (isPresent != null) {
+        for (int i = 0; i < length; ++i) {
+          isPresent.write(1);
+        }
+      }
+    } else {
+      if (vector.isRepeating) {
+        boolean isNull = vector.isNull[0];
+        if (isPresent != null) {
+          for (int i = 0; i < length; ++i) {
+            isPresent.write(isNull ? 0 : 1);
+          }
+        }
+        if (isNull) {
+          foundNulls = true;
+          indexStatistics.setNull();
+        } else {
+          indexStatistics.increment(length);
+        }
+      } else {
+        // count the number of non-null values
+        int nonNullCount = 0;
+        for(int i = 0; i < length; ++i) {
+          boolean isNull = vector.isNull[i + offset];
+          if (!isNull) {
+            nonNullCount += 1;
+          }
+          if (isPresent != null) {
+            isPresent.write(isNull ? 0 : 1);
+          }
+        }
+        indexStatistics.increment(nonNullCount);
+        if (nonNullCount != length) {
+          foundNulls = true;
+          indexStatistics.setNull();
+        }
+      }
+    }
+  }
+
+  private void removeIsPresentPositions() {
+    for(int i=0; i < rowIndex.getEntryCount(); ++i) {
+      OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
+      List<Long> positions = entry.getPositionsList();
+      // bit streams use 3 positions if uncompressed, 4 if compressed
+      positions = positions.subList(isCompressed ? 4 : 3, positions.size());
+      entry.clearPositions();
+      entry.addAllPositions(positions);
+    }
+  }
+
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    if (isPresent != null) {
+      isPresent.flush();
+
+      // if no nulls are found in a stream, then suppress the stream
+      if(!foundNulls) {
+        isPresentOutStream.suppress();
+        // since isPresent bitstream is suppressed, update the index to
+        // remove the positions of the isPresent stream
+        if (rowIndex != null) {
+          removeIsPresentPositions();
+        }
+      }
+    }
+
+    // merge stripe-level column statistics to file statistics and write it to
+    // stripe statistics
+    fileStatistics.merge(stripeColStatistics);
+    stats.addColStats(stripeColStatistics.serialize());
+    stripeColStatistics.reset();
+
+    // reset the flag for next stripe
+    foundNulls = false;
+
+    builder.addColumns(getEncoding());
+    if (rowIndex != null) {
+      if (rowIndex.getEntryCount() != requiredIndexEntries) {
+        throw new IllegalArgumentException("Column has wrong number of " +
+             "index entries found: " + rowIndex.getEntryCount() + " expected: " +
+             requiredIndexEntries);
+      }
+      streamFactory.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
+      rowIndex.clear();
+      rowIndexEntry.clear();
+    }
+
+    // write the bloom filter to out stream
+    if (bloomFilterIndex != null) {
+      streamFactory.writeBloomFilter(new StreamName(id,
+          OrcProto.Stream.Kind.BLOOM_FILTER), bloomFilterIndex);
+      bloomFilterIndex.clear();
+    }
+    // write the bloom filter to out stream
+    if (bloomFilterIndexUtf8 != null) {
+      streamFactory.writeBloomFilter(new StreamName(id,
+          OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), bloomFilterIndexUtf8);
+      bloomFilterIndexUtf8.clear();
+    }
+  }
+
+  /**
+   * Get the encoding for this column.
+   * @return the information about the encoding of this column
+   */
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder builder =
+        OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    if (createBloomFilter) {
+      builder.setBloomEncoding(BloomFilterIO.Encoding.CURRENT.getId());
+    }
+    return builder;
+  }
+
+  /**
+   * Create a row index entry with the previous location and the current
+   * index statistics. Also merges the index statistics into the file
+   * statistics before they are cleared. Finally, it records the start of the
+   * next index and ensures all of the children columns also create an entry.
+   */
+  public void createRowIndexEntry() throws IOException {
+    stripeColStatistics.merge(indexStatistics);
+    rowIndexEntry.setStatistics(indexStatistics.serialize());
+    indexStatistics.reset();
+    rowIndex.addEntry(rowIndexEntry);
+    rowIndexEntry.clear();
+    addBloomFilterEntry();
+    recordPosition(rowIndexPosition);
+  }
+
+  void addBloomFilterEntry() {
+    if (createBloomFilter) {
+      if (bloomFilter != null) {
+        BloomFilterIO.serialize(bloomFilterEntry, bloomFilter);
+        bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+        bloomFilter.reset();
+      }
+      if (bloomFilterUtf8 != null) {
+        BloomFilterIO.serialize(bloomFilterEntry, bloomFilterUtf8);
+        bloomFilterIndexUtf8.addBloomFilter(bloomFilterEntry.build());
+        bloomFilterUtf8.reset();
+      }
+    }
+  }
+
+  @Override
+  public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+    fileStatistics.merge(ColumnStatisticsImpl.deserialize(stats.getColStats(id)));
+  }
+
+  /**
+   * Record the current position in each of this column's streams.
+   * @param recorder where should the locations be recorded
+   */
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    if (isPresent != null) {
+      isPresent.getPosition(recorder);
+    }
+  }
+
+  /**
+   * Estimate how much memory the writer is consuming excluding the streams.
+   * @return the number of bytes.
+   */
+  public long estimateMemory() {
+    long result = 0;
+    if (isPresent != null) {
+      result = isPresentOutStream.getBufferSize();
+    }
+    return result;
+  }
+
+  @Override
+  public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+    footer.addStatistics(fileStatistics.serialize());
+  }
+
+  static class RowIndexPositionRecorder implements PositionRecorder {
+    private final OrcProto.RowIndexEntry.Builder builder;
+
+    RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
+      this.builder = builder;
+    }
+
+    @Override
+    public void addPosition(long position) {
+      builder.addPositions(position);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java
new file mode 100644
index 0000000..5047f01
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.RunLengthByteWriter;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.util.List;
+
+public class UnionTreeWriter extends TreeWriterBase {
+  private final RunLengthByteWriter tags;
+  private final TreeWriter[] childrenWriters;
+
+  UnionTreeWriter(int columnId,
+                  TypeDescription schema,
+                  WriterContext writer,
+                  boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    List<TypeDescription> children = schema.getChildren();
+    childrenWriters = new TreeWriterBase[children.size()];
+    for (int i = 0; i < childrenWriters.length; ++i) {
+      childrenWriters[i] = Factory.create(children.get(i), writer, true);
+    }
+    tags =
+        new RunLengthByteWriter(writer.createStream(columnId,
+            OrcProto.Stream.Kind.DATA));
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    UnionColumnVector vec = (UnionColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        byte tag = (byte) vec.tags[0];
+        for (int i = 0; i < length; ++i) {
+          tags.write(tag);
+        }
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(tag);
+          }
+          bloomFilterUtf8.addLong(tag);
+        }
+        childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
+      }
+    } else {
+      // write the records in runs of the same tag
+      int[] currentStart = new int[vec.fields.length];
+      int[] currentLength = new int[vec.fields.length];
+      for (int i = 0; i < length; ++i) {
+        // only need to deal with the non-nulls, since the nulls were dealt
+        // with in the super method.
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          byte tag = (byte) vec.tags[offset + i];
+          tags.write(tag);
+          if (currentLength[tag] == 0) {
+            // start a new sequence
+            currentStart[tag] = i + offset;
+            currentLength[tag] = 1;
+          } else if (currentStart[tag] + currentLength[tag] == i + offset) {
+            // ok, we are extending the current run for that tag.
+            currentLength[tag] += 1;
+          } else {
+            // otherwise, we need to close off the old run and start a new one
+            childrenWriters[tag].writeBatch(vec.fields[tag],
+                currentStart[tag], currentLength[tag]);
+            currentStart[tag] = i + offset;
+            currentLength[tag] = 1;
+          }
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(tag);
+            }
+            bloomFilterUtf8.addLong(tag);
+          }
+        }
+      }
+      // write out any left over sequences
+      for (int tag = 0; tag < currentStart.length; ++tag) {
+        if (currentLength[tag] != 0) {
+          childrenWriters[tag].writeBatch(vec.fields[tag], currentStart[tag],
+              currentLength[tag]);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void createRowIndexEntry() throws IOException {
+    super.createRowIndexEntry();
+    for (TreeWriter child : childrenWriters) {
+      child.createRowIndexEntry();
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    tags.flush();
+    for (TreeWriter child : childrenWriters) {
+      child.writeStripe(builder, stats, requiredIndexEntries);
+    }
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    tags.getPosition(recorder);
+  }
+
+  @Override
+  public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+    super.updateFileStatistics(stats);
+    for (TreeWriter child : childrenWriters) {
+      child.updateFileStatistics(stats);
+    }
+  }
+
+  @Override
+  public long estimateMemory() {
+    long children = 0;
+    for (TreeWriter writer : childrenWriters) {
+      children += writer.estimateMemory();
+    }
+    return children + super.estimateMemory() + tags.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long result = 0;
+    for (TreeWriter writer : childrenWriters) {
+      result += writer.getRawDataSize();
+    }
+    return result;
+  }
+
+  @Override
+  public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+    super.writeFileStatistics(footer);
+    for (TreeWriter child : childrenWriters) {
+      child.writeFileStatistics(footer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java
new file mode 100644
index 0000000..17d3f61
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Under the covers, varchar is written to ORC the same way as string.
+ */
+public class VarcharTreeWriter extends StringBaseTreeWriter {
+  private final int maxLength;
+
+  VarcharTreeWriter(int columnId,
+                    TypeDescription schema,
+                    WriterContext writer,
+                    boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    maxLength = schema.getMaxLength();
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    BytesColumnVector vec = (BytesColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        int itemLength = Math.min(vec.length[0], maxLength);
+        if (useDictionaryEncoding) {
+          int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
+          for(int i=0; i < length; ++i) {
+            rows.add(id);
+          }
+        } else {
+          for(int i=0; i < length; ++i) {
+            directStreamOutput.write(vec.vector[0], vec.start[0],
+                itemLength);
+            lengthOutput.write(itemLength);
+          }
+        }
+        indexStatistics.updateString(vec.vector[0], vec.start[0],
+            itemLength, length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            // translate from UTF-8 to the default charset
+            bloomFilter.addString(new String(vec.vector[0],
+                vec.start[0], itemLength,
+                StandardCharsets.UTF_8));
+          }
+          bloomFilterUtf8.addBytes(vec.vector[0],
+              vec.start[0], itemLength);
+        }
+      }
+    } else {
+      for(int i=0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          int itemLength = Math.min(vec.length[offset + i], maxLength);
+          if (useDictionaryEncoding) {
+            rows.add(dictionary.add(vec.vector[offset + i],
+                vec.start[offset + i], itemLength));
+          } else {
+            directStreamOutput.write(vec.vector[offset + i],
+                vec.start[offset + i], itemLength);
+            lengthOutput.write(itemLength);
+          }
+          indexStatistics.updateString(vec.vector[offset + i],
+              vec.start[offset + i], itemLength, 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              // translate from UTF-8 to the default charset
+              bloomFilter.addString(new String(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength,
+                  StandardCharsets.UTF_8));
+            }
+            bloomFilterUtf8.addBytes(vec.vector[offset + i],
+                vec.start[offset + i], itemLength);
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
new file mode 100644
index 0000000..f11d519
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.StreamName;
+
+import java.io.IOException;
+
+public interface WriterContext {
+
+  /**
+     * Create a stream to store part of a column.
+     * @param column the column id for the stream
+     * @param kind the kind of stream
+     * @return The output outStream that the section needs to be written to.
+     */
+    OutStream createStream(int column,
+                           OrcProto.Stream.Kind kind
+                           ) throws IOException;
+
+    /**
+     * Get the stride rate of the row index.
+     */
+    int getRowIndexStride();
+
+    /**
+     * Should be building the row index.
+     * @return true if we are building the index
+     */
+    boolean buildIndex();
+
+    /**
+     * Is the ORC file compressed?
+     * @return are the streams compressed
+     */
+    boolean isCompressed();
+
+    /**
+     * Get the encoding strategy to use.
+     * @return encoding strategy
+     */
+    OrcFile.EncodingStrategy getEncodingStrategy();
+
+    /**
+     * Get the bloom filter columns
+     * @return bloom filter columns
+     */
+    boolean[] getBloomFilterColumns();
+
+    /**
+     * Get bloom filter false positive percentage.
+     * @return fpp
+     */
+    double getBloomFilterFPP();
+
+    /**
+     * Get the writer's configuration.
+     * @return configuration
+     */
+    Configuration getConfiguration();
+
+    /**
+     * Get the version of the file to write.
+     */
+    OrcFile.Version getVersion();
+
+    OrcFile.BloomFilterVersion getBloomFilterVersion();
+
+    void writeIndex(StreamName name,
+                    OrcProto.RowIndex.Builder index) throws IOException;
+
+    void writeBloomFilter(StreamName name,
+                          OrcProto.BloomFilterIndex.Builder bloom
+                          ) throws IOException;
+}


Mime
View raw message