spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rick Moritz (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-20489) Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
Date Thu, 27 Apr 2017 12:39:04 GMT
Rick Moritz created SPARK-20489:
-----------------------------------

             Summary: Different results in local mode and yarn mode when working with dates
(race condition with SimpleDateFormat?)
                 Key: SPARK-20489
                 URL: https://issues.apache.org/jira/browse/SPARK-20489
             Project: Spark
          Issue Type: Bug
          Components: Shuffle, Spark Core, SQL
    Affects Versions: 2.0.2, 2.0.1, 2.0.0
         Environment: yarn-client mode in Zeppelin
            Reporter: Rick Moritz
            Priority: Critical


Running the following code (in Zeppelin, but I assume spark-shell would be the same), I get
different results, depending on whether I am using local[*] -mode or yarn-client mode:

import org.apache.spark.sql.Row
import spark.implicits._

val counter = 1 to 2
val size = 1 to 3
val sampleText = spark.createDataFrame(
    sc.parallelize(size)
    .map(Row(_)),
    StructType(Array(StructField("id", IntegerType, nullable=false))
        )
    )
    .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
    
val rddList = counter.map(
            count => sampleText
            .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"yyyy-MM-dd'T'HH:mm:ss.SSS"))
            .drop(col("loadDTS"))
            .withColumnRenamed("loadDTS2","loadDTS")
            .coalesce(4)
            .rdd
        )
val resultText = spark.createDataFrame(
    spark.sparkContext.union(rddList),
    sampleText.schema
)
val testGrouped = resultText.groupBy("id")
val timestamps = testGrouped.agg(
    max(unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) as "timestamp"
)
val loadDateResult = resultText.join(timestamps, "id")
val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS"))
filteredresult.count

The expected result, 3 is what I obtain in local mode, but as soon as I run fully distributed,
I get 0. If Increase size to 1 to 32000, I do get some results (depending on the size of counter)
- none of which makes any sense.

Up to the application of the last filter, at first glance everything looks okay, but then
something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but
I can't get it to happen in a non-distributed mode. The generated execution plan is the same
in each case, as expected.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message