ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [1/3] incubator-ignite git commit: # IGNITE-417 removeAll() throws IllegalStateException if remote node stops during removeAll() execution (cherry picked from commit 314cc89)
Date Fri, 13 Mar 2015 11:53:06 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-417-1 [created] 166983722


# IGNITE-417 removeAll() throws IllegalStateException if remote node stops during removeAll()
execution
(cherry picked from commit 314cc89)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/239b8dc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/239b8dc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/239b8dc2

Branch: refs/heads/ignite-417-1
Commit: 239b8dc2a756a6d49dd6a06d3dbe11935bb83549
Parents: b2f1e43
Author: sevdokimov <sevdokimov@gridgain.com>
Authored: Fri Mar 6 18:50:37 2015 +0300
Committer: sevdokimov <sevdokimov@gridgain.com>
Committed: Fri Mar 13 13:35:08 2015 +0300

----------------------------------------------------------------------
 .../GridDistributedCacheAdapter.java            | 174 ++++++++++++++-----
 .../GridCacheRemoveAllMultithreadedTest.java    | 118 +++++++++++++
 2 files changed, 246 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/239b8dc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 792fc47..99ef07e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -48,6 +49,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final int MAX_REMOVE_ALL_ATTEMPTS = 50;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -143,25 +147,58 @@ public abstract class GridDistributedCacheAdapter<K, V> extends
GridCacheAdapter
 
     /** {@inheritDoc} */
     @Override public void removeAll() throws IgniteCheckedException {
-        try {
-            long topVer;
+        int attemptCnt = 0;
+
+        while (true) {
+            long topVer = ctx.discovery().topologyVersion();
+
+            IgniteInternalFuture<Long> fut = ctx.affinity().affinityReadyFuturex(topVer);
+            if (fut != null)
+                fut.get();
+
+            // Send job to all data nodes.
+            ClusterGroup cluster = ctx.grid().cluster().forDataNodes(name());
+
+            if (cluster.nodes().isEmpty())
+                break;
+
+            try {
+                Collection<Long> res = ctx.grid().compute(cluster).withNoFailover().broadcast(
+                    new GlobalRemoveAllCallable<>(name(), topVer));
+
+                Long max = Collections.max(res);
+
+                if (max > 0) {
+                    assert max > topVer;
+
+                    ctx.affinity().affinityReadyFuture(max).get();
+
+                    continue;
+                }
 
-            do {
-                topVer = ctx.affinity().affinityTopologyVersion();
+                if (res.contains(-1L)) {
+                    if (++attemptCnt > MAX_REMOVE_ALL_ATTEMPTS)
+                        throw new IgniteCheckedException("Failed to remove all entries.");
 
-                // Send job to all data nodes.
-                Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
+                    continue;
+                }
+            }
+            catch (ClusterGroupEmptyException ignore) {
+                if (log.isDebugEnabled())
+                    log.debug("All remote nodes left while cache remove [cacheName=" + name()
+ "]");
 
-                if (!nodes.isEmpty()) {
-                    ctx.closures().callAsyncNoFailover(BROADCAST,
-                        new GlobalRemoveAllCallable<>(name(), topVer), nodes, true).get();
+                break;
+            }
+            catch (ClusterTopologyException e) {
+                // GlobalRemoveAllCallable was sent to node that has left.
+                if (topVer == ctx.discovery().topologyVersion()) {
+                    // Node was not left, some other error has occurs.
+                    throw e;
                 }
             }
-            while (ctx.affinity().affinityTopologyVersion() > topVer);
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache remove [cacheName=" + name()
+ "]");
+
+            if (topVer == ctx.discovery().topologyVersion())
+                break;
         }
     }
 
@@ -169,44 +206,79 @@ public abstract class GridDistributedCacheAdapter<K, V> extends
GridCacheAdapter
     @Override public IgniteInternalFuture<?> removeAllAsync() {
         GridFutureAdapter<Void> opFut = new GridFutureAdapter<>();
 
-        long topVer = ctx.affinity().affinityTopologyVersion();
-
-        removeAllAsync(opFut, topVer);
+        removeAllAsync(opFut, 0);
 
         return opFut;
     }
 
     /**
      * @param opFut Future.
-     * @param topVer Topology version.
+     * @param attemptCnt Attempts count.
      */
-    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final long topVer)
{
-        Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
+    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final int attemptCnt)
{
+        final long topVer = ctx.affinity().affinityTopologyVersion();
+
+        ClusterGroup cluster = ctx.grid().cluster().forDataNodes(name());
+
+        if (cluster.nodes().isEmpty())
+            opFut.onDone();
+        else {
+            IgniteCompute computeAsync = ctx.grid().compute(cluster).withNoFailover().withAsync();
+
+            computeAsync.broadcast(new GlobalRemoveAllCallable<>(name(), topVer));
 
-        if (!nodes.isEmpty()) {
-            IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST,
-                    new GlobalRemoveAllCallable<>(name(), topVer), nodes, true);
+            ComputeTaskFuture<Collection<Long>> fut = computeAsync.future();
 
-            rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
+            fut.listen(new IgniteInClosure<IgniteFuture<Collection<Long>>>()
{
+                @Override public void apply(IgniteFuture<Collection<Long>> fut)
{
                     try {
-                        fut.get();
+                        Collection<Long> res = fut.get();
+
+                        Long max = Collections.max(res);
 
-                        long topVer0 = ctx.affinity().affinityTopologyVersion();
+                        if (max > 0) {
+                            assert max > topVer;
 
-                        if (topVer0 == topVer)
-                            opFut.onDone();
-                        else
-                            removeAllAsync(opFut, topVer0);
+                            try {
+                                ctx.affinity().affinityReadyFuture(max).get();
+
+                                removeAllAsync(opFut, attemptCnt);
+                            }
+                            catch (IgniteCheckedException e) {
+                                opFut.onDone(e);
+                            }
+
+                            return;
+                        }
+
+                        if (res.contains(-1L)) {
+                            if (attemptCnt >= MAX_REMOVE_ALL_ATTEMPTS)
+                                opFut.onDone(new IgniteCheckedException("Failed to remove
all entries."));
+                            else
+                                removeAllAsync(opFut, attemptCnt + 1);
+
+                            return;
+                        }
+
+                        if (topVer != ctx.affinity().affinityTopologyVersion())
+                            removeAllAsync(opFut, attemptCnt);
                     }
-                    catch (ClusterGroupEmptyCheckedException ignore) {
+                    catch (ClusterGroupEmptyException ignore) {
                         if (log.isDebugEnabled())
                             log.debug("All remote nodes left while cache remove [cacheName="
+ name() + "]");
 
                         opFut.onDone();
                     }
-                    catch (IgniteCheckedException e) {
-                        opFut.onDone(e);
+                    catch (ClusterTopologyException e) {
+                        // GlobalRemoveAllCallable was sent to node that has left.
+                        if (topVer == ctx.discovery().topologyVersion()) {
+                            // Node was not left, some other error has occurs.
+                            opFut.onDone(e);
+
+                            return;
+                        }
+
+                        removeAllAsync(opFut, attemptCnt + 1);
                     }
                     catch (Error e) {
                         opFut.onDone(e);
@@ -216,8 +288,6 @@ public abstract class GridDistributedCacheAdapter<K, V> extends
GridCacheAdapter
                 }
             });
         }
-        else
-            opFut.onDone();
     }
 
     /** {@inheritDoc} */
@@ -230,7 +300,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends
GridCacheAdapter
      * operation on a cache with the given name.
      */
     @GridInternal
-    private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>,
Externalizable {
+    private static class GlobalRemoveAllCallable<K,V> implements IgniteCallable<Long>,
Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -260,21 +330,22 @@ public abstract class GridDistributedCacheAdapter<K, V> extends
GridCacheAdapter
             this.topVer = topVer;
         }
 
-        /**
-         * {@inheritDoc}
-         */
-        @Override public Object call() throws Exception {
+        /** {@inheritDoc} */
+        @Override public Long call() throws Exception {
             GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
 
             final GridCacheContext<K, V> ctx = cacheAdapter.context();
 
-            ctx.affinity().affinityReadyFuture(topVer).get();
+            IgniteInternalFuture<Long> topVerFut = ctx.affinity().affinityReadyFuture(topVer);
+
+            if (topVerFut != null)
+                topVerFut.get();
 
             ctx.gate().enter();
 
             try {
-                if (ctx.affinity().affinityTopologyVersion() != topVer)
-                    return null; // Ignore this remove request because remove request will
be sent again.
+                if (ctx.affinity().affinityTopologyVersion() > topVer)
+                    return ctx.affinity().affinityTopologyVersion();
 
                 GridDhtCacheAdapter<K, V> dht;
 
@@ -307,13 +378,24 @@ public abstract class GridDistributedCacheAdapter<K, V> extends
GridCacheAdapter
 
                     while (it.hasNext())
                         dataLdr.removeDataInternal(it.next());
+
+                    return 0L; // 0 means remove completer successfully.
+                }
+                catch (IgniteException e) {
+                    if (e instanceof ClusterTopologyException
+                        || e.hasCause(ClusterTopologyCheckedException.class, ClusterTopologyException.class))
+                        return -1L;
+
+                    throw e;
+                }
+                catch (IllegalStateException ignored) {
+                    // Looks like node is about stop.
+                    return -1L; // -1 means request should be resend.
                 }
             }
             finally {
                 ctx.gate().leave();
             }
-
-            return null;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/239b8dc2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java
new file mode 100644
index 0000000..49245cc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Base test for all multithreaded cache scenarios w/ and w/o failover.
+ */
+public class GridCacheRemoveAllMultithreadedTest extends GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /**
+     * @return Cache atomicity mode.
+     */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return Long.MAX_VALUE;
+    }
+
+    /**
+     * Actual test.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRemoveAll() throws Exception {
+        final Object mux = new Object();
+
+        Thread t = new GridTestThread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (!Thread.interrupted()) {
+
+                        startGrid(3);
+
+                        synchronized (mux) {
+                            stopGrid(3);
+                        }
+                    }
+                }
+                catch (IgniteInterruptedCheckedException ignored) {
+                    // Test stopped.
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        t.start();
+
+        try {
+            long endTime = System.currentTimeMillis() + 60 * 1000;
+
+            Random rnd = new Random();
+
+            while (endTime > System.currentTimeMillis()) {
+                synchronized (mux) {
+                    try (IgniteDataLoader<Integer, Integer> ldr = ignite(0).dataLoader(null))
{
+                        for (int i = 0; i < 1000; i++)
+                            ldr.addData(i, i);
+                    }
+                }
+
+                jcache(0).removeAll();
+
+                for (int i = 0; i < gridCount(); i++) {
+                    int locSize = jcache(i).localSize(CachePeekMode.ALL);
+
+                    assert locSize == 0 : locSize;
+                }
+            }
+        }
+        finally {
+            t.interrupt();
+
+            t.join();
+        }
+
+    }
+}


Mime
View raw message