ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5937
Date Wed, 25 Oct 2017 11:41:27 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5937 4cadfc92d -> 739417afb


ignite-5937


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/739417af
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/739417af
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/739417af

Branch: refs/heads/ignite-5937
Commit: 739417afb3cb47f6ada06135ad9ce66c7e1cb0a8
Parents: 4cadfc9
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Oct 25 13:34:28 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Oct 25 14:40:36 2017 +0300

----------------------------------------------------------------------
 .../query/h2/database/H2TreeIndex.java          |   8 +-
 .../query/h2/opt/GridH2IndexBase.java           |  21 +-
 .../cache/mvcc/CacheMvccSqlQueriesTest.java     | 234 +++++++++++++++++--
 3 files changed, 235 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/739417af/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index f6ca9e8..63b33af 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -372,11 +372,13 @@ public class H2TreeIndex extends GridH2IndexBase {
     @Override protected GridCursor<GridH2Row> doFind0(
         IgniteTree t,
         @Nullable SearchRow first,
-        boolean includeFirst,
         @Nullable SearchRow last,
-        IndexingQueryFilter filter) {
+        IndexingQueryFilter filter,
+        H2TreeMvccFilterClosure mvccFilter) {
         try {
-            GridCursor<GridH2Row> range = t.find(first, last);
+            assert !cctx.mvccEnabled() || mvccFilter != null;
+
+            GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, mvccFilter,
null);
 
             if (range == null)
                 return EMPTY_CURSOR;

http://git-wip-us.apache.org/repos/asf/ignite/blob/739417af/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index b865e00..96b331a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -432,7 +433,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     // This is the first request containing all the search rows.
                     assert !msg.bounds().isEmpty() : "empty bounds";
 
-                    src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter());
+                    src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter(), qctx.mvccFilter());
                 }
                 else {
                     // This is request to fetch next portion of data.
@@ -1475,20 +1476,28 @@ public abstract class GridH2IndexBase extends BaseIndex {
         /** */
         final IndexingQueryFilter filter;
 
+        /** */
+        private final H2TreeMvccFilterClosure mvccFilter;
+
         /** Iterator. */
         Iterator<GridH2Row> iter = emptyIterator();
 
         /**
          * @param bounds Bounds.
+         * @param segment Segment.
          * @param filter Filter.
+         * @param mvccFilter Mvcc filter.
          */
         RangeSource(
             Iterable<GridH2RowRangeBounds> bounds,
             int segment,
-            IndexingQueryFilter filter
+            IndexingQueryFilter filter,
+            H2TreeMvccFilterClosure mvccFilter
         ) {
             this.segment = segment;
             this.filter = filter;
+            this.mvccFilter = mvccFilter;
+
             boundsIter = bounds.iterator();
         }
 
@@ -1546,7 +1555,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
                 IgniteTree t = treeForRead(segment);
 
-                iter = new CursorIteratorWrapper(doFind0(t, first, true, last, filter));
+                iter = new CursorIteratorWrapper(doFind0(t, first, last, filter, mvccFilter));
 
                 if (!iter.hasNext()) {
                     // We have to return empty range here.
@@ -1571,17 +1580,17 @@ public abstract class GridH2IndexBase extends BaseIndex {
     /**
      * @param t Tree.
      * @param first Lower bound.
-     * @param includeFirst Whether lower bound should be inclusive.
      * @param last Upper bound always inclusive.
      * @param filter Filter.
+     * @param mvccFilter Mvcc filter.
      * @return Iterator over rows in given range.
      */
     protected GridCursor<GridH2Row> doFind0(
         IgniteTree t,
         @Nullable SearchRow first,
-        boolean includeFirst,
         @Nullable SearchRow last,
-        IndexingQueryFilter filter) {
+        IndexingQueryFilter filter,
+        H2TreeMvccFilterClosure mvccFilter) {
         throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/739417af/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
index ebe24c9..e77a3f1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
@@ -350,12 +350,12 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
      * @param distributedJoin {@code True} to test distributed joins.
      * @throws Exception If failed.
      */
-    private void joinTransactional(boolean singleNode, boolean distributedJoin) throws Exception
{
+    private void joinTransactional(boolean singleNode, final boolean distributedJoin) throws
Exception {
         final int KEYS = 100;
 
-        final int writers = 1;
+        final int writers = 4;
 
-        final int readers = 1;
+        final int readers = 4;
 
         GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
             new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
@@ -373,12 +373,12 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
                             try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ))
{
                                 Integer key = rnd.nextInt(KEYS);
 
-                                JoinTestChildKey childKey = new JoinTestChildKey(rnd.nextInt(KEYS));
+                                JoinTestChildKey childKey = new JoinTestChildKey(key);
 
                                 JoinTestChild child = (JoinTestChild)cache.cache.get(childKey);
 
                                 if (child == null) {
-                                    Integer parentKey = key;
+                                    Integer parentKey = distributedJoin ? key + 100 : key;
 
                                     child = new JoinTestChild(parentKey);
 
@@ -396,6 +396,8 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
 
                                 tx.commit();
                             }
+
+                            cnt++;
                         }
                         finally {
                             cache.readUnlock();
@@ -411,27 +413,51 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
                 @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean
stop) {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                    SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from
" +
-                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId
= p.id)");
+                    List<SqlFieldsQuery> qrys = new ArrayList<>();
+
+                    qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId
= p.id)").
+                        setDistributedJoins(distributedJoin));
+
+                    qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId
= p.id) where p.id = 10").
+                        setDistributedJoins(distributedJoin));
+
+                    qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId
= p.id) where p.id != 10").
+                        setDistributedJoins(distributedJoin));
 
                     while (!stop.get()) {
                         TestCache<Object, Object> cache = randomCache(caches, rnd);
 
-                        List<List<?>> res;
-
                         try {
-                            res = cache.cache.query(qry).getAll();
+                            for (SqlFieldsQuery qry : qrys) {
+                                List<List<?>> res = cache.cache.query(qry).getAll();
+
+                                if (!res.isEmpty()) {
+                                    for (List<?> resRow : res) {
+                                        Integer parentId = (Integer)resRow.get(1);
+
+                                        assertNotNull(parentId);
+                                    }
+                                }
+                            }
                         }
                         finally {
                             cache.readUnlock();
                         }
+                    }
 
-                        if (!res.isEmpty()) {
-                            for (List<?> resRow : res) {
-                                Integer parentId = (Integer)resRow.get(1);
+                    if (idx == 0) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
 
-                                assertNotNull(parentId);
-                            }
+                        try {
+                            List<List<?>> res = cache.cache.query(qrys.get(0)).getAll();
+
+                            info("Reader finished, result: " + res);
+                        }
+                        finally {
+                            cache.readUnlock();
                         }
                     }
                 }
@@ -468,6 +494,175 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testJoinTransactional_DistributedJoins_ClientServer2() throws Exception {
+        final int KEYS = 100;
+
+        final int writers = 1;
+
+        final int readers = 4;
+
+        final int CHILDREN_CNT = 10;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean
stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
+
+                        try {
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ))
{
+                                Integer key = rnd.nextInt(KEYS);
+
+                                JoinTestParentKey parentKey = new JoinTestParentKey(key);
+
+                                JoinTestParent parent = (JoinTestParent)cache.cache.get(parentKey);
+
+                                if (parent == null) {
+                                    for (int i = 0; i < CHILDREN_CNT; i++)
+                                        cache.cache.put(new JoinTestChildKey(key * 10_000
+ i), new JoinTestChild(key));
+
+                                    cache.cache.put(parentKey, new JoinTestParent(key));
+                                }
+                                else {
+                                    for (int i = 0; i < CHILDREN_CNT; i++)
+                                        cache.cache.remove(new JoinTestChildKey(key * 10_000
+ i));
+
+                                    cache.cache.remove(parentKey);
+                                }
+
+                                tx.commit();
+                            }
+
+                            cnt++;
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean
stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from
" +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId
= p.id) where p.id=?").
+                        setDistributedJoins(true);
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        qry.setArgs(rnd.nextInt(KEYS));
+
+                        try {
+                            List<List<?>> res = cache.cache.query(qry).getAll();
+
+                            if (!res.isEmpty())
+                                assertEquals(CHILDREN_CNT, res.size());
+
+                            cnt++;
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Reader finished, read count: " + cnt);
+                }
+            };
+
+        readWriteTest(
+            null,
+            4,
+            2,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(JoinTestParentKey.class, JoinTestParent.class,
+                JoinTestChildKey.class, JoinTestChild.class),
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDistributedJoinSimple() throws Exception {
+        startGridsMultiThreaded(4);
+
+        Ignite srv0 = ignite(0);
+
+        int[] backups = {0, 1, 2};
+
+        for (int b : backups) {
+            IgniteCache<Object, Object> cache = srv0.createCache(
+                cacheConfiguration(PARTITIONED, FULL_SYNC, b, DFLT_PARTITION_COUNT).
+                    setIndexedTypes(JoinTestParentKey.class, JoinTestParent.class, JoinTestChildKey.class,
JoinTestChild.class));
+
+            int cntr = 0;
+
+            int expCnt = 0;
+
+            for (int i = 0; i < 10; i++) {
+                JoinTestParentKey parentKey = new JoinTestParentKey(i);
+
+                cache.put(parentKey, new JoinTestParent(i));
+
+                for (int c = 0; c < i; c++) {
+                    JoinTestChildKey childKey = new JoinTestChildKey(cntr++);
+
+                    cache.put(childKey, new JoinTestChild(i));
+
+                    expCnt++;
+                }
+            }
+
+            SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " +
+                "JoinTestChild c join JoinTestParent p on (c.parentId = p.id)").
+                setDistributedJoins(true);
+
+            Map<Integer, Integer> resMap = new HashMap<>();
+
+            List<List<?>> res = cache.query(qry).getAll();
+
+            assertEquals(expCnt, res.size());
+
+            for (List<?> resRow : res) {
+                Integer parentId = (Integer)resRow.get(0);
+
+                Integer cnt = resMap.get(parentId);
+
+                if (cnt == null)
+                    resMap.put(parentId, 1);
+                else
+                    resMap.put(parentId, cnt + 1);
+            }
+
+            for (int i = 1; i < 10; i++)
+                assertEquals(i, (Object)resMap.get(i));
+
+            srv0.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testCacheRecreate() throws Exception {
         cacheRecreate(new InitIndexing(Integer.class, MvccTestAccount.class));
     }
@@ -1260,7 +1455,7 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
         /**
          * @param key Key.
          */
-        public JoinTestParentKey(int key) {
+        JoinTestParentKey(int key) {
             this.key = key;
         }
 
@@ -1294,7 +1489,7 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
         /**
          * @param id ID.
          */
-        public JoinTestParent(int id) {
+        JoinTestParent(int id) {
             this.id = id;
         }
 
@@ -1309,12 +1504,13 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest
{
      */
     static class JoinTestChildKey implements Serializable {
         /** */
+        @QuerySqlField(index = true)
         private int key;
 
         /**
          * @param key Key.
          */
-        public JoinTestChildKey(int key) {
+        JoinTestChildKey(int key) {
             this.key = key;
         }
 
@@ -1348,7 +1544,7 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
         /**
          * @param parentId Parent ID.
          */
-        public JoinTestChild(int parentId) {
+        JoinTestChild(int parentId) {
             this.parentId = parentId;
         }
 


Mime
View raw message