ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-6702: SQL: now COUNT(*) use BPlusTree.size instead of cursor iteration. This closes #3037.
Date Mon, 04 Dec 2017 12:25:10 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 8292335e4 -> 08ab9af8a


IGNITE-6702: SQL: now COUNT(*) use BPlusTree.size instead of cursor iteration. This closes #3037.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08ab9af8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08ab9af8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08ab9af8

Branch: refs/heads/master
Commit: 08ab9af8a94c53c1cd58d5763f2b9d594f3ca58e
Parents: 8292335
Author: gg-shq <kirill.shirokov@gmail.com>
Authored: Mon Dec 4 15:25:00 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Mon Dec 4 15:25:00 2017 +0300

----------------------------------------------------------------------
 .../internal/jdbc2/JdbcLocalCachesSelfTest.java |  28 +
 .../cache/persistence/tree/BPlusTree.java       | 141 +++-
 .../processors/database/BPlusTreeSelfTest.java  | 819 ++++++++++++++++++-
 .../query/h2/database/H2TreeIndex.java          |  92 ++-
 ...lexClientAtomicPartitionedNoBackupsTest.java |  34 +
 ...exingComplexClientAtomicPartitionedTest.java |   2 +-
 ...dexingComplexClientAtomicReplicatedTest.java |   2 +-
 ...ntTransactionalPartitionedNoBackupsTest.java |  34 +
 ...mplexClientTransactionalPartitionedTest.java |   2 +-
 ...omplexClientTransactionalReplicatedTest.java |   2 +-
 ...lexServerAtomicPartitionedNoBackupsTest.java |  34 +
 ...exingComplexServerAtomicPartitionedTest.java |   2 +-
 ...dexingComplexServerAtomicReplicatedTest.java |   2 +-
 ...erTransactionalPartitionedNoBackupsTest.java |  34 +
 ...mplexServerTransactionalPartitionedTest.java |   2 +-
 ...omplexServerTransactionalReplicatedTest.java |   2 +-
 .../index/H2DynamicIndexingComplexTest.java     |  22 +-
 .../query/IgniteSqlSegmentedIndexSelfTest.java  | 124 ++-
 18 files changed, 1288 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java
index f096e69..09ccbc9 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java
@@ -129,6 +129,34 @@ public class JdbcLocalCachesSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Verifies that <code>select count(*)</code> behaves correctly in
+     * {@link org.apache.ignite.cache.CacheMode#LOCAL} mode.
+     *
+     * @throws Exception If failed.
+     */
+    public void testCountAll() throws Exception {
+        Properties cfg = new Properties();
+
+        cfg.setProperty(PROP_NODE_ID, grid(0).localNode().id().toString());
+
+        Connection conn = null;
+
+        try {
+            conn = DriverManager.getConnection(BASE_URL, cfg);
+
+            ResultSet rs = conn.createStatement().executeQuery("select count(*) from Integer");
+
+            assertTrue(rs.next());
+
+            assertEquals(2L, rs.getLong(1));
+        }
+        finally {
+            if (conn != null)
+                conn.close();
+        }
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testCache2() throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 8e6e099..436a69d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.Bool.DONE;
 import static org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.Bool.FALSE;
@@ -1906,57 +1907,122 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     }
 
     /**
-     * !!! For debug only! May produce wrong results on concurrent access.
+     * Returns number of elements in the tree by scanning pages of the bottom (leaf) level.
+     * Since a concurrent access is permitted, there is no guarantee about
+     * momentary consistency: the method may miss updates made in already scanned pages.
      *
-     * @return Size.
+     * @return Number of elements in the tree.
      * @throws IgniteCheckedException If failed.
      */
     @Override public final long size() throws IgniteCheckedException {
+        return size(null);
+    }
+
+    /**
+     * Returns number of elements in the tree that match the filter by scanning through the pages of the leaf level.
+     * Since a concurrent access to the tree is permitted, there is no guarantee about
+     * momentary consistency: the method may not see updates made in already scanned pages.
+     *
+     * @param filter The filter to use or null to count all elements.
+     * @return Number of either all elements in the tree or the elements that match the filter.
+     * @throws IgniteCheckedException If failed.
+     */
+    public long size(@Nullable TreeRowClosure<L, T> filter) throws IgniteCheckedException {
         checkDestroyed();
 
-        long pageId;
+        for (;;) {
+            long curPageId;
 
-        long metaPage = acquirePage(metaPageId);
-        try {
-            pageId = getFirstPageId(metaPageId, metaPage, 0); // Level 0 is always at the bottom.
-        }
-        finally {
-            releasePage(metaPageId, metaPage);
-        }
+            long metaPage = acquirePage(metaPageId);
 
-        BPlusIO<L> io = null;
+            try {
+                curPageId = getFirstPageId(metaPageId, metaPage, 0); // Level 0 is always at the bottom.
+            }
+            finally {
+                releasePage(metaPageId, metaPage);
+            }
 
-        long cnt = 0;
+            long cnt = 0;
 
-        while (pageId != 0) {
-            long curId = pageId;
-            long curPage = acquirePage(curId);
+            long curPage = acquirePage(curPageId);
             try {
-                long curAddr = readLock(curId, curPage); // No correctness guaranties.
+                long curPageAddr = readLock(curPageId, curPage);
+
+                if (curPageAddr == 0)
+                    continue; // The first page has gone: restart scan.
 
                 try {
-                    if (io == null) {
-                        io = io(curAddr);
+                    BPlusIO<L> io = io(curPageAddr);
 
-                        assert io.isLeaf();
-                    }
+                    assert io.isLeaf();
+
+                    for (;;) {
+                        int curPageSize = io.getCount(curPageAddr);
+
+                        if (filter == null)
+                            cnt += curPageSize;
+                        else {
+                            for (int i = 0; i < curPageSize; ++i) {
+                                if (filter.apply(this, io, curPageAddr, i))
+                                    cnt++;
+                            }
+                        }
+
+                        long nextPageId = io.getForward(curPageAddr);
+
+                        if (nextPageId == 0) {
+                            checkDestroyed();
+
+                            return cnt;
+                        }
+
+                        long nextPage = acquirePage(nextPageId);
+
+                        try {
+                            long nextPageAddr = readLock(nextPageId, nextPage);
+
+                            // In the current implementation the next page can't change when the current page is locked.
+                            assert nextPageAddr != 0 : nextPageAddr;
+
+                            try {
+                                long pa = curPageAddr;
+                                curPageAddr = 0; // Set to zero to avoid double unlocking in finalizer.
+
+                                readUnlock(curPageId, curPage, pa);
 
-                    cnt += io.getCount(curAddr);
+                                long p = curPage;
+                                curPage = 0; // Set to zero to avoid double release in finalizer.
 
-                    pageId = io.getForward(curAddr);
+                                releasePage(curPageId, p);
+
+                                curPageId = nextPageId;
+                                curPage = nextPage;
+                                curPageAddr = nextPageAddr;
+
+                                nextPage = 0;
+                                nextPageAddr = 0;
+                            }
+                            finally {
+                                if (nextPageAddr != 0)
+                                    readUnlock(nextPageId, nextPage, nextPageAddr);
+                            }
+                        }
+                        finally {
+                            if (nextPage != 0)
+                                releasePage(nextPageId, nextPage);
+                        }
+                    }
                 }
                 finally {
-                    readUnlock(curId, curPage, curAddr);
+                    if (curPageAddr != 0)
+                        readUnlock(curPageId, curPage, curPageAddr);
                 }
             }
             finally {
-                releasePage(curId, curPage);
+                if (curPage != 0)
+                    releasePage(curPageId, curPage);
             }
         }
-
-        checkDestroyed();
-
-        return cnt;
     }
 
     /**
@@ -4803,4 +4869,23 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         /** */
         DONE
     }
+
+    /**
+     * A generic visitor-style interface for performing filtering/modifications/miscellaneous operations on the tree.
+     */
+    public interface TreeRowClosure<L, T extends L> {
+        /**
+         * Performs inspection or operation on a specified row and returns true if this row is
+         * required or matches or /operation successful (depending on the context).
+         *
+         * @param tree The tree.
+         * @param io Th tree IO object.
+         * @param pageAddr The page address.
+         * @param idx The item index.
+         * @return {@code True} if the item passes the predicate.
+         * @throws IgniteCheckedException If failed.
+         */
+        public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx)
+            throws IgniteCheckedException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 7b4ca13..85d269f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -17,14 +17,22 @@
 
 package org.apache.ignite.internal.processors.database;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -32,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
+
+import com.google.common.base.Predicate;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -104,6 +114,9 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     /** */
     private static int RMV_INC = 1;
 
+    /** Forces printing lock/unlock events on the test tree */
+    private static boolean PRINT_LOCKS = false;
+
     /** */
     protected PageMemory pageMem;
 
@@ -1077,10 +1090,6 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         for (long i = 15; i >= 0; i--)
             tree.put(i);
-
-
-
-
     }
 
     /**
@@ -1156,6 +1165,790 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Verifies that {@link BPlusTree#size} and {@link BPlusTree#size} methods behave correctly
+     * on single-threaded addition and removal of elements in random order.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testSizeForPutRmvSequential() throws IgniteCheckedException {
+        MAX_PER_PAGE = 5;
+
+        boolean DEBUG_PRINT = false;
+
+        int itemCnt = (int) Math.pow(MAX_PER_PAGE, 5) + rnd.nextInt(MAX_PER_PAGE * MAX_PER_PAGE);
+
+        Long[] items = new Long[itemCnt];
+        for (int i = 0; i < itemCnt; ++i)
+            items[i] = (long) i;
+
+        TestTree testTree = createTestTree(true);
+        TreeMap<Long,Long> goldenMap = new TreeMap<>();
+
+        assertEquals(0, testTree.size());
+        assertEquals(0, goldenMap.size());
+
+        final Predicate<Long> rowMatcher = new Predicate<Long>() {
+            @Override public boolean apply(Long row) {
+                return row % 7 == 0;
+            }
+        };
+
+        final BPlusTree.TreeRowClosure<Long, Long> rowClosure = new BPlusTree.TreeRowClosure<Long, Long>() {
+            @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+                throws IgniteCheckedException {
+                return rowMatcher.apply(io.getLookupRow(tree, pageAddr, idx));
+            }
+        };
+
+        int correctMatchingRows = 0;
+
+        Collections.shuffle(Arrays.asList(items), rnd);
+
+        for (Long row : items) {
+            if (DEBUG_PRINT) {
+                X.println(" --> put(" + row + ")");
+                X.print(testTree.printTree());
+            }
+
+            assertEquals(goldenMap.put(row, row), testTree.put(row));
+            assertEquals(row, testTree.findOne(row));
+
+            if (rowMatcher.apply(row))
+                ++correctMatchingRows;
+
+            assertEquals(correctMatchingRows, testTree.size(rowClosure));
+
+            long correctSize = goldenMap.size();
+
+            assertEquals(correctSize, testTree.size());
+            assertEquals(correctSize, size(testTree.find(null, null)));
+
+            assertNoLocks();
+        }
+
+        Collections.shuffle(Arrays.asList(items), rnd);
+
+        for (Long row : items) {
+            if (DEBUG_PRINT) {
+                X.println(" --> rmv(" + row + ")");
+                X.print(testTree.printTree());
+            }
+
+            assertEquals(row, goldenMap.remove(row));
+            assertEquals(row, testTree.remove(row));
+            assertNull(testTree.findOne(row));
+
+            if (rowMatcher.apply(row))
+                --correctMatchingRows;
+
+            assertEquals(correctMatchingRows, testTree.size(rowClosure));
+
+            long correctSize = goldenMap.size();
+
+            assertEquals(correctSize, testTree.size());
+            assertEquals(correctSize, size(testTree.find(null, null)));
+
+            assertNoLocks();
+        }
+    }
+
+    /**
+     * Verifies that {@link BPlusTree#size()} method behaves correctly when run concurrently with
+     * {@link BPlusTree#put}, {@link BPlusTree#remove} methods. Please see details in
+     * {@link #doTestSizeForRandomPutRmvMultithreaded}.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSizeForRandomPutRmvMultithreaded_5_4() throws Exception {
+        MAX_PER_PAGE = 5;
+        CNT = 10_000;
+
+        doTestSizeForRandomPutRmvMultithreaded(4);
+    }
+
+    public void testSizeForRandomPutRmvMultithreaded_3_256() throws Exception {
+        MAX_PER_PAGE = 3;
+        CNT = 10_000;
+
+        doTestSizeForRandomPutRmvMultithreaded(256);
+    }
+
+    /**
+     * Verifies that {@link BPlusTree#size()} method behaves correctly when run between series of
+     * concurrent {@link BPlusTree#put}, {@link BPlusTree#remove} methods.
+     *
+     * @param rmvPutSlidingWindowSize Sliding window size (distance between items being deleted and added).
+     * @throws Exception If failed.
+     */
+    private void doTestSizeForRandomPutRmvMultithreaded(final int rmvPutSlidingWindowSize) throws Exception {
+        final TestTree tree = createTestTree(false);
+
+        final boolean DEBUG_PRINT = false;
+
+        final AtomicLong curRmvKey = new AtomicLong(0);
+        final AtomicLong curPutKey = new AtomicLong(rmvPutSlidingWindowSize);
+
+        for (long i = curRmvKey.get(); i < curPutKey.get(); ++i)
+            assertNull(tree.put(i));
+
+        final int putRmvThreadCnt = Math.min(Runtime.getRuntime().availableProcessors(), rmvPutSlidingWindowSize);
+
+        final int loopCnt = CNT / putRmvThreadCnt;
+
+        final CyclicBarrier putRmvOpBarrier = new CyclicBarrier(putRmvThreadCnt);
+        final CyclicBarrier sizeOpBarrier = new CyclicBarrier(putRmvThreadCnt);
+
+        IgniteInternalFuture<?> putRmvFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+
+                for (int i = 0; i < loopCnt && !stop.get(); ++i) {
+                    putRmvOpBarrier.await();
+
+                    Long putVal = curPutKey.getAndIncrement();
+
+                    if (DEBUG_PRINT || (i & 0x7ff) == 0)
+                        X.println(" --> put(" + putVal + ")");
+
+                    assertNull(tree.put(putVal));
+
+                    assertNoLocks();
+
+                    Long rmvVal = curRmvKey.getAndIncrement();
+
+                    if (DEBUG_PRINT || (i & 0x7ff) == 0)
+                        X.println(" --> rmv(" + rmvVal + ")");
+
+                    assertEquals(rmvVal, tree.remove(rmvVal));
+                    assertNull(tree.remove(rmvVal));
+
+                    assertNoLocks();
+
+                    if (stop.get())
+                        break;
+
+                    sizeOpBarrier.await();
+
+                    long correctSize = curPutKey.get() - curRmvKey.get();
+
+                    if (DEBUG_PRINT || (i & 0x7ff) == 0)
+                        X.println("====> correctSize=" + correctSize);
+
+                    assertEquals(correctSize, size(tree.find(null, null)));
+                    assertEquals(correctSize, tree.size());
+                }
+
+                return null;
+            }
+        }, putRmvThreadCnt, "put-remove-size");
+
+        IgniteInternalFuture<?> lockPrintingFut = multithreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                while (!stop.get()) {
+                    Thread.sleep(5000);
+
+                    X.println(TestTree.printLocks());
+                }
+
+                return null;
+            }
+        }, 1, "printLocks");
+
+        asyncRunFut = new GridCompoundFuture<>();
+
+        asyncRunFut.add((IgniteInternalFuture) putRmvFut);
+        asyncRunFut.add((IgniteInternalFuture) lockPrintingFut);
+
+        asyncRunFut.markInitialized();
+
+        try {
+            putRmvFut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+        }
+        finally {
+            stop.set(true);
+            putRmvOpBarrier.reset();
+            sizeOpBarrier.reset();
+
+            asyncRunFut.get();
+        }
+
+        tree.validateTree();
+
+        assertNoLocks();
+    }
+
+    /**
+     * Verifies that concurrent running of {@link BPlusTree#put} + {@link BPlusTree#remove} sequence
+     * and {@link BPlusTree#size} methods results in correct calculation of tree size.
+     *
+     * @see #doTestSizeForRandomPutRmvMultithreadedAsync doTestSizeForRandomPutRmvMultithreadedAsync() for details.
+     */
+    public void testSizeForRandomPutRmvMultithreadedAsync_16() throws Exception {
+        doTestSizeForRandomPutRmvMultithreadedAsync(16);
+    }
+
+    /**
+     * Verifies that concurrent running of {@link BPlusTree#put} + {@link BPlusTree#remove} sequence
+     * and {@link BPlusTree#size} methods results in correct calculation of tree size.
+     *
+     * @see #doTestSizeForRandomPutRmvMultithreadedAsync doTestSizeForRandomPutRmvMultithreadedAsync() for details.
+     */
+    public void testSizeForRandomPutRmvMultithreadedAsync_3() throws Exception {
+        doTestSizeForRandomPutRmvMultithreadedAsync(3);
+    }
+
+    /**
+     * Verifies that concurrent running of {@link BPlusTree#put} + {@link BPlusTree#remove} sequence
+     * and {@link BPlusTree#size} methods results in correct calculation of tree size.
+     *
+     * Since in the presence of concurrent modifications the size may differ from the actual one, the test maintains
+     * sliding window of records in the tree, uses a barrier between concurrent runs to limit runaway delta in
+     * the calculated size, and checks that the measured size lies within certain bounds.
+     *
+     * NB: This test has to be changed with the integration of IGNITE-3478.
+     *
+     */
+    public void doTestSizeForRandomPutRmvMultithreadedAsync(final int rmvPutSlidingWindowSize) throws Exception {
+        MAX_PER_PAGE = 5;
+
+        final boolean DEBUG_PRINT = false;
+
+        final TestTree tree = createTestTree(false);
+
+        final AtomicLong curRmvKey = new AtomicLong(0);
+        final AtomicLong curPutKey = new AtomicLong(rmvPutSlidingWindowSize);
+
+        for (long i = curRmvKey.get(); i < curPutKey.get(); ++i)
+            assertNull(tree.put(i));
+
+        final int putRmvThreadCnt = Math.min(Runtime.getRuntime().availableProcessors(), rmvPutSlidingWindowSize);
+        final int sizeThreadCnt = putRmvThreadCnt;
+
+        final CyclicBarrier putRmvOpBarrier = new CyclicBarrier(putRmvThreadCnt + sizeThreadCnt, new Runnable() {
+            @Override public void run() {
+                if (DEBUG_PRINT) {
+                    try {
+                        X.println("===BARRIER=== size=" + tree.size()
+                            + "; contents=[" + tree.findFirst() + ".." + tree.findLast() + "]"
+                            + "; rmvVal=" + curRmvKey.get() + "; putVal=" + curPutKey.get());
+
+                        X.println(tree.printTree());
+                    }
+                    catch (IgniteCheckedException e) {
+                        // ignore
+                    }
+                }
+            }
+        });
+
+        final int loopCnt = 500;
+
+        IgniteInternalFuture<?> putRmvFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < loopCnt && !stop.get(); ++i) {
+                    int order;
+                    try {
+                        order = putRmvOpBarrier.await();
+                    } catch (BrokenBarrierException e) {
+                        break;
+                    }
+
+                    Long putVal = curPutKey.getAndIncrement();
+
+                    if (DEBUG_PRINT || (i & 0x3ff) == 0)
+                        X.println(order + ": --> put(" + putVal + ")");
+
+                    assertNull(tree.put(putVal));
+
+                    Long rmvVal = curRmvKey.getAndIncrement();
+
+                    if (DEBUG_PRINT || (i & 0x3ff) == 0)
+                        X.println(order + ": --> rmv(" + rmvVal + ")");
+
+                    assertEquals(rmvVal, tree.remove(rmvVal));
+                    assertNull(tree.findOne(rmvVal));
+                }
+
+                return null;
+            }
+        }, putRmvThreadCnt, "put-remove");
+
+        IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+
+                final List<Long> treeContents = new ArrayList<>(rmvPutSlidingWindowSize * 2);
+
+                final BPlusTree.TreeRowClosure<Long, Long> rowDumper = new BPlusTree.TreeRowClosure<Long, Long>() {
+                    @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+                        throws IgniteCheckedException {
+
+                        treeContents.add(io.getLookupRow(tree, pageAddr, idx));
+                        return true;
+                    }
+                };
+
+                for (long iter = 0; !stop.get(); ++iter) {
+                    int order = 0;
+
+                    try {
+                        order = putRmvOpBarrier.await();
+                    } catch (BrokenBarrierException e) {
+                        break;
+                    }
+
+                    long correctSize = curPutKey.get() - curRmvKey.get();
+
+                    treeContents.clear();
+                    long treeSize = tree.size(rowDumper);
+
+                    long minBound = correctSize - putRmvThreadCnt;
+                    long maxBound = correctSize + putRmvThreadCnt;
+
+                    if (DEBUG_PRINT || (iter & 0x3ff) == 0)
+                      X.println(order + ": size=" + treeSize + "; bounds=[" + minBound + ".." + maxBound
+                            + "]; contents=" + treeContents);
+
+                    if (treeSize < minBound || treeSize > maxBound) {
+                        fail("Tree size is not in bounds ["  + minBound + ".." + maxBound + "]: " + treeSize
+                            + "; Tree contents: " + treeContents);
+                    }
+                }
+
+                return null;
+            }
+        }, sizeThreadCnt, "size");
+
+        IgniteInternalFuture<?> lockPrintingFut = multithreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                while (!stop.get()) {
+                    Thread.sleep(5000);
+
+                    X.println(TestTree.printLocks());
+                }
+
+                return null;
+            }
+        }, 1, "printLocks");
+
+        asyncRunFut = new GridCompoundFuture<>();
+
+        asyncRunFut.add((IgniteInternalFuture) putRmvFut);
+        asyncRunFut.add((IgniteInternalFuture) sizeFut);
+        asyncRunFut.add((IgniteInternalFuture) lockPrintingFut);
+
+        asyncRunFut.markInitialized();
+
+        try {
+            putRmvFut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+        }
+        finally {
+            stop.set(true);
+            putRmvOpBarrier.reset();
+
+            asyncRunFut.get();
+        }
+
+        tree.validateTree();
+
+        assertNoLocks();
+    }
+
+    /**
+     * The test forces {@link BPlusTree#size} method to run into a livelock: during single run
+     * the method is picking up new pages which are concurrently added to the tree until the new pages are not added
+     * anymore. Test verifies that despite livelock condition a size from a valid range is returned.
+     *
+     * NB: This test has to be changed with the integration of IGNITE-3478.
+     *
+     * @throws Exception if test failed
+     */
+    public void testPutSizeLivelock() throws Exception {
+        MAX_PER_PAGE = 5;
+        CNT = 800;
+
+        final int SLIDING_WINDOW_SIZE = 16;
+        final boolean DEBUG_PRINT = false;
+
+        final TestTree tree = createTestTree(false);
+
+        final AtomicLong curRmvKey = new AtomicLong(0);
+        final AtomicLong curPutKey = new AtomicLong(SLIDING_WINDOW_SIZE);
+
+        for (long i = curRmvKey.get(); i < curPutKey.get(); ++i)
+            assertNull(tree.put(i));
+
+        final int hwThreads = Runtime.getRuntime().availableProcessors();
+        final int putRmvThreadCnt = Math.max(1, hwThreads / 2);
+        final int sizeThreadCnt = hwThreads - putRmvThreadCnt;
+
+        final CyclicBarrier putRmvOpBarrier = new CyclicBarrier(putRmvThreadCnt, new Runnable() {
+            @Override public void run() {
+                if (DEBUG_PRINT) {
+                    try {
+                        X.println("===BARRIER=== size=" + tree.size()
+                            + " [" + tree.findFirst() + ".." + tree.findLast() + "]");
+                    }
+                    catch (IgniteCheckedException e) {
+                        // ignore
+                    }
+                }
+            }
+        });
+
+        final int loopCnt = CNT / hwThreads;
+
+        IgniteInternalFuture<?> putRmvFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < loopCnt && !stop.get(); ++i) {
+                    int order;
+                    try {
+                        order = putRmvOpBarrier.await();
+                    } catch (BrokenBarrierException e) {
+                        // barrier reset() has been called: terminate
+                        break;
+                    }
+
+                    Long putVal = curPutKey.getAndIncrement();
+
+                    if ((i & 0xff) == 0)
+                        X.println(order + ": --> put(" + putVal + ")");
+
+                    assertNull(tree.put(putVal));
+
+                    Long rmvVal = curRmvKey.getAndIncrement();
+
+                    if ((i & 0xff) == 0)
+                        X.println(order + ": --> rmv(" + rmvVal + ")");
+
+                    assertEquals(rmvVal, tree.remove(rmvVal));
+                    assertNull(tree.findOne(rmvVal));
+                }
+
+                return null;
+            }
+        }, putRmvThreadCnt, "put-remove");
+
+        IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+
+                final List<Long> treeContents = new ArrayList<>(SLIDING_WINDOW_SIZE * 2);
+
+                final BPlusTree.TreeRowClosure<Long, Long> rowDumper = new BPlusTree.TreeRowClosure<Long, Long>() {
+                    @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+                        throws IgniteCheckedException {
+
+                        treeContents.add(io.getLookupRow(tree, pageAddr, idx));
+
+                        final long endMs = System.currentTimeMillis() + 10;
+                        final long endPutKey = curPutKey.get() + MAX_PER_PAGE;
+
+                        while (System.currentTimeMillis() < endMs && curPutKey.get() < endPutKey)
+                            Thread.yield();
+
+                        return true;
+                    }
+                };
+
+                while (!stop.get()) {
+                    treeContents.clear();
+
+                    long treeSize = tree.size(rowDumper);
+                    long curPutVal = curPutKey.get();
+
+                    X.println(" ======> size=" + treeSize + "; last-put-value=" + curPutVal);
+
+                    if (treeSize < SLIDING_WINDOW_SIZE || treeSize > curPutVal)
+                        fail("Tree size is not in bounds [" + SLIDING_WINDOW_SIZE + ".." + curPutVal + "]:"
+                            + treeSize + "; contents=" + treeContents);
+                }
+
+                return null;
+            }
+        }, sizeThreadCnt, "size");
+
+        asyncRunFut = new GridCompoundFuture<>();
+
+        asyncRunFut.add((IgniteInternalFuture) putRmvFut);
+        asyncRunFut.add((IgniteInternalFuture) sizeFut);
+
+        asyncRunFut.markInitialized();
+
+        try {
+            putRmvFut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+        }
+        finally {
+            stop.set(true);
+            putRmvOpBarrier.reset();
+
+            asyncRunFut.get();
+        }
+
+        tree.validateTree();
+
+        assertNoLocks();
+    }
+
+    /**
+     * Verifies that in case for threads concurrently calling put and remove
+     * on a tree with 1-3 pages, the size() method performs correctly.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutRmvSizeSinglePageContention() throws Exception {
+        MAX_PER_PAGE = 10;
+        CNT = 20_000;
+        final boolean DEBUG_PRINT = false;
+        final int SLIDING_WINDOWS_SIZE = MAX_PER_PAGE * 2;
+
+        final TestTree tree = createTestTree(false);
+
+        final AtomicLong curPutKey = new AtomicLong(0);
+        final BlockingQueue<Long> rowsToRemove = new ArrayBlockingQueue<>(MAX_PER_PAGE / 2);
+
+        final int hwThreadCnt = Runtime.getRuntime().availableProcessors();
+        final int putThreadCnt = Math.max(1, hwThreadCnt / 4);
+        final int rmvThreadCnt = Math.max(1, hwThreadCnt / 2 - putThreadCnt);
+        final int sizeThreadCnt = Math.max(1, hwThreadCnt - putThreadCnt - rmvThreadCnt);
+
+        final AtomicInteger sizeInvokeCnt = new AtomicInteger(0);
+
+        final int loopCnt = CNT;
+
+        IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int iter = 0;
+                while (!stop.get()) {
+                    long size = tree.size();
+
+                    if (DEBUG_PRINT || (++iter & 0xffff) == 0)
+                        X.println(" --> size() = " + size);
+
+                    sizeInvokeCnt.incrementAndGet();
+                }
+
+                return null;
+            }
+        }, sizeThreadCnt, "size");
+
+        // Let the size threads ignite
+        while (sizeInvokeCnt.get() < sizeThreadCnt * 2)
+            Thread.yield();
+
+        IgniteInternalFuture<?> rmvFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int iter = 0;
+                while(!stop.get()) {
+                    Long rmvVal = rowsToRemove.poll(200, TimeUnit.MILLISECONDS);
+                    if (rmvVal != null)
+                        assertEquals(rmvVal, tree.remove(rmvVal));
+
+                    if (DEBUG_PRINT || (++iter & 0x3ff) == 0)
+                        X.println(" --> rmv(" + rmvVal + ")");
+                }
+
+                return null;
+            }
+        }, rmvThreadCnt, "rmv");
+
+        IgniteInternalFuture<?> putFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < loopCnt && !stop.get(); ++i) {
+                    Long putVal = curPutKey.getAndIncrement();
+                    assertNull(tree.put(putVal));
+
+                    while (rowsToRemove.size() > SLIDING_WINDOWS_SIZE && !stop.get())
+                        Thread.yield();
+
+                    rowsToRemove.put(putVal);
+
+                    if (DEBUG_PRINT || (i & 0x3ff) == 0)
+                        X.println(" --> put(" + putVal + ")");
+                }
+
+                return null;
+            }
+        }, putThreadCnt, "put");
+
+        IgniteInternalFuture<?> treePrintFut = multithreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                while (!stop.get()) {
+                    Thread.sleep(1000);
+
+                    X.println(TestTree.printLocks());
+                    X.println(tree.printTree());
+                }
+
+                return null;
+            }
+        }, 1, "printTree");
+
+        asyncRunFut = new GridCompoundFuture<>();
+
+        asyncRunFut.add((IgniteInternalFuture) sizeFut);
+        asyncRunFut.add((IgniteInternalFuture) rmvFut);
+        asyncRunFut.add((IgniteInternalFuture) putFut);
+        asyncRunFut.add((IgniteInternalFuture) treePrintFut);
+
+        asyncRunFut.markInitialized();
+
+        try {
+            putFut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+        }
+        finally {
+            stop.set(true);
+
+            asyncRunFut.get();
+        }
+
+        tree.validateTree();
+
+        assertNoLocks();
+    }
+
+    /**
+     * The test verifies that {@link BPlusTree#put}, {@link BPlusTree#remove}, {@link BPlusTree#find}, and
+     * {@link BPlusTree#size} run concurrently, perform correctly and report correct values.
+     *
+     * A sliding window of numbers is maintainted in the tests.
+     *
+     * NB: This test has to be changed with the integration of IGNITE-3478.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPutRmvFindSizeMultithreaded() throws Exception {
+        MAX_PER_PAGE = 5;
+        CNT = 60_000;
+
+        final int SLIDING_WINDOW_SIZE = 100;
+
+        final TestTree tree = createTestTree(false);
+
+        final AtomicLong curPutKey = new AtomicLong(0);
+        final BlockingQueue<Long> rowsToRemove = new ArrayBlockingQueue<>(SLIDING_WINDOW_SIZE);
+
+        final int hwThreadCnt = Runtime.getRuntime().availableProcessors();
+        final int putThreadCnt = Math.max(1, hwThreadCnt / 4);
+        final int rmvThreadCnt = Math.max(1, hwThreadCnt / 4);
+        final int findThreadCnt = Math.max(1, hwThreadCnt / 4);
+        final int sizeThreadCnt = Math.max(1, hwThreadCnt - putThreadCnt - rmvThreadCnt - findThreadCnt);
+
+        final AtomicInteger sizeInvokeCnt = new AtomicInteger(0);
+
+        final int loopCnt = CNT;
+
+        IgniteInternalFuture<?> sizeFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int iter = 0;
+                while (!stop.get()) {
+                    long size = tree.size();
+
+                    if ((++iter & 0x3ff) == 0)
+                        X.println(" --> size() = " + size);
+
+                    sizeInvokeCnt.incrementAndGet();
+                }
+
+                return null;
+            }
+        }, sizeThreadCnt, "size");
+
+        // Let the size threads start
+        while (sizeInvokeCnt.get() < sizeThreadCnt * 2)
+            Thread.yield();
+
+        IgniteInternalFuture<?> rmvFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int iter = 0;
+                while(!stop.get()) {
+                    Long rmvVal = rowsToRemove.poll(200, TimeUnit.MILLISECONDS);
+                    if (rmvVal != null)
+                        assertEquals(rmvVal, tree.remove(rmvVal));
+
+                    if ((++iter & 0x3ff) == 0)
+                        X.println(" --> rmv(" + rmvVal + ")");
+                }
+
+                return null;
+            }
+        }, rmvThreadCnt, "rmv");
+
+        IgniteInternalFuture<?> findFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int iter = 0;
+                while(!stop.get()) {
+                    Long findVal = curPutKey.get()
+                        + SLIDING_WINDOW_SIZE / 2
+                        - rnd.nextInt(SLIDING_WINDOW_SIZE * 2);
+
+                    tree.findOne(findVal);
+
+                    if ((++iter & 0x3ff) == 0)
+                        X.println(" --> fnd(" + findVal + ")");
+                }
+
+                return null;
+            }
+        }, findThreadCnt, "find");
+
+        IgniteInternalFuture<?> putFut = multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < loopCnt && !stop.get(); ++i) {
+                    Long putVal = curPutKey.getAndIncrement();
+                    assertNull(tree.put(putVal));
+
+                    while (rowsToRemove.size() > SLIDING_WINDOW_SIZE) {
+                        if (stop.get())
+                            return null;
+
+                        Thread.yield();
+                    }
+
+                    rowsToRemove.put(putVal);
+
+                    if ((i & 0x3ff) == 0)
+                        X.println(" --> put(" + putVal + ")");
+                }
+
+                return null;
+            }
+        }, putThreadCnt, "put");
+
+        IgniteInternalFuture<?> lockPrintingFut = multithreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                while (!stop.get()) {
+                    Thread.sleep(1000);
+
+                    X.println(TestTree.printLocks());
+                }
+
+                return null;
+            }
+        }, 1, "printLocks");
+
+        asyncRunFut = new GridCompoundFuture<>();
+
+        asyncRunFut.add((IgniteInternalFuture) sizeFut);
+        asyncRunFut.add((IgniteInternalFuture) rmvFut);
+        asyncRunFut.add((IgniteInternalFuture) findFut);
+        asyncRunFut.add((IgniteInternalFuture) putFut);
+        asyncRunFut.add((IgniteInternalFuture) lockPrintingFut);
+
+        asyncRunFut.markInitialized();
+
+        try {
+            putFut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+        }
+        finally {
+            stop.set(true);
+
+            asyncRunFut.get();
+        }
+
+        tree.validateTree();
+
+        assertNoLocks();
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testTestRandomPutRemoveMultithreaded_1_30_0() throws Exception {
@@ -1620,7 +2413,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public void onBeforeReadLock(int cacheId, long pageId, long page) {
-//            X.println("  onBeforeReadLock: " + U.hexLong(page.id()));
+            if (PRINT_LOCKS)
+                X.println("  onBeforeReadLock: " + U.hexLong(pageId));
 //
 //            U.dumpStack();
 
@@ -1629,7 +2423,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public void onReadLock(int cacheId, long pageId, long page, long pageAddr) {
-//            X.println("  onReadLock: " + U.hexLong(page.id()));
+            if (PRINT_LOCKS)
+                X.println("  onReadLock: " + U.hexLong(pageId));
 
             if (pageAddr != 0L) {
                 long actual = PageIO.getPageId(pageAddr);
@@ -1644,7 +2439,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public void onReadUnlock(int cacheId, long pageId, long page, long pageAddr) {
-//            X.println("  onReadUnlock: " + U.hexLong(page.id()));
+            if (PRINT_LOCKS)
+                X.println("  onReadUnlock: " + U.hexLong(pageId));
 
             checkPageId(pageId, pageAddr);
 
@@ -1655,14 +2451,16 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public void onBeforeWriteLock(int cacheId, long pageId, long page) {
-//            X.println("  onBeforeWriteLock: " + U.hexLong(page.id()));
+            if (PRINT_LOCKS)
+                X.println("  onBeforeWriteLock: " + U.hexLong(pageId));
 
             assertNull(beforeWriteLock.put(threadId(), pageId));
         }
 
         /** {@inheritDoc} */
         @Override public void onWriteLock(int cacheId, long pageId, long page, long pageAddr) {
-//            X.println("  onWriteLock: " + U.hexLong(page.id()));
+            if (PRINT_LOCKS)
+                X.println("  onWriteLock: " + U.hexLong(pageId));
 //
 //            U.dumpStack();
 
@@ -1682,7 +2480,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public void onWriteUnlock(int cacheId, long pageId, long page, long pageAddr) {
-//            X.println("  onWriteUnlock: " + U.hexLong(page.id()));
+            if (PRINT_LOCKS)
+                X.println("  onWriteUnlock: " + U.hexLong(pageId));
 
             assertEquals(effectivePageId(pageId), effectivePageId(PageIO.getPageId(pageAddr)));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index 4ebac88..5a336c5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -23,11 +23,14 @@ import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.query.h2.H2Cursor;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -165,20 +168,13 @@ public class H2TreeIndex extends GridH2IndexBase {
     /** {@inheritDoc} */
     @Override public Cursor find(Session ses, SearchRow lower, SearchRow upper) {
         try {
-            IndexingQueryFilter f = threadLocalFilter();
-            IndexingQueryCacheFilter p = null;
-
-            if (f != null) {
-                String cacheName = getTable().cacheName();
-
-                p = f.forCache(cacheName);
-            }
+            IndexingQueryCacheFilter filter = partitionFilter(threadLocalFilter());
 
             int seg = threadLocalSegment();
 
             H2Tree tree = treeForRead(seg);
 
-            return new H2Cursor(tree.find(lower, upper, p));
+            return new H2Cursor(tree.find(lower, upper, filter));
         }
         catch (IgniteCheckedException e) {
             throw DbException.convert(e);
@@ -274,14 +270,59 @@ public class H2TreeIndex extends GridH2IndexBase {
 
     /** {@inheritDoc} */
     @Override public long getRowCount(Session ses) {
-        Cursor cursor = find(ses, null, null);
+        try {
+            int seg = threadLocalSegment();
 
-        long res = 0;
+            H2Tree tree = treeForRead(seg);
 
-        while (cursor.next())
-            res++;
+            BPlusTree.TreeRowClosure<SearchRow, GridH2Row> filter = filter();
 
-        return res;
+            return tree.size(filter);
+        }
+        catch (IgniteCheckedException e) {
+            throw DbException.convert(e);
+        }
+    }
+
+    /**
+     * An adapter from {@link IndexingQueryCacheFilter} to {@link BPlusTree.TreeRowClosure} to
+     * filter entries that belong to the current partition.
+     */
+    private static class PartitionFilterTreeRowClosure implements BPlusTree.TreeRowClosure<SearchRow, GridH2Row> {
+        private final IndexingQueryCacheFilter filter;
+
+        /**
+         * Creates a {@link BPlusTree.TreeRowClosure} adapter based on the given partition filter.
+         *
+         * @param filter The partition filter.
+         */
+        public PartitionFilterTreeRowClosure(IndexingQueryCacheFilter filter) {
+            this.filter = filter;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(BPlusTree<SearchRow, GridH2Row> tree,
+            BPlusIO<SearchRow> io, long pageAddr, int idx) throws IgniteCheckedException {
+
+            H2RowLinkIO h2io = (H2RowLinkIO)io;
+
+            return filter.applyPartition(
+                PageIdUtils.partId(
+                    PageIdUtils.pageId(
+                        h2io.getLink(pageAddr, idx))));
+        }
+    }
+
+    /**
+     * Returns a filter to apply to rows in the current index to obtain only the
+     * ones owned by the this cache.
+     *
+     * @return The filter, which returns true for rows owned by this cache.
+     */
+    @Nullable private BPlusTree.TreeRowClosure<SearchRow, GridH2Row> filter() {
+        final IndexingQueryCacheFilter filter = partitionFilter(threadLocalFilter());
+
+        return filter != null ? new PartitionFilterTreeRowClosure(filter) : null;
     }
 
     /** {@inheritDoc} */
@@ -344,13 +385,7 @@ public class H2TreeIndex extends GridH2IndexBase {
         @Nullable SearchRow last,
         IndexingQueryFilter filter) {
         try {
-            IndexingQueryCacheFilter p = null;
-
-            if (filter != null) {
-                String cacheName = getTable().cacheName();
-
-                p = filter.forCache(cacheName);
-            }
+            IndexingQueryCacheFilter p = partitionFilter(filter);
 
             GridCursor<GridH2Row> range = t.find(first, last, p);
 
@@ -365,6 +400,21 @@ public class H2TreeIndex extends GridH2IndexBase {
     }
 
     /**
+     * Filter which returns true for entries belonging to a particular partition.
+     *
+     * @param qryFilter Factory that creates a predicate for filtering entries for a particular cache.
+     * @return The filter or null if the filter is not needed (e.g., if the cache is not partitioned).
+     */
+    @Nullable private IndexingQueryCacheFilter partitionFilter(@Nullable IndexingQueryFilter qryFilter) {
+        if (qryFilter == null)
+            return null;
+
+        String cacheName = getTable().cacheName();
+
+        return qryFilter.forCache(cacheName);
+    }
+
+    /**
      * @param inlineIdxs Inline index helpers.
      * @param cfgInlineSize Inline size from cache config.
      * @return Inline size.

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest.java
new file mode 100644
index 0000000..25be1ed
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Test to check work of DML+DDL operations of atomic partitioned cache without backups
+ * with queries initiated from client node.
+ */
+public class H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest extends H2DynamicIndexingComplexTest {
+    /**
+     * Constructor.
+     */
+    public H2DynamicIndexingComplexClientAtomicPartitionedNoBackupsTest() {
+        super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 0, CLIENT_IDX);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedTest.java
index 78eddbf..a05390f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicPartitionedTest.java
@@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexClientAtomicPartitionedTest extends H2Dynam
      * Constructor.
      */
     public H2DynamicIndexingComplexClientAtomicPartitionedTest() {
-        super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, CLIENT_IDX);
+        super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 1, CLIENT_IDX);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicReplicatedTest.java
index 0e1004c..6962eff 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicReplicatedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientAtomicReplicatedTest.java
@@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexClientAtomicReplicatedTest extends H2Dynami
      * Constructor.
      */
     public H2DynamicIndexingComplexClientAtomicReplicatedTest() {
-        super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, CLIENT_IDX);
+        super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 1, CLIENT_IDX);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest.java
new file mode 100644
index 0000000..bccb38e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Test to check work of DML+DDL operations of transactional partitioned cache without backups
+ * with queries initiated from client node.
+ */
+public class H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest extends H2DynamicIndexingComplexTest {
+    /**
+     * Constructor.
+     */
+    public H2DynamicIndexingComplexClientTransactionalPartitionedNoBackupsTest() {
+        super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 0, CLIENT_IDX);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedTest.java
index 6dead30..8ec73cf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalPartitionedTest.java
@@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexClientTransactionalPartitionedTest extends
      * Constructor.
      */
     public H2DynamicIndexingComplexClientTransactionalPartitionedTest() {
-        super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, CLIENT_IDX);
+        super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 1, CLIENT_IDX);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalReplicatedTest.java
index 3c73d2c..6000277 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalReplicatedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexClientTransactionalReplicatedTest.java
@@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexClientTransactionalReplicatedTest extends H
      * Constructor.
      */
     public H2DynamicIndexingComplexClientTransactionalReplicatedTest() {
-        super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, CLIENT_IDX);
+        super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 1, CLIENT_IDX);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest.java
new file mode 100644
index 0000000..6e806f9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Test to check work of DML+DDL operations of atomic partitioned cache without backups
+ * with queries initiated from server node.
+ */
+public class H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest extends H2DynamicIndexingComplexTest {
+    /**
+     * Constructor.
+     */
+    public H2DynamicIndexingComplexServerAtomicPartitionedNoBackupsTest() {
+        super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 0, SRV_IDX);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedTest.java
index ff0c1cb..18f4456 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicPartitionedTest.java
@@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexServerAtomicPartitionedTest extends H2Dynam
      * Constructor.
      */
     public H2DynamicIndexingComplexServerAtomicPartitionedTest() {
-        super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, SRV_IDX);
+        super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 1, SRV_IDX);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicReplicatedTest.java
index 3d7ee18..2bfe678 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicReplicatedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerAtomicReplicatedTest.java
@@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexServerAtomicReplicatedTest extends H2Dynami
      * Constructor.
      */
     public H2DynamicIndexingComplexServerAtomicReplicatedTest() {
-        super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, SRV_IDX);
+        super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 1, SRV_IDX);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest.java
new file mode 100644
index 0000000..37b4489
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Test to check work of DML+DDL operations of transactional partitioned cache without backups
+ * with queries initiated from server node.
+ */
+public class H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest extends H2DynamicIndexingComplexTest {
+    /**
+     * Constructor.
+     */
+    public H2DynamicIndexingComplexServerTransactionalPartitionedNoBackupsTest() {
+        super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 0, SRV_IDX);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedTest.java
index aeb3839..85a58c1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalPartitionedTest.java
@@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexServerTransactionalPartitionedTest extends
      * Constructor.
      */
     public H2DynamicIndexingComplexServerTransactionalPartitionedTest() {
-        super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, SRV_IDX);
+        super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 1, SRV_IDX);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalReplicatedTest.java
index 4266161..54329b1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalReplicatedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexServerTransactionalReplicatedTest.java
@@ -28,6 +28,6 @@ public class H2DynamicIndexingComplexServerTransactionalReplicatedTest extends H
      * Constructor.
      */
     public H2DynamicIndexingComplexServerTransactionalReplicatedTest() {
-        super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, SRV_IDX);
+        super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 1, SRV_IDX);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexTest.java
index f9d3408..68df58b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexingComplexTest.java
@@ -45,6 +45,9 @@ public abstract class H2DynamicIndexingComplexTest extends DynamicIndexAbstractS
     /** Node index to initiate operations from. */
     private final int nodeIdx;
 
+    /** Backups to configure */
+    private final int backups;
+
     /** Names of companies to use. */
     private final static List<String> COMPANIES = Arrays.asList("ASF", "GNU", "BSD");
 
@@ -61,11 +64,13 @@ public abstract class H2DynamicIndexingComplexTest extends DynamicIndexAbstractS
      * Constructor.
      * @param cacheMode Cache mode.
      * @param atomicityMode Cache atomicity mode.
+     * @param backups Number of backups.
      * @param nodeIdx Node index.
      */
-    H2DynamicIndexingComplexTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, int nodeIdx) {
+    H2DynamicIndexingComplexTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, int backups, int nodeIdx) {
         this.cacheMode = cacheMode;
         this.atomicityMode = atomicityMode;
+        this.backups = backups;
         this.nodeIdx = nodeIdx;
     }
 
@@ -94,12 +99,13 @@ public abstract class H2DynamicIndexingComplexTest extends DynamicIndexAbstractS
     public void testOperations() {
         executeSql("CREATE TABLE person (id int, name varchar, age int, company varchar, city varchar, " +
             "primary key (id, name, city)) WITH \"template=" + cacheMode.name() + ",atomicity=" + atomicityMode.name() +
-            ",affinity_key=city\"");
+            ",backups=" + backups + ",affinity_key=city\"");
 
         executeSql("CREATE INDEX idx on person (city asc, name asc)");
 
         executeSql("CREATE TABLE city (name varchar, population int, primary key (name)) WITH " +
-            "\"template=" + cacheMode.name() + ",atomicity=" + atomicityMode.name() + ",affinity_key=name\"");
+            "\"template=" + cacheMode.name() + ",atomicity=" + atomicityMode.name() +
+            ",backups=" + backups + ",affinity_key=name\"");
 
         executeSql("INSERT INTO city (name, population) values(?, ?), (?, ?), (?, ?)",
             "St. Petersburg", 6000000,
@@ -107,7 +113,9 @@ public abstract class H2DynamicIndexingComplexTest extends DynamicIndexAbstractS
             "London", 8000000
         );
 
-        for (int i = 0; i < 100; i++)
+        final long PERSON_COUNT = 100;
+
+        for (int i = 0; i < PERSON_COUNT; i++)
             executeSql("INSERT INTO person (id, name, age, company, city) values (?, ?, ?, ?, ?)",
                 i,
                 "Person " + i,
@@ -121,7 +129,11 @@ public abstract class H2DynamicIndexingComplexTest extends DynamicIndexAbstractS
             }
         });
 
-        long r = (Long)executeSqlSingle("SELECT COUNT(*) from Person p inner join City c on p.city = c.name");
+        long r = (Long)executeSqlSingle("SELECT COUNT(*) from Person");
+
+        assertEquals(PERSON_COUNT, r);
+
+        r = (Long)executeSqlSingle("SELECT COUNT(*) from Person p inner join City c on p.city = c.name");
 
         // Berkeley is not present in City table, although 25 people have it specified as their city.
         assertEquals(75L, r);

http://git-wip-us.apache.org/repos/asf/ignite/blob/08ab9af8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
index 03c3f1e..dd03274 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
@@ -132,6 +132,8 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
         checkDistributedQueryWithSegmentedIndex();
 
         checkLocalQueryWithSegmentedIndex();
+
+        checkLocalSizeQueryWithSegmentedIndex();
     }
 
     /**
@@ -144,18 +146,43 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
         IgniteCache<Object, Object> cache = ignite(0).cache(ORG_CACHE_NAME);
 
         // Unequal entries distribution among partitions.
-        int expectedSize = nodesCount() * QRY_PARALLELISM_LVL *  3 / 2;
+        int expSize = nodesCount() * QRY_PARALLELISM_LVL *  3 / 2;
 
-        for (int i = 0; i < expectedSize; i++)
+        for (int i = 0; i < expSize; i++)
             cache.put(i, new Organization("org-" + i));
 
         String select0 = "select * from \"org\".Organization o";
 
         // Check for stable results.
         for(int i = 0; i < 10; i++) {
-            List<List<?>> result = cache.query(new SqlFieldsQuery(select0)).getAll();
+            List<List<?>> res = cache.query(new SqlFieldsQuery(select0)).getAll();
 
-            assertEquals(expectedSize, result.size());
+            assertEquals(expSize, res.size());
+        }
+    }
+
+    /**
+     * Checks correct <code>select count(*)</code> result with segmented indices.
+     * @throws Exception If failed.
+     */
+    public void testSegmentedIndexSizeReproducableResults() throws Exception {
+        ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class, Organization.class));
+
+        IgniteCache<Object, Object> cache = ignite(0).cache(ORG_CACHE_NAME);
+
+        // Unequal entries distribution among partitions.
+        long expSize = nodesCount() * QRY_PARALLELISM_LVL *  3 / 2;
+
+        for (int i = 0; i < expSize; i++)
+            cache.put(i, new Organization("org-" + i));
+
+        String select0 = "select count(*) from \"org\".Organization o";
+
+        // Check for stable results.
+        for(int i = 0; i < 10; i++) {
+            List<List<?>> res = cache.query(new SqlFieldsQuery(select0)).getAll();
+
+            assertEquals(expSize, res.get(0).get(0));
         }
     }
 
@@ -170,14 +197,39 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
                 .setEvictionPolicy(new FifoEvictionPolicy(10))
                 .setOnheapCacheEnabled(true));
 
-        for (int i = 0; i < 20; i++)
+        final long SIZE = 20;
+
+        for (int i = 0; i < SIZE; i++)
             cache.put(i, new Organization("org-" + i));
 
         String select0 = "select name from \"org\".Organization";
 
-        List<List<?>> result = cache.query(new SqlFieldsQuery(select0)).getAll();
+        List<List<?>> res = cache.query(new SqlFieldsQuery(select0)).getAll();
 
-        assertEquals(20, result.size());
+        assertEquals(SIZE, res.size());
+    }
+
+    /**
+     * Verifies that <code>select count(*)</code> return valid result on a single-node grid.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSizeOnSegmentedIndexWithEvictionPolicy() throws Exception {
+        final IgniteCache<Object, Object> cache = ignite(0).createCache(
+            cacheConfig(ORG_CACHE_NAME, true, Integer.class, Organization.class)
+                .setEvictionPolicy(new FifoEvictionPolicy(10))
+                .setOnheapCacheEnabled(true));
+
+        final long SIZE = 20;
+
+        for (int i = 0; i < SIZE; i++)
+            cache.put(i, new Organization("org-" + i));
+
+        String select0 = "select count(*) from \"org\".Organization";
+
+        List<List<?>> res = cache.query(new SqlFieldsQuery(select0)).getAll();
+
+        assertEquals(SIZE, res.get(0).get(0));
     }
 
     /**
@@ -194,6 +246,8 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
         checkDistributedQueryWithSegmentedIndex();
 
         checkLocalQueryWithSegmentedIndex();
+
+        checkLocalSizeQueryWithSegmentedIndex();
     }
 
     /**
@@ -205,21 +259,21 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
         for (int i = 0; i < nodesCount(); i++) {
             IgniteCache<Integer, Person> c1 = ignite(i).cache(PERSON_CAHE_NAME);
 
-            int expectedPersons = 0;
+            long expPersons = 0;
 
             for (Cache.Entry<Integer, Person> e : c1) {
                 final Integer orgId = e.getValue().orgId;
 
                 // We have as orphan ORG rows as orphan PERSON rows.
                 if (ORPHAN_ROWS <= orgId && orgId < 500)
-                    expectedPersons++;
+                    expPersons++;
             }
 
             String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
 
-            List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll();
+            List<List<?>> res = c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll();
 
-            assertEquals(expectedPersons, result.size());
+            assertEquals(expPersons, res.size());
         }
     }
 
@@ -235,25 +289,59 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
             IgniteCache<Integer, Person> c1 = node.cache(PERSON_CAHE_NAME);
             IgniteCache<Integer, Organization> c2 = node.cache(ORG_CACHE_NAME);
 
-            Set<Integer> localOrgIds = new HashSet<>();
+            Set<Integer> locOrgIds = new HashSet<>();
 
             for (Cache.Entry<Integer, Organization> e : c2.localEntries())
-                localOrgIds.add(e.getKey());
+                locOrgIds.add(e.getKey());
 
-            int expectedPersons = 0;
+            long expPersons = 0;
 
             for (Cache.Entry<Integer, Person> e : c1.localEntries()) {
                 final Integer orgId = e.getValue().orgId;
 
-                if (localOrgIds.contains(orgId))
-                    expectedPersons++;
+                if (locOrgIds.contains(orgId))
+                    expPersons++;
             }
 
             String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
 
-            List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll();
+            List<List<?>> res = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll();
+
+            assertEquals(expPersons, res.size());
+        }
+    }
+
+    /**
+     * Verifies that local <code>select count(*)</code> query returns a correct result.
+     *
+     * @throws Exception If failed.
+     */
+    public void checkLocalSizeQueryWithSegmentedIndex() throws Exception {
+        for (int i = 0; i < nodesCount(); i++) {
+            final Ignite node = ignite(i);
+
+            IgniteCache<Integer, Person> c1 = node.cache(PERSON_CAHE_NAME);
+            IgniteCache<Integer, Organization> c2 = node.cache(ORG_CACHE_NAME);
+
+            Set<Integer> locOrgIds = new HashSet<>();
+
+            for (Cache.Entry<Integer, Organization> e : c2.localEntries())
+                locOrgIds.add(e.getKey());
+
+            int expPersons = 0;
+
+            for (Cache.Entry<Integer, Person> e : c1.localEntries()) {
+                final Integer orgId = e.getValue().orgId;
+
+                if (locOrgIds.contains(orgId))
+                    expPersons++;
+            }
+
+            String select0 = "select count(*) from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
+
+            List<List<?>> res = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll();
 
-            assertEquals(expectedPersons, result.size());
+            assertEquals((long) expPersons, res.get(0).get(0));
         }
     }
 


Mime
View raw message