ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dma...@apache.org
Subject ignite git commit: IGNITE-2394: Cache loading from storage is called on client nodes. Reviewed and merged by Denis Magda (dmagda@gridgain.com)
Date Tue, 17 May 2016 14:19:08 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1.6 d2cbcbb80 -> 42e5232c8


IGNITE-2394: Cache loading from storage is called on client nodes.
Reviewed and merged by Denis Magda (dmagda@gridgain.com)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42e5232c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42e5232c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42e5232c

Branch: refs/heads/ignite-1.6
Commit: 42e5232c8b1e057ef159483b576abd39a9245f18
Parents: d2cbcbb
Author: Alper Tekinalp <alper.tekinalp@gmail.com>
Authored: Tue May 17 17:19:02 2016 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Tue May 17 17:19:02 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   6 +-
 .../processors/cache/IgniteCacheProxy.java      |  41 ++--
 .../cache/CacheClientStoreSelfTest.java         | 209 ++++++++++++++++---
 3 files changed, 212 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/42e5232c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index a02db2c..9ea688d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3689,14 +3689,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
      */
     IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K,
V> p, @Nullable Object... args)
         throws IgniteCheckedException {
-        ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
+        ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name())
             .forPredicate(new IgnitePredicate<ClusterNode>() {
                 @Override public boolean apply(ClusterNode node) {
                     return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE)
< 0;
                 }
             });
 
-        ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
+        ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name())
             .forPredicate(new IgnitePredicate<ClusterNode>() {
                 @Override public boolean apply(ClusterNode node) {
                     return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE)
>= 0 &&
@@ -3704,7 +3704,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                 }
             });
 
-        ClusterGroup newNodesV2 = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
+        ClusterGroup newNodesV2 = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name())
             .forPredicate(new IgnitePredicate<ClusterNode>() {
                 @Override public boolean apply(ClusterNode node) {
                     return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE)
>= 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/42e5232c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index fc046af..76cc77b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -190,8 +190,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
     }
 
     /**
-     * Gets cache proxy which does not acquire read lock on gateway enter, should be
-     * used only if grid read lock is externally acquired.
+     * Gets cache proxy which does not acquire read lock on gateway enter, should be used
only if grid read lock is
+     * externally acquired.
      *
      * @return Ignite cache proxy with simple gate.
      */
@@ -372,10 +372,18 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
             CacheOperationContext prev = onEnter(gate, opCtx);
 
             try {
-                if (isAsync())
-                    setFuture(ctx.cache().globalLoadCacheAsync(p, args));
-                else
-                    ctx.cache().globalLoadCache(p, args);
+                if (isAsync()) {
+                    if (ctx.cache().isLocal())
+                        setFuture(ctx.cache().localLoadCacheAsync(p, args));
+                    else
+                        setFuture(ctx.cache().globalLoadCacheAsync(p, args));
+                }
+                else {
+                    if (ctx.cache().isLocal())
+                        ctx.cache().localLoadCache(p, args);
+                    else
+                        ctx.cache().globalLoadCache(p, args);
+                }
             }
             finally {
                 onLeave(gate, prev);
@@ -463,9 +471,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
      * @return Cursor.
      */
     @SuppressWarnings("unchecked")
-    private QueryCursor<Cache.Entry<K,V>> query(final Query filter, @Nullable
ClusterGroup grp)
+    private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable
ClusterGroup grp)
         throws IgniteCheckedException {
-        final CacheQuery<Map.Entry<K,V>> qry;
+        final CacheQuery<Map.Entry<K, V>> qry;
 
         boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary();
 
@@ -478,8 +486,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
                 qry.projection(grp);
 
             final GridCloseableIterator<Entry<K, V>> iter = ctx.kernalContext().query().executeQuery(ctx,
-                new IgniteOutClosureX<GridCloseableIterator<Entry<K,V>>>()
{
-                    @Override public GridCloseableIterator<Entry<K,V>> applyx()
throws IgniteCheckedException {
+                new IgniteOutClosureX<GridCloseableIterator<Entry<K, V>>>()
{
+                    @Override public GridCloseableIterator<Entry<K, V>> applyx()
throws IgniteCheckedException {
                         final GridCloseableIterator<Map.Entry> iter0 = qry.executeScanQuery();
 
                         return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>()
{
@@ -503,7 +511,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
             return new QueryCursorImpl<>(iter);
         }
 
-        final CacheQueryFuture<Map.Entry<K,V>> fut;
+        final CacheQueryFuture<Map.Entry<K, V>> fut;
 
         if (filter instanceof TextQuery) {
             TextQuery p = (TextQuery)filter;
@@ -541,15 +549,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
             throw new CacheException("Unsupported query type: " + filter);
         }
 
-        return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,V>>()
{
+        return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,
V>>() {
             /** */
-            private Map.Entry<K,V> cur;
+            private Map.Entry<K, V> cur;
 
-            @Override protected Entry<K,V> onNext() throws IgniteCheckedException {
+            @Override protected Entry<K, V> onNext() throws IgniteCheckedException
{
                 if (!onHasNext())
                     throw new NoSuchElementException();
 
-                Map.Entry<K,V> e = cur;
+                Map.Entry<K, V> e = cur;
 
                 cur = null;
 
@@ -848,7 +856,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
         }
     }
 
-
     /** {@inheritDoc} */
     @Override public int localSize(CachePeekMode... peekModes) {
         GridCacheGateway<K, V> gate = this.gate;
@@ -959,7 +966,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<CacheEntry<K, V>>  getEntries(Set<? extends
K> keys) {
+    @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends
K> keys) {
         try {
             GridCacheGateway<K, V> gate = this.gate;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/42e5232c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheClientStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheClientStoreSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheClientStoreSelfTest.java
index 213a8de..b1bc066 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheClientStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheClientStoreSelfTest.java
@@ -17,17 +17,26 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import javax.cache.Cache;
 import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -36,7 +45,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 
 /**
- * Tests for cache client without store.
+ * Tests for cache client with and without store.
  */
 public class CacheClientStoreSelfTest extends GridCommonAbstractTest {
     /** */
@@ -46,26 +55,38 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest {
     private static final String CACHE_NAME = "test-cache";
 
     /** */
-    private boolean client;
+    private volatile boolean nearEnabled;
 
     /** */
-    private boolean nearEnabled;
+    private volatile Factory<CacheStore> factory;
 
     /** */
-    private Factory<CacheStore> factory;
+    private volatile CacheMode cacheMode;
+
+    /** */
+    private static volatile boolean loadedFromClient;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        boolean client = gridName != null && gridName.startsWith("client");
+
         cfg.setClientMode(client);
 
         CacheConfiguration cc = new CacheConfiguration();
 
         cc.setName(CACHE_NAME);
         cc.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        cc.setCacheMode(cacheMode);
+        cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cc.setBackups(1);
+
         cc.setCacheStoreFactory(factory);
 
+        if (factory instanceof Factory3)
+            cc.setReadThrough(true);
+
         if (client && nearEnabled)
             cc.setNearConfiguration(new NearCacheConfiguration());
 
@@ -81,32 +102,23 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest
{
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        client = false;
-        factory = new Factory1();
-
-        startGrids(2);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
+    @Override protected void afterTest() throws Exception {
         stopAllGrids();
-    }
 
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopGrid();
+        loadedFromClient = false;
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testCorrectStore() throws Exception {
-        client = true;
         nearEnabled = false;
+        cacheMode = CacheMode.PARTITIONED;
         factory = new Factory1();
 
-        Ignite ignite = startGrid();
+        startGrids(2);
+
+        Ignite ignite = startGrid("client-1");
 
         IgniteCache cache = ignite.cache(CACHE_NAME);
 
@@ -131,19 +143,27 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testInvalidStore() throws Exception {
-        client = true;
         nearEnabled = false;
+        cacheMode = CacheMode.PARTITIONED;
+        factory = new Factory1();
+
+        startGrids(2);
+
         factory = new Factory2();
 
-        startGrid();
+        startGrid("client-1");
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testDisabledConsistencyCheck() throws Exception {
-        client = false;
         nearEnabled = false;
+        cacheMode = CacheMode.PARTITIONED;
+        factory = new Factory1();
+
+        startGrids(2);
+
         factory = new Factory2();
 
         System.setProperty(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, "true");
@@ -162,6 +182,10 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest
{
      */
     public void testNoStoreNearDisabled() throws Exception {
         nearEnabled = false;
+        cacheMode = CacheMode.PARTITIONED;
+        factory = new Factory1();
+
+        startGrids(2);
 
         doTestNoStore();
     }
@@ -171,6 +195,10 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest
{
      */
     public void testNoStoreNearEnabled() throws Exception {
         nearEnabled = true;
+        cacheMode = CacheMode.PARTITIONED;
+        factory = new Factory1();
+
+        startGrids(2);
 
         doTestNoStore();
     }
@@ -179,10 +207,9 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     private void doTestNoStore() throws Exception {
-        client = true;
         factory = null;
 
-        Ignite ignite = startGrid();
+        Ignite ignite = startGrid("client-1");
 
         IgniteCache cache = ignite.cache(CACHE_NAME);
 
@@ -204,6 +231,101 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * Load cache created on client as LOCAL and see if it only loaded on client
+     *
+     * @throws Exception
+     */
+    public void testLocalLoadClient() throws Exception {
+        cacheMode = CacheMode.LOCAL;
+        factory = new Factory3();
+
+        startGrids(2);
+
+        Ignite client = startGrid("client-1");
+
+        IgniteCache cache = client.cache(CACHE_NAME);
+
+        cache.loadCache(null);
+
+        assertEquals(10, cache.localSize(CachePeekMode.ALL));
+
+        assertEquals(0, grid(0).cache(CACHE_NAME).localSize(CachePeekMode.ALL));
+        assertEquals(0, grid(1).cache(CACHE_NAME).localSize(CachePeekMode.ALL));
+
+        assert loadedFromClient;
+    }
+
+    /**
+     * Load cache from server that created on client as LOCAL and see if it only loaded on
server
+     *
+     * @throws Exception
+     */
+    public void testLocalLoadServer() throws Exception {
+        cacheMode = CacheMode.LOCAL;
+        factory = new Factory3();
+
+        startGrids(2);
+
+        Ignite client = startGrid("client-1");
+
+        IgniteCache cache = grid(0).cache(CACHE_NAME);
+
+        cache.loadCache(null);
+
+        assertEquals(10, cache.localSize(CachePeekMode.ALL));
+        assertEquals(0, grid(1).cache(CACHE_NAME).localSize(CachePeekMode.ALL));
+        assertEquals(0, client.cache(CACHE_NAME).localSize(CachePeekMode.ALL));
+
+        assert !loadedFromClient : "Loaded data from client!";
+    }
+
+    /**
+     * Load cache created on client as REPLICATED and see if it only loaded on servers
+     */
+    public void testReplicatedLoadFromClient() throws Exception {
+        cacheMode = CacheMode.REPLICATED;
+        factory = new Factory3();
+
+        startGrids(2);
+
+        Ignite client = startGrid("client-1");
+
+        IgniteCache cache = client.cache(CACHE_NAME);
+
+        cache.loadCache(null);
+
+        assertEquals(0, cache.localSize(CachePeekMode.ALL));
+
+        assertEquals(10, grid(0).cache(CACHE_NAME).localSize(CachePeekMode.ALL));
+        assertEquals(10, grid(1).cache(CACHE_NAME).localSize(CachePeekMode.ALL));
+
+        assert !loadedFromClient : "Loaded data from client!";
+    }
+
+    /**
+     * Load cache created on client as REPLICATED and see if it only loaded on servers
+     */
+    public void testPartitionedLoadFromClient() throws Exception {
+        cacheMode = CacheMode.PARTITIONED;
+        factory = new Factory3();
+
+        startGrids(2);
+
+        Ignite client = startGrid("client-1");
+
+        IgniteCache cache = client.cache(CACHE_NAME);
+
+        cache.loadCache(null);
+
+        assertEquals(0, cache.localSize(CachePeekMode.ALL));
+
+        assertEquals(10, grid(0).cache(CACHE_NAME).localSize(CachePeekMode.ALL));
+        assertEquals(10, grid(1).cache(CACHE_NAME).localSize(CachePeekMode.ALL));
+
+        assert !loadedFromClient : "Loaded data from client!";
+    }
+
+    /**
      */
     private static class Factory1 implements Factory<CacheStore> {
         /** {@inheritDoc} */
@@ -223,9 +345,48 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest
{
 
     /**
      */
+    private static class Factory3 implements Factory<CacheStore> {
+        /** {@inheritDoc} */
+        @Override public CacheStore create() {
+            return new TestStore();
+        }
+    }
+
+    /**
+     */
     private static class EP implements CacheEntryProcessor {
         @Override public Object process(MutableEntry entry, Object... arguments) {
             return null;
         }
     }
+
+    /**
+     * Test store that loads 10 item
+     */
+    public static class TestStore extends CacheStoreAdapter<Object, Object> {
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        @Override
+        public Integer load(Object key) throws CacheLoaderException {
+            return null;
+        }
+
+        @Override
+        public void write(Cache.Entry<? extends Object, ? extends Object> entry) throws
CacheWriterException {
+        }
+
+        @Override
+        public void delete(Object key) throws CacheWriterException {
+        }
+
+        @Override
+        public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args)
{
+            if (ignite.cluster().localNode().isClient())
+                loadedFromClient = true;
+
+            for (int i = 0; i < 10; i++)
+                clo.apply(i, i);
+        }
+    }
 }
\ No newline at end of file


Mime
View raw message