spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xusen Yin (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
Date Thu, 04 Feb 2016 00:43:39 GMT

    [ https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131455#comment-15131455
] 

Xusen Yin edited comment on SPARK-13178 at 2/4/16 12:43 AM:
------------------------------------------------------------

I don't zip RRDD with itself. Actually, the bug exists when I calling KMeans from R side.
I wrote the KMeans for SparkR in this JIRA https://issues.apache.org/jira/browse/SPARK-13011
with a code below:

{code:title=r-side.R|theme=FadeToGrey|linenumbers=true|language=R|firstline=0001|collapse=false}
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitKMeans", algorithm,
              x@sdf, iter.max, centers, "Sepal_Length Sepal_Width Petal_Length Petal_Width")
{code}



In the spark side, I wrote a fitKMeans in org.apache.spark.ml.api.r.SparkRWrappers:

{code:title=spark-side.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
def fitKMeans(
      initMode: String,
      df: DataFrame,
      maxIter: Double,
      k: Double,
      columns: String): KMeansModel = {
    val assembler = new VectorAssembler().setInputCols(columns.split(" ")).setOutputCol("features")
    val features = assembler.transform(df).select("features")
    val kMeans = new KMeans()
      .setInitMode(initMode)
      .setMaxIter(maxIter.toInt)
      .setK(k.toInt)
    val model = kMeans.fit(features)
    model
  }
{code}

The calling of KMeans have the code of rdd.zip(rdd.map(...)), and the rdd is derived from
RRDD, so I cannot move on without fixing it.


was (Author: yinxusen):
I don't zip RRDD with itself. Actually, the bug exists when I calling KMeans from R side.
I wrote the KMeans for SparkR in this JIRA https://issues.apache.org/jira/browse/SPARK-13011
with a code below:

{code:title=r-side.R|theme=FadeToGrey|linenumbers=true|language=R|firstline=0001|collapse=false}
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitKMeans", algorithm,
              x@sdf, iter.max, centers, "Sepal_Length Sepal_Width Petal_Length Petal_Width")
{code}



In the spark side, I wrote a fitKMeans in org.apache.spark.ml.api.r.SparkRWrappers:

{code:title=spark-side.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
def fitKMeans(
      initMode: String,
      df: DataFrame,
      maxIter: Double,
      k: Double,
      columns: String): KMeansModel = {
    val assembler = new VectorAssembler().setInputCols(columns.split(" ")).setOutputCol("features")
    val features = assembler.transform(df).select("features")
    val kMeans = new KMeans()
      .setInitMode(initMode)
      .setMaxIter(maxIter.toInt)
      .setK(k.toInt)
    val model = kMeans.fit(features)
    model
  }
{code}

The calling of KMeans have the code of rdd.zip(rdd.map(...)), and the rdd is derived from
RRDD, so I cannot move on without fix it.

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -----------------------------------------------------------------
>
>                 Key: SPARK-13178
>                 URL: https://issues.apache.org/jira/browse/SPARK-13178
>             Project: Spark
>          Issue Type: Bug
>          Components: SparkR
>            Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with same number
of elements" or "stream closed", similar to the JIRA issue: https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip operation,
zip with self computes each partition twice. So if we zip a HadoopRDD (iris dataset) with
itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.0, 6.0)
> we get a pair (5.9, 5.9)
> we get a pair (6.9, 6.9)
> we get a pair (6.0, 6.0)
> we get a pair (6.7, 6.7)
> we get a pair (6.1, 6.1)
> we get a pair (6.9, 6.9)
> we get a pair (5.6, 5.6)
> we get a pair (5.8, 5.8)
> we get a pair (6.7, 6.7)
> we get a pair (6.8, 6.8)
> we get a pair (5.6, 5.6)
> we get a pair (6.7, 6.7)
> we get a pair (5.8, 5.8)
> we get a pair (6.7, 6.7)
> we get a pair (6.2, 6.2)
> we get a pair (6.3, 6.3)
> we get a pair (5.6, 5.6)
> we get a pair (6.5, 6.5)
> we get a pair (5.9, 5.9)
> we get a pair (6.2, 6.2)
> we get a pair (6.1, 6.1)
> we get a pair (5.9, 5.9)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (6.4, 6.4)
> we get a pair (6.6, 6.6)
> {code}
> However, in RRDD with the same setting we get:
> {code:title=log-from-zip-RRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (5.1, 5.1)
> we get a pair (4.9, 4.7)
> we get a pair (4.6, 5.0)
> we get a pair (5.4, 4.6)
> we get a pair (5.0, 4.4)
> we get a pair (4.9, 5.4)
> we get a pair (4.8, 4.8)
> we get a pair (4.3, 5.8)
> we get a pair (5.7, 5.4)
> we get a pair (5.1, 5.7)
> we get a pair (5.1, 5.4)
> we get a pair (5.1, 4.6)
> we get a pair (5.1, 4.8)
> we get a pair (5.0, 5.0)
> we get a pair (5.2, 5.2)
> we get a pair (4.7, 4.8)
> we get a pair (5.4, 5.2)
> we get a pair (5.5, 4.9)
> we get a pair (5.0, 5.5)
> we get a pair (4.9, 4.4)
> we get a pair (5.1, 5.0)
> we get a pair (4.5, 4.4)
> we get a pair (5.0, 5.1)
> we get a pair (4.8, 5.1)
> we get a pair (4.6, 5.3)
> we get a pair (5.0, 7.0)
> we get a pair (6.4, 6.9)
> we get a pair (5.5, 6.5)
> we get a pair (5.7, 6.3)
> we get a pair (4.9, 6.6)
> we get a pair (5.2, 5.0)
> we get a pair (5.9, 6.0)
> we get a pair (6.1, 5.6)
> we get a pair (6.7, 5.6)
> we get a pair (5.8, 6.2)
> we get a pair (5.6, 5.9)
> we get a pair (6.1, 6.3)
> we get a pair (6.1, 6.4)
> we get a pair (6.6, 6.8)
> we get a pair (6.7, 6.0)
> we get a pair (5.7, 5.5)
> we get a pair (5.5, 5.8)
> we get a pair (6.0, 5.4)
> we get a pair (6.0, 6.7)
> we get a pair (6.3, 5.6)
> we get a pair (5.5, 5.5)
> we get a pair (6.1, 5.8)
> we get a pair (5.0, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (6.2, 5.1)
> we get a pair (5.7, 6.3)
> we get a pair (5.8, 7.1)
> we get a pair (6.3, 6.5)
> we get a pair (7.6, 4.9)
> we get a pair (7.3, 6.7)
> we get a pair (7.2, 6.5)
> we get a pair (6.4, 6.8)
> we get a pair (5.7, 5.8)
> we get a pair (6.4, 6.5)
> we get a pair (7.7, 7.7)
> we get a pair (6.0, 6.9)
> we get a pair (5.6, 7.7)
> we get a pair (6.3, 6.7)
> we get a pair (7.2, 6.2)
> we get a pair (6.1, 6.4)
> we get a pair (7.2, 7.4)
> we get a pair (7.9, 6.4)
> we get a pair (6.3, 6.1)
> we get a pair (7.7, 6.3)
> we get a pair (6.4, 6.0)
> we get a pair (6.9, 6.7)
> we get a pair (6.9, 5.8)
> we get a pair (6.8, 6.7)
> we get a pair (6.7, 6.3)
> we get a pair (6.5, 6.2)
> we need to close stream java.io.DataInputStream@507affd3 in thread 127
> we need to close stream java.io.DataInputStream@507affd3 in thread 127
> {code}
> We can see from the end of the log that the data stream is closed twice.
> The simplest way to avoid the error is using "cache" to cut off the lineage as shown
below. However, sometimes we do not want to cache the data.
> {code:title=work-around-zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> rdd.cache()
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message