carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupta88 <...@git.apache.org>
Subject [GitHub] carbondata pull request #1670: [CARBONDATA-1899] Add CarbonData concurrency ...
Date Wed, 20 Dec 2017 06:53:30 GMT
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1670#discussion_r157946201
  
    --- Diff: examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala
---
    @@ -0,0 +1,355 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.examples
    +
    +import java.io.File
    +import java.util
    +import java.util.concurrent.{Callable, Executors, Future, TimeUnit}
    +
    +import scala.util.Random
    +
    +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.util.CarbonProperties
    +
    +// scalastyle:off println
    +object ConcurrencyTest {
    +
    +  var totalNum = 100 * 1000 * 1000
    +  var ThreadNum = 16
    +  var TaskNum = 100
    +  var ResultIsEmpty = true
    +  val cardinalityId = 10000 * 10000
    +  val cardinalityCity = 6
    +
    +  def parquetTableName: String = "comparetest_parquet"
    +
    +  def orcTableName: String = "comparetest_orc"
    +
    +  def carbonTableName(version: String): String = s"comparetest_carbonV$version"
    +
    +  // Table schema:
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | id          | string    | 100,000,000 | dimension   | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | Column name | Data type | Cardinality | Column type | Dictionary |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | city        | string    | 6           | dimension   | yes        |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | country     | string    | 6           | dimension   | yes        |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | planet      | string    | 100,007     | dimension   | yes        |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | m1          | short     | NA          | measure     | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | m2          | int       | NA          | measure     | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | m3          | big int   | NA          | measure     | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | m4          | double    | NA          | measure     | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | m5          | decimal   | NA          | measure     | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +
    +  private def generateDataFrame(spark: SparkSession): DataFrame = {
    +    val rdd = spark.sparkContext
    +      .parallelize(1 to totalNum, 4)
    +      .map { x =>
    +        ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x %
10007,
    +          (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
    +          BigDecimal.valueOf(x.toDouble / 11))
    +      }.map { x =>
    +      Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
    +    }
    +
    +    val schema = StructType(
    +      Seq(
    +        StructField("id", StringType, nullable = false),
    +        StructField("city", StringType, nullable = false),
    +        StructField("country", StringType, nullable = false),
    +        StructField("planet", StringType, nullable = false),
    +        StructField("m1", ShortType, nullable = false),
    +        StructField("m2", IntegerType, nullable = false),
    +        StructField("m3", LongType, nullable = false),
    +        StructField("m4", DoubleType, nullable = false),
    +        StructField("m5", DecimalType(30, 10), nullable = false)
    +      )
    +    )
    +
    +    spark.createDataFrame(rdd, schema)
    +  }
    +
    +  // performance test queries, they are designed to test various data access type
    +  val r = new Random()
    +  val tmpId = r.nextInt(cardinalityId) % totalNum
    +  val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum)
    +  val queries: Array[Query] = Array(
    +    Query(
    +      "select * from $table" + s" where id = '$tmpId' ",
    +      "filter scan",
    +      "filter on high card dimension"
    +    ),
    +
    +    Query(
    +      "select id from $table" + s" where id = '$tmpId' ",
    +      "filter scan",
    +      "filter on high card dimension"
    +    ),
    +
    +    Query(
    +      "select * from $table" + s" where city = '$tmpCity' ",
    +      "filter scan",
    +      "filter on high card dimension"
    +    ),
    +
    +    Query(
    +      "select city from $table" + s" where city = '$tmpCity' ",
    +      "filter scan",
    +      "filter on high card dimension"
    +    ),
    +
    +    Query(
    +      "select country, sum(m1) from $table group by country",
    +      "aggregate",
    +      "group by on big data, on medium card column, medium result set,"
    +    ),
    +
    +    Query(
    +      "select country, sum(m1) from $table" +
    +        s" where id = '$tmpId' group by country",
    +      "aggregate",
    +      "group by on big data, on medium card column, medium result set,"
    +    ),
    +
    +    Query(
    +      "select t1.country, sum(t1.m1) from $table t1 join $table t2"
    +        + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country",
    +      "aggregate",
    +      "group by on big data, on medium card column, medium result set,"
    +    )
    +    ,
    +    Query(
    +      "select t2.country, sum(t2.m1) " +
    +        "from $table t1 join $table t2 join $table t3 " +
    +        "join $table t4 join $table t5 join $table t6 join $table t7 " +
    +        s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " +
    +        s"and t1.id=t5.id and t1.id=t6.id and " +
    +        s"t1.id=t7.id " +
    +        s" where t2.id = '$tmpId' " +
    +        s" group by t2.country",
    +      "aggregate",
    +      "group by on big data, on medium card column, medium result set,"
    +    )
    +  )
    +
    +  private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String)
    +  : Double = time {
    +    // partitioned by last 1 digit of id column
    +    val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10))
    +    dfWithPartition.write
    +      .partitionBy("partitionCol")
    +      .mode(SaveMode.Overwrite)
    +      .parquet(table)
    +    spark.read.parquet(table).createOrReplaceTempView(table)
    +  }
    +
    +  private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double
= time {
    +    // partitioned by last 1 digit of id column
    +    input.write
    +      .mode(SaveMode.Overwrite)
    +      .orc(table)
    +    spark.read.orc(table).createOrReplaceTempView(table)
    +  }
    +
    +  private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String):
Double = {
    +    CarbonProperties.getInstance().addProperty(
    +      CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
    +      "3"
    +    )
    +    spark.sql(s"drop table if exists $tableName")
    +    time {
    +      input.write
    +        .format("carbondata")
    +        .option("tableName", tableName)
    +        .option("tempCSV", "false")
    +        .option("single_pass", "true")
    +        .option("dictionary_exclude", "id") // id is high cardinality column
    +        .option("table_blocksize", "32")
    +        .mode(SaveMode.Overwrite)
    +        .save()
    +    }
    +  }
    +
    +  // load data into parquet, carbonV2, carbonV3
    +  def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = {
    +    val df = generateDataFrame(spark).cache
    +    println(s"generating ${df.count} records, schema: ${df.schema}")
    +    val table1Time = if (table1.endsWith("parquet")) {
    +      loadParquetTable(spark, df, table1)
    +    } else if (table1.endsWith("orc")) {
    +      loadOrcTable(spark, df, table1)
    +    } else {
    +      sys.error("invalid table: " + table1)
    +    }
    +    val table2Time = loadCarbonTable(spark, df, table2)
    +    println(s"load completed, time: $table1Time, $table2Time")
    +    df.unpersist()
    +  }
    +
    +  // Run all queries for the specified table
    +  private def runQueries(spark: SparkSession, tableName: String): Unit = {
    +    println(s"start running queries for $tableName...")
    +    val start = System.currentTimeMillis()
    +    println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " +
    +      "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty
is false")
    +    queries.zipWithIndex.map { case (query, index) =>
    +      val sqlText = query.sqlText.replace("$table", tableName)
    +
    +      val executorService = Executors.newFixedThreadPool(ThreadNum)
    +//      val results = new util.ArrayList[Future[Results]]()
    +      val tasks = new util.ArrayList[Callable[Results]]()
    +
    +      for (num <- 1 to TaskNum) {
    +        tasks.add(new QueryTask(spark, sqlText))
    +      }
    +      val results = executorService.invokeAll(tasks)
    +
    +      val sql = s"query ${index + 1}: $sqlText "
    +      printResult(results, sql)
    +      executorService.shutdown()
    +      executorService.awaitTermination(600, TimeUnit.SECONDS)
    --- End diff --
    
    why printResult is getting called before shutting down executorService?


---

Mime
View raw message