[ https://issues.apache.org/jira/browse/FLINK2131?page=com.atlassian.jira.plugin.system.issuetabpanels:commenttabpanel&focusedCommentId=15134196#comment15134196
]
ASF GitHub Bot commented on FLINK2131:

Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/757#discussion_r52017027
 Diff: flinkstaging/flinkml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala

@@ 0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml._
+import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{BLAS, Vector}
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids based on set of
training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and
can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/Kmeans_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds
by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least withincluster
sum of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is
intuitively
+ * the "nearest" mean. (Mathematically, this means partitioning the observations according
to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p :  x_p  m_i^(t) ^2 ≤  x_p  m_j^(t) ^2 \forall j, 1 ≤
j ≤ k}`,
+ * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned
to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / S^{(t)}_i ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a leastsquares estimator, this also minimizes the withincluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData)
+ * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids)
+ *
+ * val kmeans = KMeans()
+ * .setInitialCentroids(initialCentroids)
+ * .setNumIterations(10)
+ *
+ * kmeans.fit(trainingDS)
+ *
+ * // getting the computed centroids
+ * val centroidsResult = kmeans.centroids.get.collect()
+ *
+ * // get matching clusters for new points
+ * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData)
+ * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ *  [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the clusters. As
it
+ * is a heuristic algorithm, there is no guarantee that it will converge to the global
optimum. The
+ * centroids of the clusters and the reassignment of the data points will be repeated
till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ *  [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as start off point
of the
+ * algorithm for clustering the data set. The centroids are recalculated as often as
set in
+ * [[org.apache.flink.ml.clustering.KMeans.NumIterations]]. The choice of the initial
centroids
+ * mainly affects the outcome of the algorithm.
+ *
+ *  [[org.apache.flink.ml.clustering.KMeans.InitialStrategy]]:
+ * Defines the initialization strategy to be used for initializing the KMeans algorithm
in case
+ * the initial centroids are not provided. Allowed values are "random", "kmeans++" and
"kmeans".
+ * (Default Value: '''random''')
+ *
+ *  [[org.apache.flink.ml.clustering.KMeans.NumClusters]]:
+ * Defines the number of clusters required. This is essential to provide when only the
+ * initialization strategy is specified, not the initial centroids themselves.
+ * (Default Value: '''0''')
+ *
+ *  [[org.apache.flink.ml.clustering.KMeans.OversamplingFactor]]:
+ * Defines the oversampling rate for the kmeans initialization.
+ * (Default Value: '''2k'''), where k is the number of clusters.
+ *
+ *  [[org.apache.flink.ml.clustering.KMeans.KMeansParRounds]]:
+ * Defines the number of rounds for the kmeans initialization.
+ * (Default Value: '''5''')
+ *
+ */
+class KMeans extends Predictor[KMeans] {
+
+ import KMeans._
+
+ /**
+ * Stores the learned clusters after the fit operation
+ */
+ var centroids: Option[DataSet[Seq[LabeledVector]]] = None
+
+ /**
+ * Sets the maximum number of iterations.
+ *
+ * @param numIterations The maximum number of iterations.
+ * @return itself
+ */
+ def setNumIterations(numIterations: Int): KMeans = {
+ parameters.add(NumIterations, numIterations)
+ this
+ }
+
+ /**
+ * Sets the number of clusters.
+ *
+ * @param numClusters The number of clusters
+ * @return itself
+ */
+ def setNumClusters(numClusters: Int): KMeans = {
+ parameters.add(NumClusters, numClusters)
+ this
+ }
+
+ /**
+ * Sets the initial centroids on which the algorithm will start computing. These points
should
+ * depend on the data and will significantly influence the resulting centroids.
+ * Note that this setting will override [[setInitializationStrategy())]] and the size
of
+ * initialCentroids will override the value, if set, by [[setNumClusters()]]
+ *
+ * @param initialCentroids A set of labeled vectors.
+ * @return itself
+ */
+ def setInitialCentroids(initialCentroids: Seq[LabeledVector]): KMeans = {
+ parameters.add(InitialCentroids, initialCentroids)
+ this
+ }
+
+ /**
+ * Automatically initialize the KMeans algorithm. Allowed options are "random", "kmeans++"
and
+ * "kmeans"
+ *
+ * @param initialStrategy
+ * @return itself
+ */
+ def setInitializationStrategy(initialStrategy: String): KMeans = {
 End diff 
An enum would be the better solution here.
> Add Initialization schemes for Kmeans clustering
> 
>
> Key: FLINK2131
> URL: https://issues.apache.org/jira/browse/FLINK2131
> Project: Flink
> Issue Type: Task
> Components: Machine Learning Library
> Reporter: Sachin Goel
> Assignee: Sachin Goel
>
> The Lloyd's [KMeans] algorithm takes initial centroids as its input. However, in case
the user doesn't provide the initial centers, they may ask for a particular initialization
scheme to be followed. The most commonly used are these:
> 1. Random initialization: Selfexplanatory
> 2. kmeans++ initialization: http://ilpubs.stanford.edu:8090/778/1/200613.pdf
> 3. kmeans : http://theory.stanford.edu/~sergei/papers/vldb12kmpar.pdf
> For very large data sets, or for large values of k, the kmeans method is preferred
as it provides the same approximation guarantees as kmeans++ and requires lesser number of
passes over the input data.

This message was sent by Atlassian JIRA
(v6.3.4#6332)
