# spark-reviews mailing list archives

##### Site index · List index
Message view
Top
From srowen <...@git.apache.org>
Subject [GitHub] spark pull request #20396: [SPARK-23217][ML] Add cosine distance measure to ...
Date Fri, 26 Jan 2018 13:40:52 GMT
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20396#discussion_r164111780

--- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
---
@@ -421,13 +460,220 @@ private[evaluation] object SquaredEuclideanSilhouette {
computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: Double)
}

-    val silhouetteScore = dfWithSquaredNorm
-      .select(avg(
-        computeSilhouetteCoefficientUDF(
-          col(featuresCol), col(predictionCol).cast(DoubleType), col("squaredNorm"))
-      ))
-      .collect()(0)
-      .getDouble(0)
+    val silhouetteScore = overallScore(dfWithSquaredNorm,
+      computeSilhouetteCoefficientUDF(col(featuresCol), col(predictionCol).cast(DoubleType),
+        col("squaredNorm")))
+
+    bClustersStatsMap.destroy()
+
+    silhouetteScore
+  }
+}
+
+
+/**
+ * The algorithm which is implemented in this object, instead, is an efficient and parallel
+ * implementation of the Silhouette using the cosine distance measure. The cosine distance
+ * measure is defined as 1 - s where s is the cosine similarity between two points.
+ *
+ * The total distance of the point X to the points $C_{i}$ belonging to the cluster
$\Gamma$
+ * is:
+ *
+ * <blockquote>
+ *   $$+ * \sum\limits_{i=1}^N d(X, C_{i} ) = + * \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} }{ \|X\|\|C_{i}\|} \Big) + * = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} + * \frac{c_{ij}}{\|C_{i}\|} + * = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( \sum\limits_{i=1}^N + * \frac{c_{ij}}{\|C_{i}\|} \Big) + *$$
+ * </blockquote>
+ *
+ * where $x_{j}$ is the j-th dimension of the point X and $c_{ij}$ is the j-th
dimension
+ * of the i-th point in cluster $\Gamma$.
+ *
+ * Then, we can define the vector:
+ *
+ * <blockquote>
+ *   $$+ * \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D + *$$
+ * </blockquote>
+ *
+ * which can be precomputed for each point and the vector
+ *
+ * <blockquote>
+ *   $$+ * \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N \xi_{C_{j}i}, i = 1, ..., D + *$$
+ * </blockquote>
+ *
+ * which can be precomputed too for each cluster $\Gamma$ by its points $C_{i}$.
+ *
+ * With these definitions, the numerator becomes:
+ *
+ * <blockquote>
+ *   $$+ * N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j} + *$$
+ * </blockquote>
+ *
+ * Thus the average distance of a point X to the points of the cluster $\Gamma$ is:
+ *
+ * <blockquote>
+ *   $$+ * 1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N} + *$$
+ * </blockquote>
+ *
+ * In the implementation, the precomputed values for the clusters are distributed among
the worker
+ * nodes via broadcasted variables, because we can assume that the clusters are limited
in number.
+ *
+ * The main strengths of this algorithm are the low computational complexity and the
intrinsic
+ * parallelism. The precomputed information for each point and for each cluster can be
computed
+ * with a computational complexity which is O(N/W), where N is the number of points
in the
+ * dataset and W is the number of worker nodes. After that, every point can be analyzed
+ * independently from the others.
+ *
+ * For every point we need to compute the average distance to all the clusters. Since
the formula
+ * above requires O(D) operations, this phase has a computational complexity which
is
+ * O(C*D*N/W) where C is the number of clusters (which we assume quite low), D
is the number
+ * of dimensions, N is the number of points in the dataset and W is the number of
worker
+ * nodes.
+ */
+private[evaluation] object CosineSilhouette extends Silhouette {
+
+  private[this] var kryoRegistrationPerformed: Boolean = false
+
+  private[this] val normalizedFeaturesColName = "normalizedFeatures"
+
+  /**
+   * This method registers the class
+   * [[org.apache.spark.ml.evaluation.CosineSilhouette.ClusterStats]]
+   * for kryo serialization.
+   *
+   * @param sc SparkContext to be used
+   */
+  def registerKryoClasses(sc: SparkContext): Unit = {
+    if (!kryoRegistrationPerformed) {
+      sc.getConf.registerKryoClasses(
+        Array(
+          classOf[CosineSilhouette.ClusterStats]
+        )
+      )
+      kryoRegistrationPerformed = true
+    }
+  }
+
+  case class ClusterStats(normalizedFeatureSum: Vector, numOfPoints: Long)
+
+  /**
+   * The method takes the input dataset and computes the aggregated values
+   * about a cluster which are needed by the algorithm.
+   *
+   * @param df The DataFrame which contains the input data
+   * @param predictionCol The name of the column which contains the predicted cluster
id
+   *                      for the point.
+   * @return A [[scala.collection.immutable.Map]] which associates each cluster id to
a
+   *         [[ClusterStats]] object (which contains the precomputed values N and
+   *         $\Omega_{\Gamma}$).
+   */
+  def computeClusterStats(df: DataFrame, predictionCol: String): Map[Double, ClusterStats]
= {
+    val numFeatures = df.select(col(normalizedFeaturesColName)).first().getAs[Vector](0).size
+    val clustersStatsRDD = df.select(
+      col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName))
+      .rdd
+      .map { row => (row.getDouble(0), row.getAs[Vector](1)) }
--- End diff --

I'm neutral on it. I don't think importing implicits is a big deal either way.

---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message