spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "chenfh5 (JIRA)" <>
Subject [jira] [Created] (SPARK-22702) Spark sql filter with size function(if exists) leads twice calculation
Date Tue, 05 Dec 2017 13:32:00 GMT
chenfh5 created SPARK-22702:

             Summary: Spark sql filter with size function(if exists) leads twice calculation
                 Key: SPARK-22702
             Project: Spark
          Issue Type: Question
          Components: SQL
    Affects Versions: 2.1.0
         Environment: jdk1.7.0_67
            Reporter: chenfh5
            Priority: Minor

I occur an issue about spark-sql. When obtaining a Dataset through some logic, I wish to persist
this Dataset as it would be used many times in the future. However, when persisting it, the
logic would be calculated twice. Therefore I make some local test to reproduce this issue,
and it happens.

I test in three filter function, and found that, 
.filter(col("id") > 10) //expect
.filter(length(col("name")) > 4) //expect
.filter(size(col("seq_name")) > 1) //unexpect if filter exist

i.e., the twice calculation issue occurs when filter out result exists.

h5. result image
h6. expected

h6. unexpected

h5. reproduce code
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataset, Row, SparkSession}

object TwiceCalculationReproducer {
  private val separator =
  private val dirName = new"").getAbsolutePath + separator + "testOutput"

  System.setProperty("", "TestController")
  System.setProperty("spark.master", "local[2]")
  private val ss = SparkSession.builder().config(new SparkConf()).enableHiveSupport().getOrCreate()
  ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///")
  private val sc = ss.sparkContext

  def main(args: Array[String]) {
    val fs = FileSystem.get(sc.hadoopConfiguration)
    fs.delete(new Path(dirName), true)

    val tableRaw = Dims.df
    val tableNewExp = seqColumnGeneratorExcepted(tableRaw)
    tableNewExp.persist(), 100)

    val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw)
    tableNewExp.persist(), 100)

  def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = {
    ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name")))

  def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = {
        .filter(col("id") > 10) //expect
        .filter(length(col("name")) > 4) //expect
        .filter(size(col("seq_name")) > 1) //unexpect if filter exist

  /*validator udf*/
  def seqTokenUdf = udf {
    (id: Int, name: String) => {
      /*validator 1: console print*/
      println(name + "_" + id + "_" + System.currentTimeMillis())

      /*validator 2: write file in case of executor not printing console*/
      val fs = FileSystem.get(sc.hadoopConfiguration)
      val fileName = Seq(dirName, name + "_" + System.currentTimeMillis.toString) mkString
      fs.create(new Path(fileName))

      Seq[String](name, System.currentTimeMillis().toString)

  /*test data mock*/
  object Dims {
    private val structTypes = StructType(Seq(
      StructField("id", IntegerType),
      StructField("name", StringType)

    private val data = List(
      Row(100, "file100"),
      Row(101, "file101"),
      Row(102, "file102")

    private val rdd = sc.parallelize(data)
    val df = ss.createDataFrame(rdd, structTypes)


Kind regards,

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message