spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Moore <Jason.Mo...@quantium.com.au>
Subject Sorting within partitions is not maintained in parquet?
Date Thu, 11 Aug 2016 06:23:22 GMT
Hi,

It seems that something changed between Spark 1.6.2 and 2.0.0 that I wasn't expecting.

If I have a DataFrame with records sorted within each partition, and I write it to parquet
and read back from the parquet, previously the records would be iterated through in the same
order they were written (assuming no shuffle has taken place).  But this doesn't seem to be
the case anymore.  Below is the code to reproduce in a spark-shell.

Was this change expected?

Thanks,
Jason.


import org.apache.spark.sql._
def isSorted[T](self: DataFrame, mapping: Row => T)(implicit ordering: Ordering[T]) = {
  import self.sqlContext.implicits._
  import ordering._
  self
    .mapPartitions(rows => {
      val isSorted = rows
        .map(mapping)
        .sliding(2) // all adjacent pairs
        .forall {
          case x :: y :: Nil => x <= y
          case x :: Nil => true
          case Nil => true
        }

      Iterator(isSorted)
    })
    .reduce(_ && _)
}

// in Spark 2.0.0
spark.range(100000).toDF("id").registerTempTable("input")
spark.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY id").write.mode("overwrite").parquet("input.parquet")
isSorted(spark.read.parquet("input.parquet"), _.getAs[Long]("id"))
// FALSE

// in Spark 1.6.2
sqlContext.range(100000).toDF("id").registerTempTable("input")
sqlContext.sql("SELECT id FROM input DISTRIBUTE BY id SORT BY id").write.mode("overwrite").parquet("input.parquet")
isSorted(sqlContext.read.parquet("input.parquet"), _.getAs[Long]("id"))
// TRUE


Mime
View raw message