spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongj...@apache.org
Subject spark git commit: [SPARK-25931][SQL] Benchmarking creation of Jackson parser
Date Sat, 03 Nov 2018 16:09:44 GMT
Repository: spark
Updated Branches:
  refs/heads/master 0e318acd0 -> 42b6c1fb0


[SPARK-25931][SQL] Benchmarking creation of Jackson parser

## What changes were proposed in this pull request?

Added new benchmark which forcibly invokes Jackson parser to check overhead of its creation
for short and wide JSON strings. Existing benchmarks do not allow to check that due to an
optimisation introduced by #21909 for empty schema pushed down to JSON datasource. The `count()`
action passes empty schema as required schema to the datasource, and Jackson parser is not
created at all in that case.

Besides of new benchmark I also refactored existing benchmarks:
- Added `numIters` to control number of iteration in each benchmark
- Renamed `JSON per-line parsing` -> `count a short column`, `JSON parsing of wide lines`
-> `count a wide column`, and `Count a dataset with 10 columns` -> `Select a subset
of 10 columns`.

Closes #22920 from MaxGekk/json-benchmark-follow-up.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42b6c1fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42b6c1fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42b6c1fb

Branch: refs/heads/master
Commit: 42b6c1fb05ead89331791cd27ea7c97ff7fd8e16
Parents: 0e318ac
Author: Maxim Gekk <max.gekk@gmail.com>
Authored: Sat Nov 3 09:09:39 2018 -0700
Committer: Dongjoon Hyun <dongjoon@apache.org>
Committed: Sat Nov 3 09:09:39 2018 -0700

----------------------------------------------------------------------
 sql/core/benchmarks/JSONBenchmark-results.txt   |  35 +++--
 .../datasources/json/JsonBenchmark.scala        | 156 +++++++++++++------
 2 files changed, 131 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/42b6c1fb/sql/core/benchmarks/JSONBenchmark-results.txt
----------------------------------------------------------------------
diff --git a/sql/core/benchmarks/JSONBenchmark-results.txt b/sql/core/benchmarks/JSONBenchmark-results.txt
index 9993730..4774294 100644
--- a/sql/core/benchmarks/JSONBenchmark-results.txt
+++ b/sql/core/benchmarks/JSONBenchmark-results.txt
@@ -7,31 +7,42 @@ OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 JSON schema inferring:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-No encoding                                 62946 / 63310          1.6         629.5    
  1.0X
-UTF-8 is set                              112814 / 112866          0.9        1128.1    
  0.6X
+No encoding                                 71832 / 72149          1.4         718.3    
  1.0X
+UTF-8 is set                              101700 / 101819          1.0        1017.0    
  0.7X
 
 Preparing data for benchmarking ...
 OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
-JSON per-line parsing:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+count a short column:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-No encoding                                 16468 / 16553          6.1         164.7    
  1.0X
-UTF-8 is set                                16420 / 16441          6.1         164.2    
  1.0X
+No encoding                                 16501 / 16519          6.1         165.0    
  1.0X
+UTF-8 is set                                16477 / 16516          6.1         164.8    
  1.0X
 
 Preparing data for benchmarking ...
 OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
-JSON parsing of wide lines:              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+count a wide column:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-No encoding                                 39789 / 40053          0.3        3978.9    
  1.0X
-UTF-8 is set                                39505 / 39584          0.3        3950.5    
  1.0X
+No encoding                                 39871 / 40242          0.3        3987.1    
  1.0X
+UTF-8 is set                                39581 / 39721          0.3        3958.1    
  1.0X
 
+Preparing data for benchmarking ...
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Select a subset of 10 columns:           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Select 10 columns + count()                 16011 / 16033          0.6        1601.1    
  1.0X
+Select 1 column + count()                   14350 / 14392          0.7        1435.0    
  1.1X
+count()                                       3007 / 3034          3.3         300.7    
  5.3X
+
+Preparing data for benchmarking ...
 OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
-Count a dataset with 10 columns:         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+creation of JSON parser per line:        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------
-Select 10 columns + count()                 15997 / 16015          0.6        1599.7    
  1.0X
-Select 1 column + count()                   13280 / 13326          0.8        1328.0    
  1.2X
-count()                                       3006 / 3021          3.3         300.6    
  5.3X
+Short column without encoding                 8334 / 8453          1.2         833.4    
  1.0X
+Short column with UTF-8                     13627 / 13784          0.7        1362.7    
  0.6X
+Wide column without encoding              155073 / 155351          0.1       15507.3    
  0.1X
+Wide column with UTF-8                    212114 / 212263          0.0       21211.4    
  0.0X
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/42b6c1fb/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
index 04f724e..f50c25e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
@@ -39,13 +39,17 @@ import org.apache.spark.sql.types._
 object JSONBenchmark extends SqlBasedBenchmark {
   import spark.implicits._
 
-  def schemaInferring(rowsNum: Int): Unit = {
+  def prepareDataInfo(benchmark: Benchmark): Unit = {
+    // scalastyle:off println
+    benchmark.out.println("Preparing data for benchmarking ...")
+    // scalastyle:on println
+  }
+
+  def schemaInferring(rowsNum: Int, numIters: Int): Unit = {
     val benchmark = new Benchmark("JSON schema inferring", rowsNum, output = output)
 
     withTempPath { path =>
-      // scalastyle:off println
-      benchmark.out.println("Preparing data for benchmarking ...")
-      // scalastyle:on println
+      prepareDataInfo(benchmark)
 
       spark.sparkContext.range(0, rowsNum, 1)
         .map(_ => "a")
@@ -54,11 +58,11 @@ object JSONBenchmark extends SqlBasedBenchmark {
         .option("encoding", "UTF-8")
         .json(path.getAbsolutePath)
 
-      benchmark.addCase("No encoding", 3) { _ =>
+      benchmark.addCase("No encoding", numIters) { _ =>
         spark.read.json(path.getAbsolutePath)
       }
 
-      benchmark.addCase("UTF-8 is set", 3) { _ =>
+      benchmark.addCase("UTF-8 is set", numIters) { _ =>
         spark.read
           .option("encoding", "UTF-8")
           .json(path.getAbsolutePath)
@@ -68,28 +72,29 @@ object JSONBenchmark extends SqlBasedBenchmark {
     }
   }
 
-  def perlineParsing(rowsNum: Int): Unit = {
-    val benchmark = new Benchmark("JSON per-line parsing", rowsNum, output = output)
+  def writeShortColumn(path: String, rowsNum: Int): StructType = {
+    spark.sparkContext.range(0, rowsNum, 1)
+      .map(_ => "a")
+      .toDF("fieldA")
+      .write.json(path)
+    new StructType().add("fieldA", StringType)
+  }
 
-    withTempPath { path =>
-      // scalastyle:off println
-      benchmark.out.println("Preparing data for benchmarking ...")
-      // scalastyle:on println
+  def countShortColumn(rowsNum: Int, numIters: Int): Unit = {
+    val benchmark = new Benchmark("count a short column", rowsNum, output = output)
 
-      spark.sparkContext.range(0, rowsNum, 1)
-        .map(_ => "a")
-        .toDF("fieldA")
-        .write.json(path.getAbsolutePath)
-      val schema = new StructType().add("fieldA", StringType)
+    withTempPath { path =>
+      prepareDataInfo(benchmark)
+      val schema = writeShortColumn(path.getAbsolutePath, rowsNum)
 
-      benchmark.addCase("No encoding", 3) { _ =>
+      benchmark.addCase("No encoding", numIters) { _ =>
         spark.read
           .schema(schema)
           .json(path.getAbsolutePath)
           .count()
       }
 
-      benchmark.addCase("UTF-8 is set", 3) { _ =>
+      benchmark.addCase("UTF-8 is set", numIters) { _ =>
         spark.read
           .option("encoding", "UTF-8")
           .schema(schema)
@@ -101,35 +106,36 @@ object JSONBenchmark extends SqlBasedBenchmark {
     }
   }
 
-  def perlineParsingOfWideColumn(rowsNum: Int): Unit = {
-    val benchmark = new Benchmark("JSON parsing of wide lines", rowsNum, output = output)
+  def writeWideColumn(path: String, rowsNum: Int): StructType = {
+    spark.sparkContext.range(0, rowsNum, 1)
+      .map { i =>
+        val s = "abcdef0123456789ABCDEF" * 20
+        s"""{"a":"$s","b": $i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}"""
+      }
+      .toDF().write.text(path)
+    new StructType()
+      .add("a", StringType).add("b", LongType)
+      .add("c", StringType).add("d", LongType)
+      .add("e", StringType).add("f", LongType)
+      .add("x", StringType).add("y", LongType)
+      .add("z", StringType)
+  }
+
+  def countWideColumn(rowsNum: Int, numIters: Int): Unit = {
+    val benchmark = new Benchmark("count a wide column", rowsNum, output = output)
 
     withTempPath { path =>
-      // scalastyle:off println
-      benchmark.out.println("Preparing data for benchmarking ...")
-      // scalastyle:on println
+      prepareDataInfo(benchmark)
+      val schema = writeWideColumn(path.getAbsolutePath, rowsNum)
 
-      spark.sparkContext.range(0, rowsNum, 1)
-        .map { i =>
-          val s = "abcdef0123456789ABCDEF" * 20
-          s"""{"a":"$s","b": $i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}"""
-         }
-        .toDF().write.text(path.getAbsolutePath)
-      val schema = new StructType()
-        .add("a", StringType).add("b", LongType)
-        .add("c", StringType).add("d", LongType)
-        .add("e", StringType).add("f", LongType)
-        .add("x", StringType).add("y", LongType)
-        .add("z", StringType)
-
-      benchmark.addCase("No encoding", 3) { _ =>
+      benchmark.addCase("No encoding", numIters) { _ =>
         spark.read
           .schema(schema)
           .json(path.getAbsolutePath)
           .count()
       }
 
-      benchmark.addCase("UTF-8 is set", 3) { _ =>
+      benchmark.addCase("UTF-8 is set", numIters) { _ =>
         spark.read
           .option("encoding", "UTF-8")
           .schema(schema)
@@ -141,12 +147,14 @@ object JSONBenchmark extends SqlBasedBenchmark {
     }
   }
 
-  def countBenchmark(rowsNum: Int): Unit = {
+  def selectSubsetOfColumns(rowsNum: Int, numIters: Int): Unit = {
     val colsNum = 10
     val benchmark =
-      new Benchmark(s"Count a dataset with $colsNum columns", rowsNum, output = output)
+      new Benchmark(s"Select a subset of $colsNum columns", rowsNum, output = output)
 
     withTempPath { path =>
+      prepareDataInfo(benchmark)
+
       val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
       val schema = StructType(fields)
       val columnNames = schema.fieldNames
@@ -158,13 +166,13 @@ object JSONBenchmark extends SqlBasedBenchmark {
 
       val ds = spark.read.schema(schema).json(path.getAbsolutePath)
 
-      benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ =>
+      benchmark.addCase(s"Select $colsNum columns + count()", numIters) { _ =>
         ds.select("*").filter((_: Row) => true).count()
       }
-      benchmark.addCase(s"Select 1 column + count()", 3) { _ =>
+      benchmark.addCase(s"Select 1 column + count()", numIters) { _ =>
         ds.select($"col1").filter((_: Row) => true).count()
       }
-      benchmark.addCase(s"count()", 3) { _ =>
+      benchmark.addCase(s"count()", numIters) { _ =>
         ds.count()
       }
 
@@ -172,12 +180,64 @@ object JSONBenchmark extends SqlBasedBenchmark {
     }
   }
 
+  def jsonParserCreation(rowsNum: Int, numIters: Int): Unit = {
+    val benchmark = new Benchmark("creation of JSON parser per line", rowsNum, output = output)
+
+    withTempPath { path =>
+      prepareDataInfo(benchmark)
+
+      val shortColumnPath = path.getAbsolutePath + "/short"
+      val shortSchema = writeShortColumn(shortColumnPath, rowsNum)
+
+      val wideColumnPath = path.getAbsolutePath + "/wide"
+      val wideSchema = writeWideColumn(wideColumnPath, rowsNum)
+
+      benchmark.addCase("Short column without encoding", numIters) { _ =>
+        spark.read
+          .schema(shortSchema)
+          .json(shortColumnPath)
+          .filter((_: Row) => true)
+          .count()
+      }
+
+      benchmark.addCase("Short column with UTF-8", numIters) { _ =>
+        spark.read
+          .option("encoding", "UTF-8")
+          .schema(shortSchema)
+          .json(shortColumnPath)
+          .filter((_: Row) => true)
+          .count()
+      }
+
+      benchmark.addCase("Wide column without encoding", numIters) { _ =>
+        spark.read
+          .schema(wideSchema)
+          .json(wideColumnPath)
+          .filter((_: Row) => true)
+          .count()
+      }
+
+      benchmark.addCase("Wide column with UTF-8", numIters) { _ =>
+        spark.read
+          .option("encoding", "UTF-8")
+          .schema(wideSchema)
+          .json(wideColumnPath)
+          .filter((_: Row) => true)
+          .count()
+      }
+
+      benchmark.run()
+    }
+  }
+
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val numIters = 3
     runBenchmark("Benchmark for performance of JSON parsing") {
-      schemaInferring(100 * 1000 * 1000)
-      perlineParsing(100 * 1000 * 1000)
-      perlineParsingOfWideColumn(10 * 1000 * 1000)
-      countBenchmark(10 * 1000 * 1000)
+      schemaInferring(100 * 1000 * 1000, numIters)
+      countShortColumn(100 * 1000 * 1000, numIters)
+      countWideColumn(10 * 1000 * 1000, numIters)
+      selectSubsetOfColumns(10 * 1000 * 1000, numIters)
+      jsonParserCreation(10 * 1000 * 1000, numIters)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message