ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5454 Prevent partition tree destroy while clearing
Date Mon, 26 Jun 2017 12:43:26 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 285644ae5 -> f0c62ac0f


ignite-5454 Prevent partition tree destroy while clearing


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0c62ac0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0c62ac0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0c62ac0

Branch: refs/heads/master
Commit: f0c62ac0fc77b4dbc438dbd23ce756dbebc0659c
Parents: 285644a
Author: Igor Seliverstov <iseliverstov@gridgain.com>
Authored: Mon Jun 26 15:43:17 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jun 26 15:43:17 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    | 129 ++++++++++++++++---
 .../IgniteCacheClearDuringRebalanceTest.java    | 126 ++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 3 files changed, 242 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f0c62ac0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index a4e4c24..b95828c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -427,26 +427,30 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     @Override public void clearCache(GridCacheContext cctx, boolean readers) {
         GridCacheVersion obsoleteVer = null;
 
-        GridIterator<CacheDataRow> it = iterator(cctx.cacheId(), cacheDataStores().iterator());
+        try (GridCloseableIterator<CacheDataRow> it = grp.isLocal() ? iterator(cctx.cacheId(),
cacheDataStores().iterator()) :
+            evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator())) {
+            while (it.hasNext()) {
+                KeyCacheObject key = it.next().key();
 
-        while (it.hasNext()) {
-            KeyCacheObject key = it.next().key();
-
-            try {
-                if (obsoleteVer == null)
-                    obsoleteVer = ctx.versions().next();
+                try {
+                    if (obsoleteVer == null)
+                        obsoleteVer = ctx.versions().next();
 
-                GridCacheEntryEx entry = cctx.cache().entryEx(key);
+                    GridCacheEntryEx entry = cctx.cache().entryEx(key);
 
-                entry.clear(obsoleteVer, readers);
-            }
-            catch (GridDhtInvalidPartitionException ignore) {
-                // Ignore.
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to clear cache entry: " + key, e);
+                    entry.clear(obsoleteVer, readers);
+                }
+                catch (GridDhtInvalidPartitionException ignore) {
+                    // Ignore.
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to clear cache entry: " + key, e);
+                }
             }
         }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to close iterator", e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -587,7 +591,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
      * @param dataIt Data store iterator.
      * @return Rows iterator
      */
-    private GridIterator<CacheDataRow> iterator(final int cacheId, final Iterator<CacheDataStore>
dataIt) {
+    private GridCloseableIterator<CacheDataRow> iterator(final int cacheId, final Iterator<CacheDataStore>
dataIt) {
         return new GridCloseableIteratorAdapter<CacheDataRow>() {
             /** */
             private GridCursor<? extends CacheDataRow> cur;
@@ -638,6 +642,99 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @param dataIt Data store iterator.
+     * @return Rows iterator
+     */
+    private GridCloseableIterator<CacheDataRow> evictionSafeIterator(final int cacheId,
final Iterator<CacheDataStore> dataIt) {
+        return new GridCloseableIteratorAdapter<CacheDataRow>() {
+            /** */
+            private GridCursor<? extends CacheDataRow> cur;
+
+            /** */
+            private GridDhtLocalPartition curPart;
+
+            /** */
+            private CacheDataRow next;
+
+            @Override protected CacheDataRow onNext() {
+                CacheDataRow res = next;
+
+                next = null;
+
+                return res;
+            }
+
+            @Override protected boolean onHasNext() throws IgniteCheckedException {
+                if (next != null)
+                    return true;
+
+                while (true) {
+                    if (cur == null) {
+                        if (dataIt.hasNext()) {
+                            CacheDataStore ds = dataIt.next();
+
+                            if (!reservePartition(ds.partId()))
+                                continue;
+
+                            cur = cacheId == UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
+                        }
+                        else
+                            break;
+                    }
+
+                    if (cur.next()) {
+                        next = cur.get();
+                        next.key().partition(curPart.id());
+
+                        break;
+                    }
+                    else {
+                        cur = null;
+
+                        releaseCurrentPartition();
+                    }
+                }
+
+                return next != null;
+            }
+
+            /** */
+            private void releaseCurrentPartition() {
+                GridDhtLocalPartition p = curPart;
+
+                assert p != null;
+
+                curPart = null;
+
+                p.release();
+            }
+
+            /**
+             * @param partId Partition number.
+             * @return {@code True} if partition was reserved.
+             */
+            private boolean reservePartition(int partId) {
+                GridDhtLocalPartition p = grp.topology().localPartition(partId);
+
+                if (p != null && p.reserve()) {
+                    curPart = p;
+
+                    return true;
+                }
+
+                return false;
+            }
+
+            /** {@inheritDoc} */
+            @Override protected void onClose() throws IgniteCheckedException {
+                if (curPart != null)
+                    releaseCurrentPartition();
+            }
+        };
+    }
+
+    /**
      * @param item Item.
      * @return Single item iterator.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0c62ac0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheClearDuringRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheClearDuringRebalanceTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheClearDuringRebalanceTest.java
new file mode 100644
index 0000000..8561c5c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheClearDuringRebalanceTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class IgniteCacheClearDuringRebalanceTest extends GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_NAME = "cache";
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME)
+            .setCacheMode(PARTITIONED));
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClearAll() throws Exception {
+        final IgniteEx node = startGrid(0);
+
+        for (int i = 0; i < 5; i++) {
+            populate(node);
+
+            try {
+                startGrid(1).cache(CACHE_NAME).clear();
+            }
+            finally {
+                stopGrid(1);
+            }
+        }
+
+        populate(node);
+
+        startGrid(1).cache(CACHE_NAME).clear();
+
+        startGrid(2);
+    }
+
+    /**
+     * @param node Ignite node;
+     * @throws Exception If failed.
+     */
+    private void populate(final Ignite node) throws Exception {
+        final AtomicInteger id = new AtomicInteger();
+
+        final int tCnt = Runtime.getRuntime().availableProcessors();
+
+        final byte[] data = new byte[1024];
+
+        ThreadLocalRandom.current().nextBytes(data);
+
+        GridTestUtils.runMultiThreaded(new Runnable() {
+            @Override public void run() {
+                try (IgniteDataStreamer<Object, Object> str = node.dataStreamer(CACHE_NAME))
{
+                    int idx = id.getAndIncrement();
+
+                    str.autoFlushFrequency(0);
+
+                    for (int i = idx; i < 500_000; i += tCnt) {
+                        str.addData(i, data);
+
+                        if (i % (100 * tCnt) == idx)
+                            str.flush();
+                    }
+
+                    str.flush();
+                }
+            }
+        }, tCnt, "ldr");
+
+        assertEquals(500_000, node.cache(CACHE_NAME).size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0c62ac0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 158b118..eec0273 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePart
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedPreloadEventsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedTopologyChangeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedUnloadEventsSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheClearDuringRebalanceTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedBackupNodeFailureRecoveryTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearEvictionEventSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearMultiNodeSelfTest;
@@ -282,6 +283,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new TestSuite(CacheOptimisticTransactionsWithFilterSingleServerTest.class));
         suite.addTest(new TestSuite(CacheOptimisticTransactionsWithFilterTest.class));
 
+        suite.addTest(new TestSuite(IgniteCacheClearDuringRebalanceTest.class));
+
         return suite;
     }
 }


Mime
View raw message