spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Navige (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-19629) Partitioning of Parquet is not considered correctly at loading in local[X] mode
Date Thu, 16 Feb 2017 12:55:42 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Navige updated SPARK-19629:
---------------------------
    Description: 
Running the following two examples will lead to different results depending on whether the
code is run using Spark 1.6 or Spark 2.1. 

h1.What does the example do?
- The code creates an exemplary dataframe with random data. 
- The dataframe is repartitioned and stored to disk. 
- Then the dataframe is re-read from disk.
- The number of partitions of the dataframe is considered.

h1. What is the/my expected behaviour?
The number of partitions specified when storing the dataframe should be the same as when re-loading
the dataframe from disk.

h1. Differences in Spark 1.6 and Spark 2
On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as
specified using npartitions; on Spark 2.1 the number of partitions will equal the number of
local nodes specified when starting Spark (using local[X] as master). 

Looking at the data produced, in both Spark versions the number of files in the parquet directory
is the same - so Spark 2 produces so many files as the number partitions when storing, but
when reading in Spark 2, the number of partitions is messed up.

h1.Minimal code example
{code:none}
# run on Spark 1.6
import scala.util.Random
import org.apache.spark.sql.types.{StructField, StructType, FloatType}
import org.apache.spark.sql.Row
 val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType))))
val npartitions = 10
df.repartition(npartitions).write.parquet("/tmp/test1")
val read = sqlContext.read.parquet("/tmp/test1")
assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6
{code}

{code:none}
# run on Spark 2.1
import scala.util.Random
import org.apache.spark.sql.types.{StructField, StructType, FloatType}
import org.apache.spark.sql.Row
val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType))))
val npartitions = 10
df.repartition(npartitions).write.parquet("/tmp/test1")
val read = spark.sqlContext.read.parquet("/tmp/test1")
assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1
{code}

h1.What could other solutions be, if this is not a bug?
If this is intended, what about introducing a parameter at reading time, which specifies whether
the data should truly be repartitioned (depending on the number of nodes) or should be read
"as-is".

  was:
Running the following two examples will lead to different results depending on whether the
code is run using Spark 1.6 or Spark 2.1. 

h1.What does the example do?
- The code creates an exemplary dataframe with random data. 
- The dataframe is repartitioned and stored to disk. 
- Then the dataframe is re-read from disk.
- The number of partitions of the dataframe is considered.

h1. What is the expected behaviour?
The number of partitions specified when storing the dataframe should be the same as when re-loading
the dataframe from disk.

On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions as
specified using npartitions; on Spark 2.1 the number of partitions will equal the number of
local nodes specified when starting Spark (using local[X] as master).

h1.Minimal code example
{code:none}
# run on Spark 1.6
import scala.util.Random
import org.apache.spark.sql.types.{StructField, StructType, FloatType}
import org.apache.spark.sql.Row
 val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType))))
val npartitions = 10
df.repartition(npartitions).write.parquet("/tmp/test1")
val read = sqlContext.read.parquet("/tmp/test1")
assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6
{code}

{code:none}
# run on Spark 2.1
import scala.util.Random
import org.apache.spark.sql.types.{StructField, StructType, FloatType}
import org.apache.spark.sql.Row
val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType))))
val npartitions = 10
df.repartition(npartitions).write.parquet("/tmp/test1")
val read = spark.sqlContext.read.parquet("/tmp/test1")
assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1
{code}


> Partitioning of Parquet is not considered correctly at loading in local[X] mode
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-19629
>                 URL: https://issues.apache.org/jira/browse/SPARK-19629
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Spark Core
>    Affects Versions: 2.0.0, 2.1.0
>         Environment: Tested using docker run gettyimages/spark:1.6.1-hadoop-2.6 and
> docker run gettyimages/spark:2.1.0-hadoop-2.7.
>            Reporter: Navige
>            Priority: Minor
>
> Running the following two examples will lead to different results depending on whether
the code is run using Spark 1.6 or Spark 2.1. 
> h1.What does the example do?
> - The code creates an exemplary dataframe with random data. 
> - The dataframe is repartitioned and stored to disk. 
> - Then the dataframe is re-read from disk.
> - The number of partitions of the dataframe is considered.
> h1. What is the/my expected behaviour?
> The number of partitions specified when storing the dataframe should be the same as when
re-loading the dataframe from disk.
> h1. Differences in Spark 1.6 and Spark 2
> On Spark 1.6 the partitioning is kept, i.e., the code example will return 10 partitions
as specified using npartitions; on Spark 2.1 the number of partitions will equal the number
of local nodes specified when starting Spark (using local[X] as master). 
> Looking at the data produced, in both Spark versions the number of files in the parquet
directory is the same - so Spark 2 produces so many files as the number partitions when storing,
but when reading in Spark 2, the number of partitions is messed up.
> h1.Minimal code example
> {code:none}
> # run on Spark 1.6
> import scala.util.Random
> import org.apache.spark.sql.types.{StructField, StructType, FloatType}
> import org.apache.spark.sql.Row
>  val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
> val df = sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType))))
> val npartitions = 10
> df.repartition(npartitions).write.parquet("/tmp/test1")
> val read = sqlContext.read.parquet("/tmp/test1")
> assert(npartitions == read.rdd.getNumPartitions) //true on Spark 1.6
> {code}
> {code:none}
> # run on Spark 2.1
> import scala.util.Random
> import org.apache.spark.sql.types.{StructField, StructType, FloatType}
> import org.apache.spark.sql.Row
> val rdd = sc.parallelize(Seq.fill(100)(Row(Seq(Random.nextFloat()): _*)))
> val df = spark.sqlContext.createDataFrame(rdd, StructType(Seq(StructField("test", FloatType))))
> val npartitions = 10
> df.repartition(npartitions).write.parquet("/tmp/test1")
> val read = spark.sqlContext.read.parquet("/tmp/test1")
> assert(npartitions == read.rdd.getNumPartitions) //false on Spark 2.1
> {code}
> h1.What could other solutions be, if this is not a bug?
> If this is intended, what about introducing a parameter at reading time, which specifies
whether the data should truly be repartitioned (depending on the number of nodes) or should
be read "as-is".



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message