ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5075
Date Mon, 22 May 2017 16:34:42 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 aafa8dfee -> 6eed51a2e


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6eed51a2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6eed51a2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6eed51a2

Branch: refs/heads/ignite-5075
Commit: 6eed51a2e9937c8afadd44eac1a6b5437b603fdb
Parents: aafa8df
Author: sboikov <sboikov@gridgain.com>
Authored: Mon May 22 19:34:15 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon May 22 19:34:15 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   4 +-
 .../cache/GridCacheClearAllRunnable.java        |   2 +-
 .../cache/IgniteCacheOffheapManager.java        |  33 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  32 +-
 .../GridDistributedCacheAdapter.java            |   2 +-
 .../cache/query/GridCacheQueryManager.java      |   4 +-
 .../continuous/CacheContinuousQueryManager.java |   2 +-
 .../processors/cache/IgniteCacheGroupsTest.java | 498 ++++++++++++++++---
 .../cache/IgniteCacheGroupsSqlTest.java         | 199 +++++++-
 9 files changed, 676 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 0cf49be..8bd072b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -704,7 +704,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                 IgniteCacheOffheapManager offheapMgr = ctx.isNear() ? ctx.near().dht().context().offheap() : ctx.offheap();
 
-                its.add(offheapMgr.<K, V>entriesIterator(ctx, modes.primary, modes.backup, topVer, ctx.keepBinary()));
+                its.add(offheapMgr.<K, V>cacheEntriesIterator(ctx, modes.primary, modes.backup, topVer, ctx.keepBinary()));
             }
         }
         else if (modes.heap) {
@@ -2895,7 +2895,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         List<K> keys = new ArrayList<>(Math.min(REMOVE_ALL_KEYS_BATCH, size()));
 
         do {
-            for (Iterator<CacheDataRow> it = ctx.offheap().iteratorForCache(ctx.cacheId(), true, true, null);
+            for (Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), true, true, null);
                 it.hasNext() && keys.size() < REMOVE_ALL_KEYS_BATCH; )
                 keys.add((K)it.next().key());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
index ca89650..d37cecb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
@@ -82,7 +82,7 @@ public class GridCacheClearAllRunnable<K, V> implements Runnable {
 
         if (!ctx.isNear()) {
             if (id == 0)
-                ctx.offheap().clear(ctx, readers);
+                ctx.offheap().clearCache(ctx, readers);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index d344e20..55485bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -142,7 +142,8 @@ public interface IgniteCacheOffheapManager {
      * @param c Closure.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean expire(GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount) throws IgniteCheckedException;
+    public boolean expire(GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount)
+        throws IgniteCheckedException;
 
     /**
      * Gets the number of entries pending expire.
@@ -212,7 +213,7 @@ public interface IgniteCacheOffheapManager {
      * @return Rows iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public GridIterator<CacheDataRow> iteratorForCache(int cacheId,
+    public GridIterator<CacheDataRow> cacheIterator(int cacheId,
         boolean primary,
         boolean backup,
         final AffinityTopologyVersion topVer)
@@ -224,17 +225,18 @@ public interface IgniteCacheOffheapManager {
      * @return Partition data iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public GridIterator<CacheDataRow> iteratorForCache(int cacheId, final int part) throws IgniteCheckedException;
+    public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, final int part) throws IgniteCheckedException;
 
     /**
      * @param part Partition number.
      * @return Iterator for given partition.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException If failed.
      */
     public GridIterator<CacheDataRow> partitionIterator(final int part) throws IgniteCheckedException;
 
     /**
      * @param part Partition.
+     * @param topVer Topology version.
      * @param partCntr Partition counter to get historical data if available.
      * @return Partition data iterator.
      * @throws IgniteCheckedException If failed.
@@ -243,6 +245,7 @@ public interface IgniteCacheOffheapManager {
         throws IgniteCheckedException;
 
     /**
+     * @param cctx Cache context.
      * @param primary Primary entries flag.
      * @param backup Backup entries flag.
      * @param topVer Topology version.
@@ -250,7 +253,7 @@ public interface IgniteCacheOffheapManager {
      * @return Entries iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(
+    public <K, V> GridCloseableIterator<Cache.Entry<K, V>> cacheEntriesIterator(
         GridCacheContext cctx,
         final boolean primary,
         final boolean backup,
@@ -258,13 +261,16 @@ public interface IgniteCacheOffheapManager {
         final boolean keepBinary) throws IgniteCheckedException;
 
     /**
+     * @param cacheId Cache ID.
      * @param part Partition.
      * @return Iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public GridCloseableIterator<KeyCacheObject> keysIterator(final int part) throws IgniteCheckedException;
+    public GridCloseableIterator<KeyCacheObject> cacheKeysIterator(int cacheId, final int part)
+        throws IgniteCheckedException;
 
     /**
+     * @param cacheId Cache ID.
      * @param primary Primary entries flag.
      * @param backup Backup entries flag.
      * @param topVer Topology version.
@@ -280,7 +286,7 @@ public interface IgniteCacheOffheapManager {
      * @param cctx Cache context.
      * @param readers {@code True} to clear readers.
      */
-    public void clear(GridCacheContext cctx, boolean readers);
+    public void clearCache(GridCacheContext cctx, boolean readers);
 
     /**
      * @param cacheId Cache ID.
@@ -313,7 +319,9 @@ public interface IgniteCacheOffheapManager {
     public void dropRootPageForIndex(String idxName) throws IgniteCheckedException;
 
     /**
+     * @param idxName Index name.
      * @return Reuse list for index tree.
+     * @throws IgniteCheckedException If failed.
      */
     public ReuseList reuseListForIndex(String idxName) throws IgniteCheckedException;
 
@@ -469,6 +477,17 @@ public interface IgniteCacheOffheapManager {
             KeyCacheObject upper) throws IgniteCheckedException;
 
         /**
+         * @param cacheId Cache ID.
+         * @param lower Lower bound.
+         * @param upper Upper bound.
+         * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+         * @return Data cursor.
+         * @throws IgniteCheckedException If failed.
+         */
+        public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
+            KeyCacheObject upper, Object x) throws IgniteCheckedException;
+
+        /**
          * Destroys the tree associated with the store.
          *
          * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 8da7357..9631268 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -425,7 +425,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
      * @param readers {@code True} to clear readers.
      */
     @SuppressWarnings("unchecked")
-    @Override public void clear(GridCacheContext cctx, boolean readers) {
+    @Override public void clearCache(GridCacheContext cctx, boolean readers) {
         GridCacheVersion obsoleteVer = null;
 
         GridIterator<CacheDataRow> it = iterator(cctx.cacheId(), cacheDataStores().iterator());
@@ -470,13 +470,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(
+    @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> cacheEntriesIterator(
         final GridCacheContext cctx,
         final boolean primary,
         final boolean backup,
         final AffinityTopologyVersion topVer,
         final boolean keepBinary) throws IgniteCheckedException {
-        final Iterator<CacheDataRow> it = iteratorForCache(cctx.cacheId(), primary, backup, topVer);
+        final Iterator<CacheDataRow> it = cacheIterator(cctx.cacheId(), primary, backup, topVer);
 
         return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
             /** */
@@ -517,13 +517,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
-    @Override public GridCloseableIterator<KeyCacheObject> keysIterator(final int part) throws IgniteCheckedException {
+    @Override public GridCloseableIterator<KeyCacheObject> cacheKeysIterator(int cacheId, final int part) throws IgniteCheckedException {
         CacheDataStore data = partitionData(part);
 
         if (data == null)
             return new GridEmptyCloseableIterator<>();
 
-        final GridCursor<? extends CacheDataRow> cur = data.cursor();
+        final GridCursor<? extends CacheDataRow> cur =
+            data.cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY);
 
         return new GridCloseableIteratorAdapter<KeyCacheObject>() {
             /** */
@@ -553,7 +554,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
-    @Override public GridIterator<CacheDataRow> iteratorForCache(
+    @Override public GridIterator<CacheDataRow> cacheIterator(
         int cacheId,
         boolean primary,
         boolean backups,
@@ -563,7 +564,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
-    @Override public GridIterator<CacheDataRow> iteratorForCache(int cacheId, int part) throws IgniteCheckedException {
+    @Override public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, int part) throws IgniteCheckedException {
         CacheDataStore data = partitionData(part);
 
         if (data == null)
@@ -1300,6 +1301,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         /** {@inheritDoc} */
         @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
             KeyCacheObject upper) throws IgniteCheckedException {
+            return cursor(cacheId, lower, upper, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
+            KeyCacheObject upper, Object x) throws IgniteCheckedException {
             SearchRow lowerRow;
             SearchRow upperRow;
 
@@ -1314,7 +1321,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 upperRow = upper != null ? new SearchRow(UNDEFINED_CACHE_ID, upper) : null;
             }
 
-            return dataTree.find(lowerRow, upperRow);
+            return dataTree.find(lowerRow, upperRow, x);
         }
 
         /** {@inheritDoc} */
@@ -1352,12 +1359,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             Exception ex = null;
 
-            SearchRow bound = new SearchRow(cacheId);
-
-            GridCursor<? extends CacheDataRow> cursor = dataTree.find(bound, bound, CacheDataRowAdapter.RowData.KEY_ONLY);
+            GridCursor<? extends CacheDataRow> cur =
+                cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY);
 
-            while (cursor.next()) {
-                CacheDataRow row = cursor.get();
+            while (cur.next()) {
+                CacheDataRow row = cur.get();
 
                 assert row.link() != 0 : row;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 0955a51..daf4b91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -459,7 +459,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                             return false;
 
                         try {
-                            GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().keysIterator(part);
+                            GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().cacheKeysIterator(ctx.cacheId(), part);
 
                             if (iter != null) {
                                 try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 12a8126..f189bd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -863,12 +863,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 locPart = locPart0;
 
-                it = cctx.offheap().iteratorForCache(cctx.cacheId(), part);
+                it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part);
             }
             else {
                 locPart = null;
 
-                it = cctx.offheap().iteratorForCache(cctx.cacheId(), true, backups, topVer);
+                it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer);
             }
 
             return new PeekValueExpiryAwareIterator(it, plc, topVer, keyValFilter, qry.keepBinary(), locNode) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 03e1e1c..d83c033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -655,7 +655,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         }
 
         if (notifyExisting) {
-            final Iterator<CacheDataRow> it = cctx.offheap().iteratorForCache(cctx.cacheId(),
+            final Iterator<CacheDataRow> it = cctx.offheap().cacheIterator(cctx.cacheId(),
                 true,
                 true,
                 AffinityTopologyVersion.NONE);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index ee76c6e..1a133dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -35,15 +35,19 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import javax.cache.Cache;
 import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 import javax.cache.configuration.Factory;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheExistsException;
 import org.apache.ignite.cache.CacheInterceptor;
 import org.apache.ignite.cache.CacheInterceptorAdapter;
@@ -58,6 +62,7 @@ import org.apache.ignite.cache.store.CacheStoreAdapter;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
@@ -66,6 +71,8 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -388,6 +395,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Cache atomicity mode.
      * @throws Exception If failed.
      */
     private void scanQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
@@ -430,13 +439,17 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
             }
         }
         else {
-            cache1 = ignite(local ? 0 : 1).cache(CACHE1);
-            cache2 = ignite(local ? 0 : 2).cache(CACHE2);
+            // Async put ops.
+            int ldrs = 4;
 
-            for (int i = 0; i < keys ; i++) {
-                cache1.put(i, data1[i]);
-                cache2.put(i, data2[i]);
+            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
+
+            for (int i = 0; i < ldrs ; i++) {
+                cls.add(putOperation(local ? 0 : 1, ldrs, i, CACHE1, data1));
+                cls.add(putOperation(local ? 0 : 2, ldrs, i, CACHE2, data2));
             }
+
+            GridTestUtils.runMultiThreaded(cls, "loaders");
         }
 
         ScanQuery<Integer, Integer> qry = new ScanQuery<>();
@@ -463,6 +476,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Cache atomicity mode.
      * @throws Exception If failed.
      */
     private void scanQueryMultiplePartitions(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
@@ -503,13 +518,17 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
             }
         }
         else {
-            cache1 = ignite(1).cache(CACHE1);
-            cache2 = ignite(2).cache(CACHE2);
+            // Async put ops.
+            int ldrs = 4;
+
+            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
 
-            for (int i = 0; i < keys ; i++) {
-                cache1.put(i, data1[i]);
-                cache2.put(i, data2[i]);
+            for (int i = 0; i < ldrs ; i++) {
+                cls.add(putOperation(1, ldrs, i, CACHE1, data1));
+                cls.add(putOperation(2, ldrs, i, CACHE2, data2));
             }
+
+            GridTestUtils.runMultiThreaded(cls, "loaders");
         }
 
 
@@ -559,6 +578,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Cache atomicity mode.
      * @throws Exception If failed.
      */
     private void cacheIterator(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
@@ -598,13 +619,17 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
             }
         }
         else {
-            IgniteCache cache1 = ignite(local ? 0 : 1).cache(CACHE1);
-            IgniteCache cache2 = ignite(local ? 0 : 2).cache(CACHE2);
+            // Async put ops.
+            int ldrs = 4;
 
-            for (int i = 0; i < keys ; i++) {
-                cache1.put(i, data1[i]);
-                cache2.put(i, data2[i]);
+            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
+
+            for (int i = 0; i < ldrs ; i++) {
+                cls.add(putOperation(local ? 0 : 1, ldrs, i, CACHE1, data1));
+                cls.add(putOperation(local ? 0 : 2, ldrs, i, CACHE2, data2));
             }
+
+            GridTestUtils.runMultiThreaded(cls, "loaders");
         }
 
 
@@ -678,13 +703,17 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
             }
         }
         else {
-            IgniteCache cache1 = ignite(local ? 0 : 1).cache(CACHE1);
-            IgniteCache cache2 = ignite(local ? 0 : 2).cache(CACHE2);
+            // async put ops
+            int ldrs = 4;
+
+            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
 
-            for (int i = 0; i < keys ; i++) {
-                cache1.put(i, data1[i]);
-                cache2.put(i, data2[i]);
+            for (int i = 0; i < ldrs ; i++) {
+                cls.add(putOperation(local ? 0 : 1, ldrs, i, CACHE1, data1));
+                cls.add(putOperation(local ? 0 : 2, ldrs, i, CACHE2, data2));
             }
+
+            GridTestUtils.runMultiThreaded(cls, "loaders");
         }
 
         checkData(local ? 0 : 3, CACHE1, data1);
@@ -702,6 +731,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Cache atomicity mode.
      * @throws Exception If failed.
      */
     private void createDestroyCaches(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
@@ -735,13 +766,16 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
             }
         }
         else {
-            IgniteCache cache1 = ignite(1).cache(CACHE1);
-            IgniteCache cache2 = ignite(2).cache(CACHE2);
+            int ldrs = 4;
+
+            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
 
-            for (int i = 0; i < keys ; i++) {
-                cache1.put(i, data1[i]);
-                cache2.put(i, data2[i]);
+            for (int i = 0; i < ldrs ; i++) {
+                cls.add(putOperation(1, ldrs, i, CACHE1, data1));
+                cls.add(putOperation(2, ldrs, i, CACHE2, data2));
             }
+
+            GridTestUtils.runMultiThreaded(cls, "loaders");
         }
 
         checkLocalData(3, CACHE1, data1);
@@ -765,6 +799,34 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param idx Node index.
+     * @param ldrs Loaders count.
+     * @param ldrIdx Loader index.
+     * @param cacheName Cache name.
+     * @param data Data.
+     * @return Callable for put operation.
+     */
+    private Callable<Void> putOperation(
+            final int idx,
+            final int ldrs,
+            final int ldrIdx,
+            final String cacheName,
+            final Integer[] data) {
+        return new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                IgniteCache cache = ignite(idx).cache(cacheName);
+
+                for (int j = 0, size = data.length; j < size ; j++) {
+                    if (j % ldrs == ldrIdx) {
+                        cache.put(j, data[j]);
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    /**
      * Creates an array of random integers.
      *
      * @param cnt Array length.
@@ -782,6 +844,23 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Creates a map with random integers.
+     *
+     * @param cnt Map size length.
+     * @return Map with random integers.
+     */
+    private Map<Integer, Integer> generateDataMap(int cnt) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Map<Integer, Integer> data = U.newHashMap(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            data.put(i, rnd.nextInt());
+
+        return data;
+    }
+
+    /**
      * @param cnt Sequence length.
      * @return Sequence of integers.
      */
@@ -1491,12 +1570,38 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
         int[] backups = cacheMode == REPLICATED ? new int[]{Integer.MAX_VALUE} : new int[]{0, 1, 2, 3};
 
         for (int backups0 : backups)
-            cacheApiTest(cacheMode, atomicityMode, backups0, false);
+            cacheApiTest(cacheMode, atomicityMode, backups0, false, false, false);
 
         int backups0 = cacheMode == REPLICATED ? Integer.MAX_VALUE :
             backups[ThreadLocalRandom.current().nextInt(backups.length)];
 
-        cacheApiTest(cacheMode, atomicityMode, backups0, true);
+        cacheApiTest(cacheMode, atomicityMode, backups0, true, false, false);
+
+        if (cacheMode == PARTITIONED) {
+            // Here the f variable is used as a bit set where 2 last bits
+            // determine whether a near cache is used on server/client side.
+            // The case without near cache is already tested at this point.
+            for (int f : new int[]{1, 2, 3}) {
+                cacheApiTest(cacheMode, atomicityMode, backups0, false, nearSrv(f), nearClient(f));
+                cacheApiTest(cacheMode, atomicityMode, backups0, true, nearSrv(f), nearClient(f));
+            }
+        }
+    }
+
+    /**
+     * @param flag Flag.
+     * @return {@code True} if near cache should be used on a client side.
+     */
+    private boolean nearClient(int flag) {
+        return (flag & 0b01) == 0b01;
+    }
+
+    /**
+     * @param flag Flag.
+     * @return {@code True} if near cache should be used on a server side.
+     */
+    private boolean nearSrv(int flag) {
+        return (flag & 0b10) == 0b10;
     }
 
     /**
@@ -1504,65 +1609,338 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
      * @param atomicityMode Atomicity mode.
      * @param backups Number of backups.
      * @param heapCache On heap cache flag.
+     * @param nearSrv {@code True} if near cache should be used on a server side.
+     * @param nearClient {@code True} if near cache should be used on a client side.
+     * @throws Exception If failed.
      */
-    private void cacheApiTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, int backups, boolean heapCache) {
-        for (int i = 0; i < 2; i++)
-            ignite(0).createCache(cacheConfiguration(GROUP1, "cache-" + i, cacheMode, atomicityMode, backups, heapCache));
+    private void cacheApiTest(CacheMode cacheMode,
+        CacheAtomicityMode atomicityMode,
+        int backups,
+        boolean heapCache,
+        boolean nearSrv,
+        boolean nearClient) throws Exception {
+        Ignite srv0 = ignite(0);
+
+        NearCacheConfiguration nearCfg = nearSrv ? new NearCacheConfiguration() : null;
+
+        srv0.createCache(cacheConfiguration(GROUP1, "cache-0", cacheMode, atomicityMode, backups, heapCache)
+            .setNearConfiguration(nearCfg));
+
+        srv0.createCache(cacheConfiguration(GROUP1, "cache-1", cacheMode, atomicityMode, backups, heapCache));
+
+        srv0.createCache(cacheConfiguration(GROUP2, "cache-2", cacheMode, atomicityMode, backups, heapCache)
+            .setNearConfiguration(nearCfg));
+
+        srv0.createCache(cacheConfiguration(null, "cache-3", cacheMode, atomicityMode, backups, heapCache));
+
+        if (nearClient) {
+            Ignite clientNode = ignite(4);
+
+            clientNode.createNearCache("cache-0", new NearCacheConfiguration());
+            clientNode.createNearCache("cache-2", new NearCacheConfiguration());
+        }
 
         try {
-            for (Ignite node : Ignition.allGrids()) {
-                for (int i = 0; i < 2; i++) {
-                    IgniteCache cache = node.cache("cache-" + i);
-
-                    log.info("Test cache [node=" + node.name() +
-                        ", cache=" + cache.getName() +
-                        ", mode=" + cacheMode +
-                        ", atomicity=" + atomicityMode +
-                        ", backups=" + backups +
-                        ", heapCache=" + heapCache +
-                        ']');
-
-                    cacheApiTest(cache);
-                }
+            for (final Ignite node : Ignition.allGrids()) {
+                List<Callable<?>> ops = new ArrayList<>();
+
+                for (int i = 0; i < 4; i++)
+                    ops.add(testSet(node.cache("cache-" + i), cacheMode, atomicityMode, backups, heapCache, node));
+
+                // Async operations.
+                GridTestUtils.runMultiThreaded(ops, "cacheApiTest");
             }
         }
         finally {
-            for (int i = 0; i < 2; i++)
-                ignite(0).destroyCache("cache-" + i);
+            for (int i = 0; i < 4; i++)
+                srv0.destroyCache("cache-" + i);
         }
     }
 
     /**
      * @param cache Cache.
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Atomicity mode.
+     * @param backups Number of backups.
+     * @param heapCache On heap cache flag.
+     * @param node Ignite node.
+     * @return Callable for the test operations.
+     */
+    private Callable<?> testSet(
+        final IgniteCache<Object, Object> cache,
+        final CacheMode cacheMode,
+        final CacheAtomicityMode atomicityMode,
+        final int backups,
+        final boolean heapCache,
+        final Ignite node) {
+        return new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                log.info("Test cache [node=" + node.name() +
+                    ", cache=" + cache.getName() +
+                    ", mode=" + cacheMode +
+                    ", atomicity=" + atomicityMode +
+                    ", backups=" + backups +
+                    ", heapCache=" + heapCache +
+                    ']');
+
+                cacheApiTest(cache);
+
+                return null;
+            }
+        };
+    }
+
+    /**
+     * @param cache Cache.
+     * @throws Exception If failed.
      */
-    private void cacheApiTest(IgniteCache cache) {
-        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+    private void cacheApiTest(IgniteCache cache) throws Exception {
+        cachePutAllGetAll(cache);
 
-        for (int i = 0; i < 10; i++) {
-            Integer key = rnd.nextInt(10_000);
+        cachePutRemove(cache);
 
-            assertNull(cache.get(key));
-            assertFalse(cache.containsKey(key));
+        cachePutGet(cache);
 
-            Integer val = key + 1;
+        cachePutGetAndPut(cache);
 
-            cache.put(key, val);
+        cacheQuery(cache);
 
-            assertEquals(val, cache.get(key));
-            assertTrue(cache.containsKey(key));
+        cacheInvokeAll(cache);
 
-            cache.remove(key);
+        cacheInvoke(cache);
 
-            assertNull(cache.get(key));
-            assertFalse(cache.containsKey(key));
-        }
+        cacheDataStreamer(cache);
+    }
 
+    /**
+     * @param cache Cache.
+     */
+    private void tearDown(IgniteCache cache) {
         cache.clear();
 
         cache.removeAll();
     }
 
     /**
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void cacheDataStreamer(final IgniteCache cache) throws Exception {
+        final int keys = 1000;
+        final int loaders = 4;
+
+        final Integer[] data = generateData(keys * loaders);
+
+        // Stream through a client node.
+        Ignite clientNode = ignite(4);
+
+        List<Callable<?>> cls = new ArrayList<>(loaders);
+
+        for (final int i : sequence(loaders)) {
+            final IgniteDataStreamer ldr = clientNode.dataStreamer(cache.getName());
+
+            ldr.autoFlushFrequency(0);
+
+            cls.add(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    List<IgniteFuture> futs = new ArrayList<>(keys);
+
+                    for (int j = 0, size = keys * loaders; j < size; j++) {
+                        if (j % loaders == i)
+                            futs.add(ldr.addData(j, data[j]));
+
+                        if(j % (100 * loaders) == 0)
+                            ldr.flush();
+                    }
+
+                    ldr.flush();
+
+                    for (IgniteFuture fut : futs)
+                        fut.get();
+
+                    return null;
+                }
+            });
+        }
+
+        GridTestUtils.runMultiThreaded(cls, "loaders");
+
+        Set<Integer> keysSet = sequence(data.length);
+
+        for (Cache.Entry<Integer, Integer> entry : (IgniteCache<Integer, Integer>)cache) {
+            assertTrue(keysSet.remove(entry.getKey()));
+            assertEquals(data[entry.getKey()], entry.getValue());
+        }
+
+        assertTrue(keysSet.isEmpty());
+
+        tearDown(cache);
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void cachePutAllGetAll(IgniteCache cache) {
+        Map<Integer, Integer> data = generateDataMap(1000);
+
+        cache.putAll(data);
+
+        Map data0 = cache.getAll(data.keySet());
+
+        assertEquals(data.size(), data0.size());
+
+        for (Map.Entry<Integer, Integer> entry : data.entrySet()) {
+            assertEquals(entry.getValue(), data0.get(entry.getKey()));
+        }
+
+        tearDown(cache);
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void cachePutRemove(IgniteCache cache) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Integer key = rnd.nextInt();
+        Integer val = rnd.nextInt();
+
+        cache.put(key, val);
+
+        assertTrue(cache.remove(key));
+
+        assertNull(cache.get(key));
+
+        tearDown(cache);
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void cachePutGet(IgniteCache cache) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Integer key = rnd.nextInt();
+        Integer val = rnd.nextInt();
+
+        cache.put(key, val);
+
+        Object val0 = cache.get(key);
+
+        assertEquals(val, val0);
+
+        tearDown(cache);
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void cachePutGetAndPut(IgniteCache cache) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Integer key = rnd.nextInt();
+        Integer val1 = rnd.nextInt();
+        Integer val2 = rnd.nextInt();
+
+        cache.put(key, val1);
+
+        Object val0 = cache.getAndPut(key, val2);
+
+        assertEquals(val1, val0);
+
+        val0 = cache.get(key);
+
+        assertEquals(val2, val0);
+
+        tearDown(cache);
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void cacheQuery(IgniteCache cache) {
+        Map<Integer, Integer> data = generateDataMap(1000);
+
+        cache.putAll(data);
+
+        ScanQuery<Integer, Integer> qry = new ScanQuery<>(new IgniteBiPredicate<Integer, Integer>() {
+            @Override public boolean apply(Integer key, Integer val) {
+                return key % 2 == 0;
+            }
+        });
+
+        List<Cache.Entry<Integer, Integer>> all = cache.query(qry).getAll();
+
+        assertEquals(all.size(), data.size() / 2);
+
+        for (Cache.Entry<Integer, Integer> entry : all) {
+            assertEquals(0, entry.getKey() % 2);
+            assertEquals(entry.getValue(), data.get(entry.getKey()));
+        }
+
+        tearDown(cache);
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void cacheInvokeAll(IgniteCache cache) {
+        int keys = 1000;
+        Map<Integer, Integer> data = generateDataMap(keys);
+
+        cache.putAll(data);
+
+        Random rnd = ThreadLocalRandom.current();
+
+        int one = rnd.nextInt();
+        int two = rnd.nextInt();
+
+        Map<Integer, CacheInvokeResult<Integer>> res = cache.invokeAll(data.keySet(), new CacheEntryProcessor<Integer, Integer, Integer>() {
+            @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
+                Object expected = ((Map)arguments[0]).get(entry.getKey());
+
+                assertEquals(expected, entry.getValue());
+
+                // Some calculation.
+                return (Integer)arguments[1] + (Integer)arguments[2];
+            }
+        }, data, one, two);
+
+        assertEquals(keys, res.size());
+        assertEquals(one + two, (Object)res.get(0).get());
+
+        tearDown(cache);
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void cacheInvoke(IgniteCache cache) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Integer key = rnd.nextInt();
+        Integer val = rnd.nextInt();
+
+        cache.put(key, val);
+
+        int one = rnd.nextInt();
+        int two = rnd.nextInt();
+
+        Object res = cache.invoke(key, new CacheEntryProcessor<Integer, Integer, Integer>() {
+            @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
+                assertEquals(arguments[0], entry.getValue());
+
+                // Some calculation.
+                return (Integer)arguments[1] + (Integer)arguments[2];
+            }
+        }, val, one, two);
+
+        assertEquals(one + two, res);
+
+        tearDown(cache);
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testConcurrentOperationsSameKeys() throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java
index 39456c3..b538449 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java
@@ -18,19 +18,33 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.Callable;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.binary.AffinityKey;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
@@ -61,17 +75,17 @@ public class IgniteCacheGroupsSqlTest extends GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
 
         startGridsMultiThreaded(3);
     }
 
     /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
+    @Override protected void afterTest() throws Exception {
         stopAllGrids();
 
-        super.afterTestsStopped();
+        super.afterTest();
     }
 
     /**
@@ -80,8 +94,8 @@ public class IgniteCacheGroupsSqlTest extends GridCommonAbstractTest {
     public void testSqlQuery() throws Exception {
         Ignite node = ignite(0);
 
-        IgniteCache c1 = node.createCache(cacheConfiguration(GROUP1, "c1"));
-        IgniteCache c2 = node.createCache(cacheConfiguration(GROUP1, "c2"));
+        IgniteCache c1 = node.createCache(personCacheConfiguration(GROUP1, "c1"));
+        IgniteCache c2 = node.createCache(personCacheConfiguration(GROUP1, "c2"));
 
         SqlFieldsQuery qry = new SqlFieldsQuery("select name from Person where name=?");
         qry.setArgs("p1");
@@ -101,23 +115,157 @@ public class IgniteCacheGroupsSqlTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery1() throws Exception {
+        joinQuery(GROUP1, GROUP2, REPLICATED, PARTITIONED, TRANSACTIONAL, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery2() throws Exception {
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                joinQuery(GROUP1, GROUP1, REPLICATED, PARTITIONED, TRANSACTIONAL, TRANSACTIONAL);
+                return null;
+            }
+        }, IgniteCheckedException.class, "Cache mode mismatch for caches related to the same group");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery3() throws Exception {
+        joinQuery(GROUP1, GROUP1, PARTITIONED, PARTITIONED, TRANSACTIONAL, ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery4() throws Exception {
+        joinQuery(GROUP1, GROUP1, REPLICATED, REPLICATED, ATOMIC, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery5() throws Exception {
+        joinQuery(GROUP1, null, REPLICATED, PARTITIONED, TRANSACTIONAL, TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery6() throws Exception {
+        joinQuery(GROUP1, null, PARTITIONED, PARTITIONED, TRANSACTIONAL, ATOMIC);
+    }
+
+    /**
+     * @param grp1 First cache group.
+     * @param grp2 Second cache group.
+     * @param cm1 First cache mode.
+     * @param cm2 Second cache mode.
+     * @param cam1 First cache atomicity mode.
+     * @param cam2 Second cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void joinQuery(String grp1, String grp2, CacheMode cm1,
+        CacheMode cm2, CacheAtomicityMode cam1, CacheAtomicityMode cam2) throws Exception {
+        int keys = 1000;
+        int accsPerPerson = 4;
+
+        Ignite srv0 = ignite(0);
+
+        IgniteCache pers = srv0.createCache(personCacheConfiguration(grp1, "pers")
+            .setAffinity(new RendezvousAffinityFunction().setPartitions(10))
+            .setCacheMode(cm1)
+            .setAtomicityMode(cam1));
+
+        IgniteCache acc = srv0.createCache(accountCacheConfiguration(grp2, "acc")
+            .setAffinity(new RendezvousAffinityFunction().setPartitions(10))
+            .setCacheMode(cm2)
+            .setAtomicityMode(cam2));
+
+        try(Transaction tx = cam1 == TRANSACTIONAL || cam2 == TRANSACTIONAL ? srv0.transactions().txStart() : null) {
+            for (int i = 0; i < keys; i++) {
+
+                int pKey = i - (i % accsPerPerson);
+
+                if (i % accsPerPerson == 0)
+                    pers.put(pKey, new Person("pers-" + pKey));
+
+
+                acc.put(new AffinityKey(i, pKey), new Account(pKey, "acc-" + i));
+            }
+
+            if (tx != null)
+                tx.commit();
+        }
+
+        Ignite node = ignite(2);
+
+        SqlFieldsQuery qry = new SqlFieldsQuery(
+            "select p._key as p_key, p.name, a._key as a_key, a.personId, a.attr \n" +
+            "from \"pers\".Person p inner join \"acc\".Account a \n" +
+            "on (p._key = a.personId)");
+
+        IgniteCache<Object, Object> cache = node.cache("acc");
+
+        List<List<?>> res = cache.query(qry).getAll();
+
+        assertEquals(keys, res.size());
+
+        for (List<?> row : res)
+            assertEquals(row.get(0), row.get(3));
+    }
+
+    /**
+     * @param grpName Group name.
+     * @param cacheName Cache name.
+     * @return Person cache configuration.
+     */
+    private CacheConfiguration personCacheConfiguration(String grpName, String cacheName) {
+        QueryEntity entity = new QueryEntity();
+
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(Person.class.getName());
+        entity.addQueryField("name", String.class.getName(), null);
+
+        return cacheConfiguration(grpName, cacheName, entity);
+    }
+
+    /**
+     * @param grpName Group name.
+     * @param cacheName Cache name.
+     * @return Account cache configuration.
+     */
+    private CacheConfiguration accountCacheConfiguration(String grpName, String cacheName) {
+        QueryEntity entity = new QueryEntity();
+
+        entity.setKeyType(AffinityKey.class.getName());
+        entity.setValueType(Account.class.getName());
+        entity.addQueryField("personId", Integer.class.getName(), null);
+        entity.addQueryField("attr", String.class.getName(), null);
+        entity.setIndexes(F.asList(new QueryIndex("personId")));
+
+        return cacheConfiguration(grpName, cacheName, entity);
+    }
+
+    /**
      * @param grpName Group name.
      * @param cacheName Cache name.
+     * @param queryEntity Query entity.
      * @return Cache configuration.
      */
-    private CacheConfiguration cacheConfiguration(String grpName, String cacheName) {
+    private CacheConfiguration cacheConfiguration(String grpName, String cacheName, QueryEntity queryEntity) {
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
         ccfg.setGroupName(grpName);
         ccfg.setName(cacheName);
 
-        QueryEntity entity = new QueryEntity();
-        entity.setKeyType(Integer.class.getName());
-        entity.setValueType(Person.class.getName());
-        entity.addQueryField("name", String.class.getName(), null);
-
-        ccfg.setQueryEntities(F.asList(entity));
+        ccfg.setQueryEntities(F.asList(queryEntity));
 
         return ccfg;
     }
@@ -141,4 +289,29 @@ public class IgniteCacheGroupsSqlTest extends GridCommonAbstractTest {
             return S.toString(Person.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private static class Account implements Serializable {
+        /** */
+        Integer personId;
+
+        /** */
+        String attr;
+
+        /**
+         * @param personId Person ID.
+         * @param attr Attribute (some data).
+         */
+        public Account(Integer personId, String attr) {
+            this.personId = personId;
+            this.attr = attr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Account.class, this);
+        }
+    }
 }


Mime
View raw message