ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [17/22] ignite git commit: IGNITE-7322: Distributed MLP training.
Date Tue, 09 Jan 2018 07:36:53 GMT
IGNITE-7322: Distributed MLP training.

this closes #3325


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2f2fa8b3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2f2fa8b3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2f2fa8b3

Branch: refs/heads/ignite-zk
Commit: 2f2fa8b345c9705d0bf3109f3a2478fa616559ca
Parents: b0c5ef1
Author: Yury Babak <ybabak@gridgain.com>
Authored: Thu Jan 4 23:26:10 2018 +0300
Committer: Yury Babak <ybabak@gridgain.com>
Committed: Thu Jan 4 23:26:10 2018 +0300

----------------------------------------------------------------------
 .../CacheClientBinaryPutGetExample.java         |   2 +-
 .../ignite/examples/sql/SqlDmlExample.java      |   3 +-
 .../org/apache/ignite/ml/math/Isomorphism.java  |  69 ++++
 .../org/apache/ignite/ml/math/VectorUtils.java  |  20 +
 .../ignite/ml/nn/LabeledVectorsCache.java       |  63 ++++
 .../ignite/ml/nn/LocalBatchTrainerInput.java    |   7 +-
 .../org/apache/ignite/ml/nn/LossFunctions.java  |  28 +-
 .../ml/nn/MLPGroupUpdateTrainerCacheInput.java  | 132 +++++++
 .../java/org/apache/ignite/ml/nn/MLPLayer.java  |   3 +-
 .../ignite/ml/nn/MultilayerPerceptron.java      |  16 +-
 .../ml/nn/architecture/LayerArchitecture.java   |   5 +-
 .../ml/nn/architecture/MLPArchitecture.java     |   9 +-
 .../ml/nn/initializers/RandomInitializer.java   |   2 +-
 .../AbstractMLPGroupUpdateTrainerInput.java     |  60 +++
 .../ml/nn/trainers/distributed/MLPCache.java    |  91 +++++
 .../distributed/MLPGroupTrainingCacheValue.java |  48 +++
 .../distributed/MLPGroupUpdateTrainer.java      | 362 +++++++++++++++++++
 .../MLPGroupUpdateTrainerDataCache.java         |  77 ++++
 .../MLPGroupUpdateTrainerLocalContext.java      | 117 ++++++
 .../MLPGroupUpdateTrainingContext.java          |  64 ++++
 .../distributed/MLPGroupUpdateTrainingData.java |  88 +++++
 .../MLPGroupUpdateTrainingLoopData.java         | 116 ++++++
 .../trainers/distributed/MLPMetaoptimizer.java  |  75 ++++
 .../ml/nn/trainers/local/LocalBatchTrainer.java |  19 +-
 .../nn/trainers/local/MLPLocalBatchTrainer.java |  17 +-
 .../ml/nn/updaters/BaseSmoothParametrized.java  |   6 +-
 .../ml/nn/updaters/NesterovParameterUpdate.java |  94 +++++
 .../nn/updaters/NesterovUpdateCalculator.java   |  85 +++++
 .../ignite/ml/nn/updaters/NesterovUpdater.java  |  76 ----
 .../ml/nn/updaters/NesterovUpdaterParams.java   |  67 ----
 .../nn/updaters/ParameterUpdateCalculator.java  |  58 +++
 .../ignite/ml/nn/updaters/ParameterUpdater.java |  51 ---
 .../ml/nn/updaters/RPropParameterUpdate.java    | 228 ++++++++++++
 .../ml/nn/updaters/RPropUpdateCalculator.java   | 151 ++++++++
 .../ignite/ml/nn/updaters/RPropUpdater.java     | 148 --------
 .../ml/nn/updaters/RPropUpdaterParams.java      | 135 -------
 .../ml/nn/updaters/SimpleGDParameter.java       |  77 ++++
 .../ignite/ml/nn/updaters/SimpleGDParams.java   |  65 ----
 .../nn/updaters/SimpleGDUpdateCalculator.java   |  66 ++++
 .../ignite/ml/nn/updaters/SimpleGDUpdater.java  |  60 ---
 .../ml/nn/updaters/SmoothParametrized.java      |   5 +-
 .../ignite/ml/nn/updaters/UpdaterParams.java    |  32 --
 .../org/apache/ignite/ml/trainers/Trainer.java  |  30 ++
 .../trainers/group/BaseLocalProcessorJob.java   |  15 +-
 .../ignite/ml/trainers/group/GroupTrainer.java  |  38 +-
 .../group/GroupTrainerBaseProcessorTask.java    |  16 +-
 .../group/GroupTrainerEntriesProcessorTask.java |  12 +-
 .../group/GroupTrainerKeysProcessorTask.java    |   9 +-
 .../group/LocalEntriesProcessorJob.java         |   7 +-
 .../trainers/group/LocalKeysProcessorJob.java   |   7 +-
 .../ignite/ml/trainers/group/Metaoptimizer.java |  13 +-
 .../group/MetaoptimizerDistributedStep.java     |  20 +-
 .../group/MetaoptimizerGroupTrainer.java        |  25 +-
 .../ml/trainers/group/ResultAndUpdates.java     |  18 +-
 .../trainers/group/chain/ComputationsChain.java |  22 +-
 .../trainers/group/chain/DistributedStep.java   |  11 +-
 .../org/apache/ignite/ml/util/MnistUtils.java   |   2 +-
 .../java/org/apache/ignite/ml/TestUtils.java    |  11 +
 .../ignite/ml/nn/MLPConstInitializer.java       |  10 +-
 .../ignite/ml/nn/MLPGroupTrainerTest.java       | 126 +++++++
 .../ignite/ml/nn/MLPLocalTrainerTest.java       |  23 +-
 .../org/apache/ignite/ml/nn/MLPTestSuite.java   |   1 +
 .../ml/nn/SimpleMLPLocalBatchTrainerInput.java  |  29 +-
 .../apache/ignite/ml/nn/performance/Mnist.java  | 140 -------
 .../ml/nn/performance/MnistDistributed.java     | 150 ++++++++
 .../ignite/ml/nn/performance/MnistLocal.java    |  93 +++++
 .../ml/nn/performance/MnistMLPTestUtil.java     |  88 +++++
 .../group/DistributedWorkersChainTest.java      |  15 +-
 .../trainers/group/SimpleGroupTrainerInput.java |  27 +-
 .../ml/trainers/group/TestGroupTrainer.java     |  34 +-
 .../group/TestGroupTrainerLocalContext.java     |  39 +-
 .../trainers/group/TestGroupTrainingCache.java  |  15 +-
 .../group/TestGroupTrainingSecondCache.java     |   6 +-
 .../ml/trainers/group/TestLocalContext.java     |   9 +-
 .../ml/trainers/group/TestTrainingLoopStep.java |  17 +-
 .../IgniteCacheRandomOperationBenchmark.java    |   2 +-
 76 files changed, 2917 insertions(+), 1060 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/examples/src/main/java/org/apache/ignite/examples/binary/datagrid/CacheClientBinaryPutGetExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/binary/datagrid/CacheClientBinaryPutGetExample.java b/examples/src/main/java/org/apache/ignite/examples/binary/datagrid/CacheClientBinaryPutGetExample.java
index f0f57af..9397c37 100644
--- a/examples/src/main/java/org/apache/ignite/examples/binary/datagrid/CacheClientBinaryPutGetExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/binary/datagrid/CacheClientBinaryPutGetExample.java
@@ -25,13 +25,13 @@ import java.util.Map;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.examples.model.Address;
 import org.apache.ignite.examples.model.Organization;
 import org.apache.ignite.examples.model.OrganizationType;
-import org.apache.ignite.binary.BinaryObject;
 
 /**
  * This example demonstrates use of binary objects with Ignite cache.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/examples/src/main/java/org/apache/ignite/examples/sql/SqlDmlExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/sql/SqlDmlExample.java b/examples/src/main/java/org/apache/ignite/examples/sql/SqlDmlExample.java
index 54f0fb9..cb2a04a 100644
--- a/examples/src/main/java/org/apache/ignite/examples/sql/SqlDmlExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/sql/SqlDmlExample.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.examples.sql;
 
+import java.util.List;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
@@ -25,8 +26,6 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.examples.model.Organization;
 import org.apache.ignite.examples.model.Person;
 
-import java.util.List;
-
 /**
  * Example to showcase DML capabilities of Ignite's SQL engine.
  */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/math/Isomorphism.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/Isomorphism.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/Isomorphism.java
new file mode 100644
index 0000000..6f17e3a
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/Isomorphism.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.math;
+
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+
+/**
+ *  Function from {@code K} to {@code V} with defined inverse.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class Isomorphism<K, V> {
+    /** */
+    private IgniteFunction<K, V> forward;
+    /** */
+    private IgniteFunction<V, K> back;
+
+    /**
+     * Identity isomorphism.
+     */
+    public static <K> Isomorphism<K, K> id() {
+        return new Isomorphism<>(a -> a, a -> a);
+    }
+
+    /**
+     * Build isomorphism with forward and backward functions.
+     *
+     * @param forward Forward.
+     * @param back Back.
+     */
+    public Isomorphism(IgniteFunction<K, V> forward, IgniteFunction<V, K> back) {
+        this.forward = forward;
+        this.back = back;
+    }
+
+    /**
+     * Forward function.
+     *
+     * @param k K.
+     */
+    public V forward(K k) {
+        return forward.apply(k);
+    }
+
+    /**
+     * Backward function.
+     *
+     * @param v V.
+     */
+    public K back(V v) {
+        return back.apply(v);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorUtils.java
index 7268365..2f51245 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorUtils.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorUtils.java
@@ -135,4 +135,24 @@ public class VectorUtils {
 
         return res;
     }
+
+    /**
+     * Get copy of part of given length of given vector starting from given offset.
+     *
+     * @param v Vector to copy part from.
+     * @param off Offset.
+     * @param len Length.
+     * @return Copy of part of given length of given vector starting from given offset.
+     */
+    public static Vector copyPart(Vector v, int off, int len) {
+        assert off >= 0;
+        assert len <= v.size();
+
+        Vector res = v.like(len);
+
+        for (int i = 0; i < len; i++)
+            res.setX(i, v.getX(off + i));
+
+        return res;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/LabeledVectorsCache.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/LabeledVectorsCache.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/LabeledVectorsCache.java
new file mode 100644
index 0000000..07a6e2a
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/LabeledVectorsCache.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.structures.LabeledVector;
+
+/**
+ * Class for working with labeled vectors cache.
+ */
+public class LabeledVectorsCache {
+    /**
+     * Create new labeled vectors cache.
+     *
+     * @param ignite Ignite instance.
+     * @return new labeled vectors cache.
+     */
+    public static IgniteCache<Integer, LabeledVector<Vector, Vector>> createNew(Ignite ignite) {
+        CacheConfiguration<Integer, LabeledVector<Vector, Vector>> cfg = new CacheConfiguration<>();
+
+        // Write to primary.
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+
+        // Atomic transactions only.
+        cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+        // No copying of values.
+        cfg.setCopyOnRead(false);
+
+        // Cache is partitioned.
+        cfg.setCacheMode(CacheMode.PARTITIONED);
+
+        cfg.setBackups(0);
+
+        cfg.setOnheapCacheEnabled(true);
+
+        cfg.setName("LBLD_VECS_" + UUID.randomUUID().toString());
+
+        return ignite.getOrCreateCache(cfg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/LocalBatchTrainerInput.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/LocalBatchTrainerInput.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/LocalBatchTrainerInput.java
index 4574841..3a87d02 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/nn/LocalBatchTrainerInput.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/LocalBatchTrainerInput.java
@@ -20,17 +20,18 @@ package org.apache.ignite.ml.nn;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.ml.Model;
 import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.functions.IgniteSupplier;
 
 /**
  * Interface for classes containing input parameters for LocalBatchTrainer.
  */
 public interface LocalBatchTrainerInput<M extends Model<Matrix, Matrix>> {
     /**
-     * Get next batch in form of matrix of inputs and matrix of outputs.
+     * Get supplier of next batch in form of matrix of inputs and matrix of outputs.
      *
-     * @return Next batch.
+     * @return Supplier of next batch.
      */
-    IgniteBiTuple<Matrix, Matrix> getBatch();
+    IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier();
 
     /**
      * Model to train.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/LossFunctions.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/LossFunctions.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/LossFunctions.java
index 652072c..dff239c 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/nn/LossFunctions.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/LossFunctions.java
@@ -30,18 +30,18 @@ public class LossFunctions {
      */
     public static IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> MSE = groundTruth ->
         new IgniteDifferentiableVectorToDoubleFunction() {
-        /** {@inheritDoc} */
-        @Override public Vector differential(Vector pnt) {
-            double multiplier = 2.0 / pnt.size();
-            return pnt.minus(groundTruth).times(multiplier);
-        }
+            /** {@inheritDoc} */
+            @Override public Vector differential(Vector pnt) {
+                double multiplier = 2.0 / pnt.size();
+                return pnt.minus(groundTruth).times(multiplier);
+            }
 
-        /** {@inheritDoc} */
-        @Override public Double apply(Vector vector) {
-            return groundTruth.copy().map(vector, (a, b) -> {
-                double diff = a - b;
-                return diff * diff;
-            }).sum() / (vector.size());
-        }
-    };
-}
\ No newline at end of file
+            /** {@inheritDoc} */
+            @Override public Double apply(Vector vector) {
+                return groundTruth.copy().map(vector, (a, b) -> {
+                    double diff = a - b;
+                    return diff * diff;
+                }).sum() / (vector.size());
+            }
+        };
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/MLPGroupUpdateTrainerCacheInput.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/MLPGroupUpdateTrainerCacheInput.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/MLPGroupUpdateTrainerCacheInput.java
new file mode 100644
index 0000000..14db261
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/MLPGroupUpdateTrainerCacheInput.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.functions.IgniteSupplier;
+import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
+import org.apache.ignite.ml.nn.architecture.MLPArchitecture;
+import org.apache.ignite.ml.nn.initializers.MLPInitializer;
+import org.apache.ignite.ml.nn.trainers.distributed.AbstractMLPGroupUpdateTrainerInput;
+import org.apache.ignite.ml.nn.trainers.distributed.MLPGroupUpdateTrainer;
+import org.apache.ignite.ml.structures.LabeledVector;
+import org.apache.ignite.ml.util.Utils;
+
+/**
+ * Input for {@link MLPGroupUpdateTrainer} where batches are taken from cache of labeled vectors.
+ */
+public class MLPGroupUpdateTrainerCacheInput extends AbstractMLPGroupUpdateTrainerInput {
+    /**
+     * Cache of labeled vectors.
+     */
+    private final IgniteCache<Integer, LabeledVector<Vector, Vector>> cache;
+
+    /**
+     * Size of batch to return on each training iteration.
+     */
+    private final int batchSize;
+
+    /**
+     * Multilayer perceptron.
+     */
+    private final MultilayerPerceptron mlp;
+
+    /**
+     * Construct instance of this class with given parameters.
+     *
+     * @param arch Architecture of multilayer perceptron.
+     * @param init Initializer of multilayer perceptron.
+     * @param networksCnt Count of networks to be trained in parallel by {@link MLPGroupUpdateTrainer}.
+     * @param cache Cache with labeled vectors.
+     * @param batchSize Size of batch to return on each training iteration.
+     */
+    public MLPGroupUpdateTrainerCacheInput(MLPArchitecture arch, MLPInitializer init,
+        int networksCnt, IgniteCache<Integer, LabeledVector<Vector, Vector>> cache,
+        int batchSize) {
+        super(networksCnt);
+
+        this.batchSize = batchSize;
+        this.cache = cache;
+        this.mlp = new MultilayerPerceptron(arch, init);
+    }
+
+    /**
+     * Construct instance of this class with given parameters and default initializer.
+     *
+     * @param arch Architecture of multilayer perceptron.
+     * @param networksCnt Count of networks to be trained in parallel by {@link MLPGroupUpdateTrainer}.
+     * @param cache Cache with labeled vectors.
+     * @param batchSize Size of batch to return on each training iteration.
+     */
+    public MLPGroupUpdateTrainerCacheInput(MLPArchitecture arch, int networksCnt,
+        IgniteCache<Integer, LabeledVector<Vector, Vector>> cache,
+        int batchSize) {
+        this(arch, null, networksCnt, cache, batchSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier() {
+        String cName = cache.getName();
+        int bs = batchSize;
+
+        return () -> {
+            Ignite ignite = Ignition.localIgnite();
+            IgniteCache<Integer, LabeledVector<Vector, Vector>> cache = ignite.getOrCreateCache(cName);
+            int total = cache.size();
+            Affinity<Integer> affinity = ignite.affinity(cName);
+
+            List<Integer> allKeys = IntStream.range(0, total).boxed().collect(Collectors.toList());
+            List<Integer> keys = new ArrayList<>(affinity.mapKeysToNodes(allKeys).get(ignite.cluster().localNode()));
+
+            int locKeysCnt = keys.size();
+
+            int[] selected = Utils.selectKDistinct(locKeysCnt, Math.min(bs, locKeysCnt));
+
+            // Get dimensions of vectors in cache. We suppose that every feature vector has
+            // same dimension d 1 and every label has the same dimension d2.
+            LabeledVector<Vector, Vector> dimEntry = cache.get(keys.get(selected[0]));
+
+            Matrix inputs = new DenseLocalOnHeapMatrix(dimEntry.features().size(), bs);
+            Matrix groundTruth = new DenseLocalOnHeapMatrix(dimEntry.label().size(), bs);
+
+            for (int i = 0; i < selected.length; i++) {
+                LabeledVector<Vector, Vector> labeled = cache.get(selected[i]);
+
+                inputs.assignColumn(i, labeled.features());
+                groundTruth.assignColumn(i, labeled.label());
+            }
+
+            return new IgniteBiTuple<>(inputs, groundTruth);
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public MultilayerPerceptron mdl() {
+        return mlp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/MLPLayer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/MLPLayer.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/MLPLayer.java
index 621dc9f..b5120c6 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/nn/MLPLayer.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/MLPLayer.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.ml.nn;
 
+import java.io.Serializable;
 import org.apache.ignite.ml.math.Matrix;
 import org.apache.ignite.ml.math.Vector;
 
 /**
  * Class containing information about layer.
  */
-public class MLPLayer {
+public class MLPLayer implements Serializable {
     /**
      * Weights matrix.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/MultilayerPerceptron.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/MultilayerPerceptron.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/MultilayerPerceptron.java
index e372d96..d55e0e9 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/nn/MultilayerPerceptron.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/MultilayerPerceptron.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.ml.nn;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -41,7 +42,7 @@ import static org.apache.ignite.ml.math.util.MatrixUtil.elementWiseTimes;
 /**
  * Class encapsulating logic of multilayer perceptron.
  */
-public class MultilayerPerceptron implements Model<Matrix, Matrix>, SmoothParametrized<MultilayerPerceptron> {
+public class MultilayerPerceptron implements Model<Matrix, Matrix>, SmoothParametrized<MultilayerPerceptron>, Serializable {
     /**
      * This MLP architecture.
      */
@@ -68,7 +69,7 @@ public class MultilayerPerceptron implements Model<Matrix, Matrix>, SmoothParame
         architecture = arch;
         below = null;
 
-        initLayers(initializer);
+        initLayers(initializer != null ? initializer : new RandomInitializer(new Random()));
     }
 
     /**
@@ -77,11 +78,7 @@ public class MultilayerPerceptron implements Model<Matrix, Matrix>, SmoothParame
      * @param arch Architecture.
      */
     public MultilayerPerceptron(MLPArchitecture arch) {
-        layers = new ArrayList<>(arch.layersCount() + 1);
-        architecture = arch;
-        below = null;
-
-        initLayers(new RandomInitializer(new Random()));
+        this(arch, null);
     }
 
     /**
@@ -389,7 +386,7 @@ public class MultilayerPerceptron implements Model<Matrix, Matrix>, SmoothParame
             if (hasBiases(layer))
                 db = dz.foldRows(Vector::sum).times(invBatchSize);
 
-            // Because we go from last layer, add each layer to the begining.
+            // Because we go from last layer, add each layer to the beginning.
             layersParameters.add(0, new MLPLayer(dw, db));
         }
 
@@ -555,7 +552,8 @@ public class MultilayerPerceptron implements Model<Matrix, Matrix>, SmoothParame
      * @param nonlinearity Nonlinearity of current layer.
      * @return Gradients matrix.
      */
-    private Matrix differentiateNonlinearity(Matrix linearOut, IgniteDifferentiableDoubleToDoubleFunction nonlinearity) {
+    private Matrix differentiateNonlinearity(Matrix linearOut,
+        IgniteDifferentiableDoubleToDoubleFunction nonlinearity) {
         Matrix diff = linearOut.copy();
 
         diff.map(nonlinearity::differential);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/architecture/LayerArchitecture.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/architecture/LayerArchitecture.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/architecture/LayerArchitecture.java
index 31a3e9a..4ede888 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/nn/architecture/LayerArchitecture.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/architecture/LayerArchitecture.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.ml.nn.architecture;
 
+import java.io.Serializable;
+
 /**
  * Layer architecture.
  */
-public class LayerArchitecture {
+public class LayerArchitecture implements Serializable {
     /**
      * Count of neurons on layer.
      */
@@ -37,6 +39,7 @@ public class LayerArchitecture {
 
     /**
      * Get count of neurons in layer.
+     *
      * @return Count of neurons in layer.
      */
     public int neuronsCount() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/architecture/MLPArchitecture.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/architecture/MLPArchitecture.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/architecture/MLPArchitecture.java
index 3ef7b61..4018328 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/nn/architecture/MLPArchitecture.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/architecture/MLPArchitecture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.ml.nn.architecture;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.ml.math.functions.IgniteDifferentiableDoubleToDoubleFunction;
@@ -24,7 +25,7 @@ import org.apache.ignite.ml.math.functions.IgniteDifferentiableDoubleToDoubleFun
 /**
  * Class containing information about architecture of MLP.
  */
-public class MLPArchitecture {
+public class MLPArchitecture implements Serializable {
     /**
      * List of layers architectures.
      */
@@ -51,6 +52,7 @@ public class MLPArchitecture {
 
     /**
      * Count of layers in MLP.
+     *
      * @return Layers count.
      */
     public int layersCount() {
@@ -59,6 +61,7 @@ public class MLPArchitecture {
 
     /**
      * Size of input of MLP.
+     *
      * @return Size of input.
      */
     public int inputSize() {
@@ -67,6 +70,7 @@ public class MLPArchitecture {
 
     /**
      * Size of output of MLP.
+     *
      * @return Size of output.
      */
     public int outputSize() {
@@ -81,7 +85,8 @@ public class MLPArchitecture {
      * @param f Activation function of a new layer.
      * @return New MLP architecture with new layer added on top of all this architecture layers.
      */
-    public MLPArchitecture withAddedLayer(int neuronsCnt, boolean hasBias, IgniteDifferentiableDoubleToDoubleFunction f) {
+    public MLPArchitecture withAddedLayer(int neuronsCnt, boolean hasBias,
+        IgniteDifferentiableDoubleToDoubleFunction f) {
         ArrayList<LayerArchitecture> newLayers = new ArrayList<>(layers);
 
         newLayers.add(new TransformationLayerArchitecture(neuronsCnt, hasBias, f));

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/initializers/RandomInitializer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/initializers/RandomInitializer.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/initializers/RandomInitializer.java
index 18cb8a6..25c27cd 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/nn/initializers/RandomInitializer.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/initializers/RandomInitializer.java
@@ -28,7 +28,7 @@ public class RandomInitializer implements MLPInitializer {
     /**
      * RNG.
      */
-    Random rnd;
+    private final Random rnd;
 
     /**
      * Construct RandomInitializer from given RNG.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/AbstractMLPGroupUpdateTrainerInput.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/AbstractMLPGroupUpdateTrainerInput.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/AbstractMLPGroupUpdateTrainerInput.java
new file mode 100644
index 0000000..ed65af7
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/AbstractMLPGroupUpdateTrainerInput.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn.trainers.distributed;
+
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.ml.math.functions.IgniteSupplier;
+import org.apache.ignite.ml.nn.LocalBatchTrainerInput;
+import org.apache.ignite.ml.nn.MultilayerPerceptron;
+import org.apache.ignite.ml.trainers.group.GroupTrainerCacheKey;
+import org.apache.ignite.ml.trainers.group.GroupTrainerInput;
+
+/**
+ * Abstract class for {@link MLPGroupUpdateTrainer} inputs.
+ */
+public abstract class AbstractMLPGroupUpdateTrainerInput implements GroupTrainerInput<Void>, LocalBatchTrainerInput<MultilayerPerceptron> {
+    /**
+     * Count of networks to be trained in parallel.
+     */
+    private final int networksCnt;
+
+    /**
+     * Construct instance of this class with given parameters.
+     *
+     * @param networksCnt Count of networks to be trained in parallel.
+     */
+    public AbstractMLPGroupUpdateTrainerInput(int networksCnt) {
+        this.networksCnt = networksCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteSupplier<Stream<GroupTrainerCacheKey<Void>>> initialKeys(UUID trainingUUID) {
+        final int nt = networksCnt; // IMPL NOTE intermediate variable is intended to have smaller lambda
+        return () -> MLPCache.allKeys(nt, trainingUUID);
+    }
+
+    /**
+     * Get count of networks to be trained in parallel.
+     *
+     * @return Count of networks.
+     */
+    public int trainingsCount() {
+        return networksCnt;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPCache.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPCache.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPCache.java
new file mode 100644
index 0000000..0fa2f29
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPCache.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn.trainers.distributed;
+
+import java.util.UUID;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.ml.trainers.group.GroupTrainerCacheKey;
+
+/**
+ * Cache for distributed MLP.
+ */
+public class MLPCache {
+    /**
+     * Cache name.
+     */
+    public static String CACHE_NAME = "MLP_CACHE";
+
+    /**
+     * Affinity service for region projections cache.
+     *
+     * @return Affinity service for region projections cache.
+     */
+    public static Affinity<GroupTrainerCacheKey<Void>> affinity() {
+        return Ignition.localIgnite().affinity(CACHE_NAME);
+    }
+
+    /**
+     * Get or create region projections cache.
+     *
+     * @param ignite Ignite instance.
+     * @return Region projections cache.
+     */
+    public static IgniteCache<GroupTrainerCacheKey<Void>, MLPGroupTrainingCacheValue> getOrCreate(Ignite ignite) {
+        CacheConfiguration<GroupTrainerCacheKey<Void>, MLPGroupTrainingCacheValue> cfg = new CacheConfiguration<>();
+
+        // Write to primary.
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+
+        // Atomic transactions only.
+        cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+        // No copying of values.
+        cfg.setCopyOnRead(false);
+
+        // Cache is partitioned.
+        cfg.setCacheMode(CacheMode.PARTITIONED);
+
+        cfg.setBackups(0);
+
+        cfg.setOnheapCacheEnabled(true);
+
+        cfg.setName(CACHE_NAME);
+
+        return ignite.getOrCreateCache(cfg);
+    }
+
+    /**
+     * Get all keys of this cache for given parameters.
+     *
+     * @param trainingsCnt Parallel trainings count.
+     * @param uuid Training UUID.
+     * @return All keys of this cache for given parameters.
+     */
+    public static Stream<GroupTrainerCacheKey<Void>> allKeys(int trainingsCnt, UUID uuid) {
+        return IntStream.range(0, trainingsCnt).mapToObj(i -> new GroupTrainerCacheKey<Void>(i, null, uuid));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupTrainingCacheValue.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupTrainingCacheValue.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupTrainingCacheValue.java
new file mode 100644
index 0000000..f8e75f6
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupTrainingCacheValue.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn.trainers.distributed;
+
+import org.apache.ignite.ml.nn.MultilayerPerceptron;
+
+/**
+ * Value of cache used for group training by {@link MLPGroupUpdateTrainer}.
+ */
+public class MLPGroupTrainingCacheValue {
+    /**
+     * Multilayer perceptron.
+     */
+    private MultilayerPerceptron mlp;
+
+    /**
+     * Construct instance of this class with given parameters.
+     *
+     * @param mlp Multilayer perceptron.
+     */
+    public MLPGroupTrainingCacheValue(MultilayerPerceptron mlp) {
+        this.mlp = mlp;
+    }
+
+    /**
+     * Get multilayer perceptron.
+     *
+     * @return Multilayer perceptron.
+     */
+    public MultilayerPerceptron perceptron() {
+        return mlp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainer.java
new file mode 100644
index 0000000..1f98b53
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainer.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn.trainers.distributed;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.functions.IgniteDifferentiableVectorToDoubleFunction;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.math.functions.IgniteSupplier;
+import org.apache.ignite.ml.math.util.MatrixUtil;
+import org.apache.ignite.ml.nn.LossFunctions;
+import org.apache.ignite.ml.nn.MultilayerPerceptron;
+import org.apache.ignite.ml.nn.updaters.ParameterUpdateCalculator;
+import org.apache.ignite.ml.nn.updaters.RPropParameterUpdate;
+import org.apache.ignite.ml.nn.updaters.RPropUpdateCalculator;
+import org.apache.ignite.ml.trainers.group.GroupTrainerCacheKey;
+import org.apache.ignite.ml.trainers.group.MetaoptimizerGroupTrainer;
+import org.apache.ignite.ml.trainers.group.ResultAndUpdates;
+import org.apache.ignite.ml.trainers.group.chain.EntryAndContext;
+import org.apache.ignite.ml.util.Utils;
+
+/**
+ * Update-based distributed training of MLP.
+ *
+ * @param <U> Type of update.
+ */
+public class MLPGroupUpdateTrainer<U extends Serializable> extends
+    MetaoptimizerGroupTrainer<MLPGroupUpdateTrainerLocalContext,
+        Void,
+        MLPGroupTrainingCacheValue,
+        U,
+        MultilayerPerceptron,
+        U,
+        MultilayerPerceptron,
+        AbstractMLPGroupUpdateTrainerInput,
+        MLPGroupUpdateTrainingContext<U>,
+        ArrayList<U>,
+        MLPGroupUpdateTrainingLoopData<U>,
+        U> {
+    /**
+     * Loss function.
+     */
+    private final IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss;
+
+    /**
+     * Error tolerance.
+     */
+    private final double tolerance;
+
+    /**
+     * Maximal count of global steps.
+     */
+    private final int maxGlobalSteps;
+
+    /**
+     * Synchronize updates between networks every syncRate steps.
+     */
+    private final int syncRate;
+
+    /**
+     * Function used to reduce updates from different networks (for example, averaging of gradients of all networks).
+     */
+    private final IgniteFunction<List<U>, U> allUpdatesReducer;
+
+    /**
+     * Function used to reduce updates in one training (for example, sum all sequential gradient updates to get one
+     * gradient update).
+     */
+    private final IgniteFunction<List<U>, U> locStepUpdatesReducer;
+
+    /**
+     * Updates calculator.
+     */
+    private final ParameterUpdateCalculator<MultilayerPerceptron, U> updateCalculator;
+
+    /**
+     * Default maximal count of global steps.
+     */
+    private static final int DEFAULT_MAX_GLOBAL_STEPS = 30;
+
+    /**
+     * Default sync rate.
+     */
+    private static final int DEFAULT_SYNC_RATE = 5;
+
+    /**
+     * Default all updates reducer.
+     */
+    private static final IgniteFunction<List<RPropParameterUpdate>, RPropParameterUpdate>
+        DEFAULT_ALL_UPDATES_REDUCER = RPropParameterUpdate::avg;
+
+    /**
+     * Default local steps updates reducer.
+     */
+    private static final IgniteFunction<List<RPropParameterUpdate>, RPropParameterUpdate>
+        DEFAULT_LOCAL_STEP_UPDATES_REDUCER = RPropParameterUpdate::sumLocal;
+
+    /**
+     * Default update calculator.
+     */
+    private static final ParameterUpdateCalculator<MultilayerPerceptron, RPropParameterUpdate>
+        DEFAULT_UPDATE_CALCULATOR = new RPropUpdateCalculator<>();
+
+    /**
+     * Default loss function.
+     */
+    private static final IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> DEFAULT_LOSS
+        = LossFunctions.MSE;
+
+    /**
+     * Construct instance of this class with given parameters.
+     *
+     * @param loss Loss function.
+     * @param ignite Ignite instance.
+     * @param tolerance Error tolerance.
+     */
+    public MLPGroupUpdateTrainer(int maxGlobalSteps,
+        int syncRate,
+        IgniteFunction<List<U>, U> allUpdatesReducer,
+        IgniteFunction<List<U>, U> locStepUpdatesReducer,
+        ParameterUpdateCalculator<MultilayerPerceptron, U> updateCalculator,
+        IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss,
+        Ignite ignite, double tolerance) {
+        super(new MLPMetaoptimizer<>(allUpdatesReducer), MLPCache.getOrCreate(ignite), ignite);
+
+        this.maxGlobalSteps = maxGlobalSteps;
+        this.syncRate = syncRate;
+        this.allUpdatesReducer = allUpdatesReducer;
+        this.locStepUpdatesReducer = locStepUpdatesReducer;
+        this.updateCalculator = updateCalculator;
+        this.loss = loss;
+        this.tolerance = tolerance;
+    }
+
+    /**
+     * Get default {@link MLPGroupUpdateTrainer}.
+     *
+     * @param ignite Ignite instance.
+     * @return Default {@link MLPGroupUpdateTrainer}.
+     */
+    public static MLPGroupUpdateTrainer<RPropParameterUpdate> getDefault(Ignite ignite) {
+        return new MLPGroupUpdateTrainer<>(DEFAULT_MAX_GLOBAL_STEPS, DEFAULT_SYNC_RATE, DEFAULT_ALL_UPDATES_REDUCER,
+            DEFAULT_LOCAL_STEP_UPDATES_REDUCER, DEFAULT_UPDATE_CALCULATOR, DEFAULT_LOSS, ignite, 0.01);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void init(AbstractMLPGroupUpdateTrainerInput data, UUID trainingUUID) {
+        super.init(data, trainingUUID);
+
+        MLPGroupUpdateTrainerDataCache.getOrCreate(ignite).put(trainingUUID, new MLPGroupUpdateTrainingData<>(
+            updateCalculator,
+            syncRate,
+            locStepUpdatesReducer,
+            data.batchSupplier(),
+            loss,
+            tolerance
+        ));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteFunction<GroupTrainerCacheKey<Void>, ResultAndUpdates<U>> distributedInitializer(
+        AbstractMLPGroupUpdateTrainerInput data) {
+        MultilayerPerceptron initPerceptron = data.mdl();
+
+        // For each key put initial network into the cache.
+        return key -> {
+            Ignite ignite = Ignition.localIgnite();
+
+            U initUpdate = updateCalculator.init(initPerceptron, loss);
+
+            return ResultAndUpdates.of(initUpdate).updateCache(MLPCache.getOrCreate(ignite), key,
+                new MLPGroupTrainingCacheValue(initPerceptron));
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteFunction<EntryAndContext<Void, MLPGroupTrainingCacheValue,
+        MLPGroupUpdateTrainingContext<U>>, MLPGroupUpdateTrainingLoopData<U>> trainingLoopStepDataExtractor() {
+        return entryAndContext -> {
+            MLPGroupUpdateTrainingContext<U> ctx = entryAndContext.context();
+            Map.Entry<GroupTrainerCacheKey<Void>, MLPGroupTrainingCacheValue> entry = entryAndContext.entry();
+            MLPGroupUpdateTrainingData<U> data = ctx.data();
+
+            return new MLPGroupUpdateTrainingLoopData<>(entry.getValue().perceptron(),
+                data.updateCalculator(), data.stepsCnt(), data.updateReducer(), ctx.previousUpdate(), entry.getKey(),
+                data.batchSupplier(), data.loss(), data.tolerance());
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteSupplier<Stream<GroupTrainerCacheKey<Void>>> keysToProcessInTrainingLoop(
+        MLPGroupUpdateTrainerLocalContext locCtx) {
+        int trainingsCnt = locCtx.parallelTrainingsCnt();
+        UUID uuid = locCtx.trainingUUID();
+
+        return () -> MLPCache.allKeys(trainingsCnt, uuid);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteSupplier<MLPGroupUpdateTrainingContext<U>> remoteContextExtractor(U prevUpdate,
+        MLPGroupUpdateTrainerLocalContext ctx) {
+        UUID uuid = ctx.trainingUUID();
+
+        return () -> {
+            MLPGroupUpdateTrainingData<U> data = MLPGroupUpdateTrainerDataCache
+                .getOrCreate(Ignition.localIgnite()).get(uuid);
+            return new MLPGroupUpdateTrainingContext<>(data, prevUpdate);
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteFunction<MLPGroupUpdateTrainingLoopData<U>, ResultAndUpdates<U>> dataProcessor() {
+        return data -> {
+            MultilayerPerceptron mlp = data.mlp();
+
+            MultilayerPerceptron mlpCp = Utils.copy(mlp);
+            ParameterUpdateCalculator<MultilayerPerceptron, U> updateCalculator = data.updateCalculator();
+            IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss = data.loss();
+
+            // ParameterUpdateCalculator API to have proper way to setting loss.
+            updateCalculator.init(mlpCp, loss);
+
+            U curUpdate = data.previousUpdate();
+
+            int steps = data.stepsCnt();
+            List<U> updates = new ArrayList<>(steps);
+
+            IgniteBiTuple<Matrix, Matrix> batch = data.batchSupplier().get();
+
+            for (int i = 0; i < steps; i++) {
+                Matrix input = batch.get1();
+                Matrix truth = batch.get2();
+
+                int batchSize = truth.columnSize();
+
+                Matrix predicted = mlpCp.apply(input);
+
+                double err = MatrixUtil.zipFoldByColumns(predicted, truth, (predCol, truthCol) ->
+                    loss.apply(truthCol).apply(predCol)).sum() / batchSize;
+
+                if (err < data.tolerance())
+                    break;
+
+                mlpCp = updateCalculator.update(mlpCp, curUpdate);
+                updates.add(curUpdate);
+
+                curUpdate = updateCalculator.calculateNewUpdate(mlpCp, curUpdate, i, input, truth);
+            }
+
+            U update = data.getUpdateReducer().apply(updates);
+
+            MultilayerPerceptron newMlp = updateCalculator.update(mlp, data.previousUpdate());
+
+            return new ResultAndUpdates<>(update).
+                updateCache(MLPCache.getOrCreate(Ignition.localIgnite()), data.key(),
+                    new MLPGroupTrainingCacheValue(newMlp));
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected MLPGroupUpdateTrainerLocalContext<U> initialLocalContext(
+        AbstractMLPGroupUpdateTrainerInput data, UUID trainingUUID) {
+        return new MLPGroupUpdateTrainerLocalContext<>(trainingUUID, maxGlobalSteps, allUpdatesReducer,
+            data.trainingsCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteSupplier<Stream<GroupTrainerCacheKey<Void>>> finalResultKeys(U data,
+        MLPGroupUpdateTrainerLocalContext locCtx) {
+        UUID uuid = locCtx.trainingUUID();
+        int trainingsCnt = locCtx.parallelTrainingsCnt();
+
+        return () -> MLPCache.allKeys(trainingsCnt, uuid);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteSupplier<MLPGroupUpdateTrainingContext<U>> extractContextForFinalResultCreation(U data,
+        MLPGroupUpdateTrainerLocalContext locCtx) {
+        return () -> null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteFunction<EntryAndContext<Void, MLPGroupTrainingCacheValue,
+        MLPGroupUpdateTrainingContext<U>>, ResultAndUpdates<MultilayerPerceptron>> finalResultsExtractor() {
+        return context -> ResultAndUpdates.of(context.entry().getValue().perceptron());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteFunction<List<MultilayerPerceptron>, MultilayerPerceptron> finalResultsReducer() {
+        // Just take any of MLPs since they will be in the same state.
+        return mlps -> mlps.stream().filter(Objects::nonNull).findFirst().orElse(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected MultilayerPerceptron mapFinalResult(MultilayerPerceptron res,
+        MLPGroupUpdateTrainerLocalContext locCtx) {
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cleanup(MLPGroupUpdateTrainerLocalContext locCtx) {
+
+    }
+
+    /**
+     * Create new {@link MLPGroupUpdateTrainer} with new maxGlobalSteps value.
+     *
+     * @param maxGlobalSteps New maxGlobalSteps value.
+     * @return New {@link MLPGroupUpdateTrainer} with new maxGlobalSteps value.
+     */
+    public MLPGroupUpdateTrainer<U> withMaxGlobalSteps(int maxGlobalSteps) {
+        return new MLPGroupUpdateTrainer<>(maxGlobalSteps, syncRate, allUpdatesReducer, locStepUpdatesReducer,
+            updateCalculator, loss, ignite, tolerance);
+    }
+
+    /**
+     * Create new {@link MLPGroupUpdateTrainer} with new syncRate value.
+     *
+     * @param syncRate New syncRate value.
+     * @return New {@link MLPGroupUpdateTrainer} with new syncRate value.
+     */
+    public MLPGroupUpdateTrainer<U> withSyncRate(int syncRate) {
+        return new MLPGroupUpdateTrainer<>(maxGlobalSteps, syncRate
+            , allUpdatesReducer, locStepUpdatesReducer, updateCalculator, loss, ignite, tolerance);
+    }
+
+    /**
+     * Create new {@link MLPGroupUpdateTrainer} with new tolerance.
+     *
+     * @param tolerance New tolerance value.
+     * @return New {@link MLPGroupUpdateTrainer} with new tolerance value.
+     */
+    public MLPGroupUpdateTrainer<U> withTolerance(double tolerance) {
+        return new MLPGroupUpdateTrainer<>(maxGlobalSteps, syncRate, allUpdatesReducer, locStepUpdatesReducer,
+            updateCalculator, loss, ignite, tolerance);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainerDataCache.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainerDataCache.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainerDataCache.java
new file mode 100644
index 0000000..c237f86
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainerDataCache.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn.trainers.distributed;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Cache used for storing data for {@link MLPGroupUpdateTrainer}.
+ */
+public class MLPGroupUpdateTrainerDataCache {
+    /**
+     * Cache name.
+     */
+    public static String CACHE_NAME = "MLP_GRP_TRN_DATA_CACHE";
+
+    /**
+     * Affinity service for region projections cache.
+     *
+     * @return Affinity service for region projections cache.
+     */
+    public static Affinity<UUID> affinity() {
+        return Ignition.localIgnite().affinity(CACHE_NAME);
+    }
+
+    /**
+     * Get or create region projections cache.
+     *
+     * @param ignite Ignite instance.
+     * @return Region projections cache.
+     */
+    public static IgniteCache<UUID, MLPGroupUpdateTrainingData> getOrCreate(Ignite ignite) {
+        CacheConfiguration<UUID, MLPGroupUpdateTrainingData> cfg = new CacheConfiguration<>();
+
+        // Write to primary.
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+
+        // Atomic transactions only.
+        cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+        // No copying of values.
+        cfg.setCopyOnRead(false);
+
+        // Cache is partitioned.
+        cfg.setCacheMode(CacheMode.REPLICATED);
+
+        cfg.setBackups(0);
+
+        cfg.setOnheapCacheEnabled(true);
+
+        cfg.setName(CACHE_NAME);
+
+        return ignite.getOrCreateCache(cfg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainerLocalContext.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainerLocalContext.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainerLocalContext.java
new file mode 100644
index 0000000..ecb141d
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainerLocalContext.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn.trainers.distributed;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.trainers.group.chain.HasTrainingUUID;
+
+/**
+ * Local context for {@link MLPGroupUpdateTrainer}.
+ *
+ * @param <U> Type of updates on which training is done.
+ */
+public class MLPGroupUpdateTrainerLocalContext<U> implements HasTrainingUUID {
+    /**
+     * UUID of training.
+     */
+    private final UUID trainingUUID;
+
+    /**
+     * Maximal number of global steps.
+     */
+    private final int globalStepsMaxCnt;
+
+    /**
+     * Reducer used to reduce updates resulted from each parallel training.
+     */
+    private final IgniteFunction<List<U>, U> allUpdatesReducer;
+
+    /**
+     * Count of networks to be trained in parallel.
+     */
+    private final int parallelTrainingsCnt;
+
+    /**
+     * Current global step of {@link MLPGroupUpdateTrainer}.
+     */
+    private int curStep;
+
+    /** Create multilayer perceptron group update trainer local context. */
+    public MLPGroupUpdateTrainerLocalContext(UUID trainingUUID, int globalStepsMaxCnt,
+        IgniteFunction<List<U>, U> allUpdatesReducer, int parallelTrainingsCnt) {
+        this.trainingUUID = trainingUUID;
+        this.globalStepsMaxCnt = globalStepsMaxCnt;
+        this.allUpdatesReducer = allUpdatesReducer;
+        this.parallelTrainingsCnt = parallelTrainingsCnt;
+        curStep = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID trainingUUID() {
+        return trainingUUID;
+    }
+
+    /**
+     * Get global steps max count.
+     *
+     * @return Global steps max count.
+     */
+    public int globalStepsMaxCount() {
+        return globalStepsMaxCnt;
+    }
+
+    /**
+     * Get reducer used to reduce updates resulted from each parallel training.
+     *
+     * @return Reducer used to reduce updates resulted from each parallel training.
+     */
+    public IgniteFunction<List<U>, U> allUpdatesReducer() {
+        return allUpdatesReducer;
+    }
+
+    /**
+     * Get count of networks to be trained in parallel.
+     *
+     * @return Count of networks to be trained in parallel.
+     */
+    public int parallelTrainingsCnt() {
+        return parallelTrainingsCnt;
+    }
+
+    /**
+     * Get current global step.
+     *
+     * @return Current global step.
+     */
+    public int currentStep() {
+        return curStep;
+    }
+
+    /**
+     * Increment current global step.
+     *
+     * @return This object.
+     */
+    public MLPGroupUpdateTrainerLocalContext<U> incrementCurrentStep() {
+        curStep++;
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingContext.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingContext.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingContext.java
new file mode 100644
index 0000000..f4ccd98
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn.trainers.distributed;
+
+/**
+ * Context extracted in distribute phase of training loop step in {@link MLPGroupUpdateTrainer}.
+ *
+ * @param <U> Type of update.
+ */
+public class MLPGroupUpdateTrainingContext<U> {
+    /**
+     * Group training data.
+     */
+    private final MLPGroupUpdateTrainingData<U> data;
+
+    /**
+     * Update produced by previous training loop step.
+     */
+    private final U previousUpdate;
+
+    /**
+     * Construct an instance of this class.
+     *
+     * @param data Group training data.
+     * @param previousUpdate Update produced by previous training loop step.
+     */
+    public MLPGroupUpdateTrainingContext(MLPGroupUpdateTrainingData<U> data, U previousUpdate) {
+        this.data = data;
+        this.previousUpdate = previousUpdate;
+    }
+
+    /**
+     * Get group training data.
+     *
+     * @return Group training data.
+     */
+    public MLPGroupUpdateTrainingData<U> data() {
+        return data;
+    }
+
+    /**
+     * Get update produced by previous training loop step.
+     *
+     * @return Update produced by previous training loop step.
+     */
+    public U previousUpdate() {
+        return previousUpdate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingData.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingData.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingData.java
new file mode 100644
index 0000000..86074dd
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingData.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn.trainers.distributed;
+
+import java.util.List;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.functions.IgniteDifferentiableVectorToDoubleFunction;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.math.functions.IgniteSupplier;
+import org.apache.ignite.ml.nn.MultilayerPerceptron;
+import org.apache.ignite.ml.nn.updaters.ParameterUpdateCalculator;
+
+/** Multilayer perceptron group update training data. */
+public class MLPGroupUpdateTrainingData<U> {
+    /** */
+    private final ParameterUpdateCalculator<MultilayerPerceptron, U> updateCalculator;
+    /** */
+    private final int stepsCnt;
+    /** */
+    private final IgniteFunction<List<U>, U> updateReducer;
+    /** */
+    private final IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier;
+    /** */
+    private final IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss;
+    /** */
+    private final double tolerance;
+
+    /** Construct multilayer perceptron group update training data with all parameters provided. */
+    public MLPGroupUpdateTrainingData(
+        ParameterUpdateCalculator<MultilayerPerceptron, U> updateCalculator, int stepsCnt,
+        IgniteFunction<List<U>, U> updateReducer,
+        IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier,
+        IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss, double tolerance) {
+        this.updateCalculator = updateCalculator;
+        this.stepsCnt = stepsCnt;
+        this.updateReducer = updateReducer;
+        this.batchSupplier = batchSupplier;
+        this.loss = loss;
+        this.tolerance = tolerance;
+    }
+
+    /** Get update calculator. */
+    public ParameterUpdateCalculator<MultilayerPerceptron, U> updateCalculator() {
+        return updateCalculator;
+    }
+
+    /** Get count of steps. */
+    public int stepsCnt() {
+        return stepsCnt;
+    }
+
+    /** Get update reducer. */
+    public IgniteFunction<List<U>, U> updateReducer() {
+        return updateReducer;
+    }
+
+    /** Get batch supplier. */
+    public IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier() {
+        return batchSupplier;
+    }
+
+    /** Get loss function. */
+    public IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss() {
+        return loss;
+    }
+
+    /** Get tolerance. */
+    public double tolerance() {
+        return tolerance;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingLoopData.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingLoopData.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingLoopData.java
new file mode 100644
index 0000000..0f3d974
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPGroupUpdateTrainingLoopData.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn.trainers.distributed;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.ml.math.Matrix;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.functions.IgniteDifferentiableVectorToDoubleFunction;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.math.functions.IgniteSupplier;
+import org.apache.ignite.ml.nn.MultilayerPerceptron;
+import org.apache.ignite.ml.nn.updaters.ParameterUpdateCalculator;
+import org.apache.ignite.ml.trainers.group.GroupTrainerCacheKey;
+
+/** Multilayer perceptron group update training loop data. */
+public class MLPGroupUpdateTrainingLoopData<P> implements Serializable {
+    /** */
+    private final ParameterUpdateCalculator<MultilayerPerceptron, P> updateCalculator;
+    /** */
+    private final int stepsCnt;
+    /** */
+    private final IgniteFunction<List<P>, P> updateReducer;
+    /** */
+    private final P previousUpdate;
+    /** */
+    private final IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier;
+    /** */
+    private final IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss;
+    /** */
+    private final double tolerance;
+
+    /** */
+    private final GroupTrainerCacheKey<Void> key;
+    /** */
+    private final MultilayerPerceptron mlp;
+
+    /** Create multilayer perceptron group update training loop data. */
+    public MLPGroupUpdateTrainingLoopData(MultilayerPerceptron mlp,
+        ParameterUpdateCalculator<MultilayerPerceptron, P> updateCalculator, int stepsCnt,
+        IgniteFunction<List<P>, P> updateReducer, P previousUpdate,
+        GroupTrainerCacheKey<Void> key, IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier,
+        IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss,
+        double tolerance) {
+        this.mlp = mlp;
+        this.updateCalculator = updateCalculator;
+        this.stepsCnt = stepsCnt;
+        this.updateReducer = updateReducer;
+        this.previousUpdate = previousUpdate;
+        this.key = key;
+        this.batchSupplier = batchSupplier;
+        this.loss = loss;
+        this.tolerance = tolerance;
+    }
+
+    /** Get perceptron. */
+    public MultilayerPerceptron mlp() {
+        return mlp;
+    }
+
+    /** Get update calculator. */
+    public ParameterUpdateCalculator<MultilayerPerceptron, P> updateCalculator() {
+        return updateCalculator;
+    }
+
+    /** Get steps count. */
+    public int stepsCnt() {
+        return stepsCnt;
+    }
+
+    /** Get update reducer. */
+    public IgniteFunction<List<P>, P> getUpdateReducer() {
+        return updateReducer;
+    }
+
+    /** Get previous update. */
+    public P previousUpdate() {
+        return previousUpdate;
+    }
+
+    /** Get group trainer cache key. */
+    public GroupTrainerCacheKey<Void> key() {
+        return key;
+    }
+
+    /** Get batch supplier. */
+    public IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier() {
+        return batchSupplier;
+    }
+
+    /** Get loss function. */
+    public IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss() {
+        return loss;
+    }
+
+    /** Get tolerance. */
+    public double tolerance() {
+        return tolerance;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPMetaoptimizer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPMetaoptimizer.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPMetaoptimizer.java
new file mode 100644
index 0000000..249136b
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/distributed/MLPMetaoptimizer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.nn.trainers.distributed;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.trainers.group.Metaoptimizer;
+
+/** Meta-optimizer for multilayer perceptron. */
+public class MLPMetaoptimizer<P> implements Metaoptimizer<MLPGroupUpdateTrainerLocalContext,
+    MLPGroupUpdateTrainingLoopData<P>, P, P, P, ArrayList<P>> {
+    /** */
+    private final IgniteFunction<List<P>, P> allUpdatesReducer;
+
+    /** Construct metaoptimizer. */
+    public MLPMetaoptimizer(IgniteFunction<List<P>, P> allUpdatesReducer) {
+        this.allUpdatesReducer = allUpdatesReducer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFunction<List<P>, P> initialReducer() {
+        return allUpdatesReducer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public P locallyProcessInitData(P data, MLPGroupUpdateTrainerLocalContext locCtx) {
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFunction<P, ArrayList<P>> distributedPostprocessor() {
+        return p -> {
+            ArrayList<P> res = new ArrayList<>();
+            res.add(p);
+            return res;
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFunction<List<ArrayList<P>>, ArrayList<P>> postProcessReducer() {
+        // Flatten.
+        return lists -> new ArrayList<>(lists.stream()
+            .flatMap(List::stream)
+            .collect(Collectors.toList()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public P localProcessor(ArrayList<P> input, MLPGroupUpdateTrainerLocalContext locCtx) {
+        locCtx.incrementCurrentStep();
+
+        return allUpdatesReducer.apply(input);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean shouldContinue(P input, MLPGroupUpdateTrainerLocalContext locCtx) {
+        return input != null && locCtx.currentStep() < locCtx.globalStepsMaxCount();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/local/LocalBatchTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/local/LocalBatchTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/local/LocalBatchTrainer.java
index 64a1956..8579b82 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/local/LocalBatchTrainer.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/local/LocalBatchTrainer.java
@@ -28,19 +28,18 @@ import org.apache.ignite.ml.math.functions.IgniteFunction;
 import org.apache.ignite.ml.math.functions.IgniteSupplier;
 import org.apache.ignite.ml.math.util.MatrixUtil;
 import org.apache.ignite.ml.nn.LocalBatchTrainerInput;
-import org.apache.ignite.ml.nn.updaters.ParameterUpdater;
-import org.apache.ignite.ml.nn.updaters.UpdaterParams;
+import org.apache.ignite.ml.nn.updaters.ParameterUpdateCalculator;
 
 /**
  * Batch trainer. This trainer is not distributed on the cluster, but input can theoretically read data from
  * Ignite cache.
  */
-public class LocalBatchTrainer<M extends Model<Matrix, Matrix>, P extends UpdaterParams<? super M>>
+public class LocalBatchTrainer<M extends Model<Matrix, Matrix>, P>
     implements Trainer<M, LocalBatchTrainerInput<M>> {
     /**
      * Supplier for updater function.
      */
-    private final IgniteSupplier<ParameterUpdater<? super M, P>> updaterSupplier;
+    private final IgniteSupplier<ParameterUpdateCalculator<M, P>> updaterSupplier;
 
     /**
      * Error threshold.
@@ -71,7 +70,7 @@ public class LocalBatchTrainer<M extends Model<Matrix, Matrix>, P extends Update
      * @param maxIterations Maximal iterations count.
      */
     public LocalBatchTrainer(IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss,
-        IgniteSupplier<ParameterUpdater<? super M, P>> updaterSupplier, double errorThreshold, int maxIterations) {
+        IgniteSupplier<ParameterUpdateCalculator<M, P>> updaterSupplier, double errorThreshold, int maxIterations) {
         this.loss = loss;
         this.updaterSupplier = updaterSupplier;
         this.errorThreshold = errorThreshold;
@@ -84,19 +83,19 @@ public class LocalBatchTrainer<M extends Model<Matrix, Matrix>, P extends Update
         M mdl = data.mdl();
         double err;
 
-        ParameterUpdater<? super M, P> updater = updaterSupplier.get();
+        ParameterUpdateCalculator<? super M, P> updater = updaterSupplier.get();
 
         P updaterParams = updater.init(mdl, loss);
 
         while (i < maxIterations) {
-            IgniteBiTuple<Matrix, Matrix> batch = data.getBatch();
+            IgniteBiTuple<Matrix, Matrix> batch = data.batchSupplier().get();
             Matrix input = batch.get1();
             Matrix truth = batch.get2();
 
-            updaterParams = updater.updateParams(mdl, updaterParams, i, input, truth);
+            updaterParams = updater.calculateNewUpdate(mdl, updaterParams, i, input, truth);
 
             // Update mdl with updater parameters.
-            mdl = updaterParams.update(mdl);
+            mdl = updater.update(mdl, updaterParams);
 
             Matrix predicted = mdl.apply(input);
 
@@ -132,7 +131,7 @@ public class LocalBatchTrainer<M extends Model<Matrix, Matrix>, P extends Update
      * @param updaterSupplier New updater supplier.
      * @return new trainer with the same parameters as this trainer, but with new updater supplier.
      */
-    public LocalBatchTrainer withUpdater(IgniteSupplier<ParameterUpdater<? super M, P>> updaterSupplier) {
+    public LocalBatchTrainer withUpdater(IgniteSupplier<ParameterUpdateCalculator<M, P>> updaterSupplier) {
         return new LocalBatchTrainer<>(loss, updaterSupplier, errorThreshold, maxIterations);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/local/MLPLocalBatchTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/local/MLPLocalBatchTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/local/MLPLocalBatchTrainer.java
index 7065e2f..0c92395 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/local/MLPLocalBatchTrainer.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/trainers/local/MLPLocalBatchTrainer.java
@@ -23,17 +23,16 @@ import org.apache.ignite.ml.math.functions.IgniteFunction;
 import org.apache.ignite.ml.math.functions.IgniteSupplier;
 import org.apache.ignite.ml.nn.LossFunctions;
 import org.apache.ignite.ml.nn.MultilayerPerceptron;
-import org.apache.ignite.ml.nn.updaters.ParameterUpdater;
-import org.apache.ignite.ml.nn.updaters.RPropUpdater;
-import org.apache.ignite.ml.nn.updaters.RPropUpdaterParams;
-import org.apache.ignite.ml.nn.updaters.UpdaterParams;
+import org.apache.ignite.ml.nn.updaters.ParameterUpdateCalculator;
+import org.apache.ignite.ml.nn.updaters.RPropParameterUpdate;
+import org.apache.ignite.ml.nn.updaters.RPropUpdateCalculator;
 
 /**
  * Local batch trainer for MLP.
  *
  * @param <P> Parameter updater parameters.
  */
-public class MLPLocalBatchTrainer<P extends UpdaterParams<? super MultilayerPerceptron>>
+public class MLPLocalBatchTrainer<P>
     extends LocalBatchTrainer<MultilayerPerceptron, P> {
     /**
      * Default loss function.
@@ -51,7 +50,6 @@ public class MLPLocalBatchTrainer<P extends UpdaterParams<? super MultilayerPerc
      */
     private static final int DEFAULT_MAX_ITERATIONS = 100;
 
-
     /**
      * Construct a trainer.
      *
@@ -62,7 +60,7 @@ public class MLPLocalBatchTrainer<P extends UpdaterParams<? super MultilayerPerc
      */
     public MLPLocalBatchTrainer(
         IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss,
-        IgniteSupplier<ParameterUpdater<? super MultilayerPerceptron, P>> updaterSupplier,
+        IgniteSupplier<ParameterUpdateCalculator<MultilayerPerceptron, P>> updaterSupplier,
         double errorThreshold, int maxIterations) {
         super(loss, updaterSupplier, errorThreshold, maxIterations);
     }
@@ -72,7 +70,8 @@ public class MLPLocalBatchTrainer<P extends UpdaterParams<? super MultilayerPerc
      *
      * @return MLPLocalBatchTrainer with default parameters.
      */
-    public static MLPLocalBatchTrainer<RPropUpdaterParams> getDefault() {
-        return new MLPLocalBatchTrainer<>(DEFAULT_LOSS, RPropUpdater::new, DEFAULT_ERROR_THRESHOLD, DEFAULT_MAX_ITERATIONS);
+    public static MLPLocalBatchTrainer<RPropParameterUpdate> getDefault() {
+        return new MLPLocalBatchTrainer<>(DEFAULT_LOSS, () -> new RPropUpdateCalculator<>(), DEFAULT_ERROR_THRESHOLD,
+            DEFAULT_MAX_ITERATIONS);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f2fa8b3/modules/ml/src/main/java/org/apache/ignite/ml/nn/updaters/BaseSmoothParametrized.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/nn/updaters/BaseSmoothParametrized.java b/modules/ml/src/main/java/org/apache/ignite/ml/nn/updaters/BaseSmoothParametrized.java
index b33c2c7..8e2f0df 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/nn/updaters/BaseSmoothParametrized.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/nn/updaters/BaseSmoothParametrized.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.ml.nn.updaters;
 
+import org.apache.ignite.ml.Model;
 import org.apache.ignite.ml.math.Matrix;
 import org.apache.ignite.ml.math.Vector;
 import org.apache.ignite.ml.math.functions.IgniteDifferentiableVectorToDoubleFunction;
@@ -25,7 +26,7 @@ import org.apache.ignite.ml.math.functions.IgniteFunction;
 /**
  * Interface for models which are smooth functions of their parameters.
  */
-interface BaseSmoothParametrized<M extends BaseSmoothParametrized<M>> {
+interface BaseSmoothParametrized<M extends BaseSmoothParametrized<M> & Model<Matrix, Matrix>> {
     /**
      * Compose function in the following way: feed output of this model as input to second argument to loss function.
      * After that we have a function g of three arguments: input, ground truth, parameters.
@@ -39,7 +40,8 @@ interface BaseSmoothParametrized<M extends BaseSmoothParametrized<M>> {
      * @param truthBatch Batch of ground truths.
      * @return Gradient of h at current point in parameters space.
      */
-    Vector differentiateByParameters(IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss, Matrix inputsBatch, Matrix truthBatch);
+    Vector differentiateByParameters(IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss,
+        Matrix inputsBatch, Matrix truthBatch);
 
     /**
      * Get parameters vector.


Mime
View raw message