ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [2/6] ignite git commit: ignite-db-x fix IgniteCachePartitionLossPolicySelfTest, batch reset partition
Date Fri, 03 Feb 2017 14:47:14 GMT
ignite-db-x fix IgniteCachePartitionLossPolicySelfTest, batch reset partition


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a70dc00e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a70dc00e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a70dc00e

Branch: refs/heads/ignite-3477
Commit: a70dc00e1b699a9ad2959dd127241ec4371c0a1c
Parents: 16f64d1
Author: Dmitriy Govorukhin <dgovorukhin@gridgain.com>
Authored: Thu Feb 2 18:26:45 2017 +0300
Committer: Dmitriy Govorukhin <dgovorukhin@gridgain.com>
Committed: Thu Feb 2 18:26:45 2017 +0300

----------------------------------------------------------------------
 .../src/main/java/org/apache/ignite/Ignite.java |  5 +++
 .../java/org/apache/ignite/IgniteCache.java     |  6 ---
 .../apache/ignite/internal/IgniteKernal.java    | 15 +++++++
 .../processors/cache/GridCacheProcessor.java    | 44 +++++++++++++++-----
 .../processors/cache/IgniteCacheProxy.java      | 26 ------------
 .../GridDhtPartitionsExchangeFuture.java        | 41 +++++++++++-------
 .../IgniteCachePartitionLossPolicySelfTest.java | 14 +++----
 .../processors/igfs/IgfsIgniteMock.java         |  5 +++
 .../ignite/testframework/junits/IgniteMock.java |  5 +++
 .../multijvm/IgniteCacheProcessProxy.java       |  5 ---
 .../junits/multijvm/IgniteProcessProxy.java     |  5 +++
 11 files changed, 102 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 54512a3..408a797 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -612,4 +612,9 @@ public interface Ignite extends AutoCloseable {
      * @param active If {@code True} start activation process. If {@code False} start deactivation
process.
      */
     public void active(boolean active);
+
+    /**
+     * Clears partition's lost state and moves caches to a normal mode.
+     */
+    public void resetLostPartitions(Collection<String> cacheNames);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index eca61ab..412e08d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -933,10 +933,4 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K,
V>, IgniteAsyncS
      * @return Lost paritions.
      */
     public Collection<Integer> lostPartitions();
-
-    /**
-     * Clears partition's lost state and moves cache to a normal mode.
-     */
-    @IgniteAsyncSupported
-    public void resetLostPartitions();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c11b770..ad10f7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3226,6 +3226,21 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
     }
 
     /** {@inheritDoc} */
+    @Override public void resetLostPartitions(Collection<String> cacheNames) {
+        guard();
+
+        try {
+            ctx.cache().resetCacheState(cacheNames).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public IgniteAtomicSequence atomicSequence(String name, long initVal,
boolean create) {
         guard();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 57b0d84..542a18c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2570,22 +2570,43 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * Resets cache state after the cache has been moved to recovery state.
      *
-     * @param cacheName Cache name.
-     * @return Future that will be completed when state is changed.
+     * @param cacheNames Cache names.
+     * @return Future that will be completed when state is changed for all caches.
      */
-    public IgniteInternalFuture<?> resetCacheState(String cacheName) {
-        IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(cacheName));
+    public IgniteInternalFuture<?> resetCacheState(Collection<String> cacheNames)
{
+        checkEmptyTransactions();
 
-        if (proxy == null || proxy.proxyClosed())
-            return new GridFinishedFuture<>(); // No-op.
+        if (F.isEmpty(cacheNames))
+            cacheNames = registeredCaches.keySet();
 
-        checkEmptyTransactions();
+        Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());
 
-        DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName,
ctx.localNodeId());
+        for (String cacheName : cacheNames) {
+            DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
 
-        t.markResetLostPartitions();
+            if (desc == null) {
+                log.warning("Reset lost partition will not be executed, " +
+                    "because cache with name:" + cacheName + " doesn't not exist");
 
-        return F.first(initiateCacheChanges(F.asList(t), false));
+                continue;
+            }
+
+            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
+                UUID.randomUUID(), cacheName, ctx.localNodeId());
+
+            req.markResetLostPartitions();
+
+            reqs.add(req);
+        }
+
+        GridCompoundFuture fut = new GridCompoundFuture();
+
+        for (DynamicCacheStartFuture f : initiateCacheChanges(reqs, false))
+            fut.add(f);
+
+        fut.markInitialized();
+
+        return fut;
     }
 
     /**
@@ -2690,7 +2711,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings("TypeMayBeWeakened")
     private Collection<DynamicCacheStartFuture> initiateCacheChanges(
-        Collection<DynamicCacheChangeRequest> reqs, boolean failIfExists
+        Collection<DynamicCacheChangeRequest> reqs,
+        boolean failIfExists
     ) {
         Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index dd92309..03cbbf7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -2307,32 +2307,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
     }
 
     /** {@inheritDoc} */
-    @Override public void resetLostPartitions() {
-        GridCacheGateway<K, V> gate = this.gate;
-
-        CacheOperationContext prev = onEnter(gate, opCtx);
-
-        try {
-            IgniteInternalFuture<?> fut = ctx.kernalContext().cache().resetCacheState(getName());
-
-            if (isAsync())
-                setFuture(fut);
-            else {
-                try {
-                    fut.get();
-                }
-                catch (IgniteCheckedException e) {
-                    throw CU.convertToCacheException(e);
-                }
-            }
-        }
-        finally {
-            onLeave(gate, prev);
-        }
-
-    }
-
-    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(ctx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 75fd3e2..af0085d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1041,6 +1041,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
+        assert node != null;
+
+        // Reset lost partition before send local partition to coordinator.
+        if (!F.isEmpty(reqs)) {
+            Set<String> caches = new HashSet<>();
+
+            for (DynamicCacheChangeRequest req : reqs) {
+                if (req.resetLostPartitions())
+                    caches.add(req.cacheName());
+            }
+
+            if (!F.isEmpty(caches))
+                resetLostPartitions(caches);
+        }
+
         GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(
             node, exchangeId(), clientOnlyExchange, true);
 
@@ -1141,17 +1156,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
             }
 
-            if (!F.isEmpty(reqs)) {
-                for (DynamicCacheChangeRequest req : reqs) {
-                    if (req.resetLostPartitions()) {
-                        resetLostPartitions();
-
-                        break;
-                    }
-                }
-            }
-
-            detectLostPartitions();
+            if (discoEvt.type() == EVT_NODE_LEFT ||
+                discoEvt.type() == EVT_NODE_FAILED ||
+                discoEvt.type() == EVT_NODE_JOINED)
+                detectLostPartitions();
 
             Map<Integer, CacheValidation> m = new HashMap<>(cctx.cacheContexts().size());
 
@@ -1514,13 +1522,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /**
      *
      */
-    private void resetLostPartitions() {
+    private void resetLostPartitions(Collection<String> cacheNames) {
         synchronized (cctx.exchange().interruptLock()) {
             if (Thread.currentThread().isInterrupted())
                 return;
 
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (!cacheCtx.isLocal())
+                if (!cacheCtx.isLocal() && cacheNames.contains(cacheCtx.name()))
                     cacheCtx.topology().resetLostPartitions();
             }
         }
@@ -1551,12 +1559,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)((DiscoveryCustomEvent)discoEvt)
                         .customMessage();
 
+                    Set<String> caches = new HashSet<>();
+
                     for (DynamicCacheChangeRequest req : batch.requests()) {
                         if (req.resetLostPartitions())
-                            resetLostPartitions();
+                            caches.add(req.cacheName());
                         else if (req.globalStateChange() && req.state() != ClusterState.INACTIVE)
                             assignPartitionsStates();
                     }
+
+                    if (!F.isEmpty(caches))
+                        resetLostPartitions(caches);
                 }
             }
             else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED)

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
index 5951ba3..480dc20 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
@@ -17,6 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.PartitionLossPolicy;
@@ -38,12 +44,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.util.TestTcpCommunicationSpi;
 
-import javax.cache.CacheException;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
@@ -200,7 +200,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
         for (Ignite ig : G.allGrids())
             verifyCacheOps(canWrite, safe, part, ig);
 
-        ignite(0).cache(CACHE_NAME).resetLostPartitions();
+        ignite(0).resetLostPartitions(Collections.singletonList(CACHE_NAME));
 
         awaitPartitionMapExchange(true, true, null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
index 027072b..2a522fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
@@ -523,6 +523,11 @@ public class IgfsIgniteMock implements IgniteEx {
         throwUnsupported();
     }
 
+    /** {@inheritDoc} */
+    @Override public void resetLostPartitions(Collection<String> cacheNames) {
+        throwUnsupported();
+    }
+
     /**
      * Throw {@link UnsupportedOperationException}.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 772abcf..bf3f17e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -431,6 +431,11 @@ public class IgniteMock implements Ignite {
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public void resetLostPartitions(Collection<String> cacheNames) {
+        // No-op.
+    }
+
     /**
      * @param staticCfg Configuration.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index 1b1d14e..df60bf1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -509,11 +509,6 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K,
V> {
         throw new UnsupportedOperationException("Method should be supported.");
     }
 
-    /** {@inheritDoc} */
-    @Override public void resetLostPartitions() {
-        throw new UnsupportedOperationException("Method should be supported.");
-    }
-
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a70dc00e/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index 15a520a..05a72e3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -624,6 +624,11 @@ public class IgniteProcessProxy implements IgniteEx {
     }
 
     /** {@inheritDoc} */
+    @Override public void resetLostPartitions(Collection<String> cacheNames) {
+        throw new UnsupportedOperationException("Operation isn't supported yet.");
+    }
+
+    /** {@inheritDoc} */
     @Override public void close() throws IgniteException {
         final CountDownLatch rmtNodeStoppedLatch = new CountDownLatch(1);
 


Mime
View raw message