ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-6149
Date Wed, 20 Sep 2017 15:22:02 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 d26266456 -> 880ea9821


ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/880ea982
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/880ea982
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/880ea982

Branch: refs/heads/ignite-3478
Commit: 880ea98217a4c9fa6058ca954c216e0f58f85f61
Parents: d262664
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Sep 20 18:21:54 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Sep 20 18:21:54 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    |   2 +-
 .../internal/TestRecordingCommunicationSpi.java |  14 +++
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 110 ++++++++++++++++++-
 3 files changed, 122 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/880ea982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index ea74f3c..d7be3eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1400,7 +1400,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                         activeTx = true;
                     }
 
-                    // Should not delete oldest version which is less than cleanup version
.
+                    // Should not delete oldest version which is less than cleanup version.
                     int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
 
                     if (cmp <= 0) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/880ea982/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index ab61687..859010e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -60,6 +61,9 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     /** */
     private IgniteBiPredicate<ClusterNode, Message> blockP;
 
+    /** */
+    private volatile IgniteBiInClosure<ClusterNode, Message> c;
+
     /**
      * @param node Node.
      * @return Test SPI.
@@ -76,6 +80,9 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
 
             Message msg0 = ioMsg.message();
 
+            if (c != null)
+                c.apply(node, msg0);
+
             synchronized (this) {
                 boolean record = (recordClasses != null && recordClasses.contains(msg0.getClass()))
||
                     (recordP != null && recordP.apply(node, msg0));
@@ -212,6 +219,13 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi
{
     }
 
     /**
+     * @param c Message closure.
+     */
+    public void closure(IgniteBiInClosure<ClusterNode, Message> c) {
+        this.c = c;
+    }
+
+    /**
      * @param blockP Message block predicate.
      */
     public void blockMessages(IgniteBiPredicate<ClusterNode, Message> blockP) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/880ea982/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index cf3bafb..0265519 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -55,6 +56,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -597,12 +599,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
-    public void testCleanupWaitsForGet() throws Exception {
+    public void testCleanupWaitsForGet1() throws Exception {
         boolean vals[] = {true, false};
 
         for (boolean otherPuts : vals) {
             for (boolean putOnStart : vals) {
-                cleanupWaitsForGet(otherPuts, putOnStart);
+                cleanupWaitsForGet1(otherPuts, putOnStart);
 
                 afterTest();
             }
@@ -614,7 +616,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
      * @param putOnStart {@code True} to put data in cache before getAll.
      * @throws Exception If failed.
      */
-    private void cleanupWaitsForGet(boolean otherPuts, final boolean putOnStart) throws Exception
{
+    private void cleanupWaitsForGet1(boolean otherPuts, final boolean putOnStart) throws
Exception {
         info("cleanupWaitsForGet [otherPuts=" + otherPuts + ", putOnStart=" + putOnStart
+ "]");
 
         testSpi = true;
@@ -700,6 +702,108 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
         assertEquals(5, (Object)vals.get(key2));
     }
 
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCleanupWaitsForGet2() throws Exception {
+        testSpi = true;
+
+        client = false;
+
+        startGrids(2);
+
+        client = true;
+
+        final Ignite client = startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        final IgniteCache<Object, Object> cache = client.createCache(cacheConfiguration(PARTITIONED,
FULL_SYNC, 0, 16).
+            setNodeFilter(new TestCacheNodeExcludingFilter(ignite(0).name())));
+
+        final Integer key1 = 1;
+        final Integer key2 = 2;
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+            cache.put(key1, 0);
+            cache.put(key2, 0);
+
+            tx.commit();
+        }
+
+        TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(grid(0));
+
+        TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(client);
+
+        final CountDownLatch getLatch = new CountDownLatch(1);
+
+        clientSpi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
+            @Override public void apply(ClusterNode node, Message msg) {
+                if (msg instanceof CoordinatorTxAckRequest)
+                    doSleep(2000);
+            }
+        });
+
+        crdSpi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
+            /** */
+            private AtomicInteger cntr = new AtomicInteger();
+
+            @Override public void apply(ClusterNode node, Message msg) {
+                if (msg instanceof MvccCoordinatorVersionResponse) {
+                    if (cntr.incrementAndGet() == 2) {
+                        getLatch.countDown();
+
+                        doSleep(1000);
+                    }
+                }
+            }
+        });
+
+        final IgniteInternalFuture<?> putFut1 = GridTestUtils.runAsync(new Callable<Void>()
{
+            @Override public Void call() throws Exception {
+                try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+                    cache.put(key1, 1);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        }, "put1");
+
+        final IgniteInternalFuture<?> putFut2 = GridTestUtils.runAsync(new Callable<Void>()
{
+            @Override public Void call() throws Exception {
+                try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+                    cache.put(key1, 2);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        }, "put2");
+
+        IgniteInternalFuture<?> getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>()
{
+            @Override public Void call() throws Exception {
+                U.await(getLatch);
+
+                while (!putFut1.isDone() || !putFut2.isDone()) {
+                    Map<Object, Object> vals = cache.getAll(F.asSet(key1, key2));
+
+                    assertEquals(2, vals.size());
+                }
+
+                return null;
+            }
+        }, 4, "get-thread");
+
+        putFut1.get();
+        putFut2.get();
+        getFut.get();
+    }
+
     /**
      * @throws Exception If failed.
      */


Mime
View raw message