ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ch...@apache.org
Subject [2/3] ignite git commit: IGNITE-8169: [ML] Adopt KMeans to the new Partitioned Dataset and cleanup old code
Date Mon, 16 Apr 2018 17:20:59 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/FuzzyCMeansModel.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/FuzzyCMeansModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/FuzzyCMeansModel.java
deleted file mode 100644
index 70009cb..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/FuzzyCMeansModel.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.clustering;
-
-import java.util.Arrays;
-import org.apache.ignite.ml.Exportable;
-import org.apache.ignite.ml.Exporter;
-import org.apache.ignite.ml.FuzzyCMeansModelFormat;
-import org.apache.ignite.ml.math.Vector;
-import org.apache.ignite.ml.math.distances.DistanceMeasure;
-
-/** This class incapsulates result of clusterization. */
-public class FuzzyCMeansModel implements ClusterizationModel<Vector, Integer>, Exportable<FuzzyCMeansModelFormat> {
-    /** Centers of clusters. */
-    private Vector[] centers;
-
-    /** Distance measure. */
-    private DistanceMeasure measure;
-
-    /**
-     * Constructor that creates FCM model by centers and measure.
-     *
-     * @param centers Array of centers.
-     * @param measure Distance measure.
-     */
-    public FuzzyCMeansModel(Vector[] centers, DistanceMeasure measure) {
-        this.centers = Arrays.copyOf(centers, centers.length);
-        this.measure = measure;
-    }
-
-    /** Distance measure used while clusterization. */
-    public DistanceMeasure distanceMeasure() {
-        return measure;
-    }
-
-    /** @inheritDoc */
-    @Override public int clustersCount() {
-        return centers.length;
-    }
-
-    /** @inheritDoc */
-    @Override public Vector[] centers() {
-        return Arrays.copyOf(centers, centers.length);
-    }
-
-    /**
-     * Predict closest center index for a given vector.
-     *
-     * @param val Vector.
-     * @return Index of the closest center or -1 if it can't be found.
-     */
-    @Override public Integer apply(Vector val) {
-        int idx = -1;
-        double minDistance = Double.POSITIVE_INFINITY;
-
-        for (int i = 0; i < centers.length; i++) {
-            double currDistance = measure.compute(val, centers[i]);
-            if (currDistance < minDistance) {
-                minDistance = currDistance;
-                idx = i;
-            }
-        }
-
-        return idx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public <P> void saveModel(Exporter<FuzzyCMeansModelFormat, P> exporter, P path) {
-        FuzzyCMeansModelFormat mdlData = new FuzzyCMeansModelFormat(centers, measure);
-
-        exporter.save(mdlData, path);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java
deleted file mode 100644
index 5595b4c..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.clustering;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import javax.cache.Cache;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.ml.math.Vector;
-import org.apache.ignite.ml.math.VectorUtils;
-import org.apache.ignite.ml.math.distances.DistanceMeasure;
-import org.apache.ignite.ml.math.distributed.CacheUtils;
-import org.apache.ignite.ml.math.distributed.keys.impl.SparseMatrixKey;
-import org.apache.ignite.ml.math.exceptions.ConvergenceException;
-import org.apache.ignite.ml.math.exceptions.MathIllegalArgumentException;
-import org.apache.ignite.ml.math.functions.Functions;
-import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
-import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
-import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
-import org.apache.ignite.ml.math.util.MapUtil;
-import org.apache.ignite.ml.math.util.MatrixUtil;
-
-import static org.apache.ignite.ml.math.distributed.CacheUtils.distributedFold;
-import static org.apache.ignite.ml.math.util.MatrixUtil.localCopyOf;
-
-/**
- * Clustering algorithm based on Bahmani et al. paper and Apache Spark class with corresponding functionality.
- *
- * TODO: IGNITE-6059, add block matrix support.
- *
- * @see <a href="http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf">Scalable K-Means++(wikipedia)</a>
- */
-public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistributedMatrix> {
-    /** */
-    private final int maxIterations;
-
-    /** */
-    private Random rnd;
-
-    /** */
-    private int initSteps;
-
-    /** */
-    private long seed;
-
-    /** */
-    private double epsilon = 1e-4;
-
-    /** */
-    public KMeansDistributedClusterer(DistanceMeasure measure, int initSteps, int maxIterations, Long seed) {
-        super(measure);
-        this.initSteps = initSteps;
-
-        this.seed = seed != null ? seed : new Random().nextLong();
-
-        this.maxIterations = maxIterations;
-        rnd = new Random(this.seed);
-    }
-
-    /** */
-    @Override public KMeansModel cluster(SparseDistributedMatrix points, int k) throws
-        MathIllegalArgumentException, ConvergenceException {
-        SparseDistributedMatrix pointsCp = (SparseDistributedMatrix)points.like(points.rowSize(), points.columnSize());
-
-        String cacheName = ((SparseDistributedMatrixStorage)points.getStorage()).cacheName();
-
-        // TODO: IGNITE-5825, this copy is very ineffective, just for POC. Immutability of data should be guaranteed by other methods
-        // such as logical locks for example.
-        pointsCp.assign(points);
-
-        Vector[] centers = initClusterCenters(pointsCp, k);
-
-        boolean converged = false;
-        int iteration = 0;
-        int dim = pointsCp.viewRow(0).size();
-        UUID uid = pointsCp.getUUID();
-
-        // Execute iterations of Lloyd's algorithm until converged
-        while (iteration < maxIterations && !converged) {
-            SumsAndCounts stats = getSumsAndCounts(centers, dim, uid, cacheName);
-
-            converged = true;
-
-            for (Integer ind : stats.sums.keySet()) {
-                Vector massCenter = stats.sums.get(ind).times(1.0 / stats.counts.get(ind));
-
-                if (converged && distance(massCenter, centers[ind]) > epsilon * epsilon)
-                    converged = false;
-
-                centers[ind] = massCenter;
-            }
-
-            iteration++;
-        }
-
-        pointsCp.destroy();
-
-        return new KMeansModel(centers, getDistanceMeasure());
-    }
-
-    /** Initialize cluster centers. */
-    private Vector[] initClusterCenters(SparseDistributedMatrix points, int k) {
-        // Initialize empty centers and point costs.
-        int ptsCnt = points.rowSize();
-
-        String cacheName = ((SparseDistributedMatrixStorage)points.getStorage()).cacheName();
-
-        // Initialize the first center to a random point.
-        Vector sample = localCopyOf(points.viewRow(rnd.nextInt(ptsCnt)));
-
-        List<Vector> centers = new ArrayList<>();
-        List<Vector> newCenters = new ArrayList<>();
-        newCenters.add(sample);
-        centers.add(sample);
-
-        final ConcurrentHashMap<Integer, Double> costs = new ConcurrentHashMap<>();
-
-        // On each step, sample 2 * k points on average with probability proportional
-        // to their squared distance from the centers. Note that only distances between points
-        // and new centers are computed in each iteration.
-        int step = 0;
-        UUID uid = points.getUUID();
-
-        while (step < initSteps) {
-            // We assume here that costs can fit into memory of one node.
-            ConcurrentHashMap<Integer, Double> newCosts = getNewCosts(points, newCenters, cacheName);
-
-            // Merge costs with new costs.
-            for (Integer ind : newCosts.keySet())
-                costs.merge(ind, newCosts.get(ind), Math::min);
-
-            double sumCosts = costs.values().stream().mapToDouble(Double::valueOf).sum();
-
-            newCenters = getNewCenters(k, costs, uid, sumCosts, cacheName);
-            centers.addAll(newCenters);
-
-            step++;
-        }
-
-        List<Vector> distinctCenters = centers.stream().distinct().collect(Collectors.toList());
-
-        if (distinctCenters.size() <= k)
-            return distinctCenters.toArray(new Vector[] {});
-        else {
-            // Finally, we might have a set of more than k distinct candidate centers; weight each
-            // candidate by the number of points in the dataset mapping to it and run a local k-means++
-            // on the weighted centers to pick k of them
-            ConcurrentHashMap<Integer, Integer> centerInd2Weight = weightCenters(uid, distinctCenters, cacheName);
-
-            List<Double> weights = new ArrayList<>(centerInd2Weight.size());
-
-            for (int i = 0; i < distinctCenters.size(); i++)
-                weights.add(i, Double.valueOf(centerInd2Weight.getOrDefault(i, 0)));
-
-            DenseLocalOnHeapMatrix dCenters = MatrixUtil.fromList(distinctCenters, true);
-
-            return new KMeansLocalClusterer(getDistanceMeasure(), 30, seed).cluster(dCenters, k, weights).centers();
-        }
-    }
-
-    /** */
-    private List<Vector> getNewCenters(int k, ConcurrentHashMap<Integer, Double> costs, UUID uid,
-        double sumCosts, String cacheName) {
-        return distributedFold(cacheName,
-            (IgniteBiFunction<Cache.Entry<SparseMatrixKey, Map<Integer, Double>>,
-                List<Vector>,
-                List<Vector>>)(vectorWithIndex, list) -> {
-                Integer ind = vectorWithIndex.getKey().index();
-
-                double prob = costs.get(ind) * 2.0 * k / sumCosts;
-
-                if (new Random(seed ^ ind).nextDouble() < prob)
-                    list.add(VectorUtils.fromMap(vectorWithIndex.getValue(), false));
-
-                return list;
-            },
-            key -> key.dataStructureId().equals(uid),
-            (list1, list2) -> {
-                list1.addAll(list2);
-                return list1;
-            }, ArrayList::new
-        );
-    }
-
-    /** */
-    private ConcurrentHashMap<Integer, Double> getNewCosts(SparseDistributedMatrix points, List<Vector> newCenters,
-        String cacheName) {
-        return distributedFold(cacheName,
-            (IgniteBiFunction<Cache.Entry<SparseMatrixKey, ConcurrentHashMap<Integer, Double>>,
-                ConcurrentHashMap<Integer, Double>,
-                ConcurrentHashMap<Integer, Double>>)(vectorWithIndex, map) -> {
-                for (Vector center : newCenters)
-                    map.merge(vectorWithIndex.getKey().index(), distance(vectorWithIndex.getValue(), center), Functions.MIN);
-
-                return map;
-            },
-            key -> key.dataStructureId().equals(points.getUUID()),
-            (map1, map2) -> {
-                map1.putAll(map2);
-                return map1;
-            }, ConcurrentHashMap::new);
-    }
-
-    /** */
-    private ConcurrentHashMap<Integer, Integer> weightCenters(UUID uid, List<Vector> distinctCenters,
-        String cacheName) {
-        return distributedFold(cacheName,
-            (IgniteBiFunction<Cache.Entry<SparseMatrixKey, Map<Integer, Double>>,
-                ConcurrentHashMap<Integer, Integer>,
-                ConcurrentHashMap<Integer, Integer>>)(vectorWithIndex, countMap) -> {
-                Integer resInd = -1;
-                Double resDist = Double.POSITIVE_INFINITY;
-
-                int i = 0;
-                for (Vector cent : distinctCenters) {
-                    double curDist = distance(vectorWithIndex.getValue(), cent);
-
-                    if (resDist > curDist) {
-                        resDist = curDist;
-                        resInd = i;
-                    }
-
-                    i++;
-                }
-
-                countMap.compute(resInd, (ind, v) -> v != null ? v + 1 : 1);
-                return countMap;
-            },
-            key -> key.dataStructureId().equals(uid),
-            (map1, map2) -> MapUtil.mergeMaps(map1, map2, (integer, integer2) -> integer2 + integer,
-                ConcurrentHashMap::new),
-            ConcurrentHashMap::new);
-    }
-
-    /** */
-    private double distance(Map<Integer, Double> vecMap, Vector vector) {
-        return distance(VectorUtils.fromMap(vecMap, false), vector);
-    }
-
-    /** */
-    private SumsAndCounts getSumsAndCounts(Vector[] centers, int dim, UUID uid, String cacheName) {
-        return CacheUtils.distributedFold(cacheName,
-            (IgniteBiFunction<Cache.Entry<SparseMatrixKey, Map<Integer, Double>>, SumsAndCounts, SumsAndCounts>)(entry, counts) -> {
-                Map<Integer, Double> vec = entry.getValue();
-
-                IgniteBiTuple<Integer, Double> closest = findClosest(centers, VectorUtils.fromMap(vec, false));
-                int bestCenterIdx = closest.get1();
-
-                counts.totalCost += closest.get2();
-                counts.sums.putIfAbsent(bestCenterIdx, VectorUtils.zeroes(dim));
-
-                counts.sums.compute(bestCenterIdx,
-                    (IgniteBiFunction<Integer, Vector, Vector>)(ind, v) -> v.plus(VectorUtils.fromMap(vec, false)));
-
-                counts.counts.merge(bestCenterIdx, 1,
-                    (IgniteBiFunction<Integer, Integer, Integer>)(i1, i2) -> i1 + i2);
-
-                return counts;
-            },
-            key -> key.dataStructureId().equals(uid),
-            SumsAndCounts::merge, SumsAndCounts::new
-        );
-    }
-
-    /** Service class used for statistics. */
-    private static class SumsAndCounts {
-        /** */
-        public double totalCost;
-
-        /** */
-        public ConcurrentHashMap<Integer, Vector> sums = new ConcurrentHashMap<>();
-
-        /** Count of points closest to the center with a given index. */
-        public ConcurrentHashMap<Integer, Integer> counts = new ConcurrentHashMap<>();
-
-        /** Merge current */
-        public SumsAndCounts merge(SumsAndCounts other) {
-            this.totalCost += totalCost;
-            MapUtil.mergeMaps(sums, other.sums, Vector::plus, ConcurrentHashMap::new);
-            MapUtil.mergeMaps(counts, other.counts, (i1, i2) -> i1 + i2, ConcurrentHashMap::new);
-            return this;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansLocalClusterer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansLocalClusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansLocalClusterer.java
deleted file mode 100644
index 8a50e65..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansLocalClusterer.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.clustering;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import org.apache.ignite.internal.util.GridArgumentCheck;
-import org.apache.ignite.ml.math.Matrix;
-import org.apache.ignite.ml.math.Vector;
-import org.apache.ignite.ml.math.VectorUtils;
-import org.apache.ignite.ml.math.distances.DistanceMeasure;
-import org.apache.ignite.ml.math.exceptions.ConvergenceException;
-import org.apache.ignite.ml.math.exceptions.MathIllegalArgumentException;
-import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
-
-import static org.apache.ignite.ml.math.util.MatrixUtil.localCopyOf;
-
-/**
- * Perform clusterization on local data.
- * This class is based on Apache Spark class with corresponding functionality.
- */
-public class KMeansLocalClusterer extends BaseKMeansClusterer<DenseLocalOnHeapMatrix> implements
-    WeightedClusterer<DenseLocalOnHeapMatrix, KMeansModel> {
-    /** */
-    private int maxIterations;
-
-    /** */
-    private Random rand;
-
-    /**
-     * Build a new clusterer with the given {@link DistanceMeasure}.
-     *
-     * @param measure Distance measure to use.
-     * @param maxIterations maximal number of iterations.
-     * @param seed Seed used in random parts of algorithm.
-     */
-    public KMeansLocalClusterer(DistanceMeasure measure, int maxIterations, Long seed) {
-        super(measure);
-        this.maxIterations = maxIterations;
-        rand = seed != null ? new Random(seed) : new Random();
-    }
-
-    /** {@inheritDoc} */
-    @Override public KMeansModel cluster(
-        DenseLocalOnHeapMatrix points, int k) throws MathIllegalArgumentException, ConvergenceException {
-        List<Double> ones = new ArrayList<>(Collections.nCopies(points.rowSize(), 1.0));
-        return cluster(points, k, ones);
-    }
-
-    /** {@inheritDoc} */
-    @Override public KMeansModel cluster(DenseLocalOnHeapMatrix points, int k,
-        List<Double> weights) throws MathIllegalArgumentException, ConvergenceException {
-
-        GridArgumentCheck.notNull(points, "points");
-
-        int dim = points.columnSize();
-        Vector[] centers = new Vector[k];
-
-        centers[0] = pickWeighted(points, weights);
-
-        Vector costs = points.foldRows(row -> distance(row,
-            centers[0]));
-
-        for (int i = 0; i < k; i++) {
-            double weightedSum = weightedSum(costs, weights);
-
-            double r = rand.nextDouble() * weightedSum;
-            double s = 0.0;
-            int j = 0;
-
-            while (j < points.rowSize() && s < r) {
-                s += weights.get(j) * costs.get(j);
-                j++;
-            }
-
-            if (j == 0)
-                // TODO: IGNITE-5825, Process this case more carefully
-                centers[i] = localCopyOf(points.viewRow(0));
-            else
-                centers[i] = localCopyOf(points.viewRow(j - 1));
-
-            for (int p = 0; p < points.rowSize(); p++)
-                costs.setX(p, Math.min(getDistanceMeasure().compute(localCopyOf(points.viewRow(p)), centers[i]),
-                    costs.get(p)));
-        }
-
-        int[] oldClosest = new int[points.rowSize()];
-        Arrays.fill(oldClosest, -1);
-        int iter = 0;
-        boolean moved = true;
-
-        while (moved && iter < maxIterations) {
-            moved = false;
-
-            double[] counts = new double[k];
-            Arrays.fill(counts, 0.0);
-            Vector[] sums = new Vector[k];
-
-            Arrays.fill(sums, VectorUtils.zeroes(dim));
-
-            int i = 0;
-
-            while (i < points.rowSize()) {
-                Vector p = localCopyOf(points.viewRow(i));
-
-                int ind = findClosest(centers, p).get1();
-                sums[ind] = sums[ind].plus(p.times(weights.get(i)));
-
-                counts[ind] += weights.get(i);
-                if (ind != oldClosest[i]) {
-                    moved = true;
-                    oldClosest[i] = ind;
-                }
-                i++;
-            }
-            // Update centers
-            int j = 0;
-            while (j < k) {
-                if (counts[j] == 0.0) {
-                    // Assign center to a random point
-                    centers[j] = points.viewRow(rand.nextInt(points.rowSize()));
-                }
-                else {
-                    sums[j] = sums[j].times(1.0 / counts[j]);
-                    centers[j] = sums[j];
-                }
-                j++;
-            }
-            iter++;
-        }
-
-        return new KMeansModel(centers, getDistanceMeasure());
-    }
-
-    /** Pick a random vector with a probability proportional to the corresponding weight. */
-    private Vector pickWeighted(Matrix points, List<Double> weights) {
-        double r = rand.nextDouble() * weights.stream().mapToDouble(Double::valueOf).sum();
-
-        int i = 0;
-        double curWeight = 0.0;
-
-        while (i < points.rowSize() && curWeight < r) {
-            curWeight += weights.get(i);
-            i += 1;
-        }
-
-        return localCopyOf(points.viewRow(i - 1));
-    }
-
-    /** Get a weighted sum of a vector v. */
-    private double weightedSum(Vector v, List<Double> weights) {
-        double res = 0.0;
-
-        for (int i = 0; i < v.size(); i++)
-            res += v.getX(i) * weights.get(i);
-
-        return res;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansModel.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansModel.java
deleted file mode 100644
index e1d783f..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansModel.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.clustering;
-
-import java.util.Arrays;
-import org.apache.ignite.ml.Exportable;
-import org.apache.ignite.ml.Exporter;
-import org.apache.ignite.ml.KMeansModelFormat;
-import org.apache.ignite.ml.math.Vector;
-import org.apache.ignite.ml.math.distances.DistanceMeasure;
-
-/**
- * This class encapsulates result of clusterization.
- */
-public class KMeansModel implements ClusterizationModel<Vector, Integer>, Exportable<KMeansModelFormat> {
-    /** Centers of clusters. */
-    private final Vector[] centers;
-
-    /** Distance measure. */
-    private final DistanceMeasure distance;
-
-    /**
-     * Construct KMeans model with given centers and distance measure.
-     *
-     * @param centers Centers.
-     * @param distance Distance measure.
-     */
-    public KMeansModel(Vector[] centers, DistanceMeasure distance) {
-        this.centers = centers;
-        this.distance = distance;
-    }
-
-    /** Distance measure used while clusterization */
-    public DistanceMeasure distanceMeasure() {
-        return distance;
-    }
-
-    /** Count of centers in clusterization. */
-    @Override public int clustersCount() {
-        return centers.length;
-    }
-
-    /** Get centers of clusters. */
-    @Override public Vector[] centers() {
-        return Arrays.copyOf(centers, centers.length);
-    }
-
-    /**
-     * Predict closest center index for a given vector.
-     *
-     * @param vec Vector.
-     */
-    public Integer apply(Vector vec) {
-        int res = -1;
-        double minDist = Double.POSITIVE_INFINITY;
-
-        for (int i = 0; i < centers.length; i++) {
-            double curDist = distance.compute(centers[i], vec);
-            if (curDist < minDist) {
-                minDist = curDist;
-                res = i;
-            }
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public <P> void saveModel(Exporter<KMeansModelFormat, P> exporter, P path) {
-        KMeansModelFormat mdlData = new KMeansModelFormat(centers, distance);
-
-        exporter.save(mdlData, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int res = 1;
-
-        res = res * 37 + distance.hashCode();
-        res = res * 37 + Arrays.hashCode(centers);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-
-        if (obj == null || getClass() != obj.getClass())
-            return false;
-
-        KMeansModel that = (KMeansModel)obj;
-
-        return distance.equals(that.distance) && Arrays.deepEquals(centers, that.centers);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/WeightedClusterer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/WeightedClusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/WeightedClusterer.java
deleted file mode 100644
index 1688087..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/WeightedClusterer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.clustering;
-
-import java.util.List;
-import org.apache.ignite.ml.Model;
-import org.apache.ignite.ml.math.exceptions.ConvergenceException;
-import org.apache.ignite.ml.math.exceptions.MathIllegalArgumentException;
-
-/**
- * Support of clusterization with given weights.
- */
-public interface WeightedClusterer<P, M extends Model> extends Clusterer<P, M> {
-    /**
-     * Perform clusterization of given points weighted by given weights.
-     *
-     * @param points Points.
-     * @param k count of centers.
-     * @param weights Weights.
-     */
-    public M cluster(P points, int k, List<Double> weights) throws
-        MathIllegalArgumentException, ConvergenceException;
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/Clusterer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/Clusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/Clusterer.java
new file mode 100644
index 0000000..9930f23
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/Clusterer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering.kmeans;
+
+import org.apache.ignite.ml.Model;
+
+/**
+ * Base interface for clusterers.
+ */
+public interface Clusterer<P, M extends Model> {
+    /**
+     * Cluster given points set into k clusters.
+     *
+     * @param points Points set.
+     * @param k Clusters count.
+     */
+    public M cluster(P points, int k);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/ClusterizationModel.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/ClusterizationModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/ClusterizationModel.java
new file mode 100644
index 0000000..474a463
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/ClusterizationModel.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering.kmeans;
+
+import org.apache.ignite.ml.Model;
+
+/** Base interface for all clusterization models. */
+public interface ClusterizationModel<P, V> extends Model<P, V> {
+    /** Gets the clusters count. */
+    public int amountOfClusters();
+
+    /** Get cluster centers. */
+    public P[] centers();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansModel.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansModel.java
new file mode 100644
index 0000000..c900efd
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansModel.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering.kmeans;
+
+import java.util.Arrays;
+import org.apache.ignite.ml.Exportable;
+import org.apache.ignite.ml.Exporter;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.distances.DistanceMeasure;
+
+/**
+ * This class encapsulates result of clusterization by KMeans algorithm.
+ */
+public class KMeansModel implements ClusterizationModel<Vector, Integer>, Exportable<KMeansModelFormat> {
+    /** Centers of clusters. */
+    private final Vector[] centers;
+
+    /** Distance measure. */
+    private final DistanceMeasure distanceMeasure;
+
+    /**
+     * Construct KMeans model with given centers and distanceMeasure measure.
+     *
+     * @param centers Centers.
+     * @param distanceMeasure Distance measure.
+     */
+    public KMeansModel(Vector[] centers, DistanceMeasure distanceMeasure) {
+        this.centers = centers;
+        this.distanceMeasure = distanceMeasure;
+    }
+
+    /** Distance measure. */
+    public DistanceMeasure distanceMeasure() {
+        return distanceMeasure;
+    }
+
+    /** Amount of centers in clusterization. */
+    @Override public int amountOfClusters() {
+        return centers.length;
+    }
+
+    /** Get centers of clusters. */
+    @Override public Vector[] centers() {
+        return Arrays.copyOf(centers, centers.length);
+    }
+
+    /**
+     * Predict closest center index for a given vector.
+     *
+     * @param vec Vector.
+     */
+    public Integer apply(Vector vec) {
+        int res = -1;
+        double minDist = Double.POSITIVE_INFINITY;
+
+        for (int i = 0; i < centers.length; i++) {
+            double curDist = distanceMeasure.compute(centers[i], vec);
+            if (curDist < minDist) {
+                minDist = curDist;
+                res = i;
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <P> void saveModel(Exporter<KMeansModelFormat, P> exporter, P path) {
+        KMeansModelFormat mdlData = new KMeansModelFormat(centers, distanceMeasure);
+
+        exporter.save(mdlData, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = 1;
+
+        res = res * 37 + distanceMeasure.hashCode();
+        res = res * 37 + Arrays.hashCode(centers);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+
+        if (obj == null || getClass() != obj.getClass())
+            return false;
+
+        KMeansModel that = (KMeansModel)obj;
+
+        return distanceMeasure.equals(that.distanceMeasure) && Arrays.deepEquals(centers, that.centers);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansModelFormat.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansModelFormat.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansModelFormat.java
new file mode 100644
index 0000000..2663701
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansModelFormat.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering.kmeans;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import org.apache.ignite.ml.Exportable;
+import org.apache.ignite.ml.Exporter;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.distances.DistanceMeasure;
+
+/**
+ * K-means model representation.
+ *
+ * @see Exportable
+ * @see Exporter
+ */
+public class KMeansModelFormat implements Serializable {
+    /** Centers of clusters. */
+    private final Vector[] centers;
+
+    /** Distance measure. */
+    private final DistanceMeasure distance;
+
+    /** */
+    public KMeansModelFormat(Vector[] centers, DistanceMeasure distance) {
+        this.centers = centers;
+        this.distance = distance;
+    }
+
+    /** */
+    public DistanceMeasure getDistance() {
+        return distance;
+    }
+
+    /** */
+    public Vector[] getCenters() {
+        return centers;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = 1;
+
+        res = res * 37 + distance.hashCode();
+        res = res * 37 + Arrays.hashCode(centers);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+
+        if (obj == null || getClass() != obj.getClass())
+            return false;
+
+        KMeansModelFormat that = (KMeansModelFormat)obj;
+
+        return distance.equals(that.distance) && Arrays.deepEquals(centers, that.centers);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansTrainer.java
new file mode 100644
index 0000000..f65a3fe
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/KMeansTrainer.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering.kmeans;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.dataset.primitive.context.EmptyContext;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.VectorUtils;
+import org.apache.ignite.ml.math.distances.DistanceMeasure;
+import org.apache.ignite.ml.math.distances.EuclideanDistance;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
+import org.apache.ignite.ml.math.util.MapUtil;
+import org.apache.ignite.ml.structures.LabeledDataset;
+import org.apache.ignite.ml.structures.LabeledVector;
+import org.apache.ignite.ml.structures.partition.LabeledDatasetPartitionDataBuilderOnHeap;
+import org.apache.ignite.ml.trainers.SingleLabelDatasetTrainer;
+
+/**
+ * The trainer for KMeans algorithm.
+ */
+public class KMeansTrainer implements SingleLabelDatasetTrainer<KMeansModel> {
+    /** Amount of clusters. */
+    private int k = 2;
+
+    /** Amount of iterations. */
+    private int maxIterations = 10;
+
+    /** Delta of convergence. */
+    private double epsilon = 1e-4;
+
+    /** Distance measure. */
+    private DistanceMeasure distance = new EuclideanDistance();
+
+    /** KMeans initializer. */
+    private long seed;
+
+    /**
+     * Trains model based on the specified data.
+     *
+     * @param datasetBuilder Dataset builder.
+     * @param featureExtractor Feature extractor.
+     * @param lbExtractor Label extractor.
+     * @return Model.
+     */
+    @Override public <K, V> KMeansModel fit(DatasetBuilder<K, V> datasetBuilder,
+        IgniteBiFunction<K, V, double[]> featureExtractor, IgniteBiFunction<K, V, Double> lbExtractor) {
+        assert datasetBuilder != null;
+
+        PartitionDataBuilder<K, V, EmptyContext, LabeledDataset<Double, LabeledVector>> partDataBuilder = new LabeledDatasetPartitionDataBuilderOnHeap<>(
+            featureExtractor,
+            lbExtractor
+        );
+
+        Vector[] centers;
+
+        try (Dataset<EmptyContext, LabeledDataset<Double, LabeledVector>> dataset = datasetBuilder.build(
+            (upstream, upstreamSize) -> new EmptyContext(),
+            partDataBuilder
+        )) {
+            final int cols = dataset.compute(org.apache.ignite.ml.structures.Dataset::colSize, (a, b) -> a == null ? b : a);
+            centers = initClusterCentersRandomly(dataset, k);
+
+            boolean converged = false;
+            int iteration = 0;
+
+            while (iteration < maxIterations && !converged) {
+                Vector[] newCentroids = new DenseLocalOnHeapVector[k];
+
+                TotalCostAndCounts totalRes = calcDataForNewCentroids(centers, dataset, cols);
+
+                converged = true;
+
+                for (Integer ind : totalRes.sums.keySet()) {
+                    Vector massCenter = totalRes.sums.get(ind).times(1.0 / totalRes.counts.get(ind));
+
+                    if (converged && distance.compute(massCenter, centers[ind]) > epsilon * epsilon)
+                        converged = false;
+
+                    newCentroids[ind] = massCenter;
+                }
+
+                iteration++;
+                centers = newCentroids;
+            }
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return new KMeansModel(centers, distance);
+    }
+
+    /**
+     * Prepares the data to define new centroids on current iteration.
+     *
+     * @param centers Current centers on the current iteration.
+     * @param dataset Dataset.
+     * @param cols Amount of columns.
+     * @return Helper data to calculate the new centroids.
+     */
+    private TotalCostAndCounts calcDataForNewCentroids(Vector[] centers,
+        Dataset<EmptyContext, LabeledDataset<Double, LabeledVector>> dataset, int cols) {
+        final Vector[] finalCenters = centers;
+
+        return dataset.compute(data -> {
+
+            TotalCostAndCounts res = new TotalCostAndCounts();
+
+            for (int i = 0; i < data.rowSize(); i++) {
+                final IgniteBiTuple<Integer, Double> closestCentroid = findClosestCentroid(finalCenters, data.getRow(i));
+
+                int centroidIdx = closestCentroid.get1();
+
+                data.setLabel(i, centroidIdx);
+
+                res.totalCost += closestCentroid.get2();
+                res.sums.putIfAbsent(centroidIdx, VectorUtils.zeroes(cols));
+
+                int finalI = i;
+                res.sums.compute(centroidIdx,
+                    (IgniteBiFunction<Integer, Vector, Vector>)(ind, v) -> v.plus(data.getRow(finalI).features()));
+
+                res.counts.merge(centroidIdx, 1,
+                    (IgniteBiFunction<Integer, Integer, Integer>)(i1, i2) -> i1 + i2);
+            }
+            return res;
+        }, (a, b) -> a == null ? b : a.merge(b));
+    }
+
+    /**
+     * Find the closest cluster center index and distance to it from a given point.
+     *
+     * @param centers Centers to look in.
+     * @param pnt Point.
+     */
+    private IgniteBiTuple<Integer, Double> findClosestCentroid(Vector[] centers, LabeledVector pnt) {
+        double bestDistance = Double.POSITIVE_INFINITY;
+        int bestInd = 0;
+
+        for (int i = 0; i < centers.length; i++) {
+            double dist = distance.compute(centers[i], pnt.features());
+            if (dist < bestDistance) {
+                bestDistance = dist;
+                bestInd = i;
+            }
+        }
+        return new IgniteBiTuple<>(bestInd, bestDistance);
+    }
+
+    /**
+     * K cluster centers are initialized randomly.
+     *
+     * @param dataset The dataset to pick up random centers.
+     * @param k Amount of clusters.
+     * @return K cluster centers.
+     */
+    private Vector[] initClusterCentersRandomly(Dataset<EmptyContext, LabeledDataset<Double, LabeledVector>> dataset,
+        int k) {
+
+        Vector[] initCenters = new DenseLocalOnHeapVector[k];
+
+        List<LabeledVector> rndPnts = dataset.compute(data -> {
+            List<LabeledVector> rndPnt = new ArrayList<>();
+            rndPnt.add(data.getRow(new Random(seed).nextInt(data.rowSize())));
+            return rndPnt;
+        }, (a, b) -> a == null ? b : Stream.concat(a.stream(), b.stream()).collect(Collectors.toList()));
+
+        for (int i = 0; i < k; i++) {
+            final LabeledVector rndPnt = rndPnts.get(new Random(seed).nextInt(rndPnts.size()));
+            rndPnts.remove(rndPnt);
+            initCenters[i] = rndPnt.features();
+        }
+
+        return initCenters;
+    }
+
+    /** Service class used for statistics. */
+    private static class TotalCostAndCounts {
+        /** */
+        double totalCost;
+
+        /** */
+        ConcurrentHashMap<Integer, Vector> sums = new ConcurrentHashMap<>();
+
+        /** Count of points closest to the center with a given index. */
+        ConcurrentHashMap<Integer, Integer> counts = new ConcurrentHashMap<>();
+
+        /** Merge current */
+        TotalCostAndCounts merge(TotalCostAndCounts other) {
+            this.totalCost += totalCost;
+            this.sums = MapUtil.mergeMaps(sums, other.sums, Vector::plus, ConcurrentHashMap::new);
+            this.counts = MapUtil.mergeMaps(counts, other.counts, (i1, i2) -> i1 + i2, ConcurrentHashMap::new);
+            return this;
+        }
+    }
+
+    /**
+     * Gets the amount of clusters.
+     *
+     * @return The parameter value.
+     */
+    public int getK() {
+        return k;
+    }
+
+    /**
+     * Set up the amount of clusters.
+     *
+     * @param k The parameter value.
+     * @return Model with new amount of clusters parameter value.
+     */
+    public KMeansTrainer withK(int k) {
+        this.k = k;
+        return this;
+    }
+
+    /**
+     * Gets the max number of iterations before convergence.
+     *
+     * @return The parameter value.
+     */
+    public int getMaxIterations() {
+        return maxIterations;
+    }
+
+    /**
+     * Set up the max number of iterations before convergence.
+     *
+     * @param maxIterations The parameter value.
+     * @return Model with new max number of iterations before convergence parameter value.
+     */
+    public KMeansTrainer withMaxIterations(int maxIterations) {
+        this.maxIterations = maxIterations;
+        return this;
+    }
+
+    /**
+     * Gets the epsilon.
+     *
+     * @return The parameter value.
+     */
+    public double getEpsilon() {
+        return epsilon;
+    }
+
+    /**
+     * Set up the epsilon.
+     *
+     * @param epsilon The parameter value.
+     * @return Model with new epsilon parameter value.
+     */
+    public KMeansTrainer withEpsilon(double epsilon) {
+        this.epsilon = epsilon;
+        return this;
+    }
+
+    /**
+     * Gets the distance.
+     *
+     * @return The parameter value.
+     */
+    public DistanceMeasure getDistance() {
+        return distance;
+    }
+
+    /**
+     * Set up the distance.
+     *
+     * @param distance The parameter value.
+     * @return Model with new distance parameter value.
+     */
+    public KMeansTrainer withDistance(DistanceMeasure distance) {
+        this.distance = distance;
+        return this;
+    }
+
+    /**
+     * Gets the seed number.
+     *
+     * @return The parameter value.
+     */
+    public long getSeed() {
+        return seed;
+    }
+
+    /**
+     * Set up the seed.
+     *
+     * @param seed The parameter value.
+     * @return Model with new seed parameter value.
+     */
+    public KMeansTrainer withSeed(long seed) {
+        this.seed = seed;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/package-info.java
new file mode 100644
index 0000000..4d27b6e
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/kmeans/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains kMeans clustering algorithm.
+ */
+package org.apache.ignite.ml.clustering.kmeans;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/structures/preprocessing/LabellingMachine.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/structures/preprocessing/LabellingMachine.java b/modules/ml/src/main/java/org/apache/ignite/ml/structures/preprocessing/LabellingMachine.java
deleted file mode 100644
index e8d88ad..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/structures/preprocessing/LabellingMachine.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.structures.preprocessing;
-
-import org.apache.ignite.ml.Model;
-import org.apache.ignite.ml.structures.LabeledDataset;
-
-/** Data pre-processing step which assigns labels to all observations according model. */
-public class LabellingMachine {
-    /**
-     * Set labels to each observation according passed model.
-     * <p>
-     * NOTE: In-place operation.
-     * </p>
-     * @param ds The given labeled dataset.
-     * @param mdl The given model.
-     * @return Dataset with predicted labels.
-     */
-    public static LabeledDataset assignLabels(LabeledDataset ds, Model mdl) {
-        for (int i = 0; i < ds.rowSize(); i++) {
-            double predictedCls = (double) mdl.apply(ds.getRow(i).features());
-            ds.setLabel(i, predictedCls);
-        }
-        return ds;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/main/java/org/apache/ignite/ml/structures/preprocessing/Normalizer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/structures/preprocessing/Normalizer.java b/modules/ml/src/main/java/org/apache/ignite/ml/structures/preprocessing/Normalizer.java
deleted file mode 100644
index 161ec35..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/structures/preprocessing/Normalizer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.structures.preprocessing;
-
-import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
-import org.apache.ignite.ml.structures.Dataset;
-import org.apache.ignite.ml.structures.DatasetRow;
-
-/** Data pre-processing step which scales features according normalization algorithms. */
-public class Normalizer {
-    /**
-     * Scales features in dataset with MiniMax algorithm x'=(x-MIN[X])/(MAX[X]-MIN[X]). This is an in-place operation.
-     * <p>
-     * NOTE: Complexity 2*N^2.
-     * </p>
-     * @param ds The given dataset.
-     * @return Transformed dataset.
-     */
-    public static Dataset normalizeWithMiniMax(Dataset ds) {
-        int colSize = ds.colSize();
-        double[] mins = new double[colSize];
-        double[] maxs = new double[colSize];
-
-        int rowSize = ds.rowSize();
-        DatasetRow[] data = ds.data();
-        for (int j = 0; j < colSize; j++) {
-            double maxInCurrCol = Double.MIN_VALUE;
-            double minInCurrCol = Double.MAX_VALUE;
-
-            for (int i = 0; i < rowSize; i++) {
-                double e = data[i].features().get(j);
-                maxInCurrCol = Math.max(e, maxInCurrCol);
-                minInCurrCol = Math.min(e, minInCurrCol);
-            }
-
-            mins[j] = minInCurrCol;
-            maxs[j] = maxInCurrCol;
-        }
-
-        for (int j = 0; j < colSize; j++) {
-            double div = maxs[j] - mins[j];
-            if(div == 0)
-                continue;
-
-            for (int i = 0; i < rowSize; i++) {
-                double oldVal = data[i].features().get(j);
-                double newVal = (oldVal - mins[j]) / div;
-                // x'=(x-MIN[X])/(MAX[X]-MIN[X])
-                data[i].features().set(j, newVal);
-            }
-        }
-
-        return ds;
-    }
-
-    /**
-     * Scales features in dataset with Z-Normalization algorithm x'=(x-M[X])/\sigma [X]. This is an in-place operation.
-     *
-     * @param ds The given dataset.
-     * @return Transformed dataset.
-     */
-    public static Dataset normalizeWithZNormalization(Dataset ds) {
-        throw new UnsupportedOperationException("Z-normalization is not supported yet");
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/test/java/org/apache/ignite/ml/LocalModelsTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/LocalModelsTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/LocalModelsTest.java
index 3f12bdc..353cc22 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/LocalModelsTest.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/LocalModelsTest.java
@@ -20,14 +20,18 @@ package org.apache.ignite.ml;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.function.Function;
-import org.apache.ignite.ml.clustering.KMeansLocalClusterer;
-import org.apache.ignite.ml.clustering.KMeansModel;
+import org.apache.ignite.ml.clustering.kmeans.KMeansModel;
+import org.apache.ignite.ml.clustering.kmeans.KMeansModelFormat;
+import org.apache.ignite.ml.clustering.kmeans.KMeansTrainer;
+import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder;
 import org.apache.ignite.ml.knn.classification.KNNClassificationModel;
 import org.apache.ignite.ml.knn.classification.KNNModelFormat;
 import org.apache.ignite.ml.knn.classification.KNNStrategy;
 import org.apache.ignite.ml.math.distances.EuclideanDistance;
-import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
 import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
 import org.apache.ignite.ml.regressions.linear.LinearRegressionModel;
 import org.apache.ignite.ml.svm.SVMLinearBinaryClassificationModel;
@@ -140,14 +144,20 @@ public class LocalModelsTest {
 
     /** */
     private KMeansModel getClusterModel() {
-        KMeansLocalClusterer clusterer = new KMeansLocalClusterer(new EuclideanDistance(), 1, 1L);
+        Map<Integer, double[]> data = new HashMap<>();
+        data.put(0, new double[] {1.0, 1959, 325100});
+        data.put(1, new double[] {1.0, 1960, 373200});
 
-        double[] v1 = new double[] {1959, 325100};
-        double[] v2 = new double[] {1960, 373200};
+        KMeansTrainer trainer = new KMeansTrainer()
+            .withK(1);
 
-        DenseLocalOnHeapMatrix points = new DenseLocalOnHeapMatrix(new double[][] {v1, v2});
+        KMeansModel knnMdl = trainer.fit(
+            new LocalDatasetBuilder<>(data, 2),
+            (k, v) -> Arrays.copyOfRange(v, 0, v.length - 1),
+            (k, v) -> v[2]
+        );
 
-        return clusterer.cluster(points, 1);
+        return knnMdl;
     }
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java
index 85f61fa..80538a0 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java
@@ -25,11 +25,8 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    KMeansDistributedClustererTestSingleNode.class,
-    KMeansDistributedClustererTestMultiNode.class,
-    KMeansLocalClustererTest.class,
-    FuzzyCMeansDistributedClustererTest.class,
-    FuzzyCMeansLocalClustererTest.class
+    KMeansTrainerTest.class,
+    KMeansModelTest.class
 })
 public class ClusteringTestSuite {
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/test/java/org/apache/ignite/ml/clustering/FuzzyCMeansDistributedClustererTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/FuzzyCMeansDistributedClustererTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/FuzzyCMeansDistributedClustererTest.java
deleted file mode 100644
index 4b415bb..0000000
--- a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/FuzzyCMeansDistributedClustererTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.clustering;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Random;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.ml.math.StorageConstants;
-import org.apache.ignite.ml.math.Vector;
-import org.apache.ignite.ml.math.distances.DistanceMeasure;
-import org.apache.ignite.ml.math.distances.EuclideanDistance;
-import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
-import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/** Tests that checks distributed Fuzzy C-Means clusterer. */
-public class FuzzyCMeansDistributedClustererTest extends GridCommonAbstractTest {
-    /** Number of nodes in grid. */
-    private static final int NODE_COUNT = 3;
-
-    /** Grid instance. */
-    private Ignite ignite;
-
-    /** Default constructor. */
-    public FuzzyCMeansDistributedClustererTest() {
-        super(false);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        ignite = grid(NODE_COUNT);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        for (int i = 1; i <= NODE_COUNT; i++)
-            startGrid(i);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
-    /** Test that algorithm gives correct results on a small sample - 4 centers on the plane. */
-    public void testTwoDimensionsLittleData() {
-        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
-
-        FuzzyCMeansDistributedClusterer clusterer = new FuzzyCMeansDistributedClusterer(new EuclideanDistance(),
-                2, BaseFuzzyCMeansClusterer.StopCondition.STABLE_MEMBERSHIPS,
-                0.01, 500, null, 2, 50);
-
-        double[][] points = new double[][]{{-10, -10}, {-9, -11}, {-10, -9}, {-11, -9},
-                {10, 10},   {9, 11},   {10, 9},   {11, 9},
-                {-10, 10},  {-9, 11},  {-10, 9},  {-11, 9},
-                {10, -10},  {9, -11},  {10, -9},  {11, -9}};
-
-        SparseDistributedMatrix pntMatrix = new SparseDistributedMatrix(16, 2,
-                StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE);
-        for (int i = 0; i < 16; i++)
-            pntMatrix.setRow(i, points[i]);
-
-        FuzzyCMeansModel mdl = clusterer.cluster(pntMatrix, 4);
-
-        Vector[] centers = mdl.centers();
-        Arrays.sort(centers, Comparator.comparing(vector -> Math.atan2(vector.get(1), vector.get(0))));
-
-        DistanceMeasure measure = mdl.distanceMeasure();
-
-        assertEquals(0, measure.compute(centers[0], new DenseLocalOnHeapVector(new double[]{-10, -10})), 1);
-        assertEquals(0, measure.compute(centers[1], new DenseLocalOnHeapVector(new double[]{10, -10})), 1);
-        assertEquals(0, measure.compute(centers[2], new DenseLocalOnHeapVector(new double[]{10, 10})), 1);
-        assertEquals(0, measure.compute(centers[3], new DenseLocalOnHeapVector(new double[]{-10, 10})), 1);
-
-        pntMatrix.destroy();
-    }
-
-    /** Perform N tests each of which contains M random points placed around K centers on the plane. */
-    public void testTwoDimensionsRandomlyPlacedPointsAndCenters() {
-        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
-
-        final int numOfTests = 5;
-
-        final double exponentialWeight = 2.0;
-        final double maxCentersDelta = 0.01;
-        final int maxIterations = 500;
-        final Long seed = 1L;
-
-        DistanceMeasure measure = new EuclideanDistance();
-        FuzzyCMeansDistributedClusterer distributedClusterer = new FuzzyCMeansDistributedClusterer(measure,
-                exponentialWeight, BaseFuzzyCMeansClusterer.StopCondition.STABLE_CENTERS,
-                maxCentersDelta, maxIterations, seed, 2, 50);
-
-        for (int i = 0; i < numOfTests; i++)
-            performRandomTest(distributedClusterer, i);
-    }
-
-    /**
-     * Test given clusterer on points placed randomly around vertexes of a regular polygon.
-     *
-     * @param distributedClusterer Tested clusterer.
-     * @param seed Seed for the random numbers generator.
-     */
-    private void performRandomTest(FuzzyCMeansDistributedClusterer distributedClusterer, long seed) {
-        final int minNumCenters = 2;
-        final int maxNumCenters = 5;
-        final double maxRadius = 1000;
-        final int maxPoints = 1000;
-        final int minPoints = 300;
-
-        Random random = new Random(seed);
-
-        int numCenters = random.nextInt(maxNumCenters - minNumCenters) + minNumCenters;
-
-        double[][] centers = new double[numCenters][2];
-
-        for (int i = 0; i < numCenters; i++) {
-            double angle = Math.PI * 2.0 * i / numCenters;
-
-            centers[i][0] = Math.cos(angle) * maxRadius;
-            centers[i][1] = Math.sin(angle) * maxRadius;
-        }
-
-        int numPoints = minPoints + random.nextInt(maxPoints - minPoints);
-
-        double[][] points = new double[numPoints][2];
-
-        for (int i = 0; i < numPoints; i++) {
-            int center = random.nextInt(numCenters);
-            double randomDouble = random.nextDouble();
-            double radius = randomDouble * randomDouble * maxRadius / 10;
-            double angle = random.nextDouble() * Math.PI * 2.0;
-
-            points[i][0] = centers[center][0] + Math.cos(angle) * radius;
-            points[i][1] = centers[center][1] + Math.sin(angle) * radius;
-        }
-
-        SparseDistributedMatrix pntMatrix = new SparseDistributedMatrix(numPoints, 2,
-                StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE);
-
-        for (int i = 0; i < numPoints; i++)
-            pntMatrix.setRow(i, points[i]);
-
-        FuzzyCMeansModel mdl = distributedClusterer.cluster(pntMatrix, numCenters);
-        Vector[] computedCenters = mdl.centers();
-        DistanceMeasure measure = mdl.distanceMeasure();
-
-        int cntr = numCenters;
-
-        for (int i = 0; i < numCenters; i++) {
-            for (int j = 0; j < numCenters; j++) {
-                if (measure.compute(computedCenters[i], new DenseLocalOnHeapVector(centers[j])) < 100) {
-                    cntr--;
-                    break;
-                }
-            }
-        }
-
-        assertEquals(0, cntr);
-
-        pntMatrix.destroy();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/test/java/org/apache/ignite/ml/clustering/FuzzyCMeansLocalClustererTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/FuzzyCMeansLocalClustererTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/FuzzyCMeansLocalClustererTest.java
deleted file mode 100644
index 4fe1eee..0000000
--- a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/FuzzyCMeansLocalClustererTest.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.clustering;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import org.apache.ignite.ml.math.Vector;
-import org.apache.ignite.ml.math.distances.DistanceMeasure;
-import org.apache.ignite.ml.math.distances.EuclideanDistance;
-import org.apache.ignite.ml.math.exceptions.MathIllegalArgumentException;
-import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
-import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/** Tests that checks local Fuzzy C-Means clusterer. */
-public class FuzzyCMeansLocalClustererTest {
-    /** Test FCM on points that forms three clusters on the line. */
-    @Test
-    public void equalWeightsOneDimension() {
-        FuzzyCMeansLocalClusterer clusterer = new FuzzyCMeansLocalClusterer(new EuclideanDistance(),
-                2, BaseFuzzyCMeansClusterer.StopCondition.STABLE_CENTERS,
-                0.01, 10, null);
-
-        double[][] points = new double[][]{{-10}, {-9}, {-8}, {-7},
-                                           {7},   {8},  {9},  {10},
-                                           {-1},  {0},  {1}};
-
-        DenseLocalOnHeapMatrix pntMatrix = new DenseLocalOnHeapMatrix(points);
-
-        FuzzyCMeansModel mdl = clusterer.cluster(pntMatrix, 3);
-
-        Vector[] centers = mdl.centers();
-        Arrays.sort(centers, Comparator.comparing(vector -> vector.getX(0)));
-        assertEquals(-8.5, centers[0].getX(0), 2);
-        assertEquals(0, centers[1].getX(0), 2);
-        assertEquals(8.5, centers[2].getX(0), 2);
-    }
-
-    /** Test FCM on points that forms four clusters on the plane. */
-    @Test
-    public void equalWeightsTwoDimensions() {
-        FuzzyCMeansLocalClusterer clusterer = new FuzzyCMeansLocalClusterer(new EuclideanDistance(),
-                2, BaseFuzzyCMeansClusterer.StopCondition.STABLE_CENTERS,
-                0.01, 20, null);
-
-        double[][] points = new double[][]{{-10, -10}, {-9, -11}, {-10, -9}, {-11, -9},
-                                           {10, 10},   {9, 11},   {10, 9},   {11, 9},
-                                           {-10, 10},  {-9, 11},  {-10, 9},  {-11, 9},
-                                           {10, -10},  {9, -11},  {10, -9},  {11, -9}};
-
-        DenseLocalOnHeapMatrix pntMatrix = new DenseLocalOnHeapMatrix(points);
-
-        FuzzyCMeansModel mdl = clusterer.cluster(pntMatrix, 4);
-        Vector[] centers = mdl.centers();
-        Arrays.sort(centers, Comparator.comparing(vector -> Math.atan2(vector.get(1), vector.get(0))));
-
-        DistanceMeasure measure = mdl.distanceMeasure();
-
-        assertEquals(0, measure.compute(centers[0], new DenseLocalOnHeapVector(new double[]{-10, -10})), 1);
-        assertEquals(0, measure.compute(centers[1], new DenseLocalOnHeapVector(new double[]{10, -10})), 1);
-        assertEquals(0, measure.compute(centers[2], new DenseLocalOnHeapVector(new double[]{10, 10})), 1);
-        assertEquals(0, measure.compute(centers[3], new DenseLocalOnHeapVector(new double[]{-10, 10})), 1);
-    }
-
-    /** Test FCM on points which have the equal coordinates. */
-    @Test
-    public void checkCentersOfTheSamePointsTwoDimensions() {
-        FuzzyCMeansLocalClusterer clusterer = new FuzzyCMeansLocalClusterer(new EuclideanDistance(),
-                2, BaseFuzzyCMeansClusterer.StopCondition.STABLE_MEMBERSHIPS, 0.01, 10, null);
-
-        double[][] points = new double[][] {{3.3, 10}, {3.3, 10}, {3.3, 10}, {3.3, 10}, {3.3, 10}};
-
-        DenseLocalOnHeapMatrix pntMatrix = new DenseLocalOnHeapMatrix(points);
-
-        int k = 2;
-        FuzzyCMeansModel mdl = clusterer.cluster(pntMatrix, k);
-        Vector exp = new DenseLocalOnHeapVector(new double[] {3.3, 10});
-        for (int i = 0; i < k; i++) {
-            Vector center = mdl.centers()[i];
-
-            for (int j = 0; j < 2; j++)
-                assertEquals(exp.getX(j), center.getX(j), 1);
-        }
-    }
-
-    /** Test FCM on points located on the circle. */
-    @Test
-    public void checkCentersLocationOnSphere() {
-        FuzzyCMeansLocalClusterer clusterer = new FuzzyCMeansLocalClusterer(new EuclideanDistance(),
-                2, BaseFuzzyCMeansClusterer.StopCondition.STABLE_CENTERS, 0.01, 100, null);
-
-        int numOfPoints = 650;
-        double radius = 100.0;
-        double[][] points = new double [numOfPoints][2];
-
-        for (int i = 0; i < numOfPoints; i++) {
-            points[i][0] = Math.cos(Math.PI * 2 * i / numOfPoints) * radius;
-            points[i][1] = Math.sin(Math.PI * 2 * i / numOfPoints) * radius;
-        }
-
-        DenseLocalOnHeapMatrix pntMatrix = new DenseLocalOnHeapMatrix(points);
-
-        int k = 10;
-        FuzzyCMeansModel mdl = clusterer.cluster(pntMatrix, k);
-
-        Vector sum = mdl.centers()[0];
-        for (int i = 1; i < k; i++)
-            sum = sum.plus(mdl.centers()[i]);
-
-        assertEquals(0, sum.kNorm(1), 1);
-    }
-
-    /** Test FCM on points that forms the line located on the plane. */
-    @Test
-    public void test2DLineClustering() {
-        FuzzyCMeansLocalClusterer clusterer = new FuzzyCMeansLocalClusterer(new EuclideanDistance(),
-                2, BaseFuzzyCMeansClusterer.StopCondition.STABLE_CENTERS, 0.01, 50, null);
-
-        double[][] points = new double[][]{{1, 2}, {3, 6}, {5, 10}};
-
-        DenseLocalOnHeapMatrix pntMatrix = new DenseLocalOnHeapMatrix(points);
-
-        int k = 2;
-        FuzzyCMeansModel mdl = clusterer.cluster(pntMatrix, k);
-        Vector[] centers = mdl.centers();
-        Arrays.sort(centers, Comparator.comparing(vector -> vector.getX(0)));
-
-        Vector[] exp = {new DenseLocalOnHeapVector(new double[]{1.5, 3}),
-                        new DenseLocalOnHeapVector(new double[]{4.5, 9})};
-
-        for (int i = 0; i < k; i++) {
-            Vector center = centers[i];
-
-            for (int j = 0; j < 2; j++)
-                assertEquals(exp[i].getX(j), center.getX(j), 0.5);
-        }
-    }
-
-    /** Test FCM on points that have different weights. */
-    @Test
-    public void differentWeightsOneDimension() {
-        FuzzyCMeansLocalClusterer clusterer = new FuzzyCMeansLocalClusterer(new EuclideanDistance(),
-                2, BaseFuzzyCMeansClusterer.StopCondition.STABLE_CENTERS,
-                0.01, 10, null);
-
-        double[][] points = new double[][]{{1}, {2}, {3}, {4}, {5}, {6}};
-
-        DenseLocalOnHeapMatrix pntMatrix = new DenseLocalOnHeapMatrix(points);
-        ArrayList<Double> weights = new ArrayList<>();
-        Collections.addAll(weights, 3.0, 2.0, 1.0, 1.0, 1.0, 1.0);
-
-        Vector[] centers1 = clusterer.cluster(pntMatrix, 2).centers();
-        Vector[] centers2 = clusterer.cluster(pntMatrix, 2, weights).centers();
-        Arrays.sort(centers1, Comparator.comparing(vector -> vector.getX(0)));
-        Arrays.sort(centers2, Comparator.comparing(vector -> vector.getX(0)));
-
-        assertTrue(centers1[0].get(0) - centers2[0].get(0) > 0.5);
-    }
-
-    /** Test FCM on illegal number of clusters. */
-    @Test(expected = MathIllegalArgumentException.class)
-    public void testIllegalNumberOfClusters() {
-        FuzzyCMeansLocalClusterer clusterer = new FuzzyCMeansLocalClusterer(new EuclideanDistance(),
-                2, BaseFuzzyCMeansClusterer.StopCondition.STABLE_CENTERS, 0.01, 10, null);
-        double[][] points = new double[][]{{1}, {2}, {3}, {4}};
-
-        clusterer.cluster(new DenseLocalOnHeapMatrix(points), 1);
-    }
-
-    /** Test FCM on different numbers of points and weights. */
-    @Test(expected = MathIllegalArgumentException.class)
-    public void testDifferentAmountsOfPointsAndWeights(){
-        FuzzyCMeansLocalClusterer clusterer = new FuzzyCMeansLocalClusterer(new EuclideanDistance(),
-                2, BaseFuzzyCMeansClusterer.StopCondition.STABLE_CENTERS, 0.01, 10, null);
-        double[][] points = new double[][]{{1}, {2}, {3}, {4}};
-
-        ArrayList<Double> weights = new ArrayList<>();
-        Collections.addAll(weights, 1.0, 34.0, 2.5, 5.0, 0.5);
-
-        clusterer.cluster(new DenseLocalOnHeapMatrix(points), 2, weights);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9e21cec0/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java
deleted file mode 100644
index 71be8be..0000000
--- a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.clustering;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.ml.math.StorageConstants;
-import org.apache.ignite.ml.math.Vector;
-import org.apache.ignite.ml.math.distances.EuclideanDistance;
-import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
-import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * This test is made to make sure that K-Means distributed clustering does not crash on distributed environment.
- * In {@link KMeansDistributedClustererTestSingleNode} we check logic of clustering (checks for clusters structures).
- * In this class we just check that clusterer does not crash. There are two separate tests because we cannot
- * guarantee order in which nodes return results of intermediate computations and therefore algorithm can return
- * different results.
- */
-public class KMeansDistributedClustererTestMultiNode extends GridCommonAbstractTest {
-    /** Number of nodes in grid. */
-    private static final int NODE_COUNT = 3;
-
-    /** Grid instance. */
-    private Ignite ignite;
-
-    /**
-     * Default constructor.
-     */
-    public KMeansDistributedClustererTestMultiNode() {
-        super(false);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override protected void beforeTest() throws Exception {
-        ignite = grid(NODE_COUNT);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        for (int i = 1; i <= NODE_COUNT; i++)
-            startGrid(i);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
-    /** */
-    public void testPerformClusterAnalysisDegenerate() {
-        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
-
-        KMeansDistributedClusterer clusterer = new KMeansDistributedClusterer(new EuclideanDistance(), 1, 1, 1L);
-
-        double[] v1 = new double[] {1959, 325100};
-        double[] v2 = new double[] {1960, 373200};
-
-        SparseDistributedMatrix points = new SparseDistributedMatrix(2, 2, StorageConstants.ROW_STORAGE_MODE,
-            StorageConstants.RANDOM_ACCESS_MODE);
-
-        points.setRow(0, v1);
-        points.setRow(1, v2);
-
-        clusterer.cluster(points, 1);
-
-        points.destroy();
-    }
-
-    /** */
-    public void testClusterizationOnDatasetWithObviousStructure() throws IOException {
-        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
-
-        int ptsCnt = 10000;
-        int squareSideLen = 10000;
-
-        Random rnd = new Random(123456L);
-
-        // Let centers be in the vertices of square.
-        Map<Integer, Vector> centers = new HashMap<>();
-        centers.put(100, new DenseLocalOnHeapVector(new double[] {0.0, 0.0}));
-        centers.put(900, new DenseLocalOnHeapVector(new double[] {squareSideLen, 0.0}));
-        centers.put(3000, new DenseLocalOnHeapVector(new double[] {0.0, squareSideLen}));
-        centers.put(6000, new DenseLocalOnHeapVector(new double[] {squareSideLen, squareSideLen}));
-
-        SparseDistributedMatrix points = new SparseDistributedMatrix(ptsCnt, 2, StorageConstants.ROW_STORAGE_MODE,
-            StorageConstants.RANDOM_ACCESS_MODE);
-
-        List<Integer> permutation = IntStream.range(0, ptsCnt).boxed().collect(Collectors.toList());
-        Collections.shuffle(permutation, rnd);
-
-        int totalCnt = 0;
-
-        for (Integer count : centers.keySet()) {
-            for (int i = 0; i < count; i++) {
-                Vector pnt = new DenseLocalOnHeapVector(2).assign(centers.get(count));
-                // Perturbate point on random value.
-                pnt.map(val -> val + rnd.nextDouble() * squareSideLen / 100);
-                points.assignRow(permutation.get(totalCnt), pnt);
-                totalCnt++;
-            }
-        }
-
-        EuclideanDistance dist = new EuclideanDistance();
-
-        KMeansDistributedClusterer clusterer = new KMeansDistributedClusterer(dist, 3, 100, 1L);
-
-        clusterer.cluster(points, 4);
-
-        points.destroy();
-    }
-}


Mime
View raw message