ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [10/17] ignite git commit: ignite-5937
Date Wed, 11 Oct 2017 10:43:08 GMT
ignite-5937


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1b2c03d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1b2c03d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1b2c03d

Branch: refs/heads/ignite-5932
Commit: c1b2c03dc1ee9de222997cba4efcb2e5fb1a5885
Parents: c553638
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Oct 9 17:05:02 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 9 17:50:53 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    | 285 +++++++------------
 .../GridDhtPartitionsExchangeFuture.java        |   2 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  32 ++-
 .../cache/mvcc/CacheMvccClusterRestartTest.java | 173 +++++++++++
 4 files changed, 308 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index dd4d7e0..80d36c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1356,15 +1356,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             return dataRow;
         }
 
-        private int compare(CacheDataRow row, long crdVer, long mvccCntr) {
-            int cmp = Long.compare(row.mvccCoordinatorVersion(), crdVer);
-
-            if (cmp != 0)
-                return cmp;
-
-            return Long.compare(row.mvccCounter(), mvccCntr);
-        }
-
         /** {@inheritDoc} */
         @Override public GridLongList mvccRemove(GridCacheContext cctx,
             boolean primary,
@@ -1376,9 +1367,67 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
 
             try {
+                int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
+                CacheObjectContext coCtx = cctx.cacheObjectContext();
 
-                return null;
+                // Make sure value bytes initialized.
+                key.valueBytes(coCtx);
+
+                MvccUpdateRow updateRow = new MvccUpdateRow(
+                    key,
+                    null,
+                    null,
+                    mvccVer,
+                    partId,
+                    cacheId);
+
+                rowStore.addRow(updateRow);
+
+                assert updateRow.link() != 0 : updateRow;
+
+                if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+                    updateRow.cacheId(cctx.cacheId());
+
+                GridLongList waitTxs = null;
+
+                if (mvccVer.initialLoad()) {
+                    boolean old = dataTree.putx(updateRow);
+
+                    assert !old;
+
+                    incrementSize(cctx.cacheId());
+                }
+                else {
+                    dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key),
updateRow);
+
+                    boolean old = dataTree.putx(updateRow);
+
+                    assert !old;
+
+                    if (!updateRow.previousNotNull())
+                        incrementSize(cctx.cacheId());
+
+                    waitTxs = updateRow.activeTransactions();
+
+                    List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
+
+                    if (cleanupRows != null) {
+                        for (int i = 0; i < cleanupRows.size(); i++) {
+                            CacheSearchRow oldRow = cleanupRows.get(i);
+
+                            assert oldRow.link() != 0L : oldRow;
+
+                            boolean rmvd = dataTree.removex(oldRow);
+
+                            assert rmvd;
+
+                            rowStore.removeRow(oldRow.link());
+                        }
+                    }
+                }
+
+                return waitTxs;
             }
             finally {
                 busyLock.leaveBusy();
@@ -1407,135 +1456,60 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 key.valueBytes(coCtx);
                 val.valueBytes(coCtx);
 
-                if (true) {
-                    MvccUpdateRow updateRow = new MvccUpdateRow(
-                        key,
-                        val,
-                        ver,
-                        mvccVer,
-                        partId,
-                        cacheId);
-
-                    rowStore.addRow(updateRow);
-
-                    assert updateRow.link() != 0 : updateRow;
-
-                    if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
-                        updateRow.cacheId(cctx.cacheId());
-
-                    GridLongList waitTxs = null;
-
-                    if (mvccVer.initialLoad()) {
-                        boolean old = dataTree.putx(updateRow);
-
-                        assert !old;
-
-                        incrementSize(cctx.cacheId());
-                    }
-                    else {
-                        dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key),
updateRow);
-
-                        boolean old = dataTree.putx(updateRow);
-
-                        assert !old;
-
-                        if (!updateRow.previousNotNull())
-                            incrementSize(cctx.cacheId());
-
-                        waitTxs = updateRow.activeTransactions();
+                MvccUpdateRow updateRow = new MvccUpdateRow(
+                    key,
+                    val,
+                    ver,
+                    mvccVer,
+                    partId,
+                    cacheId);
 
-                        List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
+                rowStore.addRow(updateRow);
 
-                        if (cleanupRows != null) {
-                            for (int i = 0; i < cleanupRows.size(); i++) {
-                                CacheSearchRow oldRow = cleanupRows.get(i);
+                assert updateRow.link() != 0 : updateRow;
 
-                                assert oldRow.link() != 0L : oldRow;
+                if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+                    updateRow.cacheId(cctx.cacheId());
 
-                                boolean rmvd = dataTree.removex(oldRow);
+                GridLongList waitTxs = null;
 
-                                assert rmvd;
+                if (mvccVer.initialLoad()) {
+                    boolean old = dataTree.putx(updateRow);
 
-                                rowStore.removeRow(oldRow.link());
-                            }
-                        }
-                    }
+                    assert !old;
 
-                    return waitTxs;
+                    incrementSize(cctx.cacheId());
                 }
                 else {
-                    MvccDataRow dataRow = new MvccDataRow(
-                        key,
-                        val,
-                        ver,
-                        partId,
-                        cacheId,
-                        mvccVer.coordinatorVersion(),
-                        mvccVer.counter());
-
-                    rowStore.addRow(dataRow);
+                    dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key),
updateRow);
 
-                    assert dataRow.link() != 0 : dataRow;
-
-                    if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
-                        dataRow.cacheId(cctx.cacheId());
-
-                    boolean old = dataTree.putx(dataRow);
+                    boolean old = dataTree.putx(updateRow);
 
                     assert !old;
 
-                    GridLongList waitTxs = null;
-
-                    if (!mvccVer.initialLoad()) {
-                        MvccLongList activeTxs = mvccVer.activeTransactions();
-
-                        // TODO IGNITE-3484: need special method.
-                        GridCursor<CacheDataRow> cur = dataTree.find(
-                            new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(),
mvccVer.counter() - 1),
-                            new MvccSearchRow(cacheId, key, 1, 1));
-
-                        boolean first = true;
-
-                        boolean activeTx = false;
-
-                        while (cur.next()) {
-                            CacheDataRow oldVal = cur.get();
-
-                            assert oldVal.link() != 0 : oldVal;
-
-                            if (activeTxs != null && oldVal.mvccCoordinatorVersion()
== mvccVer.coordinatorVersion() &&
-                                activeTxs.contains(oldVal.mvccCounter())) {
-                                if (waitTxs == null)
-                                    waitTxs = new GridLongList();
+                    if (!updateRow.previousNotNull())
+                        incrementSize(cctx.cacheId());
 
-                                assert oldVal.mvccCounter() != mvccVer.counter();
+                    waitTxs = updateRow.activeTransactions();
 
-                                waitTxs.add(oldVal.mvccCounter());
+                    List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
 
-                                activeTx = true;
-                            }
+                    if (cleanupRows != null) {
+                        for (int i = 0; i < cleanupRows.size(); i++) {
+                            CacheSearchRow oldRow = cleanupRows.get(i);
 
-                            if (!activeTx) {
-                                // Should not delete oldest version which is less than cleanup
version.
-                                int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+                            assert oldRow.link() != 0L : oldRow;
 
-                                if (cmp <= 0) {
-                                    if (first)
-                                        first = false;
-                                    else {
-                                        boolean rmvd = dataTree.removex(oldVal);
+                            boolean rmvd = dataTree.removex(oldRow);
 
-                                        assert rmvd;
+                            assert rmvd;
 
-                                        rowStore.removeRow(oldVal.link());
-                                    }
-                                }
-                            }
+                            rowStore.removeRow(oldRow.link());
                         }
                     }
-
-                    return waitTxs;
                 }
+
+                return waitTxs;
             }
             finally {
                 busyLock.leaveBusy();
@@ -1746,26 +1720,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             CacheDataRow row;
 
             if (grp.mvccEnabled()) {
-                if (true) {
-                    MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId,
key);
-
-                    dataTree.iterate(
-                        searchRow,
-                        new MvccKeyMinVersionBound(cacheId, key),
-                        searchRow // Use the same instance as closure to do not create extra
object.
-                    );
+                MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
 
-                    row = searchRow.row();
-                }
-                else {
-                    GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId,
key, Long.MAX_VALUE, Long.MAX_VALUE),
-                        new MvccSearchRow(cacheId, key, 1, 1));
+                dataTree.iterate(
+                    searchRow,
+                    new MvccKeyMinVersionBound(cacheId, key),
+                    searchRow // Use the same instance as closure to do not create extra
object.
+                );
 
-                    if (cur.next())
-                        row = cur.get();
-                    else
-                        row = null;
-                }
+                row = searchRow.row();
             }
             else
                 row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
@@ -1818,55 +1781,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-            if (true) {
-                MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId,
key, ver);
-
-                dataTree.iterate(
-                    lower,
-                    new MvccKeyMinVersionBound(cacheId, key),
-                    lower // Use the same instance as closure to do not create extra object.
-                );
-
-                CacheDataRow row = lower.row();
-
-                afterRowFound(row, key);
-
-                return row;
-            }
-            else {
-                GridCursor<CacheDataRow> cur = dataTree.find(
-                    new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
-                    new MvccSearchRow(cacheId, key, 1, 1));
-
-                CacheDataRow row = null;
+            MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key,
ver);
 
-                MvccLongList txs = ver.activeTransactions();
+            dataTree.iterate(
+                lower,
+                new MvccKeyMinVersionBound(cacheId, key),
+                lower // Use the same instance as closure to do not create extra object.
+            );
 
-                while (cur.next()) {
-                    CacheDataRow row0 = cur.get();
+            CacheDataRow row = lower.row();
 
-                    assert row0.mvccCoordinatorVersion() > 0 : row0;
-
-                    boolean visible;
-
-                    if (txs != null) {
-                        visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
-                            || !txs.contains(row0.mvccCounter());
-                    }
-                    else
-                        visible = true;
-
-                    if (visible) {
-                        row = row0;
-
-                        break;
-                    }
-                }
-
-                assert row == null || key.equals(row.key());
+            afterRowFound(row, key);
 
-                return row;
-            }
+            return row;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 830d50b..88095ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -560,7 +560,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             MvccCoordinator mvccCrd = firstEvtDiscoCache.mvccCoordinator();
 
             boolean mvccCrdChange = mvccCrd != null &&
-                initialVersion().equals(mvccCrd.topologyVersion());
+                (initialVersion().equals(mvccCrd.topologyVersion()) || activateCluster());
 
             cctx.kernalContext().coordinators().currentCoordinator(mvccCrd);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index b9b8ea1..54fb3c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -66,6 +66,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 
 /**
@@ -86,7 +87,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
     /** */
     private static final byte MSG_POLICY = SYSTEM_POOL;
-    
+
+    /** */
+    private static final long CRD_VER_MASK = 0x3F_FF_FF_FF_FF_FF_FF_FFL;
+
+    /** */
+    private static final long RMVD_VAL_VER_MASK = 0x80_00_00_00_00_00_00_00L;
+
     /** */
     private volatile MvccCoordinator curCrd;
 
@@ -139,6 +146,21 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter
{
         super(ctx);
     }
 
+    public static int compareCoordinatorVersions(long crdVer1, long crdVer2) {
+        crdVer1 = CRD_VER_MASK & crdVer1;
+        crdVer2 = CRD_VER_MASK & crdVer2;
+
+        return Long.compare(crdVer1, crdVer2);
+    }
+
+    public long createVersionForRemovedValue(long crdVer) {
+        return crdVer | RMVD_VAL_VER_MASK;
+    }
+
+    public boolean versionForRemovedValue(long crdVer) {
+        return (crdVer & RMVD_VAL_VER_MASK) != 0;
+    }
+
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         statCntrs = new StatCounter[7];
@@ -199,7 +221,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      * @param topVer Topology version.
      */
     public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer)
{
-        if (evtType == EVT_NODE_METRICS_UPDATED)
+        if (evtType == EVT_NODE_METRICS_UPDATED || evtType == EVT_DISCOVERY_CUSTOM_EVT)
             return;
 
         MvccCoordinator crd;
@@ -778,7 +800,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId)
{
         assert crdVer != 0;
 
-        return activeQueries.assignQueryCounter(qryNodeId, futId);
+        MvccCoordinatorVersionResponse res = activeQueries.assignQueryCounter(qryNodeId,
futId);
+
+        return res;
 
 //        MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
 //
@@ -989,7 +1013,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter
{
         log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
             ", topVer=" + topVer + ']');
 
-        crdVer = topVer.topologyVersion();
+        crdVer = topVer.topologyVersion() + ctx.discovery().gridStartTime();
 
         prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1b2c03d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
new file mode 100644
index 0000000..ed7b62d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheMvccClusterRestartTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setConsistentId(gridName);
+
+        cfg.setMvccEnabled(true);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+        memCfg.setPageSize(1024);
+        memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024);
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        GridTestUtils.deleteDbFiles();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        GridTestUtils.deleteDbFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        GridTestUtils.deleteDbFiles();
+
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestart1() throws Exception {
+       restart1(3, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestart2() throws Exception {
+        restart1(1, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestart3() throws Exception {
+        restart1(3, 1);
+    }
+
+    /**
+     * @param srvBefore Number of servers before restart.
+     * @param srvAfter Number of servers after restart.
+     * @throws Exception If failed.
+     */
+    private void restart1(int srvBefore, int srvAfter) throws Exception {
+        Ignite srv0 = startGridsMultiThreaded(srvBefore);
+
+        srv0.active(true);
+
+        IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration());
+
+        Set<Integer> keys = new HashSet<>(primaryKeys(cache, 1, 0));
+
+        try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+            for (Integer k : keys)
+                cache.put(k, k);
+
+            tx.commit();
+        }
+
+        stopAllGrids();
+
+        srv0 = startGridsMultiThreaded(srvAfter);
+
+        srv0.active(true);
+
+        cache = srv0.cache(DEFAULT_CACHE_NAME);
+
+        Map<Object, Object> res = cache.getAll(keys);
+
+        assertEquals(keys.size(), res.size());
+
+        for (Integer k : keys)
+            assertEquals(k, cache.get(k));
+
+        try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+            for (Integer k : keys)
+                cache.put(k, k + 1);
+
+            tx.commit();
+        }
+
+        for (Integer k : keys)
+            assertEquals(k + 1, cache.get(k));
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration() {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setBackups(2);
+
+        return ccfg;
+    }
+}


Mime
View raw message