ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ch...@apache.org
Subject [2/2] ignite git commit: IGNITE-8663: Add Normalization Preprocessing support
Date Wed, 06 Jun 2018 12:29:09 GMT
IGNITE-8663: Add Normalization Preprocessing support

this closes #4117


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae7357ba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae7357ba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae7357ba

Branch: refs/heads/master
Commit: ae7357ba24518795873174e8c914fc4bdab39986
Parents: 34907a7
Author: zaleslaw <zaleslaw.sin@gmail.com>
Authored: Wed Jun 6 15:28:53 2018 +0300
Committer: Yury Babak <ybabak@gridgain.com>
Committed: Wed Jun 6 15:28:53 2018 +0300

----------------------------------------------------------------------
 .../ml/preprocessing/BinarizationExample.java   |   6 -
 .../ml/preprocessing/ImputingExample.java       |   8 +-
 .../ImputingExampleWithMostFrequentValues.java  |  10 +-
 .../ml/preprocessing/MinMaxScalerExample.java   | 101 +++++++
 .../ml/preprocessing/NormalizationExample.java  |  19 +-
 ...ssionLSQRTrainerWithMinMaxScalerExample.java | 177 ++++++++++++
 ...sionLSQRTrainerWithNormalizationExample.java | 177 ------------
 ...gressionMultiClassClassificationExample.java |  10 +-
 .../SVMMultiClassClassificationExample.java     |  10 +-
 .../binarization/BinarizationTrainer.java       |   5 -
 .../imputer/ImputerPartitionData.java           | 111 --------
 .../imputer/ImputerPreprocessor.java            |  67 -----
 .../preprocessing/imputer/ImputerTrainer.java   | 281 -------------------
 .../preprocessing/imputer/ImputingStrategy.java |  27 --
 .../ml/preprocessing/imputer/package-info.java  |  22 --
 .../imputing/ImputerPartitionData.java          | 111 ++++++++
 .../imputing/ImputerPreprocessor.java           |  67 +++++
 .../preprocessing/imputing/ImputerTrainer.java  | 281 +++++++++++++++++++
 .../imputing/ImputingStrategy.java              |  27 ++
 .../ml/preprocessing/imputing/package-info.java |  22 ++
 .../MinMaxScalerPartitionData.java              |  58 ++++
 .../minmaxscaling/MinMaxScalerPreprocessor.java |  88 ++++++
 .../minmaxscaling/MinMaxScalerTrainer.java      | 105 +++++++
 .../minmaxscaling/package-info.java             |  22 ++
 .../NormalizationPartitionData.java             |  58 ----
 .../NormalizationPreprocessor.java              |  59 ++--
 .../normalization/NormalizationTrainer.java     |  92 ++----
 .../normalization/package-info.java             |   4 +-
 .../preprocessing/PreprocessingTestSuite.java   |  11 +-
 .../imputing/ImputerPreprocessorTest.java       |   2 -
 .../imputing/ImputerTrainerTest.java            |   3 -
 .../MinMaxScalerPreprocessorTest.java           |  54 ++++
 .../minmaxscaling/MinMaxScalerTrainerTest.java  |  76 +++++
 .../NormalizationPreprocessorTest.java          |  24 +-
 .../normalization/NormalizationTrainerTest.java |  19 +-
 35 files changed, 1298 insertions(+), 916 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java
index edf4fd7..a8f2fa0 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java
@@ -31,10 +31,6 @@ import org.apache.ignite.ml.preprocessing.binarization.BinarizationTrainer;
 
 /**
  * Example that shows how to use binarization preprocessor to binarize data.
- *
- * Machine learning preprocessors are built as a chain. Most often a first preprocessor is a feature extractor as shown
- * in this example. The second preprocessor here is a normalization preprocessor which is built on top of the feature
- * extractor and represents a chain of itself and the underlying feature extractor.
  */
 public class BinarizationExample {
     /** Run example. */
@@ -54,8 +50,6 @@ public class BinarizationExample {
                 .withThreshold(40)
                 .fit(ignite, persons, featureExtractor);
 
-
-
             // Creates a cache based simple dataset containing features and providing standard dataset API.
             try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset(ignite, persons, preprocessor)) {
                 // Calculation of the mean value. This calculation will be performed in map-reduce manner.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java
index e0c0d86..f873736 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java
@@ -27,14 +27,10 @@ import org.apache.ignite.examples.ml.dataset.model.Person;
 import org.apache.ignite.ml.dataset.DatasetFactory;
 import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.preprocessing.imputer.ImputerTrainer;
+import org.apache.ignite.ml.preprocessing.imputing.ImputerTrainer;
 
 /**
- * Example that shows how to use binarization preprocessor to binarize data.
- *
- * Machine learning preprocessors are built as a chain. Most often a first preprocessor is a feature extractor as shown
- * in this example. The second preprocessor here is a normalization preprocessor which is built on top of the feature
- * extractor and represents a chain of itself and the underlying feature extractor.
+ * Example that shows how to use Imputing preprocessor to impute the missing value in the given data.
  */
 public class ImputingExample {
     /** Run example. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExampleWithMostFrequentValues.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExampleWithMostFrequentValues.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExampleWithMostFrequentValues.java
index d25f6d0..2611c46 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExampleWithMostFrequentValues.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExampleWithMostFrequentValues.java
@@ -27,15 +27,11 @@ import org.apache.ignite.examples.ml.dataset.model.Person;
 import org.apache.ignite.ml.dataset.DatasetFactory;
 import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.preprocessing.imputer.ImputerTrainer;
-import org.apache.ignite.ml.preprocessing.imputer.ImputingStrategy;
+import org.apache.ignite.ml.preprocessing.imputing.ImputerTrainer;
+import org.apache.ignite.ml.preprocessing.imputing.ImputingStrategy;
 
 /**
- * Example that shows how to use binarization preprocessor to binarize data.
- *
- * Machine learning preprocessors are built as a chain. Most often a first preprocessor is a feature extractor as shown
- * in this example. The second preprocessor here is a normalization preprocessor which is built on top of the feature
- * extractor and represents a chain of itself and the underlying feature extractor.
+ * Example that shows how to use Imputing preprocessor to impute the missing values in the given data.
  */
 public class ImputingExampleWithMostFrequentValues {
     /** Run example. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/MinMaxScalerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/MinMaxScalerExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/MinMaxScalerExample.java
new file mode 100644
index 0000000..e60b72b
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/MinMaxScalerExample.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.preprocessing;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.examples.ml.dataset.model.Person;
+import org.apache.ignite.ml.dataset.DatasetFactory;
+import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.preprocessing.minmaxscaling.MinMaxScalerTrainer;
+
+import java.util.Arrays;
+
+/**
+ * Example that shows how to use MinMaxScaler preprocessor to scale the given data.
+ *
+ * Machine learning preprocessors are built as a chain. Most often a first preprocessor is a feature extractor as shown
+ * in this example. The second preprocessor here is a MinMaxScaler preprocessor which is built on top of the feature
+ * extractor and represents a chain of itself and the underlying feature extractor.
+ */
+public class MinMaxScalerExample {
+    /** Run example. */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println(">>> Normalization example started.");
+
+            IgniteCache<Integer, Person> persons = createCache(ignite);
+
+            // Defines first preprocessor that extracts features from an upstream data.
+            IgniteBiFunction<Integer, Person, double[]> featureExtractor = (k, v) -> new double[] {
+                v.getAge(),
+                v.getSalary()
+            };
+
+            // Defines second preprocessor that normalizes features.
+            IgniteBiFunction<Integer, Person, double[]> preprocessor = new MinMaxScalerTrainer<Integer, Person>()
+                .fit(ignite, persons, featureExtractor);
+
+            // Creates a cache based simple dataset containing features and providing standard dataset API.
+            try (SimpleDataset<?> dataset = DatasetFactory.createSimpleDataset(ignite, persons, preprocessor)) {
+                // Calculation of the mean value. This calculation will be performed in map-reduce manner.
+                double[] mean = dataset.mean();
+                System.out.println("Mean \n\t" + Arrays.toString(mean));
+
+                // Calculation of the standard deviation. This calculation will be performed in map-reduce manner.
+                double[] std = dataset.std();
+                System.out.println("Standard deviation \n\t" + Arrays.toString(std));
+
+                // Calculation of the covariance matrix.  This calculation will be performed in map-reduce manner.
+                double[][] cov = dataset.cov();
+                System.out.println("Covariance matrix ");
+                for (double[] row : cov)
+                    System.out.println("\t" + Arrays.toString(row));
+
+                // Calculation of the correlation matrix.  This calculation will be performed in map-reduce manner.
+                double[][] corr = dataset.corr();
+                System.out.println("Correlation matrix ");
+                for (double[] row : corr)
+                    System.out.println("\t" + Arrays.toString(row));
+            }
+
+            System.out.println(">>> Normalization example completed.");
+        }
+    }
+
+    /** */
+    private static IgniteCache<Integer, Person> createCache(Ignite ignite) {
+        CacheConfiguration<Integer, Person> cacheConfiguration = new CacheConfiguration<>();
+
+        cacheConfiguration.setName("PERSONS");
+        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 2));
+
+        IgniteCache<Integer, Person> persons = ignite.createCache(cacheConfiguration);
+
+        persons.put(1, new Person("Mike", 42, 10000));
+        persons.put(2, new Person("John", 32, 64000));
+        persons.put(3, new Person("George", 53, 120000));
+        persons.put(4, new Person("Karl", 24, 70000));
+
+        return persons;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java
index b2c4e12..16169ab 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/NormalizationExample.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.examples.ml.preprocessing;
 
+import java.util.Arrays;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
@@ -26,16 +27,11 @@ import org.apache.ignite.examples.ml.dataset.model.Person;
 import org.apache.ignite.ml.dataset.DatasetFactory;
 import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.preprocessing.binarization.BinarizationTrainer;
 import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer;
 
-import java.util.Arrays;
-
 /**
- * Example that shows how to use normalization preprocessor to normalize data.
- *
- * Machine learning preprocessors are built as a chain. Most often a first preprocessor is a feature extractor as shown
- * in this example. The second preprocessor here is a normalization preprocessor which is built on top of the feature
- * extractor and represents a chain of itself and the underlying feature extractor.
+ * Example that shows how to use normalization preprocessor to normalize each vector in the given data.
  */
 public class NormalizationExample {
     /** Run example. */
@@ -53,6 +49,7 @@ public class NormalizationExample {
 
             // Defines second preprocessor that normalizes features.
             IgniteBiFunction<Integer, Person, double[]> preprocessor = new NormalizationTrainer<Integer, Person>()
+                .withP(1)
                 .fit(ignite, persons, featureExtractor);
 
             // Creates a cache based simple dataset containing features and providing standard dataset API.
@@ -91,10 +88,10 @@ public class NormalizationExample {
 
         IgniteCache<Integer, Person> persons = ignite.createCache(cacheConfiguration);
 
-        persons.put(1, new Person("Mike", 42, 10000));
-        persons.put(2, new Person("John", 32, 64000));
-        persons.put(3, new Person("George", 53, 120000));
-        persons.put(4, new Person("Karl", 24, 70000));
+        persons.put(1, new Person("Mike", 10, 20));
+        persons.put(2, new Person("John", 20, 10));
+        persons.put(3, new Person("George", 30, 0));
+        persons.put(4, new Person("Karl", 25, 15));
 
         return persons;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/examples/src/main/java/org/apache/ignite/examples/ml/regression/linear/LinearRegressionLSQRTrainerWithMinMaxScalerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/regression/linear/LinearRegressionLSQRTrainerWithMinMaxScalerExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/regression/linear/LinearRegressionLSQRTrainerWithMinMaxScalerExample.java
new file mode 100644
index 0000000..d3ab681
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/regression/linear/LinearRegressionLSQRTrainerWithMinMaxScalerExample.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.regression.linear;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
+import org.apache.ignite.ml.preprocessing.minmaxscaling.MinMaxScalerPreprocessor;
+import org.apache.ignite.ml.preprocessing.minmaxscaling.MinMaxScalerTrainer;
+import org.apache.ignite.ml.regressions.linear.LinearRegressionLSQRTrainer;
+import org.apache.ignite.ml.regressions.linear.LinearRegressionModel;
+import org.apache.ignite.thread.IgniteThread;
+
+import javax.cache.Cache;
+import java.util.Arrays;
+import java.util.UUID;
+
+/**
+ * Run linear regression model over cached dataset.
+ *
+ * @see LinearRegressionLSQRTrainer
+ * @see MinMaxScalerTrainer
+ * @see MinMaxScalerPreprocessor
+ */
+public class LinearRegressionLSQRTrainerWithMinMaxScalerExample {
+    /** */
+    private static final double[][] data = {
+        {8, 78, 284, 9.100000381, 109},
+        {9.300000191, 68, 433, 8.699999809, 144},
+        {7.5, 70, 739, 7.199999809, 113},
+        {8.899999619, 96, 1792, 8.899999619, 97},
+        {10.19999981, 74, 477, 8.300000191, 206},
+        {8.300000191, 111, 362, 10.89999962, 124},
+        {8.800000191, 77, 671, 10, 152},
+        {8.800000191, 168, 636, 9.100000381, 162},
+        {10.69999981, 82, 329, 8.699999809, 150},
+        {11.69999981, 89, 634, 7.599999905, 134},
+        {8.5, 149, 631, 10.80000019, 292},
+        {8.300000191, 60, 257, 9.5, 108},
+        {8.199999809, 96, 284, 8.800000191, 111},
+        {7.900000095, 83, 603, 9.5, 182},
+        {10.30000019, 130, 686, 8.699999809, 129},
+        {7.400000095, 145, 345, 11.19999981, 158},
+        {9.600000381, 112, 1357, 9.699999809, 186},
+        {9.300000191, 131, 544, 9.600000381, 177},
+        {10.60000038, 80, 205, 9.100000381, 127},
+        {9.699999809, 130, 1264, 9.199999809, 179},
+        {11.60000038, 140, 688, 8.300000191, 80},
+        {8.100000381, 154, 354, 8.399999619, 103},
+        {9.800000191, 118, 1632, 9.399999619, 101},
+        {7.400000095, 94, 348, 9.800000191, 117},
+        {9.399999619, 119, 370, 10.39999962, 88},
+        {11.19999981, 153, 648, 9.899999619, 78},
+        {9.100000381, 116, 366, 9.199999809, 102},
+        {10.5, 97, 540, 10.30000019, 95},
+        {11.89999962, 176, 680, 8.899999619, 80},
+        {8.399999619, 75, 345, 9.600000381, 92},
+        {5, 134, 525, 10.30000019, 126},
+        {9.800000191, 161, 870, 10.39999962, 108},
+        {9.800000191, 111, 669, 9.699999809, 77},
+        {10.80000019, 114, 452, 9.600000381, 60},
+        {10.10000038, 142, 430, 10.69999981, 71},
+        {10.89999962, 238, 822, 10.30000019, 86},
+        {9.199999809, 78, 190, 10.69999981, 93},
+        {8.300000191, 196, 867, 9.600000381, 106},
+        {7.300000191, 125, 969, 10.5, 162},
+        {9.399999619, 82, 499, 7.699999809, 95},
+        {9.399999619, 125, 925, 10.19999981, 91},
+        {9.800000191, 129, 353, 9.899999619, 52},
+        {3.599999905, 84, 288, 8.399999619, 110},
+        {8.399999619, 183, 718, 10.39999962, 69},
+        {10.80000019, 119, 540, 9.199999809, 57},
+        {10.10000038, 180, 668, 13, 106},
+        {9, 82, 347, 8.800000191, 40},
+        {10, 71, 345, 9.199999809, 50},
+        {11.30000019, 118, 463, 7.800000191, 35},
+        {11.30000019, 121, 728, 8.199999809, 86},
+        {12.80000019, 68, 383, 7.400000095, 57},
+        {10, 112, 316, 10.39999962, 57},
+        {6.699999809, 109, 388, 8.899999619, 94}
+    };
+
+    /** Run example. */
+    public static void main(String[] args) throws InterruptedException {
+        System.out.println();
+        System.out.println(">>> Linear regression model over cached dataset usage example started.");
+        // Start ignite grid.
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println(">>> Ignite grid started.");
+
+            IgniteThread igniteThread = new IgniteThread(ignite.configuration().getIgniteInstanceName(),
+                LinearRegressionLSQRTrainerWithMinMaxScalerExample.class.getSimpleName(), () -> {
+                IgniteCache<Integer, double[]> dataCache = getTestCache(ignite);
+
+                System.out.println(">>> Create new minmaxscaling trainer object.");
+                MinMaxScalerTrainer<Integer, double[]> normalizationTrainer = new MinMaxScalerTrainer<>();
+
+                System.out.println(">>> Perform the training to get the minmaxscaling preprocessor.");
+                IgniteBiFunction<Integer, double[], double[]> preprocessor = normalizationTrainer.fit(
+                    ignite,
+                    dataCache,
+                    (k, v) -> Arrays.copyOfRange(v, 1, v.length)
+                );
+
+                System.out.println(">>> Create new linear regression trainer object.");
+                LinearRegressionLSQRTrainer trainer = new LinearRegressionLSQRTrainer();
+
+                System.out.println(">>> Perform the training to get the model.");
+                LinearRegressionModel mdl = trainer.fit(ignite, dataCache, preprocessor, (k, v) -> v[0]);
+
+                System.out.println(">>> Linear regression model: " + mdl);
+
+                System.out.println(">>> ---------------------------------");
+                System.out.println(">>> | Prediction\t| Ground Truth\t|");
+                System.out.println(">>> ---------------------------------");
+
+                try (QueryCursor<Cache.Entry<Integer, double[]>> observations = dataCache.query(new ScanQuery<>())) {
+                    for (Cache.Entry<Integer, double[]> observation : observations) {
+                        Integer key = observation.getKey();
+                        double[] val = observation.getValue();
+                        double groundTruth = val[0];
+
+                        double prediction = mdl.apply(new DenseLocalOnHeapVector(preprocessor.apply(key, val)));
+
+                        System.out.printf(">>> | %.4f\t\t| %.4f\t\t|\n", prediction, groundTruth);
+                    }
+                }
+
+                System.out.println(">>> ---------------------------------");
+            });
+
+            igniteThread.start();
+
+            igniteThread.join();
+        }
+    }
+
+    /**
+     * Fills cache with data and returns it.
+     *
+     * @param ignite Ignite instance.
+     * @return Filled Ignite Cache.
+     */
+    private static IgniteCache<Integer, double[]> getTestCache(Ignite ignite) {
+        CacheConfiguration<Integer, double[]> cacheConfiguration = new CacheConfiguration<>();
+        cacheConfiguration.setName("TEST_" + UUID.randomUUID());
+        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 10));
+
+        IgniteCache<Integer, double[]> cache = ignite.createCache(cacheConfiguration);
+
+        for (int i = 0; i < data.length; i++)
+            cache.put(i, data[i]);
+
+        return cache;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/examples/src/main/java/org/apache/ignite/examples/ml/regression/linear/LinearRegressionLSQRTrainerWithNormalizationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/regression/linear/LinearRegressionLSQRTrainerWithNormalizationExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/regression/linear/LinearRegressionLSQRTrainerWithNormalizationExample.java
deleted file mode 100644
index 6c9273c..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/ml/regression/linear/LinearRegressionLSQRTrainerWithNormalizationExample.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.ml.regression.linear;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.query.ScanQuery;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
-import org.apache.ignite.ml.preprocessing.normalization.NormalizationPreprocessor;
-import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer;
-import org.apache.ignite.ml.regressions.linear.LinearRegressionLSQRTrainer;
-import org.apache.ignite.ml.regressions.linear.LinearRegressionModel;
-import org.apache.ignite.thread.IgniteThread;
-
-import javax.cache.Cache;
-import java.util.Arrays;
-import java.util.UUID;
-
-/**
- * Run linear regression model over cached dataset.
- *
- * @see LinearRegressionLSQRTrainer
- * @see NormalizationTrainer
- * @see NormalizationPreprocessor
- */
-public class LinearRegressionLSQRTrainerWithNormalizationExample {
-    /** */
-    private static final double[][] data = {
-        {8, 78, 284, 9.100000381, 109},
-        {9.300000191, 68, 433, 8.699999809, 144},
-        {7.5, 70, 739, 7.199999809, 113},
-        {8.899999619, 96, 1792, 8.899999619, 97},
-        {10.19999981, 74, 477, 8.300000191, 206},
-        {8.300000191, 111, 362, 10.89999962, 124},
-        {8.800000191, 77, 671, 10, 152},
-        {8.800000191, 168, 636, 9.100000381, 162},
-        {10.69999981, 82, 329, 8.699999809, 150},
-        {11.69999981, 89, 634, 7.599999905, 134},
-        {8.5, 149, 631, 10.80000019, 292},
-        {8.300000191, 60, 257, 9.5, 108},
-        {8.199999809, 96, 284, 8.800000191, 111},
-        {7.900000095, 83, 603, 9.5, 182},
-        {10.30000019, 130, 686, 8.699999809, 129},
-        {7.400000095, 145, 345, 11.19999981, 158},
-        {9.600000381, 112, 1357, 9.699999809, 186},
-        {9.300000191, 131, 544, 9.600000381, 177},
-        {10.60000038, 80, 205, 9.100000381, 127},
-        {9.699999809, 130, 1264, 9.199999809, 179},
-        {11.60000038, 140, 688, 8.300000191, 80},
-        {8.100000381, 154, 354, 8.399999619, 103},
-        {9.800000191, 118, 1632, 9.399999619, 101},
-        {7.400000095, 94, 348, 9.800000191, 117},
-        {9.399999619, 119, 370, 10.39999962, 88},
-        {11.19999981, 153, 648, 9.899999619, 78},
-        {9.100000381, 116, 366, 9.199999809, 102},
-        {10.5, 97, 540, 10.30000019, 95},
-        {11.89999962, 176, 680, 8.899999619, 80},
-        {8.399999619, 75, 345, 9.600000381, 92},
-        {5, 134, 525, 10.30000019, 126},
-        {9.800000191, 161, 870, 10.39999962, 108},
-        {9.800000191, 111, 669, 9.699999809, 77},
-        {10.80000019, 114, 452, 9.600000381, 60},
-        {10.10000038, 142, 430, 10.69999981, 71},
-        {10.89999962, 238, 822, 10.30000019, 86},
-        {9.199999809, 78, 190, 10.69999981, 93},
-        {8.300000191, 196, 867, 9.600000381, 106},
-        {7.300000191, 125, 969, 10.5, 162},
-        {9.399999619, 82, 499, 7.699999809, 95},
-        {9.399999619, 125, 925, 10.19999981, 91},
-        {9.800000191, 129, 353, 9.899999619, 52},
-        {3.599999905, 84, 288, 8.399999619, 110},
-        {8.399999619, 183, 718, 10.39999962, 69},
-        {10.80000019, 119, 540, 9.199999809, 57},
-        {10.10000038, 180, 668, 13, 106},
-        {9, 82, 347, 8.800000191, 40},
-        {10, 71, 345, 9.199999809, 50},
-        {11.30000019, 118, 463, 7.800000191, 35},
-        {11.30000019, 121, 728, 8.199999809, 86},
-        {12.80000019, 68, 383, 7.400000095, 57},
-        {10, 112, 316, 10.39999962, 57},
-        {6.699999809, 109, 388, 8.899999619, 94}
-    };
-
-    /** Run example. */
-    public static void main(String[] args) throws InterruptedException {
-        System.out.println();
-        System.out.println(">>> Linear regression model over cached dataset usage example started.");
-        // Start ignite grid.
-        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
-            System.out.println(">>> Ignite grid started.");
-
-            IgniteThread igniteThread = new IgniteThread(ignite.configuration().getIgniteInstanceName(),
-                LinearRegressionLSQRTrainerWithNormalizationExample.class.getSimpleName(), () -> {
-                IgniteCache<Integer, double[]> dataCache = getTestCache(ignite);
-
-                System.out.println(">>> Create new normalization trainer object.");
-                NormalizationTrainer<Integer, double[]> normalizationTrainer = new NormalizationTrainer<>();
-
-                System.out.println(">>> Perform the training to get the normalization preprocessor.");
-                IgniteBiFunction<Integer, double[], double[]> preprocessor = normalizationTrainer.fit(
-                    ignite,
-                    dataCache,
-                    (k, v) -> Arrays.copyOfRange(v, 1, v.length)
-                );
-
-                System.out.println(">>> Create new linear regression trainer object.");
-                LinearRegressionLSQRTrainer trainer = new LinearRegressionLSQRTrainer();
-
-                System.out.println(">>> Perform the training to get the model.");
-                LinearRegressionModel mdl = trainer.fit(ignite, dataCache, preprocessor, (k, v) -> v[0]);
-
-                System.out.println(">>> Linear regression model: " + mdl);
-
-                System.out.println(">>> ---------------------------------");
-                System.out.println(">>> | Prediction\t| Ground Truth\t|");
-                System.out.println(">>> ---------------------------------");
-
-                try (QueryCursor<Cache.Entry<Integer, double[]>> observations = dataCache.query(new ScanQuery<>())) {
-                    for (Cache.Entry<Integer, double[]> observation : observations) {
-                        Integer key = observation.getKey();
-                        double[] val = observation.getValue();
-                        double groundTruth = val[0];
-
-                        double prediction = mdl.apply(new DenseLocalOnHeapVector(preprocessor.apply(key, val)));
-
-                        System.out.printf(">>> | %.4f\t\t| %.4f\t\t|\n", prediction, groundTruth);
-                    }
-                }
-
-                System.out.println(">>> ---------------------------------");
-            });
-
-            igniteThread.start();
-
-            igniteThread.join();
-        }
-    }
-
-    /**
-     * Fills cache with data and returns it.
-     *
-     * @param ignite Ignite instance.
-     * @return Filled Ignite Cache.
-     */
-    private static IgniteCache<Integer, double[]> getTestCache(Ignite ignite) {
-        CacheConfiguration<Integer, double[]> cacheConfiguration = new CacheConfiguration<>();
-        cacheConfiguration.setName("TEST_" + UUID.randomUUID());
-        cacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 10));
-
-        IgniteCache<Integer, double[]> cache = ignite.createCache(cacheConfiguration);
-
-        for (int i = 0; i < data.length; i++)
-            cache.put(i, data[i]);
-
-        return cache;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/examples/src/main/java/org/apache/ignite/examples/ml/regression/logistic/multiclass/LogRegressionMultiClassClassificationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/regression/logistic/multiclass/LogRegressionMultiClassClassificationExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/regression/logistic/multiclass/LogRegressionMultiClassClassificationExample.java
index f089923..61a711e 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/regression/logistic/multiclass/LogRegressionMultiClassClassificationExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/regression/logistic/multiclass/LogRegressionMultiClassClassificationExample.java
@@ -32,7 +32,7 @@ import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
 import org.apache.ignite.ml.nn.UpdatesStrategy;
 import org.apache.ignite.ml.optimization.updatecalculators.SimpleGDParameterUpdate;
 import org.apache.ignite.ml.optimization.updatecalculators.SimpleGDUpdateCalculator;
-import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer;
+import org.apache.ignite.ml.preprocessing.minmaxscaling.MinMaxScalerTrainer;
 import org.apache.ignite.ml.regressions.logistic.multiclass.LogRegressionMultiClassModel;
 import org.apache.ignite.ml.regressions.logistic.multiclass.LogRegressionMultiClassTrainer;
 import org.apache.ignite.ml.svm.SVMLinearMultiClassClassificationModel;
@@ -40,7 +40,7 @@ import org.apache.ignite.thread.IgniteThread;
 
 /**
  * Run Logistic Regression multi-class classification trainer over distributed dataset to build two models:
- * one with normalization and one without normalization.
+ * one with minmaxscaling and one without minmaxscaling.
  *
  * @see SVMLinearMultiClassClassificationModel
  */
@@ -78,7 +78,7 @@ public class LogRegressionMultiClassClassificationExample {
                 System.out.println(">>> SVM Multi-class model");
                 System.out.println(mdl.toString());
 
-                NormalizationTrainer<Integer, double[]> normalizationTrainer = new NormalizationTrainer<>();
+                MinMaxScalerTrainer<Integer, double[]> normalizationTrainer = new MinMaxScalerTrainer<>();
 
                 IgniteBiFunction<Integer, double[], double[]> preprocessor = normalizationTrainer.fit(
                     ignite,
@@ -93,7 +93,7 @@ public class LogRegressionMultiClassClassificationExample {
                     (k, v) -> v[0]
                 );
 
-                System.out.println(">>> Logistic Regression Multi-class model with normalization");
+                System.out.println(">>> Logistic Regression Multi-class model with minmaxscaling");
                 System.out.println(mdlWithNormalization.toString());
 
                 System.out.println(">>> ----------------------------------------------------------------");
@@ -128,7 +128,7 @@ public class LogRegressionMultiClassClassificationExample {
 
                         confusionMtx[idx1][idx2]++;
 
-                        // Collect data for model with normalization
+                        // Collect data for model with minmaxscaling
                         if(groundTruth != predictionWithNormalization)
                             amountOfErrorsWithNormalization++;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/examples/src/main/java/org/apache/ignite/examples/ml/svm/multiclass/SVMMultiClassClassificationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/svm/multiclass/SVMMultiClassClassificationExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/svm/multiclass/SVMMultiClassClassificationExample.java
index 4054201..c2be971 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/svm/multiclass/SVMMultiClassClassificationExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/svm/multiclass/SVMMultiClassClassificationExample.java
@@ -26,7 +26,7 @@ import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
 import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
-import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer;
+import org.apache.ignite.ml.preprocessing.minmaxscaling.MinMaxScalerTrainer;
 import org.apache.ignite.ml.svm.SVMLinearMultiClassClassificationModel;
 import org.apache.ignite.ml.svm.SVMLinearMultiClassClassificationTrainer;
 import org.apache.ignite.thread.IgniteThread;
@@ -37,7 +37,7 @@ import java.util.UUID;
 
 /**
  * Run SVM multi-class classification trainer over distributed dataset to build two models:
- * one with normalization and one without normalization.
+ * one with minmaxscaling and one without minmaxscaling.
  *
  * @see SVMLinearMultiClassClassificationModel
  */
@@ -66,7 +66,7 @@ public class SVMMultiClassClassificationExample {
                 System.out.println(">>> SVM Multi-class model");
                 System.out.println(mdl.toString());
 
-                NormalizationTrainer<Integer, double[]> normalizationTrainer = new NormalizationTrainer<>();
+                MinMaxScalerTrainer<Integer, double[]> normalizationTrainer = new MinMaxScalerTrainer<>();
 
                 IgniteBiFunction<Integer, double[], double[]> preprocessor = normalizationTrainer.fit(
                     ignite,
@@ -81,7 +81,7 @@ public class SVMMultiClassClassificationExample {
                     (k, v) -> v[0]
                 );
 
-                System.out.println(">>> SVM Multi-class model with normalization");
+                System.out.println(">>> SVM Multi-class model with minmaxscaling");
                 System.out.println(mdlWithNormalization.toString());
 
                 System.out.println(">>> ----------------------------------------------------------------");
@@ -116,7 +116,7 @@ public class SVMMultiClassClassificationExample {
 
                         confusionMtx[idx1][idx2]++;
 
-                        // Collect data for model with normalization
+                        // Collect data for model with minmaxscaling
                         if(groundTruth != predictionWithNormalization)
                             amountOfErrorsWithNormalization++;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/binarization/BinarizationTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/binarization/BinarizationTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/binarization/BinarizationTrainer.java
index 40060f4..abbf644 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/binarization/BinarizationTrainer.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/binarization/BinarizationTrainer.java
@@ -17,14 +17,9 @@
 
 package org.apache.ignite.ml.preprocessing.binarization;
 
-import org.apache.ignite.ml.dataset.Dataset;
 import org.apache.ignite.ml.dataset.DatasetBuilder;
-import org.apache.ignite.ml.dataset.UpstreamEntry;
-import org.apache.ignite.ml.dataset.primitive.context.EmptyContext;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
 import org.apache.ignite.ml.preprocessing.PreprocessingTrainer;
-import org.apache.ignite.ml.preprocessing.normalization.NormalizationPartitionData;
-import org.apache.ignite.ml.preprocessing.normalization.NormalizationPreprocessor;
 
 /**
  * Trainer of the binarization preprocessor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerPartitionData.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerPartitionData.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerPartitionData.java
deleted file mode 100644
index 46003b5..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerPartitionData.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.preprocessing.imputer;
-
-import java.util.Map;
-
-/**
- * Partition data used in imputer preprocessor.
- *
- * @see ImputerTrainer
- * @see ImputerPreprocessor
- */
-public class ImputerPartitionData implements AutoCloseable {
-    /** Sum of values in partition. */
-    private double[] sums;
-
-    /** Count of values in partition. */
-    private int[] counts;
-
-    /** Most frequent values. */
-    private Map<Double, Integer>[] valuesByFreq;
-
-    /**
-     * Constructs a new instance of imputing partition data.
-     *
-     */
-    public ImputerPartitionData() {
-    }
-
-    /**
-     * Gets the array of sums of values in partition for each feature in the dataset.
-     *
-     * @return The sums.
-     */
-    public double[] sums() {
-        return sums;
-    }
-
-    /**
-     * Sets the array of sums of values in partition for each feature in the dataset.
-     *
-     * @param sums The given value.
-     *
-     * @return The partition data.
-     */
-    public ImputerPartitionData withSums(double[] sums) {
-        this.sums = sums;
-        return this;
-    }
-
-    /**
-     * Sets the array of amounts of values in partition for each feature in the dataset.
-     *
-     * @param counts The given value.
-     *
-     * @return The partition data.
-     */
-    public ImputerPartitionData withCounts(int[] counts) {
-        this.counts = counts;
-        return this;
-    }
-
-    /**
-     * Gets the array of amounts of values in partition for each feature in the dataset.
-     *
-     * @return The counts.
-     */
-    public int[] counts() {
-        return counts;
-    }
-
-    /**
-     * Gets the array of maps of frequencies by value in partition for each feature in the dataset.
-     *
-     * @return The frequencies.
-     */
-    public Map<Double, Integer>[] valuesByFrequency() {
-        return valuesByFreq;
-    }
-
-    /**
-     * Sets the array of maps of frequencies by value in partition for each feature in the dataset.
-     *
-     * @param valuesByFreq The given value.
-     * @return The partition data.
-     */
-    public ImputerPartitionData withValuesByFrequency(Map<Double, Integer>[] valuesByFreq) {
-        this.valuesByFreq = valuesByFreq;
-        return this;
-    }
-
-    /** */
-    @Override public void close() {
-        // Do nothing, GC will clean up.
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerPreprocessor.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerPreprocessor.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerPreprocessor.java
deleted file mode 100644
index 6677d23..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerPreprocessor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.preprocessing.imputer;
-
-import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-
-/**
- * Preprocessing function that makes imputing.
- *
- * @param <K> Type of a key in {@code upstream} data.
- * @param <V> Type of a value in {@code upstream} data.
- */
-public class ImputerPreprocessor<K, V> implements IgniteBiFunction<K, V, double[]> {
-    /** */
-    private static final long serialVersionUID = 6887800576392623469L;
-
-    /** Filling values. */
-    private final double[] imputingValues;
-
-    /** Base preprocessor. */
-    private final IgniteBiFunction<K, V, double[]> basePreprocessor;
-
-    /**
-     * Constructs a new instance of imputing preprocessor.
-     *
-     * @param basePreprocessor Base preprocessor.
-     */
-    public ImputerPreprocessor(double[] imputingValues,
-        IgniteBiFunction<K, V, double[]> basePreprocessor) {
-        this.imputingValues = imputingValues;
-        this.basePreprocessor = basePreprocessor;
-    }
-
-    /**
-     * Applies this preprocessor.
-     *
-     * @param k Key.
-     * @param v Value.
-     * @return Preprocessed row.
-     */
-    @Override public double[] apply(K k, V v) {
-        double[] res = basePreprocessor.apply(k, v);
-
-        assert res.length == imputingValues.length;
-
-        for (int i = 0; i < res.length; i++) {
-            if (Double.valueOf(res[i]).equals(Double.NaN))
-                res[i] = imputingValues[i];
-        }
-        return res;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerTrainer.java
deleted file mode 100644
index 0cce4ce..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputerTrainer.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.preprocessing.imputer;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.ignite.ml.dataset.Dataset;
-import org.apache.ignite.ml.dataset.DatasetBuilder;
-import org.apache.ignite.ml.dataset.UpstreamEntry;
-import org.apache.ignite.ml.dataset.primitive.context.EmptyContext;
-import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.preprocessing.PreprocessingTrainer;
-
-/**
- * Trainer of the imputer preprocessor.
- * The imputer fills the missed values according the imputing strategy (default: mean value for each feature).
- * It supports double values in features only.
- *
- * @param <K> Type of a key in {@code upstream} data.
- * @param <V> Type of a value in {@code upstream} data.
- */
-public class ImputerTrainer<K, V> implements PreprocessingTrainer<K, V, double[], double[]> {
-    /** The imputing strategy. */
-    private ImputingStrategy imputingStgy = ImputingStrategy.MEAN;
-
-    /** {@inheritDoc} */
-    @Override public ImputerPreprocessor<K, V> fit(DatasetBuilder<K, V> datasetBuilder,
-        IgniteBiFunction<K, V, double[]> basePreprocessor) {
-        try (Dataset<EmptyContext, ImputerPartitionData> dataset = datasetBuilder.build(
-            (upstream, upstreamSize) -> new EmptyContext(),
-            (upstream, upstreamSize, ctx) -> {
-                double[] sums = null;
-                int[] counts = null;
-                Map<Double, Integer>[] valuesByFreq = null;
-
-                while (upstream.hasNext()) {
-                    UpstreamEntry<K, V> entity = upstream.next();
-                    double[] row = basePreprocessor.apply(entity.getKey(), entity.getValue());
-
-                    switch (imputingStgy) {
-                        case MEAN:
-                            sums = calculateTheSums(row, sums);
-                            counts = calculateTheCounts(row, counts);
-                            break;
-                        case MOST_FREQUENT:
-                            valuesByFreq = calculateFrequencies(row, valuesByFreq);
-                            break;
-                        default: throw new UnsupportedOperationException("The chosen strategy is not supported");
-                    }
-                }
-
-                ImputerPartitionData partData;
-
-                switch (imputingStgy) {
-                    case MEAN:
-                        partData = new ImputerPartitionData().withSums(sums).withCounts(counts);
-                        break;
-                    case MOST_FREQUENT:
-                        partData = new ImputerPartitionData().withValuesByFrequency(valuesByFreq);
-                        break;
-                    default: throw new UnsupportedOperationException("The chosen strategy is not supported");
-                }
-                return partData;
-            }
-        )) {
-
-            double[] imputingValues;
-
-            switch (imputingStgy) {
-                case MEAN:
-                    imputingValues = calculateImputingValuesBySumsAndCounts(dataset);
-                    break;
-                case MOST_FREQUENT:
-                    imputingValues = calculateImputingValuesByFrequencies(dataset);
-                    break;
-                default: throw new UnsupportedOperationException("The chosen strategy is not supported");
-            }
-
-            return new ImputerPreprocessor<>(imputingValues, basePreprocessor);
-
-        }
-        catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Calculates the imputing values by frequencies keeping in the given dataset.
-     *
-     * @param dataset The dataset of frequencies for each feature aggregated in each partition..
-     * @return Most frequent value for each feature.
-     */
-    private double[] calculateImputingValuesByFrequencies(
-        Dataset<EmptyContext, ImputerPartitionData> dataset) {
-        Map<Double, Integer>[] frequencies = dataset.compute(
-            ImputerPartitionData::valuesByFrequency,
-            (a, b) -> {
-                if (a == null)
-                    return b;
-
-                if (b == null)
-                    return a;
-
-                assert a.length == b.length;
-
-                for (int i = 0; i < a.length; i++) {
-                    int finalI = i;
-                    a[i].forEach((k, v) -> b[finalI].merge(k, v, (f1, f2) -> f1 + f2));
-                }
-                return b;
-            }
-        );
-
-        double[] res = new double[frequencies.length];
-
-        for (int i = 0; i < frequencies.length; i++) {
-            Optional<Map.Entry<Double, Integer>> max = frequencies[i].entrySet()
-                .stream()
-                .max(Comparator.comparingInt(Map.Entry::getValue));
-
-            if(max.isPresent())
-                res[i] = max.get().getKey();
-        }
-
-        return res;
-    }
-
-    /**
-     * Calculates the imputing values by sums and counts keeping in the given dataset.
-     *
-     * @param dataset The dataset with sums and counts for each feature aggregated in each partition.
-     * @return The mean value for each feature.
-     */
-    private double[] calculateImputingValuesBySumsAndCounts(Dataset<EmptyContext, ImputerPartitionData> dataset) {
-        double[] sums = dataset.compute(
-            ImputerPartitionData::sums,
-            (a, b) -> {
-                if (a == null)
-                    return b;
-
-                if (b == null)
-                    return a;
-
-                assert a.length == b.length;
-
-                for (int i = 0; i < a.length; i++)
-                    a[i] += b[i];
-
-                return a;
-            }
-        );
-
-        int[] counts = dataset.compute(
-            ImputerPartitionData::counts,
-            (a, b) -> {
-                if (a == null)
-                    return b;
-
-                if (b == null)
-                    return a;
-
-                assert a.length == b.length;
-
-                for (int i = 0; i < a.length; i++)
-                    a[i] += b[i];
-
-                return a;
-            }
-        );
-
-        double[] means = new double[sums.length];
-
-        for (int i = 0; i < means.length; i++)
-            means[i] = sums[i]/counts[i];
-
-        return means;
-    }
-
-    /**
-     * Updates frequencies by values and features.
-     *
-     * @param row Feature vector.
-     * @param valuesByFreq Holds the sums by values and features.
-     * @return Updated sums by values and features.
-     */
-    private Map<Double, Integer>[] calculateFrequencies(double[] row, Map<Double, Integer>[] valuesByFreq) {
-        if (valuesByFreq == null) {
-            valuesByFreq = new HashMap[row.length];
-            for (int i = 0; i < valuesByFreq.length; i++) valuesByFreq[i] = new HashMap<>();
-        }
-        else
-            assert valuesByFreq.length == row.length : "Base preprocessor must return exactly " + valuesByFreq.length
-                + " features";
-
-        for (int i = 0; i < valuesByFreq.length; i++) {
-            double v = row[i];
-
-            if(!Double.valueOf(v).equals(Double.NaN)) {
-                Map<Double, Integer> map = valuesByFreq[i];
-
-                if (map.containsKey(v))
-                    map.put(v, (map.get(v)) + 1);
-                else
-                    map.put(v, 1);
-            }
-        }
-        return valuesByFreq;
-    }
-
-    /**
-     * Updates sums by features.
-     *
-     * @param row Feature vector.
-     * @param sums Holds the sums by features.
-     * @return Updated sums by features.
-     */
-    private double[] calculateTheSums(double[] row, double[] sums) {
-        if (sums == null)
-            sums = new double[row.length];
-        else
-            assert sums.length == row.length : "Base preprocessor must return exactly " + sums.length
-                + " features";
-
-        for (int i = 0; i < sums.length; i++){
-            if(!Double.valueOf(row[i]).equals(Double.NaN))
-                sums[i] += row[i];
-        }
-
-        return sums;
-    }
-
-    /**
-     * Updates counts by features.
-     *
-     * @param row Feature vector.
-     * @param counts Holds the counts by features.
-     * @return Updated counts by features.
-     */
-    private int[] calculateTheCounts(double[] row, int[] counts) {
-        if (counts == null)
-            counts = new int[row.length];
-        else
-            assert counts.length == row.length : "Base preprocessor must return exactly " + counts.length
-                + " features";
-
-        for (int i = 0; i < counts.length; i++){
-            if(!Double.valueOf(row[i]).equals(Double.NaN))
-                counts[i]++;
-        }
-
-        return counts;
-    }
-
-    /**
-     * Sets the imputing strategy.
-     *
-     * @param imputingStgy The given value.
-     * @return The updated imputer trainer.
-     */
-    public ImputerTrainer<K, V> withImputingStrategy(ImputingStrategy imputingStgy){
-        this.imputingStgy = imputingStgy;
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputingStrategy.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputingStrategy.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputingStrategy.java
deleted file mode 100644
index 76af90b..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/ImputingStrategy.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.preprocessing.imputer;
-
-/** This enum contains settings for imputer preprocessor. */
-public enum ImputingStrategy {
-    /** The default strategy. If this strategy is chosen, then replace missing values using the mean for the numeric features along the axis. */
-    MEAN,
-
-    /** If this strategy is chosen, then replace missing using the most frequent value along the axis. */
-    MOST_FREQUENT
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/package-info.java
deleted file mode 100644
index 6f9ee47..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputer/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <!-- Package description. -->
- * Contains Imputer preprocessor.
- */
-package org.apache.ignite.ml.preprocessing.imputer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerPartitionData.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerPartitionData.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerPartitionData.java
new file mode 100644
index 0000000..fa25d59
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerPartitionData.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.imputing;
+
+import java.util.Map;
+
+/**
+ * Partition data used in imputing preprocessor.
+ *
+ * @see ImputerTrainer
+ * @see ImputerPreprocessor
+ */
+public class ImputerPartitionData implements AutoCloseable {
+    /** Sum of values in partition. */
+    private double[] sums;
+
+    /** Count of values in partition. */
+    private int[] counts;
+
+    /** Most frequent values. */
+    private Map<Double, Integer>[] valuesByFreq;
+
+    /**
+     * Constructs a new instance of imputing partition data.
+     *
+     */
+    public ImputerPartitionData() {
+    }
+
+    /**
+     * Gets the array of sums of values in partition for each feature in the dataset.
+     *
+     * @return The sums.
+     */
+    public double[] sums() {
+        return sums;
+    }
+
+    /**
+     * Sets the array of sums of values in partition for each feature in the dataset.
+     *
+     * @param sums The given value.
+     *
+     * @return The partition data.
+     */
+    public ImputerPartitionData withSums(double[] sums) {
+        this.sums = sums;
+        return this;
+    }
+
+    /**
+     * Sets the array of amounts of values in partition for each feature in the dataset.
+     *
+     * @param counts The given value.
+     *
+     * @return The partition data.
+     */
+    public ImputerPartitionData withCounts(int[] counts) {
+        this.counts = counts;
+        return this;
+    }
+
+    /**
+     * Gets the array of amounts of values in partition for each feature in the dataset.
+     *
+     * @return The counts.
+     */
+    public int[] counts() {
+        return counts;
+    }
+
+    /**
+     * Gets the array of maps of frequencies by value in partition for each feature in the dataset.
+     *
+     * @return The frequencies.
+     */
+    public Map<Double, Integer>[] valuesByFrequency() {
+        return valuesByFreq;
+    }
+
+    /**
+     * Sets the array of maps of frequencies by value in partition for each feature in the dataset.
+     *
+     * @param valuesByFreq The given value.
+     * @return The partition data.
+     */
+    public ImputerPartitionData withValuesByFrequency(Map<Double, Integer>[] valuesByFreq) {
+        this.valuesByFreq = valuesByFreq;
+        return this;
+    }
+
+    /** */
+    @Override public void close() {
+        // Do nothing, GC will clean up.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerPreprocessor.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerPreprocessor.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerPreprocessor.java
new file mode 100644
index 0000000..95344ee
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerPreprocessor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.imputing;
+
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * Preprocessing function that makes imputing.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ */
+public class ImputerPreprocessor<K, V> implements IgniteBiFunction<K, V, double[]> {
+    /** */
+    private static final long serialVersionUID = 6887800576392623469L;
+
+    /** Filling values. */
+    private final double[] imputingValues;
+
+    /** Base preprocessor. */
+    private final IgniteBiFunction<K, V, double[]> basePreprocessor;
+
+    /**
+     * Constructs a new instance of imputing preprocessor.
+     *
+     * @param basePreprocessor Base preprocessor.
+     */
+    public ImputerPreprocessor(double[] imputingValues,
+        IgniteBiFunction<K, V, double[]> basePreprocessor) {
+        this.imputingValues = imputingValues;
+        this.basePreprocessor = basePreprocessor;
+    }
+
+    /**
+     * Applies this preprocessor.
+     *
+     * @param k Key.
+     * @param v Value.
+     * @return Preprocessed row.
+     */
+    @Override public double[] apply(K k, V v) {
+        double[] res = basePreprocessor.apply(k, v);
+
+        assert res.length == imputingValues.length;
+
+        for (int i = 0; i < res.length; i++) {
+            if (Double.valueOf(res[i]).equals(Double.NaN))
+                res[i] = imputingValues[i];
+        }
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainer.java
new file mode 100644
index 0000000..7d3a161
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainer.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.imputing;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.ignite.ml.dataset.Dataset;
+import org.apache.ignite.ml.dataset.DatasetBuilder;
+import org.apache.ignite.ml.dataset.UpstreamEntry;
+import org.apache.ignite.ml.dataset.primitive.context.EmptyContext;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.preprocessing.PreprocessingTrainer;
+
+/**
+ * Trainer of the imputing preprocessor.
+ * The imputing fills the missed values according the imputing strategy (default: mean value for each feature).
+ * It supports double values in features only.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ */
+public class ImputerTrainer<K, V> implements PreprocessingTrainer<K, V, double[], double[]> {
+    /** The imputing strategy. */
+    private ImputingStrategy imputingStgy = ImputingStrategy.MEAN;
+
+    /** {@inheritDoc} */
+    @Override public ImputerPreprocessor<K, V> fit(DatasetBuilder<K, V> datasetBuilder,
+        IgniteBiFunction<K, V, double[]> basePreprocessor) {
+        try (Dataset<EmptyContext, ImputerPartitionData> dataset = datasetBuilder.build(
+            (upstream, upstreamSize) -> new EmptyContext(),
+            (upstream, upstreamSize, ctx) -> {
+                double[] sums = null;
+                int[] counts = null;
+                Map<Double, Integer>[] valuesByFreq = null;
+
+                while (upstream.hasNext()) {
+                    UpstreamEntry<K, V> entity = upstream.next();
+                    double[] row = basePreprocessor.apply(entity.getKey(), entity.getValue());
+
+                    switch (imputingStgy) {
+                        case MEAN:
+                            sums = calculateTheSums(row, sums);
+                            counts = calculateTheCounts(row, counts);
+                            break;
+                        case MOST_FREQUENT:
+                            valuesByFreq = calculateFrequencies(row, valuesByFreq);
+                            break;
+                        default: throw new UnsupportedOperationException("The chosen strategy is not supported");
+                    }
+                }
+
+                ImputerPartitionData partData;
+
+                switch (imputingStgy) {
+                    case MEAN:
+                        partData = new ImputerPartitionData().withSums(sums).withCounts(counts);
+                        break;
+                    case MOST_FREQUENT:
+                        partData = new ImputerPartitionData().withValuesByFrequency(valuesByFreq);
+                        break;
+                    default: throw new UnsupportedOperationException("The chosen strategy is not supported");
+                }
+                return partData;
+            }
+        )) {
+
+            double[] imputingValues;
+
+            switch (imputingStgy) {
+                case MEAN:
+                    imputingValues = calculateImputingValuesBySumsAndCounts(dataset);
+                    break;
+                case MOST_FREQUENT:
+                    imputingValues = calculateImputingValuesByFrequencies(dataset);
+                    break;
+                default: throw new UnsupportedOperationException("The chosen strategy is not supported");
+            }
+
+            return new ImputerPreprocessor<>(imputingValues, basePreprocessor);
+
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Calculates the imputing values by frequencies keeping in the given dataset.
+     *
+     * @param dataset The dataset of frequencies for each feature aggregated in each partition..
+     * @return Most frequent value for each feature.
+     */
+    private double[] calculateImputingValuesByFrequencies(
+        Dataset<EmptyContext, ImputerPartitionData> dataset) {
+        Map<Double, Integer>[] frequencies = dataset.compute(
+            ImputerPartitionData::valuesByFrequency,
+            (a, b) -> {
+                if (a == null)
+                    return b;
+
+                if (b == null)
+                    return a;
+
+                assert a.length == b.length;
+
+                for (int i = 0; i < a.length; i++) {
+                    int finalI = i;
+                    a[i].forEach((k, v) -> b[finalI].merge(k, v, (f1, f2) -> f1 + f2));
+                }
+                return b;
+            }
+        );
+
+        double[] res = new double[frequencies.length];
+
+        for (int i = 0; i < frequencies.length; i++) {
+            Optional<Map.Entry<Double, Integer>> max = frequencies[i].entrySet()
+                .stream()
+                .max(Comparator.comparingInt(Map.Entry::getValue));
+
+            if(max.isPresent())
+                res[i] = max.get().getKey();
+        }
+
+        return res;
+    }
+
+    /**
+     * Calculates the imputing values by sums and counts keeping in the given dataset.
+     *
+     * @param dataset The dataset with sums and counts for each feature aggregated in each partition.
+     * @return The mean value for each feature.
+     */
+    private double[] calculateImputingValuesBySumsAndCounts(Dataset<EmptyContext, ImputerPartitionData> dataset) {
+        double[] sums = dataset.compute(
+            ImputerPartitionData::sums,
+            (a, b) -> {
+                if (a == null)
+                    return b;
+
+                if (b == null)
+                    return a;
+
+                assert a.length == b.length;
+
+                for (int i = 0; i < a.length; i++)
+                    a[i] += b[i];
+
+                return a;
+            }
+        );
+
+        int[] counts = dataset.compute(
+            ImputerPartitionData::counts,
+            (a, b) -> {
+                if (a == null)
+                    return b;
+
+                if (b == null)
+                    return a;
+
+                assert a.length == b.length;
+
+                for (int i = 0; i < a.length; i++)
+                    a[i] += b[i];
+
+                return a;
+            }
+        );
+
+        double[] means = new double[sums.length];
+
+        for (int i = 0; i < means.length; i++)
+            means[i] = sums[i]/counts[i];
+
+        return means;
+    }
+
+    /**
+     * Updates frequencies by values and features.
+     *
+     * @param row Feature vector.
+     * @param valuesByFreq Holds the sums by values and features.
+     * @return Updated sums by values and features.
+     */
+    private Map<Double, Integer>[] calculateFrequencies(double[] row, Map<Double, Integer>[] valuesByFreq) {
+        if (valuesByFreq == null) {
+            valuesByFreq = new HashMap[row.length];
+            for (int i = 0; i < valuesByFreq.length; i++) valuesByFreq[i] = new HashMap<>();
+        }
+        else
+            assert valuesByFreq.length == row.length : "Base preprocessor must return exactly " + valuesByFreq.length
+                + " features";
+
+        for (int i = 0; i < valuesByFreq.length; i++) {
+            double v = row[i];
+
+            if(!Double.valueOf(v).equals(Double.NaN)) {
+                Map<Double, Integer> map = valuesByFreq[i];
+
+                if (map.containsKey(v))
+                    map.put(v, (map.get(v)) + 1);
+                else
+                    map.put(v, 1);
+            }
+        }
+        return valuesByFreq;
+    }
+
+    /**
+     * Updates sums by features.
+     *
+     * @param row Feature vector.
+     * @param sums Holds the sums by features.
+     * @return Updated sums by features.
+     */
+    private double[] calculateTheSums(double[] row, double[] sums) {
+        if (sums == null)
+            sums = new double[row.length];
+        else
+            assert sums.length == row.length : "Base preprocessor must return exactly " + sums.length
+                + " features";
+
+        for (int i = 0; i < sums.length; i++){
+            if(!Double.valueOf(row[i]).equals(Double.NaN))
+                sums[i] += row[i];
+        }
+
+        return sums;
+    }
+
+    /**
+     * Updates counts by features.
+     *
+     * @param row Feature vector.
+     * @param counts Holds the counts by features.
+     * @return Updated counts by features.
+     */
+    private int[] calculateTheCounts(double[] row, int[] counts) {
+        if (counts == null)
+            counts = new int[row.length];
+        else
+            assert counts.length == row.length : "Base preprocessor must return exactly " + counts.length
+                + " features";
+
+        for (int i = 0; i < counts.length; i++){
+            if(!Double.valueOf(row[i]).equals(Double.NaN))
+                counts[i]++;
+        }
+
+        return counts;
+    }
+
+    /**
+     * Sets the imputing strategy.
+     *
+     * @param imputingStgy The given value.
+     * @return The updated imputing trainer.
+     */
+    public ImputerTrainer<K, V> withImputingStrategy(ImputingStrategy imputingStgy){
+        this.imputingStgy = imputingStgy;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputingStrategy.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputingStrategy.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputingStrategy.java
new file mode 100644
index 0000000..b50f73c
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/ImputingStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.imputing;
+
+/** This enum contains settings for imputing preprocessor. */
+public enum ImputingStrategy {
+    /** The default strategy. If this strategy is chosen, then replace missing values using the mean for the numeric features along the axis. */
+    MEAN,
+
+    /** If this strategy is chosen, then replace missing using the most frequent value along the axis. */
+    MOST_FREQUENT
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/package-info.java
new file mode 100644
index 0000000..ca3936e
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/imputing/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains Imputer preprocessor.
+ */
+package org.apache.ignite.ml.preprocessing.imputing;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/minmaxscaling/MinMaxScalerPartitionData.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/minmaxscaling/MinMaxScalerPartitionData.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/minmaxscaling/MinMaxScalerPartitionData.java
new file mode 100644
index 0000000..b625b3d
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/minmaxscaling/MinMaxScalerPartitionData.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.minmaxscaling;
+
+/**
+ * Partition data used in minmaxscaling preprocessor.
+ *
+ * @see MinMaxScalerTrainer
+ * @see MinMaxScalerPreprocessor
+ */
+public class MinMaxScalerPartitionData implements AutoCloseable {
+    /** Minimal values. */
+    private final double[] min;
+
+    /** Maximum values. */
+    private final double[] max;
+
+    /**
+     * Constructs a new instance of minmaxscaling partition data.
+     *
+     * @param min Minimal values.
+     * @param max Maximum values.
+     */
+    public MinMaxScalerPartitionData(double[] min, double[] max) {
+        this.min = min;
+        this.max = max;
+    }
+
+    /** */
+    public double[] getMin() {
+        return min;
+    }
+
+    /** */
+    public double[] getMax() {
+        return max;
+    }
+
+    /** */
+    @Override public void close() {
+        // Do nothing, GC will clean up.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7357ba/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/minmaxscaling/MinMaxScalerPreprocessor.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/minmaxscaling/MinMaxScalerPreprocessor.java b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/minmaxscaling/MinMaxScalerPreprocessor.java
new file mode 100644
index 0000000..f75f927
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/preprocessing/minmaxscaling/MinMaxScalerPreprocessor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.preprocessing.minmaxscaling;
+
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+
+/**
+ * Preprocessing function that makes minmaxscaling. From mathematical point of view it's the following function which
+ * is applied to every element in dataset:
+ *
+ * {@code a_i = (a_i - min_i) / (max_i - min_i) for all i},
+ *
+ * where {@code i} is a number of column, {@code max_i} is the value of the maximum element in this columns,
+ * {@code min_i} is the value of the minimal element in this column.
+ *
+ * @param <K> Type of a key in {@code upstream} data.
+ * @param <V> Type of a value in {@code upstream} data.
+ */
+public class MinMaxScalerPreprocessor<K, V> implements IgniteBiFunction<K, V, double[]> {
+    /** */
+    private static final long serialVersionUID = 6997800576392623469L;
+
+    /** Minimal values. */
+    private final double[] min;
+
+    /** Maximum values. */
+    private final double[] max;
+
+    /** Base preprocessor. */
+    private final IgniteBiFunction<K, V, double[]> basePreprocessor;
+
+    /**
+     * Constructs a new instance of minmaxscaling preprocessor.
+     *
+     * @param min Minimal values.
+     * @param max Maximum values.
+     * @param basePreprocessor Base preprocessor.
+     */
+    public MinMaxScalerPreprocessor(double[] min, double[] max, IgniteBiFunction<K, V, double[]> basePreprocessor) {
+        this.min = min;
+        this.max = max;
+        this.basePreprocessor = basePreprocessor;
+    }
+
+    /**
+     * Applies this preprocessor.
+     *
+     * @param k Key.
+     * @param v Value.
+     * @return Preprocessed row.
+     */
+    @Override public double[] apply(K k, V v) {
+        double[] res = basePreprocessor.apply(k, v);
+
+        assert res.length == min.length;
+        assert res.length == max.length;
+
+        for (int i = 0; i < res.length; i++)
+            res[i] = (res[i] - min[i]) / (max[i] - min[i]);
+
+        return res;
+    }
+
+    /** */
+    public double[] getMin() {
+        return min;
+    }
+
+    /** */
+    public double[] getMax() {
+        return max;
+    }
+}


Mime
View raw message