ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/3] incubator-ignite git commit: # ignite-51
Date Sun, 01 Mar 2015 09:08:18 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-51 2deec384a -> d7db0122b


# ignite-51


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2192069c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2192069c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2192069c

Branch: refs/heads/ignite-51
Commit: 2192069cd2bc167e28324478cbd71a84491d9f9d
Parents: 2deec38
Author: sboikov <semen.boikov@inria.fr>
Authored: Sat Feb 28 17:19:47 2015 +0300
Committer: sboikov <semen.boikov@inria.fr>
Committed: Sat Feb 28 17:19:47 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheSwapManager.java  | 191 ++++++++++++++++++-
 1 file changed, 186 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2192069c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 8dd66d1..4b21f8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1307,8 +1307,24 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Iterator over off-heap keys.
      */
     public Iterator<KeyCacheObject> offHeapKeyIterator(boolean primary, boolean backup,
long topVer) {
-        // TODO IGNITE-51.
-        return new GridEmptyCloseableIterator<>();
+        assert primary || backup;
+
+        if (!offheapEnabled)
+            return F.emptyIterator();
+
+        if (primary && backup)
+            return keyIterator(offheap.iterator(spaceName));
+
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(),
topVer) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
+
+        return new PartitionsKeyIterator(parts) {
+            @Override protected Iterator<KeyCacheObject> partitionIterator(int part)
+                throws IgniteCheckedException
+            {
+                return keyIterator(offheap.iterator(spaceName, part));
+            }
+        };
     }
 
     /**
@@ -1316,8 +1332,24 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      */
     public Iterator<KeyCacheObject> swapKeyIterator(boolean primary, boolean backup,
long topVer)
         throws IgniteCheckedException {
-        // TODO IGNITE-51.
-        return new GridEmptyCloseableIterator<>();
+        assert primary || backup;
+
+        if (!swapEnabled)
+            return F.emptyIterator();
+
+        if (primary && backup)
+            return keyIterator(cctx.gridSwap().rawIterator(spaceName));
+
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(),
topVer) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
+
+        return new PartitionsKeyIterator(parts) {
+            @Override protected Iterator<KeyCacheObject> partitionIterator(int part)
+                throws IgniteCheckedException
+            {
+                return keyIterator(swapMgr.rawIterator(spaceName, part));
+            }
+        };
     }
 
     /**
@@ -1371,7 +1403,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 cur = new Map.Entry<K, V>() {
                     @Override public K getKey() {
                         try {
-                            return unmarshalKey(cur0.getKey(), cctx.deploy().globalLoader());
+                            KeyCacheObject key = cctx.toCacheKeyObject(null, cur0.getKey());
+
+                            return key.value(cctx, false);
                         }
                         catch (IgniteCheckedException e) {
                             throw new IgniteException(e);
@@ -1445,6 +1479,67 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * Gets lazy iterator for which key and value are lazily deserialized.
+     *
+     * @param it Closeable iterator.
+     * @return Lazy iterator.
+     */
+    private Iterator<KeyCacheObject> keyIterator(
+        final GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> it)
{
+        if (it == null)
+            return new GridEmptyIterator<>();
+
+        checkIteratorQueue();
+
+        // Weak reference will hold hard reference to this iterator, so it can properly be
closed.
+        final GridCloseableIteratorAdapter<KeyCacheObject> iter = new GridCloseableIteratorAdapter<KeyCacheObject>()
{
+            private KeyCacheObject cur;
+
+            @Override protected KeyCacheObject onNext() {
+                try {
+                    cur = cctx.toCacheKeyObject(null, it.next().getKey());
+
+                    return cur;
+                }
+                catch(IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            @Override protected boolean onHasNext() {
+                return it.hasNext();
+            }
+
+            @Override protected void onRemove() throws IgniteCheckedException {
+                throw new IllegalArgumentException();
+            }
+
+            @Override protected void onClose() throws IgniteCheckedException {
+                it.close();
+            }
+        };
+
+        // Don't hold hard reference to this iterator - only weak one.
+        Iterator<KeyCacheObject> ret = new Iterator<KeyCacheObject>() {
+            @Override public boolean hasNext() {
+                return iter.hasNext();
+            }
+
+            @Override public KeyCacheObject next() {
+                return iter.next();
+            }
+
+            @Override public void remove() {
+                iter.remove();
+            }
+        };
+
+        itSet.add(new GridWeakIterator(ret, iter, itQ));
+
+        return ret;
+    }
+
+    /**
      * Gets offheap iterator over partition.
      *
      * @param part Partition to iterate over.
@@ -1886,4 +1981,90 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         abstract protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>>
partitionIterator(int part)
             throws IgniteCheckedException;
     }
+
+    /**
+     *
+     */
+    private abstract class PartitionsKeyIterator implements Iterator<KeyCacheObject>
{
+        /** */
+        private Iterator<Integer> partIt;
+
+        /** */
+        private Iterator<KeyCacheObject> curIt;
+
+        /** */
+        private KeyCacheObject next;
+
+        /**
+         * @param parts Partitions
+         */
+        public PartitionsKeyIterator(Collection<Integer> parts) {
+            this.partIt = parts.iterator();
+
+            advance();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return next != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public KeyCacheObject next() {
+            if (next == null)
+                throw new NoSuchElementException();
+
+            KeyCacheObject e = next;
+
+            advance();
+
+            return e;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Switches to next element.
+         */
+        private void advance() {
+            next = null;
+
+            do {
+                if (curIt == null) {
+                    if (partIt.hasNext()) {
+                        int part = partIt.next();
+
+                        try {
+                            curIt = partitionIterator(part);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                }
+
+                if (curIt != null) {
+                    if (curIt.hasNext()) {
+                        next = curIt.next();
+
+                        break;
+                    }
+                    else
+                        curIt = null;
+                }
+            }
+            while (partIt.hasNext());
+        }
+
+        /**
+         * @param part Partition.
+         * @return Iterator for given partition.
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract protected Iterator<KeyCacheObject> partitionIterator(int part)
+            throws IgniteCheckedException;
+    }
 }


Mime
View raw message