ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ch...@apache.org
Subject ignite git commit: IGNITE-8666: Add ability of filtering data during datasets creation
Date Fri, 01 Jun 2018 14:43:49 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 7a5aa7c6b -> 4b351b6bf


IGNITE-8666: Add ability of filtering data during datasets creation

this closes #4101


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b351b6b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b351b6b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b351b6b

Branch: refs/heads/master
Commit: 4b351b6bfb80cff534e37367cdc229de63d16e26
Parents: 7a5aa7c
Author: Anton Dmitriev <dmitrievanthony@gmail.com>
Authored: Fri Jun 1 17:43:43 2018 +0300
Committer: Yury Babak <ybabak@gridgain.com>
Committed: Fri Jun 1 17:43:43 2018 +0300

----------------------------------------------------------------------
 .../clustering/KMeansClusterizationExample.java |  8 +-
 .../examples/ml/knn/KNNRegressionExample.java   |  4 +-
 .../ml/preprocessing/BinarizationExample.java   |  1 -
 .../ml/preprocessing/ImputingExample.java       |  1 -
 .../dataset/impl/cache/CacheBasedDataset.java   | 10 ++-
 .../impl/cache/CacheBasedDatasetBuilder.java    | 22 ++++-
 .../dataset/impl/cache/util/ComputeUtils.java   | 76 +++++++++++++---
 ...eratorWithConcurrentModificationChecker.java | 80 +++++++++++++++++
 .../impl/cache/util/UpstreamCursorAdapter.java  | 68 ---------------
 .../dataset/impl/local/LocalDatasetBuilder.java | 38 ++++++--
 .../ml/knn/regression/KNNRegressionTrainer.java |  3 +-
 .../cache/CacheBasedDatasetBuilderTest.java     | 39 +++++++++
 .../impl/cache/util/ComputeUtilsTest.java       |  2 +
 ...orWithConcurrentModificationCheckerTest.java | 91 ++++++++++++++++++++
 .../impl/local/LocalDatasetBuilderTest.java     | 38 ++++++++
 .../imputing/ImputerTrainerTest.java            |  3 -
 16 files changed, 383 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/examples/src/main/java/org/apache/ignite/examples/ml/clustering/KMeansClusterizationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/clustering/KMeansClusterizationExample.java
b/examples/src/main/java/org/apache/ignite/examples/ml/clustering/KMeansClusterizationExample.java
index 8825ebb..cb140d4 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/clustering/KMeansClusterizationExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/clustering/KMeansClusterizationExample.java
@@ -27,12 +27,11 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder;
+import org.apache.ignite.ml.clustering.kmeans.KMeansModel;
+import org.apache.ignite.ml.clustering.kmeans.KMeansTrainer;
 import org.apache.ignite.ml.knn.classification.KNNClassificationTrainer;
 import org.apache.ignite.ml.math.Tracer;
 import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
-import org.apache.ignite.ml.clustering.kmeans.KMeansModel;
-import org.apache.ignite.ml.clustering.kmeans.KMeansTrainer;
 import org.apache.ignite.thread.IgniteThread;
 
 /**
@@ -57,7 +56,8 @@ public class KMeansClusterizationExample {
                     .withSeed(7867L);
 
                 KMeansModel mdl = trainer.fit(
-                    new CacheBasedDatasetBuilder<>(ignite, dataCache),
+                    ignite,
+                    dataCache,
                     (k, v) -> Arrays.copyOfRange(v, 1, v.length),
                     (k, v) -> v[0]
                 );

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/examples/src/main/java/org/apache/ignite/examples/ml/knn/KNNRegressionExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/knn/KNNRegressionExample.java
b/examples/src/main/java/org/apache/ignite/examples/ml/knn/KNNRegressionExample.java
index 76a07cd..757c8e6 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/knn/KNNRegressionExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/knn/KNNRegressionExample.java
@@ -27,7 +27,6 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder;
 import org.apache.ignite.ml.knn.classification.KNNClassificationTrainer;
 import org.apache.ignite.ml.knn.classification.KNNStrategy;
 import org.apache.ignite.ml.knn.regression.KNNRegressionModel;
@@ -57,7 +56,8 @@ public class KNNRegressionExample {
                 KNNRegressionTrainer trainer = new KNNRegressionTrainer();
 
                 KNNRegressionModel knnMdl = (KNNRegressionModel) trainer.fit(
-                    new CacheBasedDatasetBuilder<>(ignite, dataCache),
+                    ignite,
+                    dataCache,
                     (k, v) -> Arrays.copyOfRange(v, 1, v.length),
                     (k, v) -> v[0]
                 ).withK(5)

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java
b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java
index aa2aa98..edf4fd7 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java
@@ -28,7 +28,6 @@ import org.apache.ignite.ml.dataset.DatasetFactory;
 import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
 import org.apache.ignite.ml.preprocessing.binarization.BinarizationTrainer;
-import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer;
 
 /**
  * Example that shows how to use binarization preprocessor to binarize data.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java
b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java
index 9565c85..e0c0d86 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java
@@ -27,7 +27,6 @@ import org.apache.ignite.examples.ml.dataset.model.Person;
 import org.apache.ignite.ml.dataset.DatasetFactory;
 import org.apache.ignite.ml.dataset.primitive.SimpleDataset;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.preprocessing.binarization.BinarizationTrainer;
 import org.apache.ignite.ml.preprocessing.imputer.ImputerTrainer;
 
 /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java
index 7428faf..1b492a7 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.ml.dataset.Dataset;
 import org.apache.ignite.ml.dataset.PartitionDataBuilder;
 import org.apache.ignite.ml.dataset.impl.cache.util.ComputeUtils;
@@ -55,6 +56,9 @@ public class CacheBasedDataset<K, V, C extends Serializable, D extends
AutoClose
     /** Ignite Cache with {@code upstream} data. */
     private final IgniteCache<K, V> upstreamCache;
 
+    /** Filter for {@code upstream} data. */
+    private final IgniteBiPredicate<K, V> filter;
+
     /** Ignite Cache with partition {@code context}. */
     private final IgniteCache<Integer, C> datasetCache;
 
@@ -70,15 +74,17 @@ public class CacheBasedDataset<K, V, C extends Serializable, D extends
AutoClose
      *
      * @param ignite Ignite instance.
      * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param filter Filter for {@code upstream} data.
      * @param datasetCache Ignite Cache with partition {@code context}.
      * @param partDataBuilder Partition {@code data} builder.
      * @param datasetId Dataset ID.
      */
-    public CacheBasedDataset(Ignite ignite, IgniteCache<K, V> upstreamCache,
+    public CacheBasedDataset(Ignite ignite, IgniteCache<K, V> upstreamCache, IgniteBiPredicate<K,
V> filter,
         IgniteCache<Integer, C> datasetCache, PartitionDataBuilder<K, V, C, D>
partDataBuilder,
         UUID datasetId) {
         this.ignite = ignite;
         this.upstreamCache = upstreamCache;
+        this.filter = filter;
         this.datasetCache = datasetCache;
         this.partDataBuilder = partDataBuilder;
         this.datasetId = datasetId;
@@ -95,6 +101,7 @@ public class CacheBasedDataset<K, V, C extends Serializable, D extends
AutoClose
             D data = ComputeUtils.getData(
                 Ignition.localIgnite(),
                 upstreamCacheName,
+                filter,
                 datasetCacheName,
                 datasetId,
                 part,
@@ -123,6 +130,7 @@ public class CacheBasedDataset<K, V, C extends Serializable, D extends
AutoClose
             D data = ComputeUtils.getData(
                 Ignition.localIgnite(),
                 upstreamCacheName,
+                filter,
                 datasetCacheName,
                 datasetId,
                 part,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java
index 5c0d583..b66c8aa 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java
@@ -23,6 +23,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.ml.dataset.DatasetBuilder;
 import org.apache.ignite.ml.dataset.PartitionContextBuilder;
 import org.apache.ignite.ml.dataset.PartitionDataBuilder;
@@ -52,15 +53,31 @@ public class CacheBasedDatasetBuilder<K, V> implements DatasetBuilder<K,
V> {
     /** Ignite Cache with {@code upstream} data. */
     private final IgniteCache<K, V> upstreamCache;
 
+    /** Filter for {@code upstream} data. */
+    private final IgniteBiPredicate<K, V> filter;
+
     /**
-     * Constructs a new instance of cache based dataset builder that makes {@link CacheBasedDataset}.
+     * Constructs a new instance of cache based dataset builder that makes {@link CacheBasedDataset}
with default
+     * predicate that passes all upstream entries to dataset.
      *
      * @param ignite Ignite instance.
      * @param upstreamCache Ignite Cache with {@code upstream} data.
      */
     public CacheBasedDatasetBuilder(Ignite ignite, IgniteCache<K, V> upstreamCache)
{
+        this(ignite, upstreamCache, (a, b) -> true);
+    }
+
+    /**
+     * Constructs a new instance of cache based dataset builder that makes {@link CacheBasedDataset}.
+     *
+     * @param ignite Ignite instance.
+     * @param upstreamCache Ignite Cache with {@code upstream} data.
+     * @param filter Filter for {@code upstream} data.
+     */
+    public CacheBasedDatasetBuilder(Ignite ignite, IgniteCache<K, V> upstreamCache,
IgniteBiPredicate<K, V> filter) {
         this.ignite = ignite;
         this.upstreamCache = upstreamCache;
+        this.filter = filter;
     }
 
     /** {@inheritDoc} */
@@ -84,12 +101,13 @@ public class CacheBasedDatasetBuilder<K, V> implements DatasetBuilder<K,
V> {
         ComputeUtils.initContext(
             ignite,
             upstreamCache.getName(),
+            filter,
             datasetCache.getName(),
             partCtxBuilder,
             RETRIES,
             RETRY_INTERVAL
         );
 
-        return new CacheBasedDataset<>(ignite, upstreamCache, datasetCache, partDataBuilder,
datasetId);
+        return new CacheBasedDataset<>(ignite, upstreamCache, filter, datasetCache,
partDataBuilder, datasetId);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java
index ce2fcfd..b235900 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java
@@ -23,10 +23,10 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.locks.LockSupport;
-import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
@@ -35,9 +35,11 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.ml.dataset.PartitionContextBuilder;
 import org.apache.ignite.ml.dataset.PartitionDataBuilder;
+import org.apache.ignite.ml.dataset.UpstreamEntry;
 import org.apache.ignite.ml.math.functions.IgniteFunction;
 
 /**
@@ -133,6 +135,7 @@ public class ComputeUtils {
      *
      * @param ignite Ignite instance.
      * @param upstreamCacheName Name of an {@code upstream} cache.
+     * @param filter Filter for {@code upstream} data.
      * @param datasetCacheName Name of a partition {@code context} cache.
      * @param datasetId Dataset ID.
      * @param part Partition index.
@@ -144,7 +147,7 @@ public class ComputeUtils {
      * @return Partition {@code data}.
      */
     public static <K, V, C extends Serializable, D extends AutoCloseable> D getData(Ignite
ignite,
-        String upstreamCacheName, String datasetCacheName, UUID datasetId, int part,
+        String upstreamCacheName, IgniteBiPredicate<K, V> filter, String datasetCacheName,
UUID datasetId, int part,
         PartitionDataBuilder<K, V, C, D> partDataBuilder) {
 
         PartitionDataStorage dataStorage = (PartitionDataStorage)ignite
@@ -161,12 +164,18 @@ public class ComputeUtils {
             ScanQuery<K, V> qry = new ScanQuery<>();
             qry.setLocal(true);
             qry.setPartition(part);
+            qry.setFilter(filter);
 
-            long cnt = upstreamCache.localSizeLong(part);
+            long cnt = computeCount(upstreamCache, qry);
 
             if (cnt > 0) {
-                try (QueryCursor<Cache.Entry<K, V>> cursor = upstreamCache.query(qry))
{
-                    return partDataBuilder.build(new UpstreamCursorAdapter<>(cursor.iterator(),
cnt), cnt, ctx);
+                try (QueryCursor<UpstreamEntry<K, V>> cursor = upstreamCache.query(qry,
+                    e -> new UpstreamEntry<>(e.getKey(), e.getValue()))) {
+
+                    Iterator<UpstreamEntry<K, V>> iter = new IteratorWithConcurrentModificationChecker<>(cursor.iterator(),
cnt,
+                        "Cache expected to be not modified during dataset data building");
+
+                    return partDataBuilder.build(iter, cnt, ctx);
                 }
             }
 
@@ -179,6 +188,7 @@ public class ComputeUtils {
      *
      * @param ignite Ignite instance.
      * @param upstreamCacheName Name of an {@code upstream} cache.
+     * @param filter Filter for {@code upstream} data.
      * @param datasetCacheName Name of a partition {@code context} cache.
      * @param ctxBuilder Partition {@code context} builder.
      * @param <K> Type of a key in {@code upstream} data.
@@ -186,7 +196,8 @@ public class ComputeUtils {
      * @param <C> Type of a partition {@code context}.
      */
     public static <K, V, C extends Serializable> void initContext(Ignite ignite, String
upstreamCacheName,
-        String datasetCacheName, PartitionContextBuilder<K, V, C> ctxBuilder, int retries,
int interval) {
+        IgniteBiPredicate<K, V> filter, String datasetCacheName, PartitionContextBuilder<K,
V, C> ctxBuilder, int retries,
+        int interval) {
         affinityCallWithRetries(ignite, Arrays.asList(datasetCacheName, upstreamCacheName),
part -> {
             Ignite locIgnite = Ignition.localIgnite();
 
@@ -195,11 +206,18 @@ public class ComputeUtils {
             ScanQuery<K, V> qry = new ScanQuery<>();
             qry.setLocal(true);
             qry.setPartition(part);
+            qry.setFilter(filter);
+
+            long cnt = computeCount(locUpstreamCache, qry);
 
-            long cnt = locUpstreamCache.localSizeLong(part);
             C ctx;
-            try (QueryCursor<Cache.Entry<K, V>> cursor = locUpstreamCache.query(qry))
{
-                ctx = ctxBuilder.build(new UpstreamCursorAdapter<>(cursor.iterator(),
cnt), cnt);
+            try (QueryCursor<UpstreamEntry<K, V>> cursor = locUpstreamCache.query(qry,
+                e -> new UpstreamEntry<>(e.getKey(), e.getValue()))) {
+
+                Iterator<UpstreamEntry<K, V>> iter = new IteratorWithConcurrentModificationChecker<>(cursor.iterator(),
cnt,
+                    "Cache expected to be not modified during dataset context building");
+
+                ctx = ctxBuilder.build(iter, cnt);
             }
 
             IgniteCache<Integer, C> datasetCache = locIgnite.cache(datasetCacheName);
@@ -215,6 +233,7 @@ public class ComputeUtils {
      *
      * @param ignite Ignite instance.
      * @param upstreamCacheName Name of an {@code upstream} cache.
+     * @param filter Filter for {@code upstream} data.
      * @param datasetCacheName Name of a partition {@code context} cache.
      * @param ctxBuilder Partition {@code context} builder.
      * @param retries Number of retries for the case when one of partitions not found on
the node.
@@ -223,8 +242,9 @@ public class ComputeUtils {
      * @param <C> Type of a partition {@code context}.
      */
     public static <K, V, C extends Serializable> void initContext(Ignite ignite, String
upstreamCacheName,
-        String datasetCacheName, PartitionContextBuilder<K, V, C> ctxBuilder, int retries)
{
-        initContext(ignite, upstreamCacheName, datasetCacheName, ctxBuilder, retries, 0);
+        IgniteBiPredicate<K, V> filter, String datasetCacheName, PartitionContextBuilder<K,
V, C> ctxBuilder,
+        int retries) {
+        initContext(ignite, upstreamCacheName, filter, datasetCacheName, ctxBuilder, retries,
0);
     }
 
     /**
@@ -253,4 +273,38 @@ public class ComputeUtils {
         IgniteCache<Integer, C> datasetCache = ignite.cache(datasetCacheName);
         datasetCache.put(part, ctx);
     }
+
+    /**
+     * Computes number of entries selected from the cache by the query.
+     *
+     * @param cache Ignite cache with upstream data.
+     * @param qry Cache query.
+     * @param <K> Type of a key in {@code upstream} data.
+     * @param <V> Type of a value in {@code upstream} data.
+     * @return Number of entries supplied by the iterator.
+     */
+    private static  <K, V> long computeCount(IgniteCache<K, V> cache, ScanQuery<K,
V> qry) {
+        try (QueryCursor<UpstreamEntry<K, V>> cursor = cache.query(qry,
+            e -> new UpstreamEntry<>(e.getKey(), e.getValue()))) {
+            return computeCount(cursor.iterator());
+        }
+    }
+
+    /**
+     * Computes number of entries supplied by the iterator.
+     *
+     * @param iter Iterator.
+     * @return Number of entries supplied by the iterator.
+     */
+    private static long computeCount(Iterator<?> iter) {
+        long res = 0;
+
+        while (iter.hasNext()) {
+            iter.next();
+
+            res++;
+        }
+
+        return res;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationChecker.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationChecker.java
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationChecker.java
new file mode 100644
index 0000000..f757622
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationChecker.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache.util;
+
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator wrapper that checks if number of entries in iterator is equal to expected.
+ *
+ * @param <T> Type of entries.
+ */
+public class IteratorWithConcurrentModificationChecker<T> implements Iterator<T>
{
+    /** Delegate. */
+    private final Iterator<T> delegate;
+
+    /** Expected count of entries. */
+    private long expCnt;
+
+    /** Exception message. */
+    private final String eMsg;
+
+    /**
+     * Constructs a new instance of iterator checked wrapper.
+     *
+     * @param delegate Delegate.
+     * @param expCnt Expected count of entries.
+     */
+    public IteratorWithConcurrentModificationChecker(Iterator<T> delegate, long expCnt,
String eMsg) {
+        this.delegate = delegate;
+        this.expCnt = expCnt;
+        this.eMsg = eMsg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() {
+        boolean hasNext = delegate.hasNext();
+
+        if (!hasNext ^ expCnt == 0)
+            throw new ConcurrentModificationException(eMsg);
+
+        return hasNext;
+    }
+
+    /** {@inheritDoc} */
+    @Override public T next() {
+        try {
+            T next = delegate.next();
+
+            if (expCnt == 0)
+                throw new ConcurrentModificationException(eMsg);
+
+            expCnt--;
+
+            return next;
+        }
+        catch (NoSuchElementException e) {
+            if (expCnt == 0)
+                throw e;
+            else
+                throw new ConcurrentModificationException(eMsg);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java
deleted file mode 100644
index 4482af7..0000000
--- a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.dataset.impl.cache.util;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import javax.cache.Cache;
-import org.apache.ignite.ml.dataset.UpstreamEntry;
-
-/**
- * Cursor adapter used to transform {@code Cache.Entry} received from Ignite Cache query
cursor into DLC-specific
- * {@link UpstreamEntry}.
- *
- * @param <K> Type of an upstream value key.
- * @param <V> Type of an upstream value.
- */
-public class UpstreamCursorAdapter<K, V> implements Iterator<UpstreamEntry<K,
V>> {
-    /** Cache entry iterator. */
-    private final Iterator<Cache.Entry<K, V>> delegate;
-
-    /** Size. */
-    private long cnt;
-
-    /**
-     * Constructs a new instance of iterator.
-     *
-     * @param delegate Cache entry iterator.
-     */
-    UpstreamCursorAdapter(Iterator<Cache.Entry<K, V>> delegate, long cnt) {
-        this.delegate = delegate;
-        this.cnt = cnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasNext() {
-        return delegate.hasNext() && cnt > 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UpstreamEntry<K, V> next() {
-        if (cnt == 0)
-            throw new NoSuchElementException();
-
-        cnt--;
-
-        Cache.Entry<K, V> next = delegate.next();
-
-        if (next == null)
-            return null;
-
-        return new UpstreamEntry<>(next.getKey(), next.getValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java
b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java
index cfc1801..a4f275d 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java
@@ -19,9 +19,11 @@ package org.apache.ignite.ml.dataset.impl.local;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.ml.dataset.DatasetBuilder;
 import org.apache.ignite.ml.dataset.PartitionContextBuilder;
 import org.apache.ignite.ml.dataset.PartitionDataBuilder;
@@ -42,14 +44,30 @@ public class LocalDatasetBuilder<K, V> implements DatasetBuilder<K,
V> {
     /** Number of partitions. */
     private final int partitions;
 
+    /** Filter for {@code upstream} data. */
+    private final IgniteBiPredicate<K, V> filter;
+
     /**
-     * Constructs a new instance of local dataset builder that makes {@link LocalDataset}.
+     * Constructs a new instance of local dataset builder that makes {@link LocalDataset}
with default predicate that
+     * passes all upstream entries to dataset.
      *
      * @param upstreamMap {@code Map} with upstream data.
      * @param partitions Number of partitions.
      */
     public LocalDatasetBuilder(Map<K, V> upstreamMap, int partitions) {
+        this(upstreamMap, (a, b) -> true, partitions);
+    }
+
+    /**
+     * Constructs a new instance of local dataset builder that makes {@link LocalDataset}.
+     *
+     * @param upstreamMap {@code Map} with upstream data.
+     * @param filter Filter for {@code upstream} data.
+     * @param partitions Number of partitions.
+     */
+    public LocalDatasetBuilder(Map<K, V> upstreamMap, IgniteBiPredicate<K, V>
filter, int partitions) {
         this.upstreamMap = upstreamMap;
+        this.filter = filter;
         this.partitions = partitions;
     }
 
@@ -60,22 +78,28 @@ public class LocalDatasetBuilder<K, V> implements DatasetBuilder<K,
V> {
         List<C> ctxList = new ArrayList<>();
         List<D> dataList = new ArrayList<>();
 
-        int partSize = Math.max(1, upstreamMap.size() / partitions);
+        Map<K, V> filteredMap = new HashMap<>();
+        upstreamMap.forEach((key, val) -> {
+            if (filter.apply(key, val))
+                filteredMap.put(key, val);
+        });
+
+        int partSize = Math.max(1, filteredMap.size() / partitions);
 
-        Iterator<K> firstKeysIter = upstreamMap.keySet().iterator();
-        Iterator<K> secondKeysIter = upstreamMap.keySet().iterator();
+        Iterator<K> firstKeysIter = filteredMap.keySet().iterator();
+        Iterator<K> secondKeysIter = filteredMap.keySet().iterator();
 
         int ptr = 0;
         for (int part = 0; part < partitions; part++) {
-            int cnt = part == partitions - 1 ? upstreamMap.size() - ptr : Math.min(partSize,
upstreamMap.size() - ptr);
+            int cnt = part == partitions - 1 ? filteredMap.size() - ptr : Math.min(partSize,
filteredMap.size() - ptr);
 
             C ctx = cnt > 0 ? partCtxBuilder.build(
-                new IteratorWindow<>(firstKeysIter, k -> new UpstreamEntry<>(k,
upstreamMap.get(k)), cnt),
+                new IteratorWindow<>(firstKeysIter, k -> new UpstreamEntry<>(k,
filteredMap.get(k)), cnt),
                 cnt
             ) : null;
 
             D data = cnt > 0 ? partDataBuilder.build(
-                new IteratorWindow<>(secondKeysIter, k -> new UpstreamEntry<>(k,
upstreamMap.get(k)), cnt),
+                new IteratorWindow<>(secondKeysIter, k -> new UpstreamEntry<>(k,
filteredMap.get(k)), cnt),
                 cnt,
                 ctx
             ) : null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
index 2d13cd5..7944149 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java
@@ -20,11 +20,12 @@ package org.apache.ignite.ml.knn.regression;
 import org.apache.ignite.ml.dataset.DatasetBuilder;
 import org.apache.ignite.ml.knn.KNNUtils;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.trainers.SingleLabelDatasetTrainer;
 
 /**
  * kNN algorithm trainer to solve regression task.
  */
-public class KNNRegressionTrainer{
+public class KNNRegressionTrainer implements SingleLabelDatasetTrainer<KNNRegressionModel>
{
     /**
      * Trains model based on the specified data.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java
b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java
index c35cdc3..1cf6dbf 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.ml.dataset.UpstreamEntry;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -86,6 +87,44 @@ public class CacheBasedDatasetBuilderTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * Tests that predicate works correctly.
+     */
+    public void testBuildWithPredicate() {
+        CacheConfiguration<Integer, Integer> upstreamCacheConfiguration = new CacheConfiguration<>();
+        upstreamCacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 1));
+        upstreamCacheConfiguration.setName(UUID.randomUUID().toString());
+
+        IgniteCache<Integer, Integer> upstreamCache = ignite.createCache(upstreamCacheConfiguration);
+        upstreamCache.put(1, 1);
+        upstreamCache.put(2, 2);
+
+        CacheBasedDatasetBuilder<Integer, Integer> builder = new CacheBasedDatasetBuilder<>(
+            ignite,
+            upstreamCache,
+            (k, v) -> k % 2 == 0
+        );
+
+        CacheBasedDataset<Integer, Integer, Long, AutoCloseable> dataset = builder.build(
+            (upstream, upstreamSize) -> {
+                UpstreamEntry<Integer, Integer> entry = upstream.next();
+                assertEquals(Integer.valueOf(2), entry.getKey());
+                assertEquals(Integer.valueOf(2), entry.getValue());
+                assertFalse(upstream.hasNext());
+                return 0L;
+            },
+            (upstream, upstreamSize, ctx) -> {
+                UpstreamEntry<Integer, Integer> entry = upstream.next();
+                assertEquals(Integer.valueOf(2), entry.getKey());
+                assertEquals(Integer.valueOf(2), entry.getValue());
+                assertFalse(upstream.hasNext());
+                return null;
+            }
+        );
+
+        dataset.compute(data -> {});
+    }
+
+    /**
      * Generate an Ignite Cache with the specified size and number of partitions for testing
purposes.
      *
      * @param size Size of an Ignite Cache.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java
b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java
index 4926a90..952fc43 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java
@@ -177,6 +177,7 @@ public class ComputeUtilsTest extends GridCommonAbstractTest {
                 part -> ComputeUtils.<Integer, Integer, Serializable, TestPartitionData>getData(
                     ignite,
                     upstreamCacheName,
+                    (k, v) -> true,
                     datasetCacheName,
                     datasetId,
                     0,
@@ -225,6 +226,7 @@ public class ComputeUtilsTest extends GridCommonAbstractTest {
         ComputeUtils.<Integer, Integer, Integer>initContext(
             ignite,
             upstreamCacheName,
+            (k, v) -> true,
             datasetCacheName,
             (upstream, upstreamSize) -> {
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationCheckerTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationCheckerTest.java
b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationCheckerTest.java
new file mode 100644
index 0000000..232281e
--- /dev/null
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationCheckerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.dataset.impl.cache.util;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.List;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link IteratorWithConcurrentModificationChecker}.
+ */
+public class IteratorWithConcurrentModificationCheckerTest {
+    /** */
+    @Test(expected = ConcurrentModificationException.class)
+    public void testNextWhenIteratorHasLessElementsThanExpected() {
+        List<Integer> list = Arrays.asList(1, 2, 3);
+
+        Iterator<Integer> iter = new IteratorWithConcurrentModificationChecker<>(list.iterator(),
4, "Exception");
+
+        assertEquals(Integer.valueOf(1), iter.next());
+        assertEquals(Integer.valueOf(2), iter.next());
+        assertEquals(Integer.valueOf(3), iter.next());
+
+        iter.next(); // Should throw an exception.
+    }
+
+    /** */
+    @Test(expected = ConcurrentModificationException.class)
+    public void testNextWhenIteratorHasMoreElementsThanExpected() {
+        List<Integer> list = Arrays.asList(1, 2, 3);
+
+        Iterator<Integer> iter = new IteratorWithConcurrentModificationChecker<>(list.iterator(),
2, "Exception");
+
+        assertEquals(Integer.valueOf(1), iter.next());
+        assertEquals(Integer.valueOf(2), iter.next());
+
+        iter.next(); // Should throw an exception.
+    }
+
+    /** */
+    @Test(expected = ConcurrentModificationException.class)
+    public void testHasNextWhenIteratorHasLessElementsThanExpected() {
+        List<Integer> list = Arrays.asList(1, 2, 3);
+
+        Iterator<Integer> iter = new IteratorWithConcurrentModificationChecker<>(list.iterator(),
4, "Exception");
+
+        assertTrue(iter.hasNext());
+        iter.next();
+        assertTrue(iter.hasNext());
+        iter.next();
+        assertTrue(iter.hasNext());
+        iter.next();
+
+        iter.hasNext(); // Should throw an exception.
+    }
+
+    /** */
+    @Test(expected = ConcurrentModificationException.class)
+    public void testHasNextWhenIteratorHasMoreElementsThanExpected() {
+        List<Integer> list = Arrays.asList(1, 2, 3);
+
+        Iterator<Integer> iter = new IteratorWithConcurrentModificationChecker<>(list.iterator(),
2, "Exception");
+
+        assertTrue(iter.hasNext());
+        iter.next();
+        assertTrue(iter.hasNext());
+        iter.next();
+
+        iter.hasNext(); // Should throw an exception.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java
b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java
index 0628580..8a5eb3a 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java
@@ -67,6 +67,44 @@ public class LocalDatasetBuilderTest {
         assertEquals(10, cnt.intValue());
     }
 
+    /** Tests {@code build()} method with predicate. */
+    @Test
+    public void testBuildWithPredicate() {
+        Map<Integer, Integer> data = new HashMap<>();
+        for (int i = 0; i < 100; i++)
+            data.put(i, i);
+
+        LocalDatasetBuilder<Integer, Integer> builder = new LocalDatasetBuilder<>(data,
(k, v) -> k % 2 == 0,10);
+
+        LocalDataset<Serializable, TestPartitionData> dataset = builder.build(
+            (upstream, upstreamSize) -> null,
+            (upstream, upstreamSize, ctx) -> {
+                int[] arr = new int[Math.toIntExact(upstreamSize)];
+
+                int ptr = 0;
+                while (upstream.hasNext())
+                    arr[ptr++] = upstream.next().getValue();
+
+                return new TestPartitionData(arr);
+            }
+        );
+
+        AtomicLong cnt = new AtomicLong();
+
+        dataset.compute((partData, partIdx) -> {
+            cnt.incrementAndGet();
+
+            int[] arr = partData.data;
+
+            assertEquals(5, arr.length);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals((partIdx * 5 + i) * 2, arr[i]);
+        });
+
+        assertEquals(10, cnt.intValue());
+    }
+
     /**
      * Test partition {@code data}.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainerTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainerTest.java
b/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainerTest.java
index 86d10fb..06e52fa 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainerTest.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainerTest.java
@@ -22,12 +22,9 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.ml.dataset.DatasetBuilder;
 import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder;
-import org.apache.ignite.ml.preprocessing.binarization.BinarizationPreprocessor;
-import org.apache.ignite.ml.preprocessing.binarization.BinarizationTrainer;
 import org.apache.ignite.ml.preprocessing.imputer.ImputerPreprocessor;
 import org.apache.ignite.ml.preprocessing.imputer.ImputerTrainer;
 import org.apache.ignite.ml.preprocessing.imputer.ImputingStrategy;
-import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;


Mime
View raw message