ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [45/50] [abbrv] incubator-ignite git commit: # ignite-57
Date Fri, 06 Feb 2015 13:58:02 GMT
# ignite-57


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b1959a30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b1959a30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b1959a30

Branch: refs/heads/ignite-57
Commit: b1959a30ec3beccb925e65cc7d0fb46f42f7b66a
Parents: 7a997ad
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Feb 6 14:44:57 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Feb 6 16:27:02 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheEntryImpl0.java       |  60 ++++
 .../cache/CacheWeakQueryIteratorsHolder.java    |   4 +-
 .../processors/cache/GridCacheAdapter.java      | 142 +++++++-
 .../processors/cache/GridCacheSwapManager.java  | 160 +++++++++
 .../distributed/dht/GridDhtCacheAdapter.java    |  80 ++++-
 .../distributed/near/GridNearCacheAdapter.java  |   8 +
 .../ignite/internal/util/lang/GridFunc.java     | 149 ++++++++
 .../cache/IgniteCachePeekModesAbstractTest.java | 359 +++++++++++++++++--
 8 files changed, 933 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java
new file mode 100644
index 0000000..05c30c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl0.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import javax.cache.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class CacheEntryImpl0<K, V> implements Cache.Entry<K, V> {
+    /** */
+    private final Map.Entry<K, V> e;
+
+    /**
+     * @param e Entry.
+     */
+    public CacheEntryImpl0(Map.Entry<K, V> e) {
+        this.e = e;
+    }
+
+    /** {@inheritDoc} */
+    @Override public K getKey() {
+        return e.getKey();
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getValue() {
+        return e.getValue();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> T unwrap(Class<T> cls) {
+        if (!cls.equals(getClass()))
+            throw new IllegalArgumentException("Unwrapping to class is not supported: " +
cls);
+
+        return (T)this;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return "CacheEntry [key=" + getKey() + ", val=" + getValue() + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java
index 902cf12..4e19a0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java
@@ -123,7 +123,8 @@ public class CacheWeakQueryIteratorsHolder<V> {
         /** Weak reference. */
         private final WeakReference<WeakQueryFutureIterator<T>> weakRef;
 
-        CacheIteratorConverter<T, V> convert;
+        /** */
+        private final CacheIteratorConverter<T, V> convert;
 
         /** Init flag. */
         private boolean init;
@@ -136,6 +137,7 @@ public class CacheWeakQueryIteratorsHolder<V> {
 
         /**
          * @param fut GridCacheQueryFuture to iterate.
+         * @param convert Converter.
          */
         WeakQueryFutureIterator(CacheQueryFuture<V> fut, CacheIteratorConverter<T,
V> convert) {
             this.fut = fut;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index be74645..68b846b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -663,7 +663,50 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
 
         PeekModes modes = parsePeekModes(peekModes);
 
-        return null;
+        List<Iterator<Cache.Entry<K, V>>> its = new ArrayList<>();
+
+        if (ctx.isLocal()) {
+            modes.primary = true;
+            modes.backup = true;
+
+            if (modes.heap)
+                its.add(iterator(map.entries0().iterator(), !ctx.keepPortable()));
+        }
+        else if (modes.heap) {
+            if (modes.near && ctx.isNear())
+                its.add(ctx.near().nearEntriesIterator());
+
+            if (modes.primary || modes.backup) {
+                GridDhtCacheAdapter<K, V> cache = ctx.isNear() ? ctx.near().dht() :
ctx.dht();
+
+                its.add(cache.localEntriesIterator(modes.primary, modes.backup));
+            }
+        }
+
+        // Swap and offheap are disabled for near cache.
+        if (modes.primary || modes.backup) {
+            long topVer = ctx.affinity().affinityTopologyVersion();
+
+            GridCacheSwapManager<K, V> swapMgr = ctx.isNear() ? ctx.near().dht().context().swap()
: ctx.swap();
+
+            if (modes.swap)
+                its.add(swapMgr.swapIterator(modes.primary, modes.backup, topVer));
+
+            if (modes.offheap)
+                its.add(swapMgr.offheapIterator(modes.primary, modes.backup, topVer));
+        }
+
+        final Iterator<Cache.Entry<K, V>> it = F.flatIterators(its);
+
+        return new Iterable<Cache.Entry<K, V>>() {
+            @Override public Iterator<Cache.Entry<K, V>> iterator() {
+                return it;
+            }
+
+            public String toString() {
+                return "CacheLocalEntries []";
+            }
+        };
     }
 
     /** {@inheritDoc} */
@@ -5274,15 +5317,112 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
     }
 
     /**
+     * @param it Internal entry iterator.
+     * @param deserializePortable Deserialize portable flag.
+     * @return Public API iterator.
+     */
+    protected Iterator<Cache.Entry<K, V>> iterator(final Iterator<GridCacheEntryEx<K,
V>> it,
+        final boolean deserializePortable) {
+        return new Iterator<Cache.Entry<K, V>>() {
+            {
+                advance();
+            }
+
+            /** */
+            private Cache.Entry<K, V> next;
+
+            @Override public boolean hasNext() {
+                return next != null;
+            }
+
+            @Override public Cache.Entry<K, V> next() {
+                if (next == null)
+                    throw new NoSuchElementException();
+
+                Cache.Entry<K, V> e = next;
+
+                advance();
+
+                return e;
+            }
+
+            @Override public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+            /**
+             * Switch to next entry.
+             */
+            private void advance() {
+                next = null;
+
+                while (it.hasNext()) {
+                    GridCacheEntryEx<K, V> entry = it.next();
+
+                    try {
+                        V val = entry.innerGet(
+                            null,
+                            false,
+                            false,
+                            false,
+                            true,
+                            false,
+                            false,
+                            false,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null);
+
+                        if (val == null)
+                            continue;
+
+                        K key = entry.key();
+
+                        if (deserializePortable && ctx.portableEnabled()) {
+                            key = (K)ctx.unwrapPortableIfNeeded(key, true);
+                            val = (V)ctx.unwrapPortableIfNeeded(val, true);
+                        }
+
+                        next = new CacheEntryImpl<>(key, val);
+
+                        break;
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw U.convertToCacheException(e);
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        // No-op.
+                    }
+                    catch (GridCacheFilterFailedException ignore) {
+                        assert false;
+                    }
+                }
+            }
+        };
+    }
+
+    /**
      *
      */
     private static class PeekModes {
+        /** */
         boolean near;
+
+        /** */
         boolean primary;
+
+        /** */
         boolean backup;
 
+        /** */
         boolean heap;
+
+        /** */
         boolean offheap;
+
+        /** */
         boolean swap;
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 419fdf5..affd27c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -33,6 +33,7 @@ import org.apache.ignite.spi.swapspace.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
 import java.lang.ref.*;
 import java.nio.*;
 import java.util.*;
@@ -1511,6 +1512,66 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K,
V> {
     }
 
     /**
+     * @param primary If {@code true} includes primary entries.
+     * @param backup If {@code true} includes backup entries.
+     * @param topVer Topology version.
+     * @return Swap entries iterator.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Iterator<Cache.Entry<K, V>> swapIterator(boolean primary, boolean
backup, long topVer)
+        throws IgniteCheckedException
+    {
+        assert primary || backup;
+
+        if (!swapEnabled)
+            return F.emptyIterator();
+
+        if (primary && backup)
+            return cacheEntryIterator(lazySwapIterator());
+
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(),
topVer) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
+
+        return new PartitionsIterator(parts) {
+            @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>>
partitionIterator(int part)
+                throws IgniteCheckedException
+            {
+                return swapMgr.rawIterator(spaceName, part);
+            }
+        };
+    }
+
+    /**
+     * @param primary If {@code true} includes primary entries.
+     * @param backup If {@code true} includes backup entries.
+     * @param topVer Topology version.
+     * @return Offheap entries iterator.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean
backup, long topVer)
+        throws IgniteCheckedException
+    {
+        assert primary || backup;
+
+        if (!offheapEnabled)
+            return F.emptyIterator();
+
+        if (primary && backup)
+            return cacheEntryIterator(lazyOffHeapIterator());
+
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(),
topVer) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
+
+        return new PartitionsIterator(parts) {
+            @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>>
partitionIterator(int part)
+                throws IgniteCheckedException
+            {
+                return offheap.iterator(spaceName, part);
+            }
+        };
+    }
+
+    /**
      * @param ldr Undeployed class loader.
      * @return Undeploy count.
      */
@@ -1611,6 +1672,19 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K,
V> {
     }
 
     /**
+     * @param it Map.Entry iterator.
+     * @return Cache.Entry iterator.
+     */
+    private static <K, V> Iterator<Cache.Entry<K, V>> cacheEntryIterator(Iterator<Map.Entry<K,
V>> it) {
+        return F.iterator(it, new C1<Map.Entry<K, V>, Cache.Entry<K, V>>()
{
+            @Override public Cache.Entry<K, V> apply(Map.Entry<K, V> e) {
+                // Create Cache.Entry over Map.Entry to do not deserialize key/values if
not needed.
+                return new CacheEntryImpl0<>(e);
+            }
+        }, true);
+    }
+
+    /**
      *
      */
     private class IteratorWrapper extends GridCloseableIteratorAdapter<Map.Entry<byte[],
GridCacheSwapEntry<V>>> {
@@ -1688,4 +1762,90 @@ public class GridCacheSwapManager<K, V> extends GridCacheManagerAdapter<K,
V> {
                     e.valueClassLoaderId());
         }
     }
+
+    /**
+     *
+     */
+    private abstract class PartitionsIterator implements Iterator<Cache.Entry<K, V>>
{
+        /** */
+        private Iterator<Integer> partIt;
+
+        /** */
+        private Iterator<Cache.Entry<K, V>> curIt;
+
+        /** */
+        private Cache.Entry<K, V> next;
+
+        /**
+         * @param parts Partitions
+         */
+        public PartitionsIterator(Collection<Integer> parts) {
+            this.partIt = parts.iterator();
+
+            advance();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return next != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Cache.Entry<K, V> next() {
+            if (next == null)
+                throw new NoSuchElementException();
+
+            Cache.Entry<K, V> e = next;
+
+            advance();
+
+            return e;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Switches to next element.
+         */
+        private void advance() {
+            next = null;
+
+            do {
+                if (curIt == null) {
+                    if (partIt.hasNext()) {
+                        int part = partIt.next();
+
+                        try {
+                            curIt = cacheEntryIterator(lazyIterator(partitionIterator(part)));
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                }
+
+                if (curIt != null) {
+                    if (curIt.hasNext()) {
+                        next = curIt.next();
+
+                        break;
+                    }
+                    else
+                        curIt = null;
+                }
+            }
+            while (partIt.hasNext());
+        }
+
+        /**
+         * @param part Partition.
+         * @return Iterator for given partition.
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>>
partitionIterator(int part)
+            throws IgniteCheckedException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index e673641..f4d2d0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -35,6 +35,7 @@ import org.apache.ignite.lang.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -720,7 +721,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                             ctx.io().send(req.getKey(), req.getValue());
                         }
                         catch (IgniteCheckedException e) {
-                            log.error("Failed to send TTL update request.", e);
+                            U.error(log, "Failed to send TTL update request.", e);
                         }
                     }
                 }
@@ -911,6 +912,83 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /**
+     * @param primary If {@code true} includes primary entries.
+     * @param backup If {@code true} includes backup entries.
+     * @return Local entries iterator.
+     */
+    public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary,
final boolean backup) {
+        assert primary || backup;
+
+        if (primary && backup)
+            return iterator(map.entries0().iterator(), !ctx.keepPortable());
+        else {
+            final long topVer = ctx.affinity().affinityTopologyVersion();
+
+            final Iterator<GridDhtLocalPartition<K, V>> partIt = topology().currentLocalPartitions().iterator();
+
+            Iterator<GridCacheEntryEx<K, V>> it = new Iterator<GridCacheEntryEx<K,
V>>() {
+                private GridCacheEntryEx<K, V> next;
+
+                private Iterator<GridDhtCacheEntry<K, V>> curIt;
+
+                {
+                    advance();
+                }
+
+                @Override public boolean hasNext() {
+                    return next != null;
+                }
+
+                @Override public GridCacheEntryEx<K, V> next() {
+                    if (next == null)
+                        throw new NoSuchElementException();
+
+                    GridCacheEntryEx<K, V> e = next;
+
+                    advance();
+
+                    return e;
+                }
+
+                @Override public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+
+                private void advance() {
+                    next = null;
+
+                    do {
+                        if (curIt == null) {
+                            while (partIt.hasNext()) {
+                                GridDhtLocalPartition<K, V> part = partIt.next();
+
+                                if (primary == part.primary(topVer)) {
+                                    curIt = part.entries().iterator();
+
+                                    break;
+                                }
+                            }
+                        }
+
+                        if (curIt != null) {
+                            if (curIt.hasNext()) {
+                                next = curIt.next();
+
+                                break;
+                            }
+                            else
+                                curIt = null;
+                        }
+                    }
+                    while (partIt.hasNext());
+                }
+            };
+
+            return iterator(it, !ctx.keepPortable());
+        }
+    }
+
+    /**
      * Complex partition iterator for both partition and swap iteration.
      */
     private static class PartitionEntryIterator<K, V> extends GridIteratorAdapter<CacheEntry<K,
V>> {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index abb1694..74d0f1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
 import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
@@ -780,6 +781,13 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         }
     }
 
+    /**
+     * @return Near entries iterator.
+     */
+    public Iterator<Cache.Entry<K, V>> nearEntriesIterator() {
+        return iterator(map.entries0().iterator(), !ctx.keepPortable());
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNearCacheAdapter.class, this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 77df627..6f31976 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -3091,6 +3091,65 @@ public class GridFunc {
     }
 
     /**
+     * Flattens iterable-of-iterators and returns iterator over the
+     * elements of the inner collections. This method doesn't create any
+     * new collections or copies any elements.
+     *
+     * @param c Input iterable of iterators.
+     * @return Iterator over the elements of given iterators.
+     */
+    public static <T> Iterator<T> flatIterators(@Nullable final Iterable<Iterator<T>>
c) {
+        return isEmpty(c) ? GridFunc.<T>emptyIterator() : new GridIteratorAdapter<T>()
{
+            /** */
+            private Iterator<? extends Iterator<T>> a = c.iterator();
+
+            /** */
+            private Iterator<T> b;
+
+            /** */
+            private boolean moved = true;
+
+            /** */
+            private boolean more;
+
+            @Override public boolean hasNextX() {
+                if (!moved)
+                    return more;
+
+                moved = false;
+
+                if (b != null && b.hasNext())
+                    return more = true;
+
+                while (a.hasNext()) {
+                    b = a.next();
+
+                    if (b.hasNext())
+                        return more = true;
+                }
+
+                return more = false;
+            }
+
+            @Override public T nextX() {
+                if (hasNext()) {
+                    moved = true;
+
+                    return b.next();
+                }
+
+                throw new NoSuchElementException();
+            }
+
+            @Override public void removeX() {
+                assert b != null;
+
+                b.remove();
+            }
+        };
+    }
+
+    /**
      * Flattens given set objects into a single collection. Unrolls {@link Collection},
      * {@link Iterable} and {@code Object[]} objects.
      *
@@ -4313,6 +4372,96 @@ public class GridFunc {
     }
 
     /**
+     * @param c Input iterator.
+     * @param trans Transforming closure to convert from T1 to T2.
+     * @param readOnly If {@code true}, then resulting iterator will not allow modifications
+     *      to the underlying collection.
+     * @param p Optional filtering predicates.
+     * @return Iterator from given iterator and optional filtering predicate.
+     */
+    public static <T1, T2> Iterator<T2> iterator(final Iterator<? extends
T1> c,
+        final IgniteClosure<? super T1, T2> trans,
+        final boolean readOnly,
+        @Nullable final IgnitePredicate<? super T1>... p)
+    {
+        A.notNull(c, "c", trans, "trans");
+
+        if (isAlwaysFalse(p))
+            return F.emptyIterator();
+
+        return new GridIteratorAdapter<T2>() {
+            /** */
+            private T1 elem;
+
+            /** */
+            private boolean moved = true;
+
+            /** */
+            private boolean more;
+
+            /** */
+            private Iterator<? extends T1> iter = c;
+
+            @Override public boolean hasNextX() {
+                if (isEmpty(p))
+                    return iter.hasNext();
+                else {
+                    if (!moved)
+                        return more;
+                    else {
+                        more = false;
+
+                        while (iter.hasNext()) {
+                            elem = iter.next();
+
+                            boolean isAll = true;
+
+                            for (IgnitePredicate<? super T1> r : p)
+                                if (r != null && !r.apply(elem)) {
+                                    isAll = false;
+
+                                    break;
+                                }
+
+                            if (isAll) {
+                                more = true;
+                                moved = false;
+
+                                return true;
+                            }
+                        }
+
+                        elem = null; // Give to GC.
+
+                        return false;
+                    }
+                }
+            }
+
+            @Nullable @Override public T2 nextX() {
+                if (isEmpty(p))
+                    return trans.apply(iter.next());
+                else {
+                    if (hasNext()) {
+                        moved = true;
+
+                        return trans.apply(elem);
+                    }
+                    else
+                        throw new NoSuchElementException();
+                }
+            }
+
+            @Override public void removeX() {
+                if (readOnly)
+                    throw new UnsupportedOperationException("Cannot modify read-only iterator.");
+
+                iter.remove();
+            }
+        };
+    }
+
+    /**
      * Gets predicate that always returns {@code true}. This method returns
      * constant predicate.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b1959a30/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
index 653b064..b1fd167 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.swapspace.file.*;
 
+import javax.cache.*;
 import java.util.*;
 
 import static org.apache.ignite.cache.CacheDistributionMode.*;
@@ -45,7 +46,10 @@ import static org.apache.ignite.cache.CachePeekMode.*;
  */
 public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstractTest {
     /** */
-    private static final int HEAP_ENTRIES = 10;
+    private static final String SPACE_NAME = "gg-swap-cache-dflt";
+
+    /** */
+    private static final int HEAP_ENTRIES = 30;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
@@ -227,9 +231,7 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
 
             Set<Integer> swapKeys = new HashSet<>();
 
-            final String spaceName = "gg-swap-cache-dflt";
-
-            IgniteSpiCloseableIterator<Integer> it = swap.keyIterator(spaceName, null);
+            IgniteSpiCloseableIterator<Integer> it = swap.keyIterator(SPACE_NAME, null);
 
             assertNotNull(it);
 
@@ -338,6 +340,7 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
             cache0.removeAll(new HashSet<>(keys));
         }
     }
+
     /**
      * @throws Exception If failed.
      */
@@ -410,9 +413,9 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
             try {
                 int totalKeys = 200;
 
-                T2<Integer, Integer> swapKeys = swapKeys(0);
+                T2<Integer, Integer> swapKeys = swapKeysCount(0);
 
-                T2<Integer, Integer> offheapKeys = offheapKeys(0);
+                T2<Integer, Integer> offheapKeys = offheapKeysCount(0);
 
                 int totalSwap = swapKeys.get1() + swapKeys.get2();
                 int totalOffheap = offheapKeys.get1() + offheapKeys.get2();
@@ -445,9 +448,9 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
             }
         }
         else {
-            //checkSizeAffinityFilter(0);
+            checkSizeAffinityFilter(0);
 
-            //checkSizeAffinityFilter(1);
+            checkSizeAffinityFilter(1);
 
             checkSizeStorageFilter(0);
 
@@ -608,14 +611,12 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
 
     /**
      * @param nodeIdx Node index.
-     * @return Tuple with number of primary and backup keys.
+     * @return Tuple with primary and backup keys.
      */
-    private T2<Integer, Integer> swapKeys(int nodeIdx) {
+    private T2<List<Integer>, List<Integer>> swapKeys(int nodeIdx) {
         FileSwapSpaceSpi swap = (FileSwapSpaceSpi)ignite(nodeIdx).configuration().getSwapSpaceSpi();
 
-        final String spaceName = "gg-swap-cache-dflt";
-
-        IgniteSpiCloseableIterator<Integer> it = swap.keyIterator(spaceName, null);
+        IgniteSpiCloseableIterator<Integer> it = swap.keyIterator(SPACE_NAME, null);
 
         assertNotNull(it);
 
@@ -623,18 +624,18 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
 
         ClusterNode node = ignite(nodeIdx).cluster().localNode();
 
-        int primary = 0;
-        int backups = 0;
+        List<Integer> primary = new ArrayList<>();
+        List<Integer> backups = new ArrayList<>();
 
         while (it.hasNext()) {
             Integer key = it.next();
 
             if (aff.isPrimary(node, key))
-                primary++;
+                primary.add(key);
             else {
                 assertTrue(aff.isBackup(node, key));
 
-                backups++;
+                backups.add(key);
             }
         }
 
@@ -645,7 +646,17 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
      * @param nodeIdx Node index.
      * @return Tuple with number of primary and backup keys.
      */
-    private T2<Integer, Integer> offheapKeys(int nodeIdx) {
+    private T2<Integer, Integer> swapKeysCount(int nodeIdx) {
+        T2<List<Integer>, List<Integer>> keys = swapKeys(nodeIdx);
+
+        return new T2<>(keys.get1().size(), keys.get2().size());
+    }
+
+    /**
+     * @param nodeIdx Node index.
+     * @return Tuple with primary and backup keys.
+     */
+    private T2<List<Integer>, List<Integer>> offheapKeys(int nodeIdx) {
         GridCacheAdapter<Integer, String> internalCache =
             ((IgniteKernal)ignite(nodeIdx)).context().cache().internalCache();
 
@@ -660,18 +671,18 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
 
         ClusterNode node = ignite(nodeIdx).cluster().localNode();
 
-        int primary = 0;
-        int backups = 0;
+        List<Integer> primary = new ArrayList<>();
+        List<Integer> backups = new ArrayList<>();
 
         while (offheapIt.hasNext()) {
             Map.Entry<Integer, String> e = offheapIt.next();
 
             if (aff.isPrimary(node, e.getKey()))
-                primary++;
+                primary.add(e.getKey());
             else {
                 assertTrue(aff.isBackup(node, e.getKey()));
 
-                backups++;
+                backups.add(e.getKey());
             }
         }
 
@@ -680,6 +691,16 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
 
     /**
      * @param nodeIdx Node index.
+     * @return Tuple with number of primary and backup keys.
+     */
+    private T2<Integer, Integer> offheapKeysCount(int nodeIdx) {
+        T2<List<Integer>, List<Integer>> keys = offheapKeys(nodeIdx);
+
+        return new T2<>(keys.get1().size(), keys.get2().size());
+    }
+
+    /**
+     * @param nodeIdx Node index.
      * @throws Exception If failed.
      */
     private void checkSizeStorageFilter(int nodeIdx) throws Exception {
@@ -698,12 +719,12 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
 
             int totalKeys = 200;
 
-            T2<Integer, Integer> swapKeys = swapKeys(nodeIdx);
+            T2<Integer, Integer> swapKeys = swapKeysCount(nodeIdx);
 
             assertTrue(swapKeys.get1() > 0);
             assertTrue(swapKeys.get2() > 0);
 
-            T2<Integer, Integer> offheapKeys = offheapKeys(nodeIdx);
+            T2<Integer, Integer> offheapKeys = offheapKeysCount(nodeIdx);
 
             assertTrue(offheapKeys.get1() > 0);
             assertTrue(offheapKeys.get2() > 0);
@@ -740,12 +761,12 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
             int globalOffheapBackup = 0;
 
             for (int i = 0; i < gridCount(); i++) {
-                T2<Integer, Integer> swap = swapKeys(i);
+                T2<Integer, Integer> swap = swapKeysCount(i);
 
                 globalSwapPrimary += swap.get1();
                 globalSwapBackup += swap.get2();
 
-                T2<Integer, Integer> offheap = offheapKeys(i);
+                T2<Integer, Integer> offheap = offheapKeysCount(i);
 
                 globalOffheapPrimary += offheap.get1();
                 globalOffheapBackup += offheap.get2();
@@ -821,4 +842,290 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra
 
         assertEquals(exp, size);
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalEntries() throws Exception {
+        if (cacheMode() == LOCAL) {
+            IgniteCache<Integer, String> cache0 = jcache(0);
+
+            Set<Integer> keys = new HashSet<>();
+
+            try {
+                for (int i = 0; i < HEAP_ENTRIES; i++) {
+                    cache0.put(i, String.valueOf(i));
+
+                    keys.add(i);
+                }
+
+                checkLocalEntries(cache0.localEntries(), keys);
+                checkLocalEntries(cache0.localEntries(ALL), keys);
+                checkLocalEntries(cache0.localEntries(NEAR), keys);
+                checkLocalEntries(cache0.localEntries(PRIMARY), keys);
+                checkLocalEntries(cache0.localEntries(BACKUP), keys);
+            }
+            finally {
+                cache0.removeAll(keys);
+            }
+
+            checkLocalEntries(cache0.localEntries());
+
+            final String val = "test-val-";
+
+            keys = new HashSet<>();
+
+            for (int i = 0; i < 200; i++) {
+                cache0.put(i, val + i);
+
+                keys.add(i);
+            }
+
+            try {
+                int totalKeys = 200;
+
+                T2<List<Integer>, List<Integer>> swapKeys = swapKeys(0);
+
+                T2<List<Integer>, List<Integer>> offheapKeys = offheapKeys(0);
+
+                List<Integer> swap = new ArrayList<>();
+
+                swap.addAll(swapKeys.get1());
+                swap.addAll(swapKeys.get2());
+
+                assertFalse(swap.isEmpty());
+
+                List<Integer> offheap = new ArrayList<>();
+
+                offheap.addAll(offheapKeys.get1());
+                offheap.addAll(offheapKeys.get2());
+
+                assertFalse(offheap.isEmpty());
+
+                log.info("Keys [total=" + totalKeys +
+                    ", offheap=" + offheap.size() +
+                    ", swap=" + swap.size() + ']');
+
+                assertTrue(swap.size() + offheap.size() < totalKeys);
+
+                List<Integer> heap = new ArrayList<>(keys);
+
+                heap.removeAll(swap);
+                heap.removeAll(offheap);
+
+                assertFalse(heap.isEmpty());
+
+                checkLocalEntries(cache0.localEntries(), val, keys);
+                checkLocalEntries(cache0.localEntries(ALL), val, keys);
+
+                checkLocalEntries(cache0.localEntries(OFFHEAP), val, offheap);
+                checkLocalEntries(cache0.localEntries(SWAP), val, swap);
+                checkLocalEntries(cache0.localEntries(ONHEAP), val, heap);
+
+                checkLocalEntries(cache0.localEntries(OFFHEAP, PRIMARY), val, offheap);
+                checkLocalEntries(cache0.localEntries(SWAP, PRIMARY), val, swap);
+                checkLocalEntries(cache0.localEntries(ONHEAP, PRIMARY), val, heap);
+
+                checkLocalEntries(cache0.localEntries(OFFHEAP, BACKUP), val, offheap);
+                checkLocalEntries(cache0.localEntries(SWAP, BACKUP), val, swap);
+                checkLocalEntries(cache0.localEntries(ONHEAP, BACKUP), val, heap);
+
+                checkLocalEntries(cache0.localEntries(OFFHEAP, NEAR), val, offheap);
+                checkLocalEntries(cache0.localEntries(SWAP, NEAR), val, swap);
+                checkLocalEntries(cache0.localEntries(ONHEAP, NEAR), val, heap);
+            }
+            finally {
+                cache0.removeAll(keys);
+            }
+        }
+        else {
+            //checkLocalEntriesAffinityFilter(0);
+
+            //checkLocalEntriesAffinityFilter(1);
+
+            checkLocalEntriesStorageFilter(0);
+
+            checkLocalEntriesStorageFilter(1);
+        }
+    }
+
+    /**
+     * @param nodeIdx Node index.
+     * @throws Exception If failed.
+     */
+    private void checkLocalEntriesStorageFilter(int nodeIdx) throws Exception {
+        IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
+
+        List<Integer> primaryKeys = primaryKeys(cache0, 100, 10_000);
+        List<Integer> backupKeys = backupKeys(cache0, 100, 10_000);
+
+        try {
+            final String val = "test_value-";
+
+            for (int i = 0; i < 100; i++) {
+                cache0.put(primaryKeys.get(i), val + primaryKeys.get(i));
+                cache0.put(backupKeys.get(i), val + backupKeys.get(i));
+            }
+
+            int totalKeys = 200;
+
+            T2<List<Integer>, List<Integer>> swapKeys = swapKeys(nodeIdx);
+
+            assertTrue(swapKeys.get1().size() > 0);
+            assertTrue(swapKeys.get2().size() > 0);
+
+            T2<List<Integer>, List<Integer>> offheapKeys = offheapKeys(nodeIdx);
+
+            assertTrue(offheapKeys.get1().size() > 0);
+            assertTrue(offheapKeys.get2().size() > 0);
+
+            List<Integer> swap = new ArrayList<>();
+
+            swap.addAll(swapKeys.get1());
+            swap.addAll(swapKeys.get2());
+
+            assertFalse(swap.isEmpty());
+
+            List<Integer> offheap = new ArrayList<>();
+
+            offheap.addAll(offheapKeys.get1());
+            offheap.addAll(offheapKeys.get2());
+
+            assertFalse(offheap.isEmpty());
+
+            List<Integer> heap = new ArrayList<>();
+
+            heap.addAll(primaryKeys);
+            heap.addAll(backupKeys);
+
+            heap.removeAll(swap);
+            heap.removeAll(offheap);
+
+            log.info("Keys [total=" + totalKeys +
+                ", offheap=" + offheap.size() +
+                ", swap=" + swap.size() + ']');
+
+            assertFalse(heap.isEmpty());
+
+            checkLocalEntries(cache0.localEntries(), val, primaryKeys, backupKeys);
+            checkLocalEntries(cache0.localEntries(ALL), val, primaryKeys, backupKeys);
+            checkLocalEntries(cache0.localEntries(ONHEAP, OFFHEAP, SWAP), val, primaryKeys,
backupKeys);
+
+            checkLocalEntries(cache0.localEntries(SWAP), val, swap);
+            checkLocalEntries(cache0.localEntries(OFFHEAP), val, offheap);
+            checkLocalEntries(cache0.localEntries(ONHEAP), val, heap);
+
+            checkLocalEntries(cache0.localEntries(SWAP, OFFHEAP), val, swap, offheap);
+            checkLocalEntries(cache0.localEntries(SWAP, ONHEAP), val, swap, heap);
+
+            checkLocalEntries(cache0.localEntries(SWAP, PRIMARY), val, swapKeys.get1());
+            checkLocalEntries(cache0.localEntries(SWAP, BACKUP), val, swapKeys.get2());
+            checkLocalEntries(cache0.localEntries(OFFHEAP, PRIMARY), val, offheapKeys.get1());
+            checkLocalEntries(cache0.localEntries(OFFHEAP, BACKUP), val, offheapKeys.get2());
+
+            checkLocalEntries(cache0.localEntries(SWAP, OFFHEAP, PRIMARY), val, swapKeys.get1(),
offheapKeys.get1());
+            checkLocalEntries(cache0.localEntries(SWAP, OFFHEAP, BACKUP), val, swapKeys.get2(),
offheapKeys.get2());
+            checkLocalEntries(cache0.localEntries(SWAP, OFFHEAP, PRIMARY, BACKUP), val, swap,
offheap);
+        }
+        finally {
+            cache0.removeAll(new HashSet<>(primaryKeys));
+            cache0.removeAll(new HashSet<>(backupKeys));
+        }
+    }
+
+    /**
+     * @param nodeIdx Node index.
+     * @throws Exception If failed.
+     */
+    private void checkLocalEntriesAffinityFilter(int nodeIdx) throws Exception {
+        IgniteCache<Integer, String> cache0 = jcache(nodeIdx);
+
+        final int PUT_KEYS = 10;
+
+        List<Integer> primaryKeys = null;
+        List<Integer> backupKeys = null;
+        List<Integer> nearKeys = null;
+
+        try {
+            primaryKeys = primaryKeys(cache0, PUT_KEYS, 0);
+            backupKeys = backupKeys(cache0, PUT_KEYS, 0);
+
+            for (Integer key : primaryKeys)
+                cache0.put(key, String.valueOf(key));
+            for (Integer key : backupKeys)
+                cache0.put(key, String.valueOf(key));
+
+            nearKeys = cacheMode() == PARTITIONED ? nearKeys(cache0, PUT_KEYS, 0) : Collections.<Integer>emptyList();
+
+            for (Integer key : nearKeys)
+                cache0.put(key, String.valueOf(key));
+
+            log.info("Keys [near=" + nearKeys + ", primary=" + primaryKeys + ", backup="
+ backupKeys + ']');
+
+            boolean hasNearCache = nodeIdx == 0 && cacheMode() == PARTITIONED;
+
+            if (hasNearCache) {
+                checkLocalEntries(cache0.localEntries(), nearKeys, primaryKeys, backupKeys);
+                checkLocalEntries(cache0.localEntries(ALL), nearKeys, primaryKeys, backupKeys);
+                checkLocalEntries(cache0.localEntries(NEAR), nearKeys);
+                checkLocalEntries(cache0.localEntries(PRIMARY, BACKUP, NEAR), nearKeys, primaryKeys,
backupKeys);
+                checkLocalEntries(cache0.localEntries(NEAR, PRIMARY), nearKeys, primaryKeys);
+                checkLocalEntries(cache0.localEntries(NEAR, BACKUP), nearKeys, backupKeys);
+            }
+            else {
+                checkLocalEntries(cache0.localEntries(), primaryKeys, backupKeys);
+                checkLocalEntries(cache0.localEntries(ALL), primaryKeys, backupKeys);
+                checkLocalEntries(cache0.localEntries(NEAR));
+                checkLocalEntries(cache0.localEntries(NEAR, PRIMARY), primaryKeys);
+                checkLocalEntries(cache0.localEntries(NEAR, BACKUP), backupKeys);
+                checkLocalEntries(cache0.localEntries(PRIMARY, BACKUP, NEAR), primaryKeys,
backupKeys);
+            }
+
+            checkLocalEntries(cache0.localEntries(PRIMARY), primaryKeys);
+            checkLocalEntries(cache0.localEntries(BACKUP), backupKeys);
+            checkLocalEntries(cache0.localEntries(PRIMARY, BACKUP), primaryKeys, backupKeys);
+        }
+        finally {
+            if (primaryKeys != null)
+                cache0.removeAll(new HashSet<>(primaryKeys));
+
+            if (backupKeys != null)
+                cache0.removeAll(new HashSet<>(backupKeys));
+
+            if (nearKeys != null)
+                cache0.removeAll(new HashSet<>(nearKeys));
+        }
+    }
+
+    /**
+     * @param entries Entries.
+     * @param exp Expected entries.
+     */
+    private void checkLocalEntries(Iterable<Cache.Entry<Integer, String>> entries,
Collection<Integer>... exp) {
+        checkLocalEntries(entries, "", exp);
+    }
+
+    /**
+     * @param entries Entries.
+     * @param expVal Expected value.
+     * @param exp Expected keys.
+     */
+    private void checkLocalEntries(Iterable<Cache.Entry<Integer, String>> entries,
+        String expVal,
+        Collection<Integer>... exp) {
+        Set<Integer> allExp = new HashSet<>();
+
+        for (Collection<Integer> col : exp)
+            assertTrue(allExp.addAll(col));
+
+        for (Cache.Entry<Integer, String> e : entries) {
+            assertNotNull(e.getKey());
+            assertNotNull(e.getValue());
+            assertEquals(expVal + e.getKey(), e.getValue());
+
+            assertTrue("Unexpected entry: " + e, allExp.remove(e.getKey()));
+        }
+
+        assertTrue("Expected entries not found: " + allExp, allExp.isEmpty());
+    }
 }


Mime
View raw message