ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [53/60] [abbrv] ignite git commit: IGNITE-5109 Refactoring for SparseDistributedMatrix
Date Thu, 04 May 2017 11:32:57 GMT
IGNITE-5109 Refactoring for SparseDistributedMatrix


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/156ec536
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/156ec536
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/156ec536

Branch: refs/heads/ignite-5075-cacheStart
Commit: 156ec5360e6ec918878d9d0c6f7a5d04fc8161a0
Parents: c64ad78
Author: Yury Babak <ybabak@gridgain.com>
Authored: Wed May 3 20:02:38 2017 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Wed May 3 20:02:38 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/ml/math/impls/CacheUtils.java | 111 +++++++++++++------
 .../ml/math/impls/matrix/AbstractMatrix.java    |   5 +
 .../impls/matrix/DenseLocalOffHeapMatrix.java   |   5 -
 .../impls/matrix/SparseDistributedMatrix.java   |  16 ++-
 .../matrix/SparseDistributedMatrixStorage.java  |  89 ++++++++++-----
 .../ignite/ml/math/MathImplMainTestSuite.java   |   5 +-
 .../matrix/SparseDistributedMatrixTest.java     |  67 +++++++++--
 7 files changed, 213 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
index cfb01be..ace399b 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java
@@ -28,13 +28,17 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.ml.math.KeyMapper;
 import org.apache.ignite.ml.math.ValueMapper;
 import org.apache.ignite.ml.math.functions.IgniteBiFunction;
 import org.apache.ignite.ml.math.functions.IgniteConsumer;
 import org.apache.ignite.ml.math.functions.IgniteFunction;
+import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
 
 /**
  * Distribution-related misc. support.
@@ -118,17 +122,21 @@ public class CacheUtils {
     }
 
     /**
-     * @param cacheName Cache name.
+     * @param matrixUuid Matrix UUID.
      * @return Sum obtained using sparse logic.
      */
-    public static <K, V> double sparseSum(String cacheName) {
-        Collection<Double> subSums = fold(cacheName, (CacheEntry<Integer, Map<Integer,
Double>> ce, Double acc) -> {
-            Map<Integer, Double> map = ce.entry().getValue();
+    public static <K, V> double sparseSum(IgniteUuid matrixUuid) {
+        Collection<Double> subSums = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME,
(CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce,
Double acc) -> {
+            Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>
entry = ce.entry();
+            if (entry.getKey().get2().equals(matrixUuid)) {
+                Map<Integer, Double> map = entry.getValue();
 
-            double sum = sum(map.values());
+                double sum = sum(map.values());
 
-            return acc == null ? sum : acc + sum;
-        });
+                return acc == null ? sum : acc + sum;
+            } else
+                return acc;
+        }, key -> key.get2().equals(matrixUuid));
 
         return sum(subSums);
     }
@@ -172,39 +180,48 @@ public class CacheUtils {
     }
 
     /**
-     * @param cacheName Cache name.
+     * @param matrixUuid Matrix UUID.
      * @return Minimum value obtained using sparse logic.
      */
-    public static <K, V> double sparseMin(String cacheName) {
-        Collection<Double> mins = fold(cacheName, (CacheEntry<Integer, Map<Integer,
Double>> ce, Double acc) -> {
-            Map<Integer, Double> map = ce.entry().getValue();
+    public static <K, V> double sparseMin(IgniteUuid matrixUuid) {
+        Collection<Double> mins = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME,
(CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce,
Double acc) -> {
+            Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>
entry = ce.entry();
 
-            double min = Collections.min(map.values());
+            if (entry.getKey().get2().equals(matrixUuid)) {
+                Map<Integer, Double> map = entry.getValue();
 
-            if (acc == null)
-                return min;
-            else
-                return Math.min(acc, min);
-        });
+                double min = Collections.min(map.values());
+
+                if (acc == null)
+                    return min;
+                else
+                    return Math.min(acc, min);
+            } else
+                return acc;
+        }, key -> key.get2().equals(matrixUuid));
 
         return Collections.min(mins);
     }
 
     /**
-     * @param cacheName Cache name.
+     * @param matrixUuid Matrix UUID.
      * @return Maximum value obtained using sparse logic.
      */
-    public static <K, V> double sparseMax(String cacheName) {
-        Collection<Double> maxes = fold(cacheName, (CacheEntry<Integer, Map<Integer,
Double>> ce, Double acc) -> {
-            Map<Integer, Double> map = ce.entry().getValue();
+    public static <K, V> double sparseMax(IgniteUuid matrixUuid) {
+        Collection<Double> maxes = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME,
(CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce,
Double acc) -> {
+            Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>
entry = ce.entry();
+            if (entry.getKey().get2().equals(matrixUuid)) {
+                Map<Integer, Double> map = entry.getValue();
 
-            double max = Collections.max(map.values());
+                double max = Collections.max(map.values());
 
-            if (acc == null)
-                return max;
-            else
-                return Math.max(acc, max);
-        });
+                if (acc == null)
+                    return max;
+                else
+                    return Math.max(acc, max);
+            } else
+                return acc;
+        }, key -> key.get2().equals(matrixUuid));
 
         return Collections.max(maxes);
     }
@@ -254,19 +271,20 @@ public class CacheUtils {
     }
 
     /**
-     * @param cacheName Cache name.
+     * @param matrixUuid Matrix UUID.
      * @param mapper Mapping {@link IgniteFunction}.
      */
-    public static <K, V> void sparseMap(String cacheName, IgniteFunction<Double,
Double> mapper) {
-        foreach(cacheName, (CacheEntry<Integer, Map<Integer, Double>> ce) ->
{
-            Integer k = ce.entry().getKey();
+    public static <K, V> void sparseMap(IgniteUuid matrixUuid, IgniteFunction<Double,
Double> mapper) {
+        foreach(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer,
IgniteUuid>, Map<Integer, Double>> ce) -> {
+            IgniteBiTuple k = ce.entry().getKey();
+
             Map<Integer, Double> v = ce.entry().getValue();
 
             for (Map.Entry<Integer, Double> e : v.entrySet())
                 e.setValue(mapper.apply(e.getValue()));
 
             ce.cache().put(k, v);
-        });
+        }, key -> key.get2().equals(matrixUuid));
     }
 
     /**
@@ -276,6 +294,17 @@ public class CacheUtils {
      * @param <V> Cache value object type.
      */
     public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K,
V>> fun) {
+        foreach(cacheName, fun, null);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param fun An operation that accepts a cache entry and processes it.
+     * @param keyFilter Cache keys filter.
+     * @param <K> Cache key object type.
+     * @param <V> Cache value object type.
+     */
+    public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K,
V>> fun, IgnitePredicate<K> keyFilter) {
         bcast(cacheName, () -> {
             Ignite ignite = Ignition.localIgnite();
             IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName);
@@ -293,7 +322,7 @@ public class CacheUtils {
                 // Iterate over given partition.
                 // Query returns an empty cursor if this partition is not stored on this
node.
                 for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
-                    (k, v) -> affinity.mapPartitionToNode(p) == locNode)))
+                    (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter
== null || keyFilter.apply(k)))))
                     fun.accept(new CacheEntry<>(entry, cache));
             }
         });
@@ -310,6 +339,20 @@ public class CacheUtils {
      * @return Fold operation result.
      */
     public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K,
V>, A, A> folder) {
+        return fold(cacheName, folder, null);
+    }
+
+    /**
+     * <b>Currently fold supports only commutative operations.<b/>
+     *
+     * @param cacheName Cache name.
+     * @param folder Fold function operating over cache entries.
+     * @param <K> Cache key object type.
+     * @param <V> Cache value object type.
+     * @param <A> Fold result type.
+     * @return Fold operation result.
+     */
+    public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K,
V>, A, A> folder, IgnitePredicate<K> keyFilter) {
         return bcast(cacheName, () -> {
             Ignite ignite = Ignition.localIgnite();
             IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName);
@@ -329,7 +372,7 @@ public class CacheUtils {
                 // Iterate over given partition.
                 // Query returns an empty cursor if this partition is not stored on this
node.
                 for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part,
-                    (k, v) -> affinity.mapPartitionToNode(p) == locNode)))
+                    (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter
== null || keyFilter.apply(k)))))
                     a = folder.apply(new CacheEntry<>(entry, cache), a);
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
index c5edeb1..d1d3904 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java
@@ -840,6 +840,11 @@ public abstract class AbstractMatrix implements Matrix {
     }
 
     /** {@inheritDoc} */
+    @Override public void destroy() {
+        getStorage().destroy();
+    }
+
+    /** {@inheritDoc} */
     @Override public Matrix copy() {
         Matrix cp = like(rowSize(), columnSize());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java
b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java
index 4161228..fad35fd 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java
@@ -71,11 +71,6 @@ public class DenseLocalOffHeapMatrix extends AbstractMatrix {
     }
 
     /** {@inheritDoc} */
-    @Override public void destroy() {
-        getStorage().destroy();
-    }
-
-    /** {@inheritDoc} */
     @Override protected Matrix likeIdentity() {
         int n = rowSize();
         Matrix res = like(n, n);

http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
index 10ebdd0..3e508bd 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java
@@ -26,6 +26,7 @@
 
 package org.apache.ignite.ml.math.impls.matrix;
 
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.ml.math.Matrix;
 import org.apache.ignite.ml.math.StorageConstants;
 import org.apache.ignite.ml.math.Vector;
@@ -119,24 +120,24 @@ public class SparseDistributedMatrix extends AbstractMatrix implements
StorageCo
      * @return Matrix with mapped values.
      */
     private Matrix mapOverValues(IgniteFunction<Double, Double> mapper) {
-        CacheUtils.sparseMap(storage().cache().getName(), mapper);
+        CacheUtils.sparseMap(getUUID(), mapper);
 
         return this;
     }
 
     /** {@inheritDoc} */
     @Override public double sum() {
-        return CacheUtils.sparseSum(storage().cache().getName());
+        return CacheUtils.sparseSum(getUUID());
     }
 
     /** {@inheritDoc} */
     @Override public double maxValue() {
-        return CacheUtils.sparseMax(storage().cache().getName());
+        return CacheUtils.sparseMax(getUUID());
     }
 
     /** {@inheritDoc} */
     @Override public double minValue() {
-        return CacheUtils.sparseMin(storage().cache().getName());
+        return CacheUtils.sparseMin(getUUID());
     }
 
     /** {@inheritDoc} */
@@ -146,11 +147,16 @@ public class SparseDistributedMatrix extends AbstractMatrix implements
StorageCo
 
     /** {@inheritDoc} */
     @Override public Matrix like(int rows, int cols) {
-        throw new UnsupportedOperationException();
+        return new SparseDistributedMatrix(rows, cols, storage().storageMode(), storage().accessMode());
     }
 
     /** {@inheritDoc} */
     @Override public Vector likeVector(int crd) {
         throw new UnsupportedOperationException();
     }
+
+    /** */
+    private IgniteUuid getUUID(){
+        return ((SparseDistributedMatrixStorage) getStorage()).getUUID();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java
b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java
index bfc0e9f..816bf44 100644
--- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java
@@ -23,6 +23,9 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -30,6 +33,7 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.ml.math.MatrixStorage;
 import org.apache.ignite.ml.math.StorageConstants;
@@ -40,19 +44,22 @@ import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
  * {@link MatrixStorage} implementation for {@link SparseDistributedMatrix}.
  */
 public class SparseDistributedMatrixStorage extends CacheUtils implements MatrixStorage,
StorageConstants {
+    /** Cache name used for all instances of {@link SparseDistributedMatrixStorage}.*/
+    public static final String ML_CACHE_NAME = "ML_SPARSE_MATRICES_CONTAINER";
     /** Amount of rows in the matrix. */
     private int rows;
     /** Amount of columns in the matrix. */
     private int cols;
-
     /** Row or column based storage mode. */
     private int stoMode;
     /** Random or sequential access mode. */
     private int acsMode;
+    /** Matrix uuid. */
+    private IgniteUuid uuid;
 
     /** Actual distributed storage. */
     private IgniteCache<
-        Integer /* Row or column index. */,
+        IgniteBiTuple<Integer, IgniteUuid> /* Row or column index with matrix uuid.
*/,
         Map<Integer, Double> /* Map-based row or column. */
         > cache = null;
 
@@ -81,14 +88,15 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements
Matrix
         this.acsMode = acsMode;
 
         cache = newCache();
+
+        uuid = IgniteUuid.randomUuid();
     }
 
     /**
-     *
-     *
+     *  Create new ML cache if needed.
      */
-    private IgniteCache<Integer, Map<Integer, Double>> newCache() {
-        CacheConfiguration<Integer, Map<Integer, Double>> cfg = new CacheConfiguration<>();
+    private IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>
newCache() {
+        CacheConfiguration<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>
cfg = new CacheConfiguration<>();
 
         // Write to primary.
         cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
@@ -106,16 +114,18 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements
Matrix
         cfg.setCacheMode(CacheMode.PARTITIONED);
 
         // Random cache name.
-        cfg.setName(new IgniteUuid().shortString());
+        cfg.setName(ML_CACHE_NAME);
 
-        return Ignition.localIgnite().getOrCreateCache(cfg);
+        IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>
cache = Ignition.localIgnite().getOrCreateCache(cfg);
+
+        return cache;
     }
 
     /**
      *
      *
      */
-    public IgniteCache<Integer, Map<Integer, Double>> cache() {
+    public IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>
cache() {
         return cache;
     }
 
@@ -138,34 +148,36 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements
Matrix
     /** {@inheritDoc} */
     @Override public double get(int x, int y) {
         if (stoMode == ROW_STORAGE_MODE)
-            return matrixGet(cache.getName(), x, y);
+            return matrixGet(x, y);
         else
-            return matrixGet(cache.getName(), y, x);
+            return matrixGet(y, x);
     }
 
     /** {@inheritDoc} */
     @Override public void set(int x, int y, double v) {
         if (stoMode == ROW_STORAGE_MODE)
-            matrixSet(cache.getName(), x, y, v);
+            matrixSet(x, y, v);
         else
-            matrixSet(cache.getName(), y, x, v);
+            matrixSet(y, x, v);
     }
 
     /**
      * Distributed matrix get.
      *
-     * @param cacheName Matrix's cache.
      * @param a Row or column index.
      * @param b Row or column index.
      * @return Matrix value at (a, b) index.
      */
-    private double matrixGet(String cacheName, int a, int b) {
+    private double matrixGet(int a, int b) {
         // Remote get from the primary node (where given row or column is stored locally).
-        return ignite().compute(groupForKey(cacheName, a)).call(() -> {
-            IgniteCache<Integer, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(cacheName);
+        return ignite().compute(groupForKey(ML_CACHE_NAME, a)).call(() -> {
+            IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>
cache = Ignition.localIgnite().getOrCreateCache(ML_CACHE_NAME);
 
             // Local get.
-            Map<Integer, Double> map = cache.localPeek(a, CachePeekMode.PRIMARY);
+            Map<Integer, Double> map = cache.localPeek(getCacheKey(a), CachePeekMode.PRIMARY);
+
+            if (map == null)
+                map = cache.get(getCacheKey(a));
 
             return (map == null || !map.containsKey(b)) ? 0.0 : map.get(b);
         });
@@ -174,21 +186,25 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements
Matrix
     /**
      * Distributed matrix set.
      *
-     * @param cacheName Matrix's cache.
      * @param a Row or column index.
      * @param b Row or column index.
      * @param v New value to set.
      */
-    private void matrixSet(String cacheName, int a, int b, double v) {
+    private void matrixSet(int a, int b, double v) {
         // Remote set on the primary node (where given row or column is stored locally).
-        ignite().compute(groupForKey(cacheName, a)).run(() -> {
-            IgniteCache<Integer, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(cacheName);
+        ignite().compute(groupForKey(ML_CACHE_NAME, a)).run(() -> {
+            IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>
cache = Ignition.localIgnite().getOrCreateCache(ML_CACHE_NAME);
 
             // Local get.
-            Map<Integer, Double> map = cache.localPeek(a, CachePeekMode.PRIMARY);
+            Map<Integer, Double> map = cache.localPeek(getCacheKey(a), CachePeekMode.PRIMARY);
 
-            if (map == null)
-                map = acsMode == SEQUENTIAL_ACCESS_MODE ? new Int2DoubleRBTreeMap() : new
Int2DoubleOpenHashMap();
+
+            if (map == null) {
+                map = cache.get(getCacheKey(a)); //Remote entry get.
+
+                if (map == null)
+                    map = acsMode == SEQUENTIAL_ACCESS_MODE ? new Int2DoubleRBTreeMap() :
new Int2DoubleOpenHashMap();
+            }
 
             if (v != 0.0)
                 map.put(b, v);
@@ -196,10 +212,15 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements
Matrix
                 map.remove(b);
 
             // Local put.
-            cache.put(a, map);
+            cache.put(getCacheKey(a), map);
         });
     }
 
+    /** Build cache key for row/column. */
+    private IgniteBiTuple<Integer, IgniteUuid> getCacheKey(int idx){
+        return new IgniteBiTuple<>(idx, uuid);
+    }
+
     /** {@inheritDoc} */
     @Override public int columnSize() {
         return cols;
@@ -216,6 +237,7 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements
Matrix
         out.writeInt(cols);
         out.writeInt(acsMode);
         out.writeInt(stoMode);
+        out.writeObject(uuid);
         out.writeUTF(cache.getName());
     }
 
@@ -225,6 +247,7 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements
Matrix
         cols = in.readInt();
         acsMode = in.readInt();
         stoMode = in.readInt();
+        uuid = (IgniteUuid)in.readObject();
         cache = ignite().getOrCreateCache(in.readUTF());
     }
 
@@ -253,9 +276,11 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements
Matrix
         return false;
     }
 
-    /** Destroy underlying cache. */
+    /** Delete all data from cache. */
     @Override public void destroy() {
-        cache.destroy();
+        Set<IgniteBiTuple<Integer, IgniteUuid>> keyset = IntStream.range(0, rows).mapToObj(this::getCacheKey).collect(Collectors.toSet());
+
+        cache.clearAll(keyset);
     }
 
     /** {@inheritDoc} */
@@ -266,6 +291,7 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements
Matrix
         res = res * 37 + rows;
         res = res * 37 + acsMode;
         res = res * 37 + stoMode;
+        res = res * 37 + uuid.hashCode();
         res = res * 37 + cache.hashCode();
 
         return res;
@@ -282,6 +308,11 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements
Matrix
         SparseDistributedMatrixStorage that = (SparseDistributedMatrixStorage)obj;
 
         return rows == that.rows && cols == that.cols && acsMode == that.acsMode
&& stoMode == that.stoMode
-            && (cache != null ? cache.equals(that.cache) : that.cache == null);
+            && uuid.equals(that.uuid) && (cache != null ? cache.equals(that.cache)
: that.cache == null);
+    }
+
+    /** */
+    public IgniteUuid getUUID() {
+        return uuid;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java
b/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java
index 5f41583..8d6d2af 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java
@@ -21,12 +21,13 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
 /**
- * Test suite for local and distributed tests
+ * Test suite for local and distributed tests.
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
     MathImplLocalTestSuite.class,
-    MathImplDistributedTestSuite.class
+    MathImplDistributedTestSuite.class,
+    TracerTest.class
 })
 public class MathImplMainTestSuite {
     // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
index 8985806..5ee2e7d 100644
--- a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
+++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java
@@ -31,12 +31,19 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.ml.math.Matrix;
 import org.apache.ignite.ml.math.StorageConstants;
 import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException;
 import org.apache.ignite.ml.math.impls.MathTestConstants;
+import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 
@@ -110,7 +117,7 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest
{
                 double v = Math.random();
                 cacheMatrix.set(i, j, v);
 
-                assert Double.compare(v, cacheMatrix.get(i, j)) == 0;
+                assertEquals("Unexpected value for matrix element["+ i +" " + j + "]", v,
cacheMatrix.get(i, j), PRECISION);
             }
         }
     }
@@ -134,7 +141,7 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest
{
         SparseDistributedMatrix objRestored = (SparseDistributedMatrix)objInputStream.readObject();
 
         assertTrue(MathTestConstants.VAL_NOT_EQUALS, cacheMatrix.equals(objRestored));
-        assertEquals(MathTestConstants.VAL_NOT_EQUALS, objRestored.get(1, 1), 1.0, 0.0);
+        assertEquals(MathTestConstants.VAL_NOT_EQUALS, objRestored.get(1, 1), 1.0, PRECISION);
     }
 
     /** Test simple math. */
@@ -225,19 +232,44 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest
{
     }
 
     /** */
+    public void testCacheBehaviour(){
+        IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        SparseDistributedMatrix cacheMatrix1 = new SparseDistributedMatrix(rows, cols, StorageConstants.ROW_STORAGE_MODE,
StorageConstants.RANDOM_ACCESS_MODE);
+        SparseDistributedMatrix cacheMatrix2 = new SparseDistributedMatrix(rows, cols, StorageConstants.ROW_STORAGE_MODE,
StorageConstants.RANDOM_ACCESS_MODE);
+
+        initMtx(cacheMatrix1);
+        initMtx(cacheMatrix2);
+
+        Collection<String> cacheNames = ignite.cacheNames();
+
+        assert cacheNames.contains(SparseDistributedMatrixStorage.ML_CACHE_NAME);
+
+        IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Object> cache = ignite.getOrCreateCache(SparseDistributedMatrixStorage.ML_CACHE_NAME);
+
+        Set<IgniteBiTuple<Integer, IgniteUuid>> keySet1 = buildKeySet(cacheMatrix1);
+        Set<IgniteBiTuple<Integer, IgniteUuid>> keySet2 = buildKeySet(cacheMatrix2);
+
+        assert cache.containsKeys(keySet1);
+        assert cache.containsKeys(keySet2);
+
+        cacheMatrix2.destroy();
+
+        assert cache.containsKeys(keySet1);
+        assert !cache.containsKeys(keySet2);
+
+        cacheMatrix1.destroy();
+
+        assert !cache.containsKeys(keySet1);
+    }
+
+    /** */
     public void testLike() {
         IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
 
         cacheMatrix = new SparseDistributedMatrix(rows, cols, StorageConstants.ROW_STORAGE_MODE,
StorageConstants.RANDOM_ACCESS_MODE);
 
-        try {
-            cacheMatrix.like(1, 1);
-            fail("UnsupportedOperationException expected.");
-        }
-        catch (UnsupportedOperationException e) {
-            return;
-        }
-        fail("UnsupportedOperationException expected.");
+        assertNotNull(cacheMatrix.like(1, 1));
     }
 
     /** */
@@ -262,4 +294,19 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest
{
             for (int j = 0; j < m.columnSize(); j++)
                 m.set(i, j, 1.0);
     }
+
+    /** Build key set for SparseDistributedMatrix. */
+    private Set<IgniteBiTuple<Integer, IgniteUuid>> buildKeySet(SparseDistributedMatrix
m){
+        Set<IgniteBiTuple<Integer, IgniteUuid>> set = new HashSet<>();
+
+        SparseDistributedMatrixStorage storage = (SparseDistributedMatrixStorage)m.getStorage();
+
+        IgniteUuid uuid = storage.getUUID();
+        int size = storage.storageMode() == StorageConstants.ROW_STORAGE_MODE ? storage.rowSize()
: storage.columnSize();
+
+        for (int i = 0; i < size; i++)
+            set.add(new IgniteBiTuple<>(i, uuid));
+
+        return set;
+    }
 }


Mime
View raw message