ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: #ignite-286: wip.
Date Wed, 29 Apr 2015 10:41:29 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-286 5017f1269 -> 35b1b7a45


#ignite-286: wip.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35b1b7a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35b1b7a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35b1b7a4

Branch: refs/heads/ignite-286
Commit: 35b1b7a4539f621244a1a2737b744c8570a79a02
Parents: 5017f12
Author: ivasilinets <ivasilinets@gridgain.com>
Authored: Wed Apr 29 13:41:17 2015 +0300
Committer: ivasilinets <ivasilinets@gridgain.com>
Committed: Wed Apr 29 13:41:17 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheSwapManager.java  | 218 ++++++++++++-------
 .../offheap/GridOffHeapProcessor.java           |  40 +++-
 .../util/offheap/GridOffHeapPartitionedMap.java |  23 +-
 .../unsafe/GridUnsafePartitionedMap.java        | 172 +++++++--------
 .../cache/OffHeapTieredTransactionSelfTest.java |  26 ++-
 ...icOffHeapTieredMultiNodeFullApiSelfTest.java |  43 ++++
 ...idOffHeapPartitionedMapAbstractSelfTest.java |  20 +-
 .../IgniteCacheFullApiSelfTestSuite.java        |   1 +
 8 files changed, 345 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35b1b7a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 0f23e07..494238a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1321,16 +1321,16 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter
{
             return F.emptyIterator();
 
         if (primary && backup)
-            return keyIterator(offheap.iterator(spaceName, partitions(primary, backup)));
+            return keyIterator(offheap.iterator(spaceName));
 
         Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(),
topVer) :
             cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
 
-        return new PartitionsKeyIterator(parts) {
+        return new PartitionsAbstractIterator<KeyCacheObject>(parts) {
             @Override protected Iterator<KeyCacheObject> partitionIterator(int part)
                 throws IgniteCheckedException
             {
-                return keyIterator(offheap.iterator(spaceName, new int[]{part}));
+                return keyIterator(offheap.iterator(spaceName, part));
             }
         };
     }
@@ -1351,7 +1351,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(),
topVer) :
             cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
 
-        return new PartitionsKeyIterator(parts) {
+        return new PartitionsAbstractIterator<KeyCacheObject>(parts) {
             @Override protected Iterator<KeyCacheObject> partitionIterator(int part)
                 throws IgniteCheckedException
             {
@@ -1367,7 +1367,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (!offheapEnabled)
             return new GridEmptyCloseableIterator<>();
 
-        return lazyIterator(offheap.iterator(spaceName, partitions(true, true)));
+        return lazyIterator(offheap.iterator(spaceName));
     }
 
     /**
@@ -1562,7 +1562,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         checkIteratorQueue();
 
-        return new IteratorWrapper(offheap.iterator(spaceName, new int[]{part}));
+        return new IteratorWrapper(offheap.iterator(spaceName, part));
     }
 
     /**
@@ -1571,7 +1571,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param backup Include backups.
      * @return Off-heap iterator.
      */
-    public <T> GridCloseableIterator<T> rawOffHeapIterator(CX2<T2<Long,
Integer>, T2<Long, Integer>, T> c,
+    public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long,
Integer>, T2<Long, Integer>, T> c,
         boolean primary, boolean backup) {
         assert c != null;
 
@@ -1580,7 +1580,21 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         checkIteratorQueue();
 
-        return offheap.iterator(spaceName, c, partitions(primary, backup));
+        if (primary && backup)
+            return offheap.iterator(spaceName, c);
+
+        AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
+
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(),
ver) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+
+        return new ClosablePartitionsAbstractIterator<T, T>(parts) {
+            @Override protected GridCloseableIterator<T> partitionIterator(int part)
+                throws IgniteCheckedException
+            {
+                return offheap.iterator(spaceName, c, part);
+            }
+        };
     }
 
     /**
@@ -1592,59 +1606,59 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter
{
         if (!offheapEnabled || (!primary && !backup))
             return new GridEmptyCloseableIterator<>();
 
-        return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>()
{
-            private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it =
-                offheap.iterator(spaceName, partitions(primary, backup));
+        if (primary && backup)
+            return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>()
{
+                private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>>
it =
+                    offheap.iterator(spaceName);
 
-            private Map.Entry<byte[], byte[]> cur;
+                private Map.Entry<byte[], byte[]> cur;
 
-            @Override protected Map.Entry<byte[], byte[]> onNext() {
-                return cur = it.next();
-            }
+                @Override protected Map.Entry<byte[], byte[]> onNext() {
+                    return cur = it.next();
+                }
 
-            @Override protected boolean onHasNext() {
-                return it.hasNext();
-            }
+                @Override protected boolean onHasNext() {
+                    return it.hasNext();
+                }
 
-            @Override protected void onRemove() throws IgniteCheckedException {
-                KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey());
+                @Override protected void onRemove() throws IgniteCheckedException {
+                    KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey());
 
-                int part = cctx.affinity().partition(key);
+                    int part = cctx.affinity().partition(key);
 
-                offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
-            }
+                    offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                }
 
-            @Override protected void onClose() throws IgniteCheckedException {
-                it.close();
-            }
-        };
-    }
+                @Override protected void onClose() throws IgniteCheckedException {
+                    it.close();
+                }
+            };
 
-    /**
-     * @param primary Include primaries.
-     * @param backup Include backups.
-     * @return Partitions.
-     */
-    private int[] partitions(boolean primary, boolean backup) {
         AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
 
-        Set<Integer> parts = null;
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(),
ver) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
 
-        if (primary && backup) {
-            int[] primaryParts = U.toIntArray(cctx.affinity().primaryPartitions(cctx.localNodeId(),
ver));
+        return new ClosablePartitionsAbstractIterator<Map.Entry<byte[], byte[]>,
IgniteBiTuple<byte[], byte[]>>(parts) {
+            private Map.Entry<byte[], byte[]> cur;
 
-            int[] backupParts = U.toIntArray(cctx.affinity().backupPartitions(cctx.localNodeId(),
ver));
+            @Override protected Map.Entry<byte[], byte[]> onNext() {
+                return cur = super.onNext();
+            }
 
-            return U.addAll(primaryParts, backupParts);
-        }
+            @Override protected GridCloseableIterator<IgniteBiTuple<byte[], byte[]>>
partitionIterator(int part)
+                throws IgniteCheckedException {
+                return offheap.iterator(spaceName, part);
+            }
 
-        if (primary)
-            parts = cctx.affinity().primaryPartitions(cctx.localNodeId(), ver);
+            @Override protected void onRemove() throws IgniteCheckedException {
+                KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey());
 
-        if (backup)
-            parts = cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+                int part = cctx.affinity().partition(key);
 
-        return parts != null ? U.toIntArray(parts)  : new int[0];
+                offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+            }
+        };
     }
 
     /**
@@ -1680,14 +1694,18 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter
{
         if (primary && backup)
             return swapMgr.rawIterator(spaceName);
 
-        int[] parts = partitions(primary, backup);
-
-        List<GridIterator<Map.Entry<byte[], byte[]>>> iterators = new ArrayList<>();
+        AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
 
-        for (int i = 0; i < parts.length; ++i)
-           iterators.add(swapMgr.rawIterator(spaceName, parts[i]));
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(),
ver) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
 
-        return U.compoudIterator(iterators);
+        return new ClosablePartitionsAbstractIterator<Map.Entry<byte[], byte[]>,
Map.Entry<byte[], byte[]>>(parts) {
+            @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>>
partitionIterator(int part)
+                throws IgniteCheckedException
+            {
+                return swapMgr.rawIterator(spaceName, part);
+            }
+        };
     }
 
     /**
@@ -1712,7 +1730,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
 
         return new PartitionsIterator<K, V>(parts) {
-            @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>>
partitionIterator(int part)
+            @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>>
nextPartition(int part)
                 throws IgniteCheckedException
             {
                 return swapMgr.rawIterator(spaceName, part);
@@ -1742,8 +1760,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
 
         return new PartitionsIterator<K, V>(parts) {
-            @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>>
partitionIterator(int part) {
-                return offheap.iterator(spaceName, new int[]{part});
+            @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>>
nextPartition(int part) {
+                return offheap.iterator(spaceName, part);
             }
         };
     }
@@ -1942,20 +1960,47 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter
{
     /**
      *
      */
-    private abstract class PartitionsIterator<K, V> implements Iterator<Cache.Entry<K,
V>> {
+    private abstract class PartitionsIterator<K, V>  extends PartitionsAbstractIterator<Cache.Entry<K,
V>> {
+        /**
+         * @param parts Partitions
+         */
+        public PartitionsIterator(Collection<Integer> parts) {
+            super(parts);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Iterator<Cache.Entry<K, V>> partitionIterator(int
part)
+            throws IgniteCheckedException {
+            return cacheEntryIterator(
+                GridCacheSwapManager.this.<K, V>lazyIterator(nextPartition(part)));
+        }
+
+        /**
+         * @param part Partition.
+         * @return Iterator for given partition.
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>>
nextPartition(int part)
+            throws IgniteCheckedException;
+    }
+
+    /**
+     *
+     */
+    private abstract class PartitionsAbstractIterator<T> implements Iterator<T>
{
         /** */
         private Iterator<Integer> partIt;
 
         /** */
-        private Iterator<Cache.Entry<K, V>> curIt;
+        private Iterator<T> curIt;
 
         /** */
-        private Cache.Entry<K, V> next;
+        private T next;
 
         /**
          * @param parts Partitions
          */
-        public PartitionsIterator(Collection<Integer> parts) {
+        public PartitionsAbstractIterator(Collection<Integer> parts) {
             this.partIt = parts.iterator();
 
             advance();
@@ -1967,11 +2012,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter
{
         }
 
         /** {@inheritDoc} */
-        @Override public Cache.Entry<K, V> next() {
+        @Override public T next() {
             if (next == null)
                 throw new NoSuchElementException();
 
-            Cache.Entry<K, V> e = next;
+            T e = next;
 
             advance();
 
@@ -1995,8 +2040,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         int part = partIt.next();
 
                         try {
-                            curIt = cacheEntryIterator(
-                                GridCacheSwapManager.this.<K, V>lazyIterator(partitionIterator(part)));
+                            curIt = partitionIterator(part);
                         }
                         catch (IgniteCheckedException e) {
                             throw new IgniteException(e);
@@ -2022,58 +2066,69 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter
{
          * @return Iterator for given partition.
          * @throws IgniteCheckedException If failed.
          */
-        abstract protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>>
partitionIterator(int part)
+        abstract protected Iterator<T> partitionIterator(int part)
             throws IgniteCheckedException;
     }
 
     /**
      *
      */
-    private abstract class PartitionsKeyIterator implements Iterator<KeyCacheObject>
{
+    private abstract class ClosablePartitionsAbstractIterator<T, T1 extends T> extends
GridCloseableIteratorAdapter<T> {
         /** */
         private Iterator<Integer> partIt;
 
         /** */
-        private Iterator<KeyCacheObject> curIt;
+        protected GridCloseableIterator<T1> curIt;
 
         /** */
-        private KeyCacheObject next;
+        protected T next;
 
         /**
          * @param parts Partitions
          */
-        public PartitionsKeyIterator(Collection<Integer> parts) {
+        public ClosablePartitionsAbstractIterator(Collection<Integer> parts) {
             this.partIt = parts.iterator();
 
-            advance();
+            try {
+                advance();
+            }
+            catch (IgniteCheckedException e) {
+                e.printStackTrace(); // Should never happen.
+            }
         }
 
         /** {@inheritDoc} */
-        @Override public boolean hasNext() {
+        @Override protected boolean onHasNext() {
             return next != null;
         }
 
         /** {@inheritDoc} */
-        @Override public KeyCacheObject next() {
-            if (next == null)
-                throw new NoSuchElementException();
+        @Override protected T onNext() {
+            try {
+                if (next == null)
+                    throw new NoSuchElementException();
 
-            KeyCacheObject e = next;
+                T e = next;
 
-            advance();
+                advance();
 
-            return e;
+                return e;
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
         }
 
         /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException();
+        @Override protected void onClose() throws IgniteCheckedException {
+            if (curIt != null)
+                curIt.close();
         }
 
         /**
          * Switches to next element.
          */
-        private void advance() {
+        private void advance() throws IgniteCheckedException {
             next = null;
 
             do {
@@ -2096,8 +2151,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                         break;
                     }
-                    else
+                    else {
+                        curIt.close();
+
                         curIt = null;
+                    }
                 }
             }
             while (partIt.hasNext());
@@ -2108,7 +2166,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
          * @return Iterator for given partition.
          * @throws IgniteCheckedException If failed.
          */
-        abstract protected Iterator<KeyCacheObject> partitionIterator(int part)
+        abstract protected GridCloseableIterator<T1> partitionIterator(int part)
             throws IgniteCheckedException;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35b1b7a4/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
index ba09821..a99c4c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
@@ -268,13 +268,12 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * Gets iterator over contents of the given space.
      *
      * @param spaceName Space name.
-     * @param parts Partitions.
      * @return Iterator.
      */
-    public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator(@Nullable
String spaceName, int[] parts) {
+    public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator(@Nullable
String spaceName) {
         GridOffHeapPartitionedMap m = offheap(spaceName);
 
-        return m == null ? new GridEmptyCloseableIterator<IgniteBiTuple<byte[], byte[]>>()
: m.iterator(parts);
+        return m == null ? new GridEmptyCloseableIterator<IgniteBiTuple<byte[], byte[]>>()
: m.iterator();
     }
 
     /**
@@ -282,16 +281,32 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      *
      * @param spaceName Space name.
      * @param c Key/value closure.
-     * @param parts Partitions.
      * @return Iterator.
      */
     public <T> GridCloseableIterator<T> iterator(@Nullable String spaceName,
-        CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, int[] parts) {
+        CX2<T2<Long, Integer>, T2<Long, Integer>, T> c) {
         assert c != null;
 
         GridOffHeapPartitionedMap m = offheap(spaceName);
 
-        return m == null ? new GridEmptyCloseableIterator<T>() : m.iterator(c, parts);
+        return m == null ? new GridEmptyCloseableIterator<T>() : m.iterator(c);
+    }
+
+    /**
+     * Gets iterator over contents of the given space.
+     *
+     * @param spaceName Space name.
+     * @param c Key/value closure.
+     * @param part Partition.
+     * @return Iterator.
+     */
+    public <T> GridCloseableIterator<T> iterator(@Nullable String spaceName,
+        CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, int part) {
+        assert c != null;
+
+        GridOffHeapPartitionedMap m = offheap(spaceName);
+
+        return m == null ? new GridEmptyCloseableIterator<T>() : m.iterator(c, part);
     }
 
     /**
@@ -330,4 +345,17 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
 
         return m == null ? -1 : m.allocatedSize();
     }
+
+    /**
+     * Gets iterator over contents of partition.
+     *
+     * @param spaceName Space name.
+     * @param part Partition.
+     * @return Iterator.
+     */
+    public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator(@Nullable
String spaceName, int part) {
+        GridOffHeapPartitionedMap m = offheap(spaceName);
+
+        return m == null ? new GridEmptyCloseableIterator<IgniteBiTuple<byte[], byte[]>>()
: m.iterator(part);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35b1b7a4/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
index 7d00ee5..a945262 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
@@ -188,19 +188,34 @@ public interface GridOffHeapPartitionedMap {
     /**
      * Gets iterator over the whole map.
      *
-     * @param parts Partitions.
      * @return Iterator over the whole map.
      */
-    public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator(int[]
parts);
+    public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator();
 
     /**
      * Gets iterator over the whole map.
      *
      * @param c Key/value closure.
-     * @param parts Partitions.
      * @return Iterator over the whole map.
      */
-    public <T> GridCloseableIterator<T> iterator(CX2<T2<Long, Integer>,
T2<Long, Integer>, T> c, int[] parts);
+    public <T> GridCloseableIterator<T> iterator(CX2<T2<Long, Integer>,
T2<Long, Integer>, T> c);
+
+    /**
+     * Gets iterator over the partition.
+     *
+     * @param c Key/value closure.
+     * @param part Partition.
+     * @return Iterator over the partition.
+     */
+    public <T> GridCloseableIterator<T> iterator(CX2<T2<Long, Integer>,
T2<Long, Integer>, T> c, int part);
+
+    /**
+     * Gets iterator over certain partition.
+     *
+     * @param p Partition.
+     * @return Iterator over certain partition.
+     */
+    public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator(int
p);
 
     /**
      * Sets callback for when entries are evicted due to memory constraints.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35b1b7a4/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
index 7757296..4ffc33f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
@@ -276,26 +276,13 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap
{
     }
 
     /** {@inheritDoc} */
-    @Override public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator(final
int[] parts) {
-        return new GridCloseableIteratorAdapter<IgniteBiTuple<byte[], byte[]>>()
{
-            private int p;
-
-            private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> curIt;
-
-            {
-                try {
-                    advance();
-                }
-                catch (IgniteCheckedException e) {
-                    e.printStackTrace(); // Should never happen.
-                }
-            }
-
-            private void advance() throws IgniteCheckedException {
+    @Override public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator()
{
+        return new PartitionedMapCloseableIterator<IgniteBiTuple<byte[], byte[]>>()
{
+            protected void advance() throws IgniteCheckedException {
                 curIt = null;
 
-                while (p < parts.length) {
-                    curIt = mapFor(parts[p++]).iterator();
+                while (p < parts) {
+                    curIt = mapFor(p++).iterator();
 
                     if (curIt.hasNext())
                         return;
@@ -305,62 +292,19 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap
{
 
                 curIt = null;
             }
-
-            @Override protected IgniteBiTuple<byte[], byte[]> onNext() throws IgniteCheckedException
{
-                if (curIt == null)
-                    throw new NoSuchElementException();
-
-                IgniteBiTuple<byte[], byte[]> t = curIt.next();
-
-                if (!curIt.hasNext()) {
-                    curIt.close();
-
-                    advance();
-                }
-
-                return t;
-            }
-
-            @Override protected boolean onHasNext() {
-                return curIt != null;
-            }
-
-            @Override protected void onRemove() {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override protected void onClose() throws IgniteCheckedException {
-                if (curIt != null)
-                    curIt.close();
-            }
         };
     }
 
     /** {@inheritDoc} */
-    @Override public <T> GridCloseableIterator<T> iterator(final CX2<T2<Long,
Integer>, T2<Long, Integer>, T> c,
-        final int[] parts0)
-    {
+    @Override public <T> GridCloseableIterator<T> iterator(final CX2<T2<Long,
Integer>, T2<Long, Integer>, T> c) {
         assert c != null;
 
-        return new GridCloseableIteratorAdapter<T>() {
-            private int p;
-
-            private GridCloseableIterator<T> curIt;
-
-            {
-                try {
-                    advance();
-                }
-                catch (IgniteCheckedException e) {
-                    e.printStackTrace(); // Should never happen.
-                }
-            }
-
-            private void advance() throws IgniteCheckedException {
+        return new PartitionedMapCloseableIterator<T>() {
+            protected void advance() throws IgniteCheckedException {
                 curIt = null;
 
-                while (p < parts0.length) {
-                    curIt = mapFor(parts0[p++]).iterator(c);
+                while (p < parts) {
+                    curIt = mapFor(p++).iterator(c);
 
                     if (curIt.hasNext())
                         return;
@@ -370,35 +314,18 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap
{
 
                 curIt = null;
             }
+        };
+    }
 
-            @Override protected T onNext() throws IgniteCheckedException {
-                if (curIt == null)
-                    throw new NoSuchElementException();
-
-                T t = curIt.next();
-
-                if (!curIt.hasNext()) {
-                    curIt.close();
-
-                    advance();
-                }
-
-                return t;
-            }
-
-            @Override protected boolean onHasNext() {
-                return curIt != null;
-            }
-
-            @Override protected void onRemove() {
-                throw new UnsupportedOperationException();
-            }
+    /** {@inheritDoc} */
+    @Override public <T> GridCloseableIterator<T> iterator(final CX2<T2<Long,
Integer>, T2<Long, Integer>, T> c,
+       int part) {
+       return mapFor(part).iterator(c);
+    }
 
-            @Override protected void onClose() throws IgniteCheckedException {
-                if (curIt != null)
-                    curIt.close();
-            }
-        };
+    /** {@inheritDoc} */
+    @Override public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator(int
p) {
+        return mapFor(p).iterator();
     }
 
     /**
@@ -427,4 +354,63 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap
{
     public long lruSize() {
         return lru.size();
     }
+
+    /**
+     *  Partitioned closable iterator.
+     */
+    private abstract class PartitionedMapCloseableIterator<T> extends GridCloseableIteratorAdapter<T>
{
+        /** Current partition. */
+        protected int p;
+
+        /** Current iterator. */
+        protected GridCloseableIterator<T> curIt;
+
+        {
+            try {
+                advance();
+            }
+            catch (IgniteCheckedException e) {
+                e.printStackTrace(); // Should never happen.
+            }
+        }
+
+        /**
+         * Switch to next partition.
+         *
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract void advance() throws IgniteCheckedException;
+
+        /** {@inheritDoc} */
+        @Override protected T onNext() throws IgniteCheckedException {
+            if (curIt == null)
+                throw new NoSuchElementException();
+
+            T t = curIt.next();
+
+            if (!curIt.hasNext()) {
+                curIt.close();
+
+                advance();
+            }
+
+            return t;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean onHasNext() {
+            return curIt != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onRemove() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onClose() throws IgniteCheckedException {
+            if (curIt != null)
+                curIt.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35b1b7a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
index e7cf3ea..7af7b5d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/OffHeapTieredTransactionSelfTest.java
@@ -60,6 +60,8 @@ public class OffHeapTieredTransactionSelfTest extends GridCommonAbstractTest
{
 
         cfg.setCacheConfiguration(ccfg);
 
+        cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
+
         return cfg;
     }
 
@@ -84,6 +86,7 @@ public class OffHeapTieredTransactionSelfTest extends GridCommonAbstractTest
{
      * @throws Exception In case of error.
      */
     public void testPutAll() throws Exception {
+
         IgniteCache<String, Integer> cache = grid(0).cache(null);
 
         final int KEYS = 5;
@@ -93,12 +96,33 @@ public class OffHeapTieredTransactionSelfTest extends GridCommonAbstractTest
{
         for (int i = 0; i < KEYS; i++)
             data.put("key_" + i, i);
 
+        checkPutAll(cache, data, OPTIMISTIC, READ_COMMITTED);
+
+        checkPutAll(cache, data, OPTIMISTIC, REPEATABLE_READ);
+
+        checkPutAll(cache, data, OPTIMISTIC, SERIALIZABLE);
+
+        checkPutAll(cache, data, PESSIMISTIC, READ_COMMITTED);
+
+        checkPutAll(cache, data, PESSIMISTIC, REPEATABLE_READ);
+
+        checkPutAll(cache, data, PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    private void checkPutAll(IgniteCache<String, Integer> cache, Map<String, Integer>
data,
+        TransactionConcurrency txConcurrency, TransactionIsolation txIsolation) throws Exception
{
         IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
 
-        try (Transaction tx = txs.txStart(PESSIMISTIC, READ_COMMITTED)) {
+        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
             cache.putAll(data);
 
             tx.commit();
         }
+
+        for (Map.Entry<String, Integer> entry : data.entrySet())
+            assertEquals(entry.getValue(), cache.get(entry.getKey()));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35b1b7a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest.java
new file mode 100644
index 0000000..686cc31
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ * Tests colocated cache with off-heap tiered mode.
+ */
+public class GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest
extends
+    GridCachePartitionedNearDisabledOffHeapTieredMultiNodeFullApiSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean txEnabled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean lockingEnabled() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35b1b7a4/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
index d0e1aea..265beda 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMapAbstractSelfTest.java
@@ -64,19 +64,11 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends
GridComm
     /** */
     protected int parts = 17;
 
-    /** */
-    protected int[] allParts;
-
     /**
      *
      */
     protected GridOffHeapPartitionedMapAbstractSelfTest() {
         super(false);
-
-        allParts = new int[parts];
-
-        for (int i = 0; i < parts; ++i)
-            allParts[i] = i;
     }
 
     /** {@inheritDoc} */
@@ -460,7 +452,7 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends
GridComm
 
         int cnt = 0;
 
-        try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = map.iterator(allParts))
{
+        try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = map.iterator())
{
             while (it.hasNext()) {
                 IgniteBiTuple<byte[], byte[]> t = it.next();
 
@@ -538,7 +530,7 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends
GridComm
                     assertEquals(new String(map.get(p, hash(key), key.getBytes())), val);
                 }
 
-                try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it
= map.iterator(allParts)) {
+                try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it
= map.iterator()) {
                     while (it.hasNext()) {
                         IgniteBiTuple<byte[], byte[]> t = it.next();
 
@@ -557,7 +549,7 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends
GridComm
 
         int cnt = 0;
 
-        try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = map.iterator(allParts))
{
+        try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = map.iterator())
{
             while (it.hasNext()) {
                 IgniteBiTuple<byte[], byte[]> t = it.next();
 
@@ -608,7 +600,7 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends
GridComm
             @Override public void run() {
                 try {
                     while (running.get()) {
-                        try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>>
it = map.iterator(allParts)) {
+                        try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>>
it = map.iterator()) {
                             while (it.hasNext()) {
                                 IgniteBiTuple<byte[], byte[]> tup = it.next();
 
@@ -685,7 +677,7 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends
GridComm
 
                 int cnt = 0;
 
-                try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it
= map.iterator(new int[]{p})) {
+                try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it
= map.iterator(p)) {
                     while (it.hasNext()) {
                         IgniteBiTuple<byte[], byte[]> t = it.next();
 
@@ -760,7 +752,7 @@ public abstract class GridOffHeapPartitionedMapAbstractSelfTest extends
GridComm
                         assertNotNull(map.get(p, hash(key), key.getBytes()));
                         assertEquals(new String(map.get(p, hash(key), key.getBytes())), val);
 
-                        try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>>
it = map.iterator(new int[]{p})) {
+                        try (GridCloseableIterator<IgniteBiTuple<byte[], byte[]>>
it = map.iterator(p)) {
                             while (it.hasNext()) {
                                 IgniteBiTuple<byte[], byte[]> t = it.next();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35b1b7a4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
index 3067b9d..369e041 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
@@ -128,6 +128,7 @@ public class IgniteCacheFullApiSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheAtomicOffHeapTieredMultiNodeFullApiSelfTest.class);
         suite.addTestSuite(GridCacheAtomicPrimaryWrityOrderOffHeapTieredMultiNodeFullApiSelfTest.class);
         suite.addTestSuite(GridCachePartitionedNearDisabledOffHeapTieredMultiNodeFullApiSelfTest.class);
+        suite.addTestSuite(GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest.class);
 
         // Private cache API.
         suite.addTestSuite(GridCacheExLocalFullApiSelfTest.class);


Mime
View raw message