spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-23318][ML] FP-growth: WARN FPGrowth: Input data is not cached
Date Tue, 13 Feb 2018 12:20:37 GMT
Repository: spark
Updated Branches:
  refs/heads/master 407f67249 -> 9dae71516


[SPARK-23318][ML] FP-growth: WARN FPGrowth: Input data is not cached

## What changes were proposed in this pull request?

Cache the RDD of items in ml.FPGrowth before passing it to mllib.FPGrowth. Cache only when
the user did not cache the input dataset of transactions. This fixes the warning about uncached
data emerging from mllib.FPGrowth.

## How was this patch tested?

Manually:
1. Run ml.FPGrowthExample - warning is there
2. Apply the fix
3. Run ml.FPGrowthExample again - no warning anymore

Author: Arseniy Tashoyan <tashoyan@gmail.com>

Closes #20578 from tashoyan/SPARK-23318.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9dae7151
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9dae7151
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9dae7151

Branch: refs/heads/master
Commit: 9dae715168a8e72e318ab231c34a1069bfa342a6
Parents: 407f672
Author: Arseniy Tashoyan <tashoyan@gmail.com>
Authored: Tue Feb 13 06:20:34 2018 -0600
Committer: Sean Owen <sowen@cloudera.com>
Committed: Tue Feb 13 06:20:34 2018 -0600

----------------------------------------------------------------------
 .../scala/org/apache/spark/ml/fpm/FPGrowth.scala     | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9dae7151/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
index aa7871d..3d041fc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
@@ -32,6 +32,7 @@ import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel
 
 /**
  * Common params for FPGrowth and FPGrowthModel
@@ -158,18 +159,30 @@ class FPGrowth @Since("2.2.0") (
   }
 
   private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = {
+    val handlePersistence = dataset.storageLevel == StorageLevel.NONE
+
     val data = dataset.select($(itemsCol))
-    val items = data.where(col($(itemsCol)).isNotNull).rdd.map(r => r.getSeq[T](0).toArray)
+    val items = data.where(col($(itemsCol)).isNotNull).rdd.map(r => r.getSeq[Any](0).toArray)
     val mllibFP = new MLlibFPGrowth().setMinSupport($(minSupport))
     if (isSet(numPartitions)) {
       mllibFP.setNumPartitions($(numPartitions))
     }
+
+    if (handlePersistence) {
+      items.persist(StorageLevel.MEMORY_AND_DISK)
+    }
+
     val parentModel = mllibFP.run(items)
     val rows = parentModel.freqItemsets.map(f => Row(f.items, f.freq))
     val schema = StructType(Seq(
       StructField("items", dataset.schema($(itemsCol)).dataType, nullable = false),
       StructField("freq", LongType, nullable = false)))
     val frequentItems = dataset.sparkSession.createDataFrame(rows, schema)
+
+    if (handlePersistence) {
+      items.unpersist()
+    }
+
     copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message