spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "zhengruifeng (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-21742) BisectingKMeans generate different models with/without caching
Date Wed, 16 Aug 2017 09:22:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-21742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

zhengruifeng updated SPARK-21742:
---------------------------------
    Description: 
I found that {{BisectingKMeans}} will generate different models if the input is cached or
not.
Using the same dataset in {{BisectingKMeansSuite}}, we can found that if we cache the input,
then the number of centers will change from 2 to 3.

So it looks like a potential bug.
{code}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.clustering._
import org.apache.spark.ml.linalg._
import scala.util.Random
case class TestRow(features: org.apache.spark.ml.linalg.Vector)

val rows = 10
val dim = 1000
val seed = 42
val nnz = 130

val bkm = new BisectingKMeans().setK(5).setMinDivisibleClusterSize(4).setMaxIter(4).setSeed(123)

val random = new Random(seed)
val rdd = sc.parallelize(1 to rows).map(i => Vectors.sparse(dim, random.shuffle(0 to dim
- 1).slice(0, nnz).sorted.toArray, Array.fill(nnz)(random.nextDouble()))).map(v => new
TestRow(v))

val sparseDataset = spark.createDataFrame(rdd)

scala> bkm.fit(sparseDataset).clusterCenters
17/08/16 17:12:28 WARN BisectingKMeans: The input RDD 579 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res22: Array[org.apache.spark.ml.linalg.Vector] = Array([0.0,0.0,0.0,0.0,0.0,0.0,0.3081569145071915,0.0,0.0,0.0,0.0,0.1875176493190393,0.0,0.0,0.0,0.33856517726920116,0.0,0.15290274761955236,0.0,0.10820818064086901,0.0,0.0,0.5987249128746422,0.0,0.0,0.3563390364518392,0.0,0.5019914247361699,0.0,0.08711412551574785,0.09199053071837167,0.05749771404790841,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.5209441786832834,0.0,0.2350595158678447,0.0,0.0,0.0,0.0,0.0,0.0,0.3041334669892575,0.0,0.0,0.32422664760898434,0.0,0.24542718129722224,0.0,0.0,0.06846136418797384,0.0,0.0,0.19556839035017104,0.0,0.0,0.08436120694800427,0.0,0.0,0.0,0.30542501045554465,0.0,0.0,0.0,0.16185204843664616,0.2800921624973247,0.0,0.45459861318444555,0.0,0.0,0.0,0.26222502250076374,0.5235099131919367,0.0,0.0,0....

scala> bkm.fit(sparseDataset).clusterCenters.length
17/08/16 17:12:36 WARN BisectingKMeans: The input RDD 667 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res23: Int = 2


scala> sparseDataset.persist()
res24: sparseDataset.type = [features: vector]

scala> bkm.fit(sparseDataset).clusterCenters
17/08/16 17:14:35 WARN BisectingKMeans: The input RDD 806 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res26: Array[org.apache.spark.ml.linalg.Vector] = Array([0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.562552947957118,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.32462454192260704,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.26134237654724357,0.275971592155115,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9124004009677724,0.0,0.0,0.972679942826953,0.0,0.7362815438916668,0.0,0.0,0.20538409256392154,0.0,0.0,0.5867051710505131,0.0,0.0,0.0,0.0,0.0,0.0,0.916275031366634,0.0,0.0,0.0,0.4855561453099385,0.0,0.0,0.0,0.0,0.0,0.0,0.7866750675022912,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6178027906951924,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.97254915644181,0.0,0.0,0.0,0.0,0.0,0.7947673417631961,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9685267297437855,0.0,0.0,0.0,0.1...
scala> bkm.fit(sparseDataset).clusterCenters.length
17/08/16 17:14:38 WARN BisectingKMeans: The input RDD 855 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res27: Int = 3

{code}

And suggested by [~srowen], I retest it with the same dataset generated in a deterministic
way

{code}
val random = new Random(seed)
val rdd = sc.parallelize(1 to rows).map(i => Vectors.sparse(dim, random.shuffle(0 to dim
- 1).slice(0, nnz).sorted.toArray, Array.fill(nnz)(random.nextDouble()))).map(v => new
TestRow(v))
val vecs = rdd.collect()
val rdd2 = sc.parallelize(vecs)

val sparseDataset2 = spark.createDataFrame(rdd2)

scala> bkm.fit(sparseDataset2).clusterCenters.length
17/08/16 17:20:36 WARN BisectingKMeans: The input RDD 1114 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res35: Int = 3

scala> bkm.fit(sparseDataset2).clusterCenters
17/08/16 17:20:43 WARN BisectingKMeans: The input RDD 1164 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res36: Array[org.apache.spark.ml.linalg.Vector] = Array([0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.562552947957118,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.32462454192260704,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.26134237654724357,0.275971592155115,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9124004009677724,0.0,0.0,0.972679942826953,0.0,0.7362815438916668,0.0,0.0,0.20538409256392154,0.0,0.0,0.5867051710505131,0.0,0.0,0.0,0.0,0.0,0.0,0.916275031366634,0.0,0.0,0.0,0.4855561453099385,0.0,0.0,0.0,0.0,0.0,0.0,0.7866750675022912,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6178027906951924,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.97254915644181,0.0,0.0,0.0,0.0,0.0,0.7947673417631961,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9685267297437855,0.0,0.0,0.0,0.1...

scala> sparseDataset2.persist()
res37: sparseDataset2.type = [features: vector]

scala> bkm.fit(sparseDataset2).clusterCenters.length
17/08/16 17:20:54 WARN BisectingKMeans: The input RDD 1216 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res38: Int = 3

scala> bkm.fit(sparseDataset2).clusterCenters
17/08/16 17:20:58 WARN BisectingKMeans: The input RDD 1265 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res39: Array[org.apache.spark.ml.linalg.Vector] = Array([0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.562552947957118,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.32462454192260704,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.26134237654724357,0.275971592155115,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9124004009677724,0.0,0.0,0.972679942826953,0.0,0.7362815438916668,0.0,0.0,0.20538409256392154,0.0,0.0,0.5867051710505131,0.0,0.0,0.0,0.0,0.0,0.0,0.916275031366634,0.0,0.0,0.0,0.4855561453099385,0.0,0.0,0.0,0.0,0.0,0.0,0.7866750675022912,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6178027906951924,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.97254915644181,0.0,0.0,0.0,0.0,0.0,0.7947673417631961,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9685267297437855,0.0,0.0,0.0,0.1...
{code}

  was:
I found that {{BisectingKMeans}} will generate different models if the input is cached or
not.
Using the same dataset in {{BisectingKMeansSuite}}, we can found that if we cache the input,
then the number of centers will change from 2 to 3.

So it looks like a potential bug.
{code}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.clustering._
import org.apache.spark.ml.linalg._
import scala.util.Random
case class TestRow(features: org.apache.spark.ml.linalg.Vector)

val rows = 10
val dim = 1000
val seed = 42
val nnz = 130

val bkm = new BisectingKMeans().setK(5).setMinDivisibleClusterSize(4).setMaxIter(4).setSeed(123)

val random = new Random(seed)
val rdd = sc.parallelize(1 to rows).map(i => Vectors.sparse(dim, random.shuffle(0 to dim
- 1).slice(0, nnz).sorted.toArray, Array.fill(nnz)(random.nextDouble()))).map(v => new
TestRow(v))

val sparseDataset = spark.createDataFrame(rdd)

scala> bkm.fit(sparseDataset).clusterCenters
17/08/16 17:12:28 WARN BisectingKMeans: The input RDD 579 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res22: Array[org.apache.spark.ml.linalg.Vector] = Array([0.0,0.0,0.0,0.0,0.0,0.0,0.3081569145071915,0.0,0.0,0.0,0.0,0.1875176493190393,0.0,0.0,0.0,0.33856517726920116,0.0,0.15290274761955236,0.0,0.10820818064086901,0.0,0.0,0.5987249128746422,0.0,0.0,0.3563390364518392,0.0,0.5019914247361699,0.0,0.08711412551574785,0.09199053071837167,0.05749771404790841,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.5209441786832834,0.0,0.2350595158678447,0.0,0.0,0.0,0.0,0.0,0.0,0.3041334669892575,0.0,0.0,0.32422664760898434,0.0,0.24542718129722224,0.0,0.0,0.06846136418797384,0.0,0.0,0.19556839035017104,0.0,0.0,0.08436120694800427,0.0,0.0,0.0,0.30542501045554465,0.0,0.0,0.0,0.16185204843664616,0.2800921624973247,0.0,0.45459861318444555,0.0,0.0,0.0,0.26222502250076374,0.5235099131919367,0.0,0.0,0....

scala> bkm.fit(sparseDataset).clusterCenters.length
17/08/16 17:12:36 WARN BisectingKMeans: The input RDD 667 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res23: Int = 2


scala> sparseDataset.persist()
res24: sparseDataset.type = [features: vector]

scala> bkm.fit(sparseDataset).clusterCenters
17/08/16 17:14:35 WARN BisectingKMeans: The input RDD 806 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res26: Array[org.apache.spark.ml.linalg.Vector] = Array([0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.562552947957118,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.32462454192260704,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.26134237654724357,0.275971592155115,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9124004009677724,0.0,0.0,0.972679942826953,0.0,0.7362815438916668,0.0,0.0,0.20538409256392154,0.0,0.0,0.5867051710505131,0.0,0.0,0.0,0.0,0.0,0.0,0.916275031366634,0.0,0.0,0.0,0.4855561453099385,0.0,0.0,0.0,0.0,0.0,0.0,0.7866750675022912,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6178027906951924,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.97254915644181,0.0,0.0,0.0,0.0,0.0,0.7947673417631961,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9685267297437855,0.0,0.0,0.0,0.1...
scala> bkm.fit(sparseDataset).clusterCenters.length
17/08/16 17:14:38 WARN BisectingKMeans: The input RDD 855 is not directly cached, which may
hurt performance if its parent RDDs are also not cached.
res27: Int = 3

{code}

And suggested by [~srowen], I retest it with 


> BisectingKMeans generate different models with/without caching
> --------------------------------------------------------------
>
>                 Key: SPARK-21742
>                 URL: https://issues.apache.org/jira/browse/SPARK-21742
>             Project: Spark
>          Issue Type: Bug
>          Components: ML
>    Affects Versions: 2.3.0
>            Reporter: zhengruifeng
>
> I found that {{BisectingKMeans}} will generate different models if the input is cached
or not.
> Using the same dataset in {{BisectingKMeansSuite}}, we can found that if we cache the
input, then the number of centers will change from 2 to 3.
> So it looks like a potential bug.
> {code}
> import org.apache.spark.ml.param.ParamMap
> import org.apache.spark.sql.Dataset
> import org.apache.spark.ml.clustering._
> import org.apache.spark.ml.linalg._
> import scala.util.Random
> case class TestRow(features: org.apache.spark.ml.linalg.Vector)
> val rows = 10
> val dim = 1000
> val seed = 42
> val nnz = 130
> val bkm = new BisectingKMeans().setK(5).setMinDivisibleClusterSize(4).setMaxIter(4).setSeed(123)
> val random = new Random(seed)
> val rdd = sc.parallelize(1 to rows).map(i => Vectors.sparse(dim, random.shuffle(0
to dim - 1).slice(0, nnz).sorted.toArray, Array.fill(nnz)(random.nextDouble()))).map(v =>
new TestRow(v))
> val sparseDataset = spark.createDataFrame(rdd)
> scala> bkm.fit(sparseDataset).clusterCenters
> 17/08/16 17:12:28 WARN BisectingKMeans: The input RDD 579 is not directly cached, which
may hurt performance if its parent RDDs are also not cached.
> res22: Array[org.apache.spark.ml.linalg.Vector] = Array([0.0,0.0,0.0,0.0,0.0,0.0,0.3081569145071915,0.0,0.0,0.0,0.0,0.1875176493190393,0.0,0.0,0.0,0.33856517726920116,0.0,0.15290274761955236,0.0,0.10820818064086901,0.0,0.0,0.5987249128746422,0.0,0.0,0.3563390364518392,0.0,0.5019914247361699,0.0,0.08711412551574785,0.09199053071837167,0.05749771404790841,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.5209441786832834,0.0,0.2350595158678447,0.0,0.0,0.0,0.0,0.0,0.0,0.3041334669892575,0.0,0.0,0.32422664760898434,0.0,0.24542718129722224,0.0,0.0,0.06846136418797384,0.0,0.0,0.19556839035017104,0.0,0.0,0.08436120694800427,0.0,0.0,0.0,0.30542501045554465,0.0,0.0,0.0,0.16185204843664616,0.2800921624973247,0.0,0.45459861318444555,0.0,0.0,0.0,0.26222502250076374,0.5235099131919367,0.0,0.0,0....
> scala> bkm.fit(sparseDataset).clusterCenters.length
> 17/08/16 17:12:36 WARN BisectingKMeans: The input RDD 667 is not directly cached, which
may hurt performance if its parent RDDs are also not cached.
> res23: Int = 2
> scala> sparseDataset.persist()
> res24: sparseDataset.type = [features: vector]
> scala> bkm.fit(sparseDataset).clusterCenters
> 17/08/16 17:14:35 WARN BisectingKMeans: The input RDD 806 is not directly cached, which
may hurt performance if its parent RDDs are also not cached.
> res26: Array[org.apache.spark.ml.linalg.Vector] = Array([0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.562552947957118,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.32462454192260704,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.26134237654724357,0.275971592155115,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9124004009677724,0.0,0.0,0.972679942826953,0.0,0.7362815438916668,0.0,0.0,0.20538409256392154,0.0,0.0,0.5867051710505131,0.0,0.0,0.0,0.0,0.0,0.0,0.916275031366634,0.0,0.0,0.0,0.4855561453099385,0.0,0.0,0.0,0.0,0.0,0.0,0.7866750675022912,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6178027906951924,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.97254915644181,0.0,0.0,0.0,0.0,0.0,0.7947673417631961,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9685267297437855,0.0,0.0,0.0,0.1...
> scala> bkm.fit(sparseDataset).clusterCenters.length
> 17/08/16 17:14:38 WARN BisectingKMeans: The input RDD 855 is not directly cached, which
may hurt performance if its parent RDDs are also not cached.
> res27: Int = 3
> {code}
> And suggested by [~srowen], I retest it with the same dataset generated in a deterministic
way
> {code}
> val random = new Random(seed)
> val rdd = sc.parallelize(1 to rows).map(i => Vectors.sparse(dim, random.shuffle(0
to dim - 1).slice(0, nnz).sorted.toArray, Array.fill(nnz)(random.nextDouble()))).map(v =>
new TestRow(v))
> val vecs = rdd.collect()
> val rdd2 = sc.parallelize(vecs)
> val sparseDataset2 = spark.createDataFrame(rdd2)
> scala> bkm.fit(sparseDataset2).clusterCenters.length
> 17/08/16 17:20:36 WARN BisectingKMeans: The input RDD 1114 is not directly cached, which
may hurt performance if its parent RDDs are also not cached.
> res35: Int = 3
> scala> bkm.fit(sparseDataset2).clusterCenters
> 17/08/16 17:20:43 WARN BisectingKMeans: The input RDD 1164 is not directly cached, which
may hurt performance if its parent RDDs are also not cached.
> res36: Array[org.apache.spark.ml.linalg.Vector] = Array([0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.562552947957118,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.32462454192260704,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.26134237654724357,0.275971592155115,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9124004009677724,0.0,0.0,0.972679942826953,0.0,0.7362815438916668,0.0,0.0,0.20538409256392154,0.0,0.0,0.5867051710505131,0.0,0.0,0.0,0.0,0.0,0.0,0.916275031366634,0.0,0.0,0.0,0.4855561453099385,0.0,0.0,0.0,0.0,0.0,0.0,0.7866750675022912,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6178027906951924,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.97254915644181,0.0,0.0,0.0,0.0,0.0,0.7947673417631961,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9685267297437855,0.0,0.0,0.0,0.1...
> scala> sparseDataset2.persist()
> res37: sparseDataset2.type = [features: vector]
> scala> bkm.fit(sparseDataset2).clusterCenters.length
> 17/08/16 17:20:54 WARN BisectingKMeans: The input RDD 1216 is not directly cached, which
may hurt performance if its parent RDDs are also not cached.
> res38: Int = 3
> scala> bkm.fit(sparseDataset2).clusterCenters
> 17/08/16 17:20:58 WARN BisectingKMeans: The input RDD 1265 is not directly cached, which
may hurt performance if its parent RDDs are also not cached.
> res39: Array[org.apache.spark.ml.linalg.Vector] = Array([0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.562552947957118,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.32462454192260704,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.26134237654724357,0.275971592155115,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9124004009677724,0.0,0.0,0.972679942826953,0.0,0.7362815438916668,0.0,0.0,0.20538409256392154,0.0,0.0,0.5867051710505131,0.0,0.0,0.0,0.0,0.0,0.0,0.916275031366634,0.0,0.0,0.0,0.4855561453099385,0.0,0.0,0.0,0.0,0.0,0.0,0.7866750675022912,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6178027906951924,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.97254915644181,0.0,0.0,0.0,0.0,0.0,0.7947673417631961,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9685267297437855,0.0,0.0,0.0,0.1...
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message