spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-16060][SQL][FOLLOW-UP] add a wrapper solution for vectorized orc reader
Date Wed, 10 Jan 2018 07:16:41 GMT
Repository: spark
Updated Branches:
  refs/heads/master edf0a48c2 -> eaac60a1e


[SPARK-16060][SQL][FOLLOW-UP] add a wrapper solution for vectorized orc reader

## What changes were proposed in this pull request?

This is mostly from https://github.com/apache/spark/pull/13775

The wrapper solution is pretty good for string/binary type, as the ORC column vector doesn't
keep bytes in a continuous memory region, and has a significant overhead when copying the
data to Spark columnar batch. For other cases, the wrapper solution is almost same with the
current solution.

I think we can treat the wrapper solution as a baseline and keep improving the writing to
Spark solution.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20205 from cloud-fan/orc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eaac60a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eaac60a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eaac60a1

Branch: refs/heads/master
Commit: eaac60a1e20e29084b7151ffca964cfaa5ba99d1
Parents: edf0a48
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Wed Jan 10 15:16:27 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Wed Jan 10 15:16:27 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   7 +
 .../datasources/orc/OrcColumnVector.java        | 251 +++++++++++++++++++
 .../datasources/orc/OrcColumnarBatchReader.java | 106 +++++---
 .../datasources/orc/OrcFileFormat.scala         |   6 +-
 .../spark/sql/hive/orc/OrcReadBenchmark.scala   | 236 ++++++++++-------
 5 files changed, 490 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eaac60a1/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 74949db..36e802a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -391,6 +391,13 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val ORC_COPY_BATCH_TO_SPARK = buildConf("spark.sql.orc.copyBatchToSpark")
+    .doc("Whether or not to copy the ORC columnar batch to Spark columnar batch in the "
+
+      "vectorized ORC reader.")
+    .internal()
+    .booleanConf
+    .createWithDefault(false)
+
   val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
     .doc("When true, enable filter pushdown for ORC files.")
     .booleanConf

http://git-wip-us.apache.org/repos/asf/spark/blob/eaac60a1/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
new file mode 100644
index 0000000..f94c55d
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import java.math.BigDecimal;
+
+import org.apache.orc.storage.ql.exec.vector.*;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A column vector class wrapping Hive's ColumnVector. Because Spark ColumnarBatch only accepts
+ * Spark's vectorized.ColumnVector, this column vector is used to adapt Hive ColumnVector
with
+ * Spark ColumnarVector.
+ */
+public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVector {
+  private ColumnVector baseData;
+  private LongColumnVector longData;
+  private DoubleColumnVector doubleData;
+  private BytesColumnVector bytesData;
+  private DecimalColumnVector decimalData;
+  private TimestampColumnVector timestampData;
+  final private boolean isTimestamp;
+
+  private int batchSize;
+
+  OrcColumnVector(DataType type, ColumnVector vector) {
+    super(type);
+
+    if (type instanceof TimestampType) {
+      isTimestamp = true;
+    } else {
+      isTimestamp = false;
+    }
+
+    baseData = vector;
+    if (vector instanceof LongColumnVector) {
+      longData = (LongColumnVector) vector;
+    } else if (vector instanceof DoubleColumnVector) {
+      doubleData = (DoubleColumnVector) vector;
+    } else if (vector instanceof BytesColumnVector) {
+      bytesData = (BytesColumnVector) vector;
+    } else if (vector instanceof DecimalColumnVector) {
+      decimalData = (DecimalColumnVector) vector;
+    } else if (vector instanceof TimestampColumnVector) {
+      timestampData = (TimestampColumnVector) vector;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public int numNulls() {
+    if (baseData.isRepeating) {
+      if (baseData.isNull[0]) {
+        return batchSize;
+      } else {
+        return 0;
+      }
+    } else if (baseData.noNulls) {
+      return 0;
+    } else {
+      int count = 0;
+      for (int i = 0; i < batchSize; i++) {
+        if (baseData.isNull[i]) count++;
+      }
+      return count;
+    }
+  }
+
+  /* A helper method to get the row index in a column. */
+  private int getRowIndex(int rowId) {
+    return baseData.isRepeating ? 0 : rowId;
+  }
+
+  @Override
+  public boolean isNullAt(int rowId) {
+    return baseData.isNull[getRowIndex(rowId)];
+  }
+
+  @Override
+  public boolean getBoolean(int rowId) {
+    return longData.vector[getRowIndex(rowId)] == 1;
+  }
+
+  @Override
+  public boolean[] getBooleans(int rowId, int count) {
+    boolean[] res = new boolean[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getBoolean(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    return (byte) longData.vector[getRowIndex(rowId)];
+  }
+
+  @Override
+  public byte[] getBytes(int rowId, int count) {
+    byte[] res = new byte[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getByte(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    return (short) longData.vector[getRowIndex(rowId)];
+  }
+
+  @Override
+  public short[] getShorts(int rowId, int count) {
+    short[] res = new short[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getShort(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    return (int) longData.vector[getRowIndex(rowId)];
+  }
+
+  @Override
+  public int[] getInts(int rowId, int count) {
+    int[] res = new int[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getInt(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    int index = getRowIndex(rowId);
+    if (isTimestamp) {
+      return timestampData.time[index] * 1000 + timestampData.nanos[index] / 1000;
+    } else {
+      return longData.vector[index];
+    }
+  }
+
+  @Override
+  public long[] getLongs(int rowId, int count) {
+    long[] res = new long[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getLong(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    return (float) doubleData.vector[getRowIndex(rowId)];
+  }
+
+  @Override
+  public float[] getFloats(int rowId, int count) {
+    float[] res = new float[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getFloat(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    return doubleData.vector[getRowIndex(rowId)];
+  }
+
+  @Override
+  public double[] getDoubles(int rowId, int count) {
+    double[] res = new double[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getDouble(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public int getArrayLength(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getArrayOffset(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Decimal getDecimal(int rowId, int precision, int scale) {
+    BigDecimal data = decimalData.vector[getRowIndex(rowId)].getHiveDecimal().bigDecimalValue();
+    return Decimal.apply(data, precision, scale);
+  }
+
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    int index = getRowIndex(rowId);
+    BytesColumnVector col = bytesData;
+    return UTF8String.fromBytes(col.vector[index], col.start[index], col.length[index]);
+  }
+
+  @Override
+  public byte[] getBinary(int rowId) {
+    int index = getRowIndex(rowId);
+    byte[] binary = new byte[bytesData.length[index]];
+    System.arraycopy(bytesData.vector[index], bytesData.start[index], binary, 0, binary.length);
+    return binary;
+  }
+
+  @Override
+  public org.apache.spark.sql.vectorized.ColumnVector arrayData() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public org.apache.spark.sql.vectorized.ColumnVector getChildColumn(int ordinal) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/eaac60a1/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index 5c28d0e..36fdf2b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -51,13 +51,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
 
   /**
-   * The default size of batch. We use this value for both ORC and Spark consistently
-   * because they have different default values like the following.
+   * The default size of batch. We use this value for ORC reader to make it consistent with
Spark's
+   * columnar batch, because their default batch sizes are different like the following:
    *
    * - ORC's VectorizedRowBatch.DEFAULT_SIZE = 1024
    * - Spark's ColumnarBatch.DEFAULT_BATCH_SIZE = 4 * 1024
    */
-  public static final int DEFAULT_SIZE = 4 * 1024;
+  private static final int DEFAULT_SIZE = 4 * 1024;
 
   // ORC File Reader
   private Reader reader;
@@ -82,13 +82,18 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch>
{
   // Writable column vectors of the result columnar batch.
   private WritableColumnVector[] columnVectors;
 
-  /**
-   * The memory mode of the columnarBatch
-   */
+  // The wrapped ORC column vectors. It should be null if `copyToSpark` is true.
+  private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers;
+
+  // The memory mode of the columnarBatch
   private final MemoryMode MEMORY_MODE;
 
-  public OrcColumnarBatchReader(boolean useOffHeap) {
+  // Whether or not to copy the ORC columnar batch to Spark columnar batch.
+  private final boolean copyToSpark;
+
+  public OrcColumnarBatchReader(boolean useOffHeap, boolean copyToSpark) {
     MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
+    this.copyToSpark = copyToSpark;
   }
 
 
@@ -167,27 +172,61 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch>
{
     }
 
     int capacity = DEFAULT_SIZE;
-    if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
-      columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
-    } else {
-      columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
-    }
-    columnarBatch = new ColumnarBatch(resultSchema, columnVectors, capacity);
 
-    if (partitionValues.numFields() > 0) {
-      int partitionIdx = requiredFields.length;
-      for (int i = 0; i < partitionValues.numFields(); i++) {
-        ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
-        columnVectors[i + partitionIdx].setIsConstant();
+    if (copyToSpark) {
+      if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
+        columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
+      } else {
+        columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
       }
-    }
 
-    // Initialize the missing columns once.
-    for (int i = 0; i < requiredFields.length; i++) {
-      if (requestedColIds[i] == -1) {
-        columnVectors[i].putNulls(0, columnarBatch.capacity());
-        columnVectors[i].setIsConstant();
+      // Initialize the missing columns once.
+      for (int i = 0; i < requiredFields.length; i++) {
+        if (requestedColIds[i] == -1) {
+          columnVectors[i].putNulls(0, capacity);
+          columnVectors[i].setIsConstant();
+        }
+      }
+
+      if (partitionValues.numFields() > 0) {
+        int partitionIdx = requiredFields.length;
+        for (int i = 0; i < partitionValues.numFields(); i++) {
+          ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
+          columnVectors[i + partitionIdx].setIsConstant();
+        }
+      }
+
+      columnarBatch = new ColumnarBatch(resultSchema, columnVectors, capacity);
+    } else {
+      // Just wrap the ORC column vector instead of copying it to Spark column vector.
+      orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
+
+      for (int i = 0; i < requiredFields.length; i++) {
+        DataType dt = requiredFields[i].dataType();
+        int colId = requestedColIds[i];
+        // Initialize the missing columns once.
+        if (colId == -1) {
+          OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
+          missingCol.putNulls(0, capacity);
+          missingCol.setIsConstant();
+          orcVectorWrappers[i] = missingCol;
+        } else {
+          orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
+        }
       }
+
+      if (partitionValues.numFields() > 0) {
+        int partitionIdx = requiredFields.length;
+        for (int i = 0; i < partitionValues.numFields(); i++) {
+          DataType dt = partitionSchema.fields()[i].dataType();
+          OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
+          ColumnVectorUtils.populate(partitionCol, partitionValues, i);
+          partitionCol.setIsConstant();
+          orcVectorWrappers[partitionIdx + i] = partitionCol;
+        }
+      }
+
+      columnarBatch = new ColumnarBatch(resultSchema, orcVectorWrappers, capacity);
     }
   }
 
@@ -196,17 +235,26 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch>
{
    * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns.
    */
   private boolean nextBatch() throws IOException {
-    for (WritableColumnVector vector : columnVectors) {
-      vector.reset();
-    }
-    columnarBatch.setNumRows(0);
-
     recordReader.nextBatch(batch);
     int batchSize = batch.size;
     if (batchSize == 0) {
       return false;
     }
     columnarBatch.setNumRows(batchSize);
+
+    if (!copyToSpark) {
+      for (int i = 0; i < requiredFields.length; i++) {
+        if (requestedColIds[i] != -1) {
+          ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
+        }
+      }
+      return true;
+    }
+
+    for (WritableColumnVector vector : columnVectors) {
+      vector.reset();
+    }
+
     for (int i = 0; i < requiredFields.length; i++) {
       StructField field = requiredFields[i];
       WritableColumnVector toColumn = columnVectors[i];

http://git-wip-us.apache.org/repos/asf/spark/blob/eaac60a1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index b8bacfa..2dd314d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
@@ -150,6 +151,7 @@ class OrcFileFormat
     val sqlConf = sparkSession.sessionState.conf
     val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
     val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
+    val copyToSpark = sparkSession.sessionState.conf.getConf(SQLConf.ORC_COPY_BATCH_TO_SPARK)
 
     val broadcastedConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -183,8 +185,8 @@ class OrcFileFormat
 
         val taskContext = Option(TaskContext.get())
         if (enableVectorizedReader) {
-          val batchReader =
-            new OrcColumnarBatchReader(enableOffHeapColumnVector && taskContext.isDefined)
+          val batchReader = new OrcColumnarBatchReader(
+            enableOffHeapColumnVector && taskContext.isDefined, copyToSpark)
           batchReader.initialize(fileSplit, taskAttemptContext)
           batchReader.initBatch(
             reader.getSchema,

http://git-wip-us.apache.org/repos/asf/spark/blob/eaac60a1/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
index 37ed846..bf6efa7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
@@ -86,7 +86,7 @@ object OrcReadBenchmark {
   }
 
   def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
-    val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values)
+    val benchmark = new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values)
 
     withTempPath { dir =>
       withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
@@ -95,61 +95,73 @@ object OrcReadBenchmark {
 
         prepareTable(dir, spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM t1"))
 
-        sqlBenchmark.addCase("Native ORC MR") { _ =>
+        benchmark.addCase("Native ORC MR") { _ =>
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
             spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
           }
         }
 
-        sqlBenchmark.addCase("Native ORC Vectorized") { _ =>
+        benchmark.addCase("Native ORC Vectorized") { _ =>
           spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
         }
 
-        sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+        benchmark.addCase("Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
         SQL Single TINYINT Column Scan:          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1192 / 1221         13.2          75.8
      1.0X
-        Native ORC Vectorized                          161 /  170         97.5          10.3
      7.4X
-        Hive built-in ORC                             1399 / 1413         11.2          89.0
      0.9X
+        Native ORC MR                                 1135 / 1171         13.9          72.2
      1.0X
+        Native ORC Vectorized                          152 /  163        103.4          
9.7       7.5X
+        Native ORC Vectorized with copy                149 /  162        105.4          
9.5       7.6X
+        Hive built-in ORC                             1380 / 1384         11.4          87.7
      0.8X
 
         SQL Single SMALLINT Column Scan:         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1287 / 1333         12.2          81.8
      1.0X
-        Native ORC Vectorized                          164 /  172         95.6          10.5
      7.8X
-        Hive built-in ORC                             1629 / 1650          9.7         103.6
      0.8X
+        Native ORC MR                                 1182 / 1244         13.3          75.2
      1.0X
+        Native ORC Vectorized                          145 /  156        108.7          
9.2       8.2X
+        Native ORC Vectorized with copy                148 /  158        106.4          
9.4       8.0X
+        Hive built-in ORC                             1591 / 1636          9.9         101.2
      0.7X
 
         SQL Single INT Column Scan:              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1304 / 1388         12.1          82.9
      1.0X
-        Native ORC Vectorized                          227 /  240         69.3          14.4
      5.7X
-        Hive built-in ORC                             1866 / 1867          8.4         118.6
      0.7X
+        Native ORC MR                                 1271 / 1271         12.4          80.8
      1.0X
+        Native ORC Vectorized                          206 /  212         76.3          13.1
      6.2X
+        Native ORC Vectorized with copy                200 /  213         78.8          12.7
      6.4X
+        Hive built-in ORC                             1776 / 1787          8.9         112.9
      0.7X
 
         SQL Single BIGINT Column Scan:           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1331 / 1357         11.8          84.6
      1.0X
-        Native ORC Vectorized                          289 /  297         54.4          18.4
      4.6X
-        Hive built-in ORC                             1922 / 1929          8.2         122.2
      0.7X
+        Native ORC MR                                 1344 / 1355         11.7          85.4
      1.0X
+        Native ORC Vectorized                          258 /  268         61.0          16.4
      5.2X
+        Native ORC Vectorized with copy                252 /  257         62.4          16.0
      5.3X
+        Hive built-in ORC                             1818 / 1823          8.7         115.6
      0.7X
 
         SQL Single FLOAT Column Scan:            Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1410 / 1428         11.2          89.7
      1.0X
-        Native ORC Vectorized                          328 /  335         48.0          20.8
      4.3X
-        Hive built-in ORC                             1929 / 2012          8.2         122.6
      0.7X
+        Native ORC MR                                 1333 / 1352         11.8          84.8
      1.0X
+        Native ORC Vectorized                          310 /  324         50.7          19.7
      4.3X
+        Native ORC Vectorized with copy                312 /  320         50.4          19.9
      4.3X
+        Hive built-in ORC                             1904 / 1918          8.3         121.0
      0.7X
 
         SQL Single DOUBLE Column Scan:           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1467 / 1485         10.7          93.3
      1.0X
-        Native ORC Vectorized                          402 /  411         39.1          25.6
      3.6X
-        Hive built-in ORC                             2023 / 2042          7.8         128.6
      0.7X
+        Native ORC MR                                 1408 / 1585         11.2          89.5
      1.0X
+        Native ORC Vectorized                          359 /  368         43.8          22.8
      3.9X
+        Native ORC Vectorized with copy                364 /  371         43.2          23.2
      3.9X
+        Hive built-in ORC                             1881 / 1954          8.4         119.6
      0.7X
         */
-        sqlBenchmark.run()
+        benchmark.run()
       }
     }
   }
@@ -176,19 +188,26 @@ object OrcReadBenchmark {
           spark.sql("SELECT sum(c1), sum(length(c2)) FROM nativeOrcTable").collect()
         }
 
+        benchmark.addCase("Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(c1), sum(length(c2)) FROM nativeOrcTable").collect()
+          }
+        }
+
         benchmark.addCase("Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(c1), sum(length(c2)) FROM hiveOrcTable").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
         Int and String Scan:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 2729 / 2744          3.8         260.2
      1.0X
-        Native ORC Vectorized                         1318 / 1344          8.0         125.7
      2.1X
-        Hive built-in ORC                             3731 / 3782          2.8         355.8
      0.7X
+        Native ORC MR                                 2566 / 2592          4.1         244.7
      1.0X
+        Native ORC Vectorized                         1098 / 1113          9.6         104.7
      2.3X
+        Native ORC Vectorized with copy               1527 / 1593          6.9         145.6
      1.7X
+        Hive built-in ORC                             3561 / 3705          2.9         339.6
      0.7X
         */
         benchmark.run()
       }
@@ -205,63 +224,84 @@ object OrcReadBenchmark {
 
         prepareTable(dir, spark.sql("SELECT value % 2 AS p, value AS id FROM t1"), Some("p"))
 
-        benchmark.addCase("Read data column - Native ORC MR") { _ =>
+        benchmark.addCase("Data column - Native ORC MR") { _ =>
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
             spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
           }
         }
 
-        benchmark.addCase("Read data column - Native ORC Vectorized") { _ =>
+        benchmark.addCase("Data column - Native ORC Vectorized") { _ =>
           spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
         }
 
-        benchmark.addCase("Read data column - Hive built-in ORC") { _ =>
+        benchmark.addCase("Data column - Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Data column - Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
         }
 
-        benchmark.addCase("Read partition column - Native ORC MR") { _ =>
+        benchmark.addCase("Partition column - Native ORC MR") { _ =>
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
             spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
           }
         }
 
-        benchmark.addCase("Read partition column - Native ORC Vectorized") { _ =>
+        benchmark.addCase("Partition column - Native ORC Vectorized") { _ =>
           spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
         }
 
-        benchmark.addCase("Read partition column - Hive built-in ORC") { _ =>
+        benchmark.addCase("Partition column - Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Partition column - Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(p) FROM hiveOrcTable").collect()
         }
 
-        benchmark.addCase("Read both columns - Native ORC MR") { _ =>
+        benchmark.addCase("Both columns - Native ORC MR") { _ =>
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
             spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
           }
         }
 
-        benchmark.addCase("Read both columns - Native ORC Vectorized") { _ =>
+        benchmark.addCase("Both columns - Native ORC Vectorized") { _ =>
           spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
         }
 
-        benchmark.addCase("Read both columns - Hive built-in ORC") { _ =>
+        benchmark.addCase("Both column - Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Both columns - Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(p), sum(id) FROM hiveOrcTable").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
         Partitioned Table:                       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Read data column - Native ORC MR               1531 / 1536         10.3         
97.4       1.0X
-        Read data column - Native ORC Vectorized        295 /  298         53.3         
18.8       5.2X
-        Read data column - Hive built-in ORC           2125 / 2126          7.4         135.1
      0.7X
-        Read partition column - Native ORC MR          1049 / 1062         15.0         
66.7       1.5X
-        Read partition column - Native ORC Vectorized    54 /   57        290.1         
 3.4      28.2X
-        Read partition column - Hive built-in ORC      1282 / 1291         12.3         
81.5       1.2X
-        Read both columns - Native ORC MR              1594 / 1598          9.9         101.3
      1.0X
-        Read both columns - Native ORC Vectorized       332 /  336         47.4         
21.1       4.6X
-        Read both columns - Hive built-in ORC          2145 / 2187          7.3         136.4
      0.7X
+        Data only - Native ORC MR                      1447 / 1457         10.9         
92.0       1.0X
+        Data only - Native ORC Vectorized               256 /  266         61.4         
16.3       5.6X
+        Data only - Native ORC Vectorized with copy     263 /  273         59.8         
16.7       5.5X
+        Data only - Hive built-in ORC                  1960 / 1988          8.0         124.6
      0.7X
+        Partition only - Native ORC MR                 1039 / 1043         15.1         
66.0       1.4X
+        Partition only - Native ORC Vectorized           48 /   53        326.6         
 3.1      30.1X
+        Partition only - Native ORC Vectorized with copy 48 /   53        328.4         
 3.0      30.2X
+        Partition only - Hive built-in ORC             1234 / 1242         12.7         
78.4       1.2X
+        Both columns - Native ORC MR                   1465 / 1475         10.7         
93.1       1.0X
+        Both columns - Native ORC Vectorized            292 /  301         53.9         
18.6       5.0X
+        Both column - Native ORC Vectorized with copy   348 /  354         45.1         
22.2       4.2X
+        Both columns - Hive built-in ORC               2051 / 2060          7.7         130.4
      0.7X
         */
         benchmark.run()
       }
@@ -287,19 +327,26 @@ object OrcReadBenchmark {
           spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
         }
 
+        benchmark.addCase("Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
+          }
+        }
+
         benchmark.addCase("Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(length(c1)) FROM hiveOrcTable").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
         Repeated String:                         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1325 / 1328          7.9         126.4
      1.0X
-        Native ORC Vectorized                          320 /  330         32.8          30.5
      4.1X
-        Hive built-in ORC                             1971 / 1972          5.3         188.0
      0.7X
+        Native ORC MR                                 1271 / 1278          8.3         121.2
      1.0X
+        Native ORC Vectorized                          200 /  212         52.4          19.1
      6.4X
+        Native ORC Vectorized with copy                342 /  347         30.7          32.6
      3.7X
+        Hive built-in ORC                             1874 / 2105          5.6         178.7
      0.7X
         */
         benchmark.run()
       }
@@ -331,32 +378,42 @@ object OrcReadBenchmark {
             "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
         }
 
+        benchmark.addCase("Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT SUM(LENGTH(c2)) FROM nativeOrcTable " +
+              "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
+          }
+        }
+
         benchmark.addCase("Hive built-in ORC") { _ =>
           spark.sql("SELECT SUM(LENGTH(c2)) FROM hiveOrcTable " +
             "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
         String with Nulls Scan (0.0%):           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 2553 / 2554          4.1         243.4
      1.0X
-        Native ORC Vectorized                          953 /  954         11.0          90.9
      2.7X
-        Hive built-in ORC                             3875 / 3898          2.7         369.6
      0.7X
+        Native ORC MR                                 2394 / 2886          4.4         228.3
      1.0X
+        Native ORC Vectorized                          699 /  729         15.0          66.7
      3.4X
+        Native ORC Vectorized with copy                959 / 1025         10.9          91.5
      2.5X
+        Hive built-in ORC                             3899 / 3901          2.7         371.9
      0.6X
 
         String with Nulls Scan (0.5%):           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 2389 / 2408          4.4         227.8
      1.0X
-        Native ORC Vectorized                         1208 / 1209          8.7         115.2
      2.0X
-        Hive built-in ORC                             2940 / 2952          3.6         280.4
      0.8X
+        Native ORC MR                                 2234 / 2255          4.7         213.1
      1.0X
+        Native ORC Vectorized                          854 /  869         12.3          81.4
      2.6X
+        Native ORC Vectorized with copy               1099 / 1128          9.5         104.8
      2.0X
+        Hive built-in ORC                             2767 / 2793          3.8         263.9
      0.8X
 
         String with Nulls Scan (0.95%):          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1295 / 1311          8.1         123.5
      1.0X
-        Native ORC Vectorized                          449 /  457         23.4          42.8
      2.9X
-        Hive built-in ORC                             1649 / 1660          6.4         157.3
      0.8X
+        Native ORC MR                                 1166 / 1202          9.0         111.2
      1.0X
+        Native ORC Vectorized                          338 /  345         31.1          32.2
      3.5X
+        Native ORC Vectorized with copy                418 /  428         25.1          39.9
      2.8X
+        Hive built-in ORC                             1730 / 1761          6.1         164.9
      0.7X
         */
         benchmark.run()
       }
@@ -364,7 +421,7 @@ object OrcReadBenchmark {
   }
 
   def columnsBenchmark(values: Int, width: Int): Unit = {
-    val sqlBenchmark = new Benchmark(s"SQL Single Column Scan from $width columns", values)
+    val benchmark = new Benchmark(s"Single Column Scan from $width columns", values)
 
     withTempPath { dir =>
       withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
@@ -376,43 +433,52 @@ object OrcReadBenchmark {
 
         prepareTable(dir, spark.sql("SELECT * FROM t1"))
 
-        sqlBenchmark.addCase("Native ORC MR") { _ =>
+        benchmark.addCase("Native ORC MR") { _ =>
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
             spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
           }
         }
 
-        sqlBenchmark.addCase("Native ORC Vectorized") { _ =>
+        benchmark.addCase("Native ORC Vectorized") { _ =>
           spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
         }
 
-        sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+        benchmark.addCase("Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Hive built-in ORC") { _ =>
           spark.sql(s"SELECT sum(c$middle) FROM hiveOrcTable").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
-        SQL Single Column Scan from 100 columns: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
+        Single Column Scan from 100 columns:     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1103 / 1124          1.0        1052.0
      1.0X
-        Native ORC Vectorized                           92 /  100         11.4          87.9
     12.0X
-        Hive built-in ORC                              383 /  390          2.7         365.4
      2.9X
+        Native ORC MR                                 1050 / 1053          1.0        1001.1
      1.0X
+        Native ORC Vectorized                           95 /  101         11.0          90.9
     11.0X
+        Native ORC Vectorized with copy                 95 /  102         11.0          90.9
     11.0X
+        Hive built-in ORC                              348 /  358          3.0         331.8
      3.0X
 
-        SQL Single Column Scan from 200 columns: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
+        Single Column Scan from 200 columns:     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 2245 / 2250          0.5        2141.0
      1.0X
-        Native ORC Vectorized                          157 /  165          6.7         150.2
     14.3X
-        Hive built-in ORC                              587 /  593          1.8         559.4
      3.8X
+        Native ORC MR                                 2099 / 2108          0.5        2002.1
      1.0X
+        Native ORC Vectorized                          179 /  187          5.8         171.1
     11.7X
+        Native ORC Vectorized with copy                176 /  188          6.0         167.6
     11.9X
+        Hive built-in ORC                              562 /  581          1.9         535.9
      3.7X
 
-        SQL Single Column Scan from 300 columns: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
+        Single Column Scan from 300 columns:     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
  Relative
         ------------------------------------------------------------------------------------------------
-        Native ORC MR                                 3343 / 3350          0.3        3188.3
      1.0X
-        Native ORC Vectorized                          265 /  280          3.9         253.2
     12.6X
-        Hive built-in ORC                              828 /  842          1.3         789.8
      4.0X
+        Native ORC MR                                 3221 / 3246          0.3        3071.4
      1.0X
+        Native ORC Vectorized                          312 /  322          3.4         298.0
     10.3X
+        Native ORC Vectorized with copy                306 /  320          3.4         291.6
     10.5X
+        Hive built-in ORC                              815 /  824          1.3         777.3
      4.0X
         */
-        sqlBenchmark.run()
+        benchmark.run()
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message