flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2131) Add Initialization schemes for K-means clustering
Date Fri, 05 Feb 2016 13:50:39 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134198#comment-15134198
] 

ASF GitHub Bot commented on FLINK-2131:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/757#discussion_r52017071
  
    --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
---
    @@ -0,0 +1,614 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.clustering
    +
    +import org.apache.flink.api.common.functions.RichFilterFunction
    +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
    +import org.apache.flink.api.scala.{DataSet, _}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.ml._
    +import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
    +import org.apache.flink.ml.common.{LabeledVector, _}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{BLAS, Vector}
    +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
    +import org.apache.flink.ml.pipeline._
    +
    +import scala.collection.JavaConverters._
    +import scala.util.Random
    +
    +
    +/**
    + * Implements the KMeans algorithm which calculates cluster centroids based on set of
training data
    + * points and a set of k initial centroids.
    + *
    + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and
can then be
    + * used to assign new points to the learned cluster centroids.
    + *
    + * The KMeans algorithm works as described on Wikipedia
    + * (http://en.wikipedia.org/wiki/K-means_clustering):
    + *
    + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds
by alternating
    + * between two steps:
    + *
    + * ===Assignment step:===
    + *
    + * Assign each observation to the cluster whose mean yields the least within-cluster
sum  of
    + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is
intuitively
    + * the "nearest" mean. (Mathematically, this means partitioning the observations according
to the
    + * Voronoi diagram generated by the means).
    + *
    + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤
j ≤ k}`,
    + * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it could be assigned
to two or
    + * more of them.
    + *
    + * ===Update step:===
    + *
    + * Calculate the new means to be the centroids of the observations in the new clusters.
    + *
    + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
    + *
    + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster
    + * sum of squares (WCSS) objective.
    + *
    + * @example
    + * {{{
    + *       val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData)
    + *       val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids)
    + *
    + *       val kmeans = KMeans()
    + *         .setInitialCentroids(initialCentroids)
    + *         .setNumIterations(10)
    + *
    + *       kmeans.fit(trainingDS)
    + *
    + *       // getting the computed centroids
    + *       val centroidsResult = kmeans.centroids.get.collect()
    + *
    + *       // get matching clusters for new points
    + *       val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData)
    + *       val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
    + * }}}
    + *
    + * =Parameters=
    + *
    + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
    + * Defines the number of iterations to recalculate the centroids of the clusters. As
it
    + * is a heuristic algorithm, there is no guarantee that it will converge to the global
optimum. The
    + * centroids of the clusters and the reassignment of the data points will be repeated
till the
    + * given number of iterations is reached.
    + * (Default value: '''10''')
    + *
    + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
    + * Defines the initial k centroids of the k clusters. They are used as start off point
of the
    + * algorithm for clustering the data set. The centroids are recalculated as often as
set in
    + * [[org.apache.flink.ml.clustering.KMeans.NumIterations]]. The choice of the initial
centroids
    + * mainly affects the outcome of the algorithm.
    + *
    + * - [[org.apache.flink.ml.clustering.KMeans.InitialStrategy]]:
    + * Defines the initialization strategy to be used for initializing the KMeans algorithm
in case
    + * the initial centroids are not provided. Allowed values are "random", "kmeans++" and
"kmeans||".
    + * (Default Value: '''random''')
    + *
    + * - [[org.apache.flink.ml.clustering.KMeans.NumClusters]]:
    + * Defines the number of clusters required. This is essential to provide when only the
    + * initialization strategy is specified, not the initial centroids themselves.
    + * (Default Value: '''0''')
    + *
    + * - [[org.apache.flink.ml.clustering.KMeans.OversamplingFactor]]:
    + *  Defines the oversampling rate for the kmeans|| initialization.
    + * (Default Value: '''2k'''), where k is the number of clusters.
    + *
    + * - [[org.apache.flink.ml.clustering.KMeans.KMeansParRounds]]:
    + *  Defines the number of rounds for the kmeans|| initialization.
    + * (Default Value: '''5''')
    + *
    + */
    +class KMeans extends Predictor[KMeans] {
    +
    +  import KMeans._
    +
    +  /**
    +   * Stores the learned clusters after the fit operation
    +   */
    +  var centroids: Option[DataSet[Seq[LabeledVector]]] = None
    +
    +  /**
    +   * Sets the maximum number of iterations.
    +   *
    +   * @param numIterations The maximum number of iterations.
    +   * @return itself
    +   */
    +  def setNumIterations(numIterations: Int): KMeans = {
    +    parameters.add(NumIterations, numIterations)
    +    this
    +  }
    +
    +  /**
    +   * Sets the number of clusters.
    +   *
    +   * @param numClusters The number of clusters
    +   * @return itself
    +   */
    +  def setNumClusters(numClusters: Int): KMeans = {
    +    parameters.add(NumClusters, numClusters)
    +    this
    +  }
    +
    +  /**
    +   * Sets the initial centroids on which the algorithm will start computing. These points
should
    +   * depend on the data and will significantly influence the resulting centroids.
    +   * Note that this setting will override [[setInitializationStrategy())]] and the size
of
    +   * initialCentroids will override the value, if set, by [[setNumClusters()]]
    +   *
    +   * @param initialCentroids A set of labeled vectors.
    +   * @return itself
    +   */
    +  def setInitialCentroids(initialCentroids: Seq[LabeledVector]): KMeans = {
    +    parameters.add(InitialCentroids, initialCentroids)
    +    this
    +  }
    +
    +  /**
    +   * Automatically initialize the KMeans algorithm. Allowed options are "random", "kmeans++"
and
    +   * "kmeans||"
    +   *
    +   * @param initialStrategy
    +   * @return itself
    +   */
    +  def setInitializationStrategy(initialStrategy: String): KMeans = {
    +    require(Array("random", "kmeans++", "kmeans||").contains(initialStrategy), s"$initialStrategy"
+
    --- End diff --
    
    It would allow us to get rid of this check


> Add Initialization schemes for K-means clustering
> -------------------------------------------------
>
>                 Key: FLINK-2131
>                 URL: https://issues.apache.org/jira/browse/FLINK-2131
>             Project: Flink
>          Issue Type: Task
>          Components: Machine Learning Library
>            Reporter: Sachin Goel
>            Assignee: Sachin Goel
>
> The Lloyd's [KMeans] algorithm takes initial centroids as its input. However, in case
the user doesn't provide the initial centers, they may ask for a particular initialization
scheme to be followed. The most commonly used are these:
> 1. Random initialization: Self-explanatory
> 2. kmeans++ initialization: http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf
> 3. kmeans|| : http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf
> For very large data sets, or for large values of k, the kmeans|| method is preferred
as it provides the same approximation guarantees as kmeans++ and requires lesser number of
passes over the input data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message