ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: ignite-500 Cache not loading correctly
Date Fri, 24 Apr 2015 17:58:43 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-500 [created] ed1678f13


ignite-500 Cache not loading correctly


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ed1678f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ed1678f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ed1678f1

Branch: refs/heads/ignite-500
Commit: ed1678f139c50de76d72d512783df308a0f9a83f
Parents: b4b28fd
Author: agura <agura@gridgain.com>
Authored: Fri Apr 24 20:58:01 2015 +0300
Committer: agura <agura@gridgain.com>
Committed: Fri Apr 24 20:58:01 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 61 ++++++++++++--
 .../distributed/dht/GridDhtCacheAdapter.java    |  9 ++-
 .../datastreamer/DataStreamerImpl.java          |  4 +-
 ...GridCacheLoadingConcurrentGridStartTest.java | 83 ++++++++++++++------
 .../ignite/testsuites/IgniteCacheTestSuite.java |  2 +-
 5 files changed, 127 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed1678f1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index c246661..f5a88bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3312,6 +3312,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                         if (ttl == CU.TTL_ZERO)
                             return;
 
+/*
+                        if (!topVer.equals(ctx.topology().topologyVersion()))
+                            throw new ClusterTopologyException("Topology changed");
+*/
+
                         loadEntry(key, val, ver0, (IgniteBiPredicate<Object, Object>)
p, topVer, replicate, ttl);
                     }
                 }, args);
@@ -3531,17 +3536,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
      */
     IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K,
V> p, @Nullable Object... args)
         throws IgniteCheckedException {
-        ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name());
-
         ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true);
 
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null;
 
-        return ctx.kernalContext().closure().callAsync(BROADCAST,
-            Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)),
-            nodes.nodes());
+        final LoadCacheClosure<K, V> loadClos = new LoadCacheClosure<>(ctx.name(),
p, args, plc);
+
+        return new GlobalLoadCacheFuture(loadClos, ctx);
     }
 
     /**
@@ -5697,4 +5700,52 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
             metrics.addPutAndGetTimeNanos(System.nanoTime() - start);
         }
     }
+
+    /**
+     * Global load cache future.
+     */
+    private static class GlobalLoadCacheFuture<K1, V1> extends GridFutureAdapter<Void>
{
+        /** Load clos. */
+        private final LoadCacheClosure<K1, V1> loadClos;
+
+        /** Context. */
+        private final GridCacheContext<K1, V1> ctx;
+
+        /**
+         * @param loadClos Load cache closure.
+         * @param ctx Context.
+         */
+        public GlobalLoadCacheFuture(LoadCacheClosure<K1, V1> loadClos, GridCacheContext<K1,
V1> ctx) {
+            this.loadClos = loadClos;
+            this.ctx = ctx;
+
+            init();
+        }
+
+        /**
+         * Inits future.
+         */
+        private void init() {
+            ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name());
+
+            ComputeTaskInternalFuture<Collection<Void>> loadFut = ctx.kernalContext().closure().callAsync(BROADCAST,
+                Arrays.asList(loadClos), nodes.nodes());
+
+            loadFut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Void>>>()
{
+                @Override public void apply(IgniteInternalFuture<Collection<Void>>
fut) {
+                    try {
+                        fut.get();
+
+                        onDone();
+                    }
+                    catch (Exception e) {
+                        if (e.getCause().getCause() instanceof GridDhtInvalidPartitionException)
+                            init();
+                        else
+                            onDone(null, fut.error());
+                    }
+                }
+            });
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed1678f1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 1c46fd0..10df868 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -410,6 +410,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 @Override public void apply(KeyCacheObject key, Object val, @Nullable GridCacheVersion
ver) {
                     assert ver == null;
 
+/*
+                    if (!topVer.equals(topology().topologyVersion()))
+                        throw new ClusterTopologyException("Topology changed");
+*/
+
                     loadEntry(key, val, ver0, p, topVer, replicate, plc);
                 }
             }, args);
@@ -476,12 +481,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 }
             }
             else if (log.isDebugEnabled())
-                log.debug("Will node load entry into cache (partition is invalid): " + part);
+                    log.debug("Will node load entry into cache (partition is invalid): "
+ part);
         }
         catch (GridDhtInvalidPartitionException e) {
             if (log.isDebugEnabled())
                 log.debug("Ignoring entry for partition that does not belong [key=" + key
+ ", val=" + val +
                     ", err=" + e + ']');
+
+            throw e;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed1678f1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a69e033..e113c0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1421,8 +1421,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
                     cctx.evicts().touch(entry, topVer);
                 }
-                catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException
ignored) {
-                    // No-op.
+                catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException
ignored){
+                    ignored.printStackTrace();
                 }
                 catch (IgniteCheckedException ex) {
                     IgniteLogger log = cache.unwrap(Ignite.class).log();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed1678f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java
index 2f9bb96..064047c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java
@@ -18,21 +18,25 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
+
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
 import javax.cache.configuration.*;
 import javax.cache.integration.*;
+import java.io.*;
 import java.util.concurrent.*;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
 
 /**
  * Tests for cache data loading during simultaneous grids start.
@@ -42,7 +46,7 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT
     private static int GRIDS_CNT = 5;
 
     /** Keys count */
-    private static int KEYS_CNT = 1_000_000;
+    private static int KEYS_CNT = 100_000;
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
@@ -53,26 +57,13 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT
 
         ccfg.setCacheMode(PARTITIONED);
 
-        ccfg.setBackups(1);
+        ccfg.setAtomicityMode(ATOMIC);
 
-        CacheStore<Integer, String> store = new CacheStoreAdapter<Integer, String>()
{
-            @Override public void loadCache(IgniteBiInClosure<Integer, String> f, Object...
args) {
-                for (int i = 0; i < KEYS_CNT; i++)
-                    f.apply(i, Integer.toString(i));
-            }
+        ccfg.setRebalanceMode(SYNC);
 
-            @Nullable @Override public String load(Integer i) throws CacheLoaderException
{
-                return null;
-            }
+        ccfg.setBackups(1);
 
-            @Override public void write(Cache.Entry<? extends Integer, ? extends String>
entry) throws CacheWriterException {
-                // No-op.
-            }
-
-            @Override public void delete(Object o) throws CacheWriterException {
-                // No-op.
-            }
-        };
+        CacheStore<Integer, String> store = new TestCacheStoreAdapter();
 
         ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
 
@@ -92,9 +83,22 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT
     public void testLoadCacheWithDataStreamer() throws Exception {
         IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() {
             @Override public void apply(Ignite grid) {
+
                 try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null))
{
+//                    dataStreamer.perNodeParallelOperations(2);
+                    dataStreamer.perNodeBufferSize(64);
+
                     for (int i = 0; i < KEYS_CNT; i++)
                         dataStreamer.addData(i, Integer.toString(i));
+
+/*
+                    try {
+                        Thread.sleep(5000);
+                    }
+                    catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+*/
                 }
             }
         };
@@ -142,13 +146,46 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT
     private void assertCacheSize() {
         IgniteCache<Integer, String> cache = grid(0).cache(null);
 
-        assertEquals(KEYS_CNT, cache.size(CachePeekMode.PRIMARY));
-
         int total = 0;
 
-        for (int i = 0; i < GRIDS_CNT; i++)
-            total += grid(i).cache(null).localSize(CachePeekMode.PRIMARY);
+        for (int i = 0; i < GRIDS_CNT; i++) {
+            int locSize = grid(i).cache(null).localSize();
+
+            System.out.println("!!! Local size(" + i + "): " + locSize);
+
+            total += locSize;
+
+        }
 
         assertEquals(KEYS_CNT, total);
+
+        assertEquals(KEYS_CNT, cache.size());
+    }
+
+    /**
+     * Cache store adapter.
+     */
+    private static class TestCacheStoreAdapter extends CacheStoreAdapter<Integer, String>
implements Serializable {
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Integer, String> f, Object...
args) {
+            for (int i = 0; i < KEYS_CNT; i++)
+                f.apply(i, Integer.toString(i));
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String load(Integer i) throws CacheLoaderException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends String>
entry)
+            throws CacheWriterException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object o) throws CacheWriterException {
+            // No-op.
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed1678f1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 6e70052..6f954cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -224,7 +224,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridCacheDhtPreloadUnloadSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedAffinityFilterSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedPreloadLifecycleSelfTest.class));
-//        suite.addTest(new TestSuite(GridCacheLoadingConcurrentGridStartTest.class));  TODO-ignite-500
+        suite.addTest(new TestSuite(GridCacheLoadingConcurrentGridStartTest.class));
         suite.addTest(new TestSuite(GridCacheDhtPreloadDelayedSelfTest.class));
         suite.addTest(new TestSuite(GridPartitionedBackupLoadSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedLoadCacheSelfTest.class));


Mime
View raw message