spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject spark git commit: [SPARK-5503][MLLIB] Example code for Power Iteration Clustering
Date Fri, 13 Feb 2015 17:46:08 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 5c883df09 -> 5e6394222


[SPARK-5503][MLLIB] Example code for Power Iteration Clustering

Author: sboeschhuawei <stephen.boesch@huawei.com>

Closes #4495 from javadba/picexamples and squashes the following commits:

3c84b14 [sboeschhuawei] PIC Examples updates from Xiangrui's comments round 5
2878675 [sboeschhuawei] Fourth round with xiangrui on PICExample
d7ac350 [sboeschhuawei] Updates to PICExample from Xiangrui's comments round 3
d7f0cba [sboeschhuawei] Updates to PICExample from Xiangrui's comments round 3
cef28f4 [sboeschhuawei] Further updates to PICExample from Xiangrui's comments
f7ff43d [sboeschhuawei] Update to PICExample from Xiangrui's comments
efeec45 [sboeschhuawei] Update to PICExample from Xiangrui's comments
03e8de4 [sboeschhuawei] Added PICExample
c509130 [sboeschhuawei] placeholder for pic examples
5864d4a [sboeschhuawei] placeholder for pic examples

(cherry picked from commit e1a1ff8108463ca79299ec0eb555a0c8db9dffa0)
Signed-off-by: Xiangrui Meng <meng@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e639422
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e639422
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e639422

Branch: refs/heads/branch-1.3
Commit: 5e639422207a113eee4ea3796c221004664ede1a
Parents: 5c883df
Author: sboeschhuawei <stephen.boesch@huawei.com>
Authored: Fri Feb 13 09:45:57 2015 -0800
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Fri Feb 13 09:46:03 2015 -0800

----------------------------------------------------------------------
 .../mllib/PowerIterationClusteringExample.scala | 160 +++++++++++++++++++
 1 file changed, 160 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5e639422/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
new file mode 100644
index 0000000..b2373ad
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.log4j.{Level, Logger}
+import scopt.OptionParser
+
+import org.apache.spark.mllib.clustering.PowerIterationClustering
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+ * An example Power Iteration Clustering http://www.icml2010.org/papers/387.pdf app.
+ * Takes an input of K concentric circles and the number of points in the innermost circle.
+ * The output should be K clusters - each cluster containing precisely the points associated
+ * with each of the input circles.
+ *
+ * Run with
+ * {{{
+ * ./bin/run-example mllib.PowerIterationClusteringExample [options]
+ *
+ * Where options include:
+ *   k:  Number of circles/clusters
+ *   n:  Number of sampled points on innermost circle.. There are proportionally more points
+ *      within the outer/larger circles
+ *   maxIterations:   Number of Power Iterations
+ *   outerRadius:  radius of the outermost of the concentric circles
+ * }}}
+ *
+ * Here is a sample run and output:
+ *
+ * ./bin/run-example mllib.PowerIterationClusteringExample
+ * -k 3 --n 30 --maxIterations 15
+ *
+ * Cluster assignments: 1 -> [0,1,2,3,4],2 -> [5,6,7,8,9,10,11,12,13,14],
+ * 0 -> [15,16,17,18,19,20,21,22,23,24,25,26,27,28,29]
+ *
+ *
+ * If you use it as a template to create your own app, please use `spark-submit` to submit
your app.
+ */
+object PowerIterationClusteringExample {
+
+  case class Params(
+      input: String = null,
+      k: Int = 3,
+      numPoints: Int = 5,
+      maxIterations: Int = 10,
+      outerRadius: Double = 3.0
+    ) extends AbstractParams[Params]
+
+  def main(args: Array[String]) {
+    val defaultParams = Params()
+
+    val parser = new OptionParser[Params]("PIC Circles") {
+      head("PowerIterationClusteringExample: an example PIC app using concentric circles.")
+      opt[Int]('k', "k")
+        .text(s"number of circles (/clusters), default: ${defaultParams.k}")
+        .action((x, c) => c.copy(k = x))
+      opt[Int]('n', "n")
+        .text(s"number of points in smallest circle, default: ${defaultParams.numPoints}")
+        .action((x, c) => c.copy(numPoints = x))
+      opt[Int]("maxIterations")
+        .text(s"number of iterations, default: ${defaultParams.maxIterations}")
+        .action((x, c) => c.copy(maxIterations = x))
+      opt[Int]('r', "r")
+        .text(s"radius of outermost circle, default: ${defaultParams.outerRadius}")
+        .action((x, c) => c.copy(numPoints = x))
+    }
+
+    parser.parse(args, defaultParams).map { params =>
+      run(params)
+    }.getOrElse {
+      sys.exit(1)
+    }
+  }
+
+  def run(params: Params) {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName(s"PowerIterationClustering with $params")
+    val sc = new SparkContext(conf)
+
+    Logger.getRootLogger.setLevel(Level.WARN)
+
+    val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints, params.outerRadius)
+    val model = new PowerIterationClustering()
+      .setK(params.k)
+      .setMaxIterations(params.maxIterations)
+      .run(circlesRdd)
+
+    val clusters = model.assignments.collect.groupBy(_._2).mapValues(_.map(_._1))
+    val assignments = clusters.toList.sortBy { case (k, v) => v.length}
+    val assignmentsStr = assignments
+      .map { case (k, v) =>
+      s"$k -> ${v.sorted.mkString("[", ",", "]")}"
+    }.mkString(",")
+    val sizesStr = assignments.map {
+      _._2.size
+    }.sorted.mkString("(", ",", ")")
+    println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
+
+    sc.stop()
+  }
+
+  def generateCircle(radius: Double, n: Int) = {
+    Seq.tabulate(n) { i =>
+      val theta = 2.0 * math.Pi * i / n
+      (radius * math.cos(theta), radius * math.sin(theta))
+    }
+  }
+
+  def generateCirclesRdd(sc: SparkContext,
+      nCircles: Int = 3,
+      nPoints: Int = 30,
+      outerRadius: Double): RDD[(Long, Long, Double)] = {
+
+    val radii = Array.tabulate(nCircles) { cx => outerRadius / (nCircles - cx)}
+    val groupSizes = Array.tabulate(nCircles) { cx => (cx + 1) * nPoints}
+    val points = (0 until nCircles).flatMap { cx =>
+      generateCircle(radii(cx), groupSizes(cx))
+    }.zipWithIndex
+    val rdd = sc.parallelize(points)
+    val distancesRdd = rdd.cartesian(rdd).flatMap { case (((x0, y0), i0), ((x1, y1), i1))
=>
+      if (i0 < i1) {
+        Some((i0.toLong, i1.toLong, gaussianSimilarity((x0, y0), (x1, y1), 1.0)))
+      } else {
+        None
+      }
+    }
+    distancesRdd
+  }
+
+  /**
+   * Gaussian Similarity:  http://en.wikipedia.org/wiki/Radial_basis_function_kernel
+   */
+  def gaussianSimilarity(p1: (Double, Double), p2: (Double, Double), sigma: Double) = {
+    val coeff = 1.0 / (math.sqrt(2.0 * math.Pi) * sigma)
+    val expCoeff = -1.0 / 2.0 * math.pow(sigma, 2.0)
+    val ssquares = (p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2)
+    coeff * math.exp(expCoeff * ssquares)
+    //    math.exp((p1._1 - p2._1) * (p1._1 - p2._1) + (p1._2 - p2._2) * (p1._2 - p2._2))
+  }
+
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message