ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5578
Date Fri, 28 Jul 2017 13:58:33 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 8c80ef7b8 -> 8d532dada


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8d532dad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8d532dad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8d532dad

Branch: refs/heads/ignite-5578
Commit: 8d532dadad2f34ae2756df2dc7f2358716c552a3
Parents: 8c80ef7
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jul 28 16:53:01 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jul 28 16:58:20 2017 +0300

----------------------------------------------------------------------
 .../dht/GridDhtTransactionalCacheAdapter.java   | 81 +++++++++++++++++++-
 .../GridDhtPartitionsExchangeFuture.java        |  6 +-
 .../distributed/CacheExchangeMergeTest.java     |  4 +-
 .../testsuites/IgniteCacheTestSuite6.java       |  3 +
 4 files changed, 87 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8d532dad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index d39afb1..79bd2f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 
@@ -645,7 +646,29 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends
GridDhtCach
             return;
         }
 
-        IgniteInternalFuture<?> f = lockAllAsync(ctx, nearNode, req, null);
+        processNearLockRequest0(nearNode, req);
+    }
+
+    /**
+     * @param nearNode
+     * @param req
+     */
+    private void processNearLockRequest0(ClusterNode nearNode, GridNearLockRequest req) {
+        IgniteInternalFuture<?> f;
+
+        if (req.firstClientRequest()) {
+            for (;;) {
+                if (waitForExchangeFuture(nearNode, req))
+                    return;
+
+                f = lockAllAsync(ctx, nearNode, req, null);
+
+                if (f != null)
+                    break;
+            }
+        }
+        else
+            f = lockAllAsync(ctx, nearNode, req, null);
 
         // Register listener just so we print out errors.
         // Exclude lock timeout exception since it's not a fatal exception.
@@ -653,6 +676,48 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends
GridDhtCach
             GridDistributedLockCancelledException.class));
     }
 
+    private boolean waitForExchangeFuture(final ClusterNode node, final GridNearLockRequest
req) {
+        assert req.firstClientRequest() : req;
+
+        GridDhtTopologyFuture topFut = ctx.shared().exchange().lastTopologyFuture();
+
+        if (!topFut.isDone()) {
+            Thread curThread = Thread.currentThread();
+
+            if (curThread instanceof IgniteThread) {
+                final IgniteThread thread = (IgniteThread)curThread;
+
+                if (thread.hasStripeOrPolicy()) {
+                    topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
+                            ctx.kernalContext().closure().runLocalWithThreadPolicy(thread,
new Runnable() {
+                                @Override public void run() {
+                                    try {
+                                        processNearLockRequest0(node, req);
+                                    }
+                                    finally {
+                                        ctx.io().onMessageProcessed(req);
+                                    }
+                                }
+                            });
+                        }
+                    });
+
+                    return true;
+                }
+            }
+
+            try {
+                topFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Topology future failed: " + e, e);
+            }
+        }
+
+        return false;
+    }
+
     /**
      * @param nodeId Node ID.
      * @param res Response.
@@ -847,6 +912,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends
GridDhtCach
                     top = topology();
 
                     top.readLock();
+
+                    if (!top.topologyVersionFuture().isDone()) {
+                        top.readUnlock();
+
+                        return null;
+                    }
                 }
 
                 try {
@@ -945,7 +1016,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends
GridDhtCach
 
                         top = topology();
 
-                        topology().readLock();
+                        top.readLock();
+
+                        if (!top.topologyVersionFuture().isDone()) {
+                            top.readUnlock();
+
+                            return null;
+                        }
                     }
 
                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d532dad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d5b7846..420b9c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1586,10 +1586,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         boolean wait;
 
         synchronized (this) {
-            assert !isDone();
-            assert !initFut.isDone();
-            assert mergedWith == null;
-            assert state == null;
+            assert !isDone() && !initFut.isDone() : this;
+            assert mergedWith == null && state == null : this;
 
             state = ExchangeLocalState.MERGED;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d532dad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 6d6da28..bea34fd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -392,6 +392,8 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     private void mergeJoinExchangesCoordinatorChange1(final int srvs, CoordinatorChangeMode
mode)
         throws Exception
     {
+        log.info("mergeJoinExchangesCoordinatorChange1 [srvs=" + srvs + ", mode=" + mode
+ ']');
+
         testSpi = true;
 
         Ignite srv0 = startGrids(srvs);
@@ -402,7 +404,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         IgniteInternalFuture<?> fut = startGrids(srv0, srvs, 2);
 
-        if (latch != null && !latch.await(5, TimeUnit.SECONDS))
+        if (latch != null && !latch.await(15, TimeUnit.SECONDS))
             fail("Failed to wait for expected messages.");
 
         stopGrid(getTestIgniteInstanceName(0), true, false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8d532dad/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index bb32d24..a0b4c8d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
 import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
 
 /**
@@ -33,6 +34,8 @@ public class IgniteCacheTestSuite6 extends TestSuite {
 
         suite.addTestSuite(CachePartitionStateTest.class);
 
+        suite.addTestSuite(CacheExchangeMergeTest.class);
+
         return suite;
     }
 }


Mime
View raw message