ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jok...@apache.org
Subject [ignite] branch master updated: IGNITE-12230 Stopping cache group must wait for currently running partition evictions - Fixes #6906.
Date Tue, 01 Oct 2019 14:30:04 GMT
This is an automated email from the ASF dual-hosted git repository.

jokser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 538a46b  IGNITE-12230 Stopping cache group must wait for currently running partition
evictions - Fixes #6906.
538a46b is described below

commit 538a46be18107bdea4d9a34a94c6907917bd9f2c
Author: mstepachev <maksim.stepachev@gmail.com>
AuthorDate: Tue Oct 1 17:28:59 2019 +0300

    IGNITE-12230 Stopping cache group must wait for currently running partition evictions
- Fixes #6906.
    
    Signed-off-by: Pavel Kovalenko <jokserfn@gmail.com>
---
 .../processors/cache/GridCacheProcessor.java       |   2 +
 .../dht/topology/PartitionsEvictManager.java       |  19 +-
 .../cache/IgniteMarshallerCacheFSRestoreTest.java  |   7 +-
 .../DropCacheContextDuringEvictionTest.java        | 103 ++++++++++
 .../PartitionsEvictManagerAbstractTest.java        | 209 +++++++++++++++++++++
 .../PartitionsEvictionTaskFailureHandlerTest.java  |  49 +++++
 .../ignite/testsuites/IgniteBasicTestSuite.java    |   5 +
 7 files changed, 386 insertions(+), 8 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index da601fe..6384e17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2661,6 +2661,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             .map(a -> F.t(cacheGrps.get(a.descriptor().groupId()), a.destroy()))
             .collect(Collectors.toList());
 
+        grpToStop.forEach(t -> sharedCtx.evict().onCacheGroupStopped(t.get1()));
+
         if (!exchActions.cacheStopRequests().isEmpty())
             removeOffheapListenerAfterCheckpoint(grpToStop);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
index 3c69905..31399bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVICTION_PERMITS;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 
 /**
  * Class that serves asynchronous part eviction process.
@@ -82,7 +84,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter
{
      *  Is not thread-safe.
      *  All method should be called under mux synchronization.
      */
-    private volatile BucketQueue evictionQueue;
+    volatile BucketQueue evictionQueue;
 
     /** Lock object. */
     private final Object mux = new Object();
@@ -376,7 +378,7 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter
{
     /**
      * Task for self-scheduled partition eviction / clearing.
      */
-    private class PartitionEvictionTask implements Runnable {
+    class PartitionEvictionTask implements Runnable {
         /** Partition to evict. */
         private final GridDhtLocalPartition part;
 
@@ -435,8 +437,11 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter
{
                         false,
                         true);
                 }
-                else
+                else {
                     LT.error(log, ex, "Partition eviction failed, this can cause grid hang.");
+
+                    cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION,
ex));
+                }
             }
         }
     }
@@ -444,13 +449,13 @@ public class PartitionsEvictManager extends GridCacheSharedManagerAdapter
{
     /**
      *
      */
-    private class BucketQueue {
-        /** Queues contains partitions scheduled for eviction. */
-        private final Queue<PartitionEvictionTask>[] buckets;
-
+    class BucketQueue {
         /** */
         private final long[] bucketSizes;
 
+        /** Queues contains partitions scheduled for eviction. */
+        final Queue<PartitionEvictionTask>[] buckets;
+
         /**
          * @param buckets Number of buckets.
          */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
index 6e41385..3d21563 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
@@ -102,9 +102,14 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest
{
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
+        stopAllGrids();
         cleanUpWorkDir();
+        cleanPersistenceDir();
+    }
 
-        stopAllGrids();
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java
new file mode 100644
index 0000000..4a7de04
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DropCacheContextDuringEvictionTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.junit.Test;
+
+/**
+ * Checks that destroy cache operations wait for an eviction finish.
+ */
+public class DropCacheContextDuringEvictionTest extends PartitionsEvictManagerAbstractTest
{
+    /**
+     *
+     */
+    @Test
+    public void testDeactivation() throws Exception {
+        T2<IgniteEx, CountDownLatch> nodeAndEvictLatch = makeNodeWithEvictLatch();
+
+        IgniteCache<Object, Object> cache = nodeAndEvictLatch.get1().createCache(new
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setGroupName("test-grp"));
+
+        for (int i = 0; i < 100_000; i++)
+            cache.put(i, i);
+
+        doActionDuringEviction(nodeAndEvictLatch, () -> nodeAndEvictLatch.get1().cluster().active(false));
+
+        assertFalse(failure.get());
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testDestroyCacheGroup() throws Exception {
+        T2<IgniteEx, CountDownLatch> nodeAndEvictLatch = makeNodeWithEvictLatch();
+
+        List<String> caches = new ArrayList<>();
+
+        for (int idx = 0; idx < 10; idx++) {
+            IgniteCache<Object, Object> cache = nodeAndEvictLatch.get1().createCache(new
CacheConfiguration<>(DEFAULT_CACHE_NAME + idx)
+                .setGroupName("test-grp"));
+
+            caches.add(cache.getName());
+
+            try (IgniteDataStreamer streamer = nodeAndEvictLatch.get1().dataStreamer(cache.getName()))
{
+                streamer.allowOverwrite(true);
+
+                for (int i = 0; i < 200_000; i++)
+                    streamer.addData(i, i);
+            }
+        }
+
+        doActionDuringEviction(nodeAndEvictLatch, () -> nodeAndEvictLatch.get1().destroyCaches(caches));
+
+        assertFalse(failure.get());
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testDestroyCache() throws Exception {
+        T2<IgniteEx, CountDownLatch> nodeAndEvictLatch = makeNodeWithEvictLatch();
+
+        IgniteCache<Object, Object> cache = nodeAndEvictLatch.get1().createCache(new
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setAffinity(new RendezvousAffinityFunction(false, 500))
+        );
+
+        try (IgniteDataStreamer streamer = nodeAndEvictLatch.get1().dataStreamer(DEFAULT_CACHE_NAME))
{
+            streamer.allowOverwrite(true);
+
+            for (int i = 0; i < 2_000_000; i++)
+                streamer.addData(i, i);
+        }
+
+        doActionDuringEviction(nodeAndEvictLatch, cache::destroy);
+
+        assertFalse(failure.get());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java
new file mode 100644
index 0000000..e49e07f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManagerAbstractTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public abstract class PartitionsEvictManagerAbstractTest extends GridCommonAbstractTest {
+    /** Failure. */
+    protected AtomicBoolean failure = new AtomicBoolean(false);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)));
+
+        cfg.setFailureHandler(new AbstractFailureHandler() {
+            /** {@inheritDoc} */
+            @Override protected boolean handle(Ignite ignite, FailureContext failureCtx)
{
+                failure.set(true);
+
+                // Do not invalidate a node context.
+                return false;
+            }
+        });
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @param node Node.
+     * @param ms Milliseconds.
+     */
+    protected void awaitEvictionQueueIsEmpty(IgniteEx node, int ms) throws IgniteInterruptedCheckedException
{
+        PartitionsEvictManager.BucketQueue evictionQueue = node.context().cache().context().evict().evictionQueue;
+
+        assertTrue(GridTestUtils.waitForCondition(evictionQueue::isEmpty, ms));
+    }
+
+    /**
+     * @param node Node.
+     * @param ms Milliseconds.
+     */
+    protected void awaitEvictionQueueForFilling(IgniteEx node, int ms) throws IgniteInterruptedCheckedException
{
+        PartitionsEvictManager.BucketQueue evictionQueue = node.context().cache().context().evict().evictionQueue;
+
+        assertTrue(GridTestUtils.waitForCondition(() -> !evictionQueue.isEmpty(), ms));
+    }
+
+    /**
+     * @param node Node.
+     * @param latch Latch.
+     * @param completeWithError Inner future throws exception.
+     */
+    protected void subscribeEvictionQueueAtLatch(IgniteEx node, CountDownLatch latch, boolean
completeWithError) {
+        PartitionsEvictManager.BucketQueue evictionQueue = node.context().cache().context().evict().evictionQueue;
+        Queue[] buckets = evictionQueue.buckets;
+
+        for (int i = 0; i < buckets.length; i++)
+            buckets[i] = new WaitingQueue(latch, completeWithError);
+    }
+
+    /**
+     *
+     */
+    protected T2<IgniteEx, CountDownLatch> makeNodeWithEvictLatch() throws Exception
{
+        return makeNodeWithEvictLatch(false);
+    }
+
+    /**
+     *
+     */
+    protected T2<IgniteEx, CountDownLatch> makeNodeWithEvictLatch(boolean completeWithError)
throws Exception {
+        IgniteEx node1 = startGrid(0);
+
+        node1.cluster().baselineAutoAdjustEnabled(false);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        subscribeEvictionQueueAtLatch(node1, latch, completeWithError);
+
+        node1.cluster().active(true);
+
+        return new T2<>(node1, latch);
+    }
+
+    /**
+     * @param nodeAndEvictLatch Node and evict latch.
+     * @param r R.
+     */
+    protected void doActionDuringEviction(T2<IgniteEx, CountDownLatch> nodeAndEvictLatch,
Runnable r) throws Exception {
+        IgniteEx node2 = startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        nodeAndEvictLatch.get1().cluster().setBaselineTopology(node2.cluster().topologyVersion());
+
+        awaitEvictionQueueForFilling(nodeAndEvictLatch.get1(), 100_000);
+
+        nodeAndEvictLatch.get2().countDown();
+
+        r.run();
+
+        awaitEvictionQueueIsEmpty(nodeAndEvictLatch.get1(), 100_000);
+    }
+
+    /**
+     * Queue witch waits on the poll or breaks a PartitionEvictionTask.
+     */
+    private class WaitingQueue extends LinkedBlockingQueue {
+        /** Latch. */
+        private final CountDownLatch latch;
+
+        /** Complete with error. */
+        private final boolean completeWithError;
+
+        /**
+         * @param latch Latch.
+         * @param completeWithError flag.
+         */
+        public WaitingQueue(CountDownLatch latch, boolean completeWithError) {
+            this.latch = latch;
+            this.completeWithError = completeWithError;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object poll() {
+            U.awaitQuiet(latch);
+
+            Object obj = super.poll();
+
+            // This code uses for failure handler testing into PartitionEvictionTask.
+            if(obj != null && completeWithError) {
+                try {
+                    Field field = U.findField(PartitionsEvictManager.PartitionEvictionTask.class,
"finishFut");
+
+                    field.setAccessible(true);
+
+                    Field modifiersField = Field.class.getDeclaredField("modifiers");
+                    modifiersField.setAccessible(true);
+                    modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+                    field.set(obj, new GridFutureAdapter<Object>() {
+                        @Override
+                        protected boolean onDone(@Nullable Object res, @Nullable Throwable
err, boolean cancel) {
+                            if (err == null)
+                                throw new RuntimeException("TEST");
+
+                            return super.onDone(res, err, cancel);
+                        }
+                    });
+                }
+                catch (Exception e) {
+                    fail();
+                }
+            }
+
+            return obj;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java
new file mode 100644
index 0000000..58c2460
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictionTaskFailureHandlerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class PartitionsEvictionTaskFailureHandlerTest extends PartitionsEvictManagerAbstractTest
{
+    /**
+     *
+     */
+    @Test
+    public void testEvictionTaskShouldCallFailureHandler() throws Exception {
+        T2<IgniteEx, CountDownLatch> nodeAndEvictLatch = makeNodeWithEvictLatch(true);
+
+        IgniteCache<Object, Object> cache = nodeAndEvictLatch.get1().createCache(new
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setGroupName("test-grp"));
+
+        for (int i = 0; i < 100_000; i++)
+            cache.put(i, i);
+
+        doActionDuringEviction(nodeAndEvictLatch, () -> {});
+
+        assertTrue(GridTestUtils.waitForCondition(() -> failure.get(), 10_000));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index bf40264..5c287be 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -69,6 +69,8 @@ import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheFSRestor
 import org.apache.ignite.internal.processors.cache.RebalanceWithDifferentThreadPoolSizeTest;
 import org.apache.ignite.internal.processors.cache.SetTxTimeoutOnPartitionMapExchangeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.DropCacheContextDuringEvictionTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictionTaskFailureHandlerTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.DiscoveryDataDeserializationFailureHanderTest;
 import org.apache.ignite.internal.processors.cache.transactions.AtomicOperationsInTxTest;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionIntegrityWithSystemWorkerDeathTest;
@@ -182,6 +184,9 @@ import org.junit.runners.Suite;
     SetTxTimeoutOnPartitionMapExchangeTest.class,
     DiscoveryDataDeserializationFailureHanderTest.class,
 
+    PartitionsEvictionTaskFailureHandlerTest.class,
+    DropCacheContextDuringEvictionTest.class,
+
     IgniteExceptionInNioWorkerSelfTest.class,
     IgniteLocalNodeMapBeforeStartTest.class,
     OdbcConfigurationValidationSelfTest.class,


Mime
View raw message