spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "chenfh5 (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-22702) Spark sql filter with size function(if exists) leads twice calculation
Date Tue, 05 Dec 2017 13:32:00 GMT
chenfh5 created SPARK-22702:
-------------------------------

             Summary: Spark sql filter with size function(if exists) leads twice calculation
                 Key: SPARK-22702
                 URL: https://issues.apache.org/jira/browse/SPARK-22702
             Project: Spark
          Issue Type: Question
          Components: SQL
    Affects Versions: 2.1.0
         Environment: jdk1.7.0_67
spark-hive_2.11
            Reporter: chenfh5
            Priority: Minor


I occur an issue about spark-sql. When obtaining a Dataset through some logic, I wish to persist
this Dataset as it would be used many times in the future. However, when persisting it, the
logic would be calculated twice. Therefore I make some local test to reproduce this issue,
and it happens.

I test in three filter function, and found that, 
.filter(col("id") > 10) //expect
.filter(length(col("name")) > 4) //expect
.filter(size(col("seq_name")) > 1) //unexpect if filter exist

i.e., the twice calculation issue occurs when filter out result exists.

h5. result image
h6. expected
!http://upload-images.jianshu.io/upload_images/2189341-51ee89377e7159ac.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240!

h6. unexpected
!http://upload-images.jianshu.io/upload_images/2189341-f87ee2047f0cf120.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240!

h5. reproduce code
{code:scala}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataset, Row, SparkSession}


object TwiceCalculationReproducer {
  private val separator = scala.reflect.io.File.separator
  private val dirName = new java.io.File("").getAbsolutePath + separator + "testOutput"

  System.setProperty("spark.app.name", "TestController")
  System.setProperty("spark.master", "local[2]")
  private val ss = SparkSession.builder().config(new SparkConf()).enableHiveSupport().getOrCreate()
  ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///")
  private val sc = ss.sparkContext

  def main(args: Array[String]) {
    val fs = FileSystem.get(sc.hadoopConfiguration)
    fs.delete(new Path(dirName), true)
    Thread.sleep(1000)

    /*expected*/
    val tableRaw = Dims.df
    val tableNewExp = seqColumnGeneratorExcepted(tableRaw)
    tableNewExp.persist()
    tableNewExp.show(10, 100)

    /*unexpected*/
    Thread.sleep(1000)
    val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw)
    tableNewExp.persist()
    tableNewUnexp.show(10, 100)
  }

  /*normal*/
  def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = {
    ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name")))
  }

  /*abnormal*/
  def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = {
    seqColumnGeneratorExcepted(ds)
        .filter(col("id") > 10) //expect
        .filter(length(col("name")) > 4) //expect
        .filter(size(col("seq_name")) > 1) //unexpect if filter exist
  }

  /*validator udf*/
  def seqTokenUdf = udf {
    (id: Int, name: String) => {
      /*validator 1: console print*/
      println(name + "_" + id + "_" + System.currentTimeMillis())

      /*validator 2: write file in case of executor not printing console*/
      val fs = FileSystem.get(sc.hadoopConfiguration)
      fs.setWriteChecksum(false)
      val fileName = Seq(dirName, name + "_" + System.currentTimeMillis.toString) mkString
separator
      fs.create(new Path(fileName))

      /*return*/
      Seq[String](name, System.currentTimeMillis().toString)
    }
  }

  /*test data mock*/
  object Dims {
    private val structTypes = StructType(Seq(
      StructField("id", IntegerType),
      StructField("name", StringType)
    ))

    private val data = List(
      Row(100, "file100"),
      Row(101, "file101"),
      Row(102, "file102")
    )

    private val rdd = sc.parallelize(data)
    val df = ss.createDataFrame(rdd, structTypes)
  }

}
{code}


Kind regards,
chenfh5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message