Return-Path: X-Original-To: apmail-spark-reviews-archive@minotaur.apache.org Delivered-To: apmail-spark-reviews-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 95EEF1050A for ; Tue, 8 Sep 2015 17:11:24 +0000 (UTC) Received: (qmail 79251 invoked by uid 500); 8 Sep 2015 17:11:24 -0000 Delivered-To: apmail-spark-reviews-archive@spark.apache.org Received: (qmail 79231 invoked by uid 500); 8 Sep 2015 17:11:24 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 79215 invoked by uid 99); 8 Sep 2015 17:11:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Sep 2015 17:11:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2E015E01F5; Tue, 8 Sep 2015 17:11:24 +0000 (UTC) From: mengxr To: reviews@spark.apache.org Reply-To: reviews@spark.apache.org References: In-Reply-To: Subject: [GitHub] spark pull request: [SPARK-8402][MLLIB] DP Means Clustering Content-Type: text/plain Message-Id: <20150908171124.2E015E01F5@git1-us-west.apache.org> Date: Tue, 8 Sep 2015 17:11:24 +0000 (UTC) Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/6880#discussion_r38952190 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/DpMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.BLAS.{axpy, scal} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + +/** + * :: Experimental :: + * + * The Dirichlet process (DP) is a popular non-parametric Bayesian mixture + * model that allows for flexible clustering of data without having to + * determine the number of clusters in advance. + * + * Given a set of data points, this class performs cluster creation process, + * based on DP means algorithm, iterating until the maximum number of iterations + * is reached or the convergence criteria is satisfied. With the current + * global set of centers, it locally creates a new cluster centered at `x` + * whenever it encounters an uncovered data point `x`. In a similar manner, + * a local cluster center is promoted to a global center whenever an uncovered + * local cluster center is found. A data point is said to be "covered" by + * a cluster `c` if the distance from the point to the cluster center of `c` + * is less than a given lambda value. + * + * The original paper is "MLbase: Distributed Machine Learning Made Easy" by + * Xinghao Pan, Evan R. Sparks, Andre Wibisono + * + * @param lambda The distance threshold value that controls cluster creation. + * @param convergenceTol The threshold value at which convergence is considered to have occurred. + * @param maxIterations The maximum number of iterations to perform. + */ + +@Experimental +class DpMeans private ( + private var lambda: Double, + private var convergenceTol: Double, + private var maxIterations: Int) extends Serializable with Logging { + + /** + * Constructs a default instance.The default parameters are {lambda: 1, convergenceTol: 0.01, + * maxIterations: 20}. + */ + def this() = this(1, 0.01, 20) + + /** Set the distance threshold that controls cluster creation. Default: 1 */ + def getLambda(): Double = lambda + + /** Return the lambda. */ + def setLambda(lambda: Double): this.type = { + this.lambda = lambda + this + } + + /** Set the threshold value at which convergence is considered to have occurred. Default: 0.01 */ + def setConvergenceTol(convergenceTol: Double): this.type = { + this.convergenceTol = convergenceTol + this + } + + /** Return the threshold value at which convergence is considered to have occurred. */ + def getConvergenceTol: Double = convergenceTol + + /** Set the maximum number of iterations. Default: 20 */ + def setMaxIterations(maxIterations: Int): this.type = { + this.maxIterations = maxIterations + this + } + + /** Return the maximum number of iterations. */ + def getMaxIterations: Int = maxIterations + + /** + * Perform DP means clustering + */ + def run(data: RDD[Vector]): DpMeansModel = { + if (data.getStorageLevel == StorageLevel.NONE) { + logWarning("The input data is not directly cached, which may hurt performance if its" + + " parent RDDs are also uncached.") + } + + // Compute squared norms and cache them. + val norms = data.map(Vectors.norm(_, 2.0)) + norms.persist() + val zippedData = data.zip(norms).map { + case (v, norm) => new VectorWithNorm(v, norm) + } + + // Implementation of DP means algorithm. + var iteration = 0 + var covered = false + var converged = false + var localCenters = Array.empty[VectorWithNorm] + val globalCenters = ArrayBuffer.empty[VectorWithNorm] + + // Execute clustering until the maximum number of iterations is reached + // or the cluster centers have converged. + while (iteration < maxIterations && !converged) { + type WeightedPoint = (Vector, Long) + def mergeClusters(x: WeightedPoint, y: WeightedPoint): WeightedPoint = { + axpy(1.0, x._1, y._1) + (y._1, x._2 + y._2) + } + + // Loop until all data points are covered by some cluster center + do { + localCenters = zippedData.mapPartitions(h => DpMeans.cover(h, globalCenters, lambda)) --- End diff -- It is useful to put some log messages here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org For additional commands, e-mail: reviews-help@spark.apache.org