ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-647 Fixed issues with dynamic cache start when fair affinity is used
Date Wed, 23 Dec 2015 14:49:02 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1537 0adfd928b -> 185c28ae6


ignite-647 Fixed issues with dynamic cache start when fair affinity is used


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/185c28ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/185c28ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/185c28ae

Branch: refs/heads/ignite-1537
Commit: 185c28ae66c24c5c4d446f37c416a0091de61f8d
Parents: 0adfd92
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Dec 23 17:48:52 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Dec 23 17:48:52 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 89 ++++++++----------
 ...ridNearOptimisticTxPrepareFutureAdapter.java | 12 ++-
 ...yMetadataUpdateChangingTopologySelfTest.java | 96 +++++++++++++-------
 3 files changed, 110 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/185c28ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 634a9ea..393413e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1290,59 +1290,48 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         GridCacheReturn retVal = null;
 
-                        IgniteTxManager tm = ctx.tm();
+                        if (keys.size() > 1 &&                             //
Several keys ...
+                            writeThrough() && !req.skipStore() &&       
  // and store is enabled ...
+                            !ctx.store().isLocal() &&                      // and
this is not local store ...
+                            !ctx.dr().receiveEnabled()                     // and no DR.
+                            ) {
+                            // This method can only be used when there are no replicated
entries in the batch.
+                            UpdateBatchResult updRes = updateWithBatch(node,
+                                hasNear,
+                                req,
+                                res,
+                                locked,
+                                ver,
+                                dhtFut,
+                                completionCb,
+                                ctx.isDrEnabled(),
+                                taskName,
+                                expiry,
+                                sndPrevVal);
 
-                        // Needed for metadata cache transaction.
-                        boolean set = tm.setTxTopologyHint(req.topologyVersion());
+                            deleted = updRes.deleted();
+                            dhtFut = updRes.dhtFuture();
 
-                        try {
-                            if (keys.size() > 1 &&                           
 // Several keys ...
-                                writeThrough() && !req.skipStore() &&   
      // and store is enabled ...
-                                !ctx.store().isLocal() &&                      //
and this is not local store ...
-                                !ctx.dr().receiveEnabled()                     // and no
DR.
-                                ) {
-                                // This method can only be used when there are no replicated
entries in the batch.
-                                UpdateBatchResult updRes = updateWithBatch(node,
-                                    hasNear,
-                                    req,
-                                    res,
-                                    locked,
-                                    ver,
-                                    dhtFut,
-                                    completionCb,
-                                    ctx.isDrEnabled(),
-                                    taskName,
-                                    expiry,
-                                    sndPrevVal);
-
-                                deleted = updRes.deleted();
-                                dhtFut = updRes.dhtFuture();
-
-                                if (req.operation() == TRANSFORM)
-                                    retVal = updRes.invokeResults();
-                            }
-                            else {
-                                UpdateSingleResult updRes = updateSingle(node,
-                                    hasNear,
-                                    req,
-                                    res,
-                                    locked,
-                                    ver,
-                                    dhtFut,
-                                    completionCb,
-                                    ctx.isDrEnabled(),
-                                    taskName,
-                                    expiry,
-                                    sndPrevVal);
-
-                                retVal = updRes.returnValue();
-                                deleted = updRes.deleted();
-                                dhtFut = updRes.dhtFuture();
-                            }
+                            if (req.operation() == TRANSFORM)
+                                retVal = updRes.invokeResults();
                         }
-                        finally {
-                            if (set)
-                                tm.setTxTopologyHint(null);
+                        else {
+                            UpdateSingleResult updRes = updateSingle(node,
+                                hasNear,
+                                req,
+                                res,
+                                locked,
+                                ver,
+                                dhtFut,
+                                completionCb,
+                                ctx.isDrEnabled(),
+                                taskName,
+                                expiry,
+                                sndPrevVal);
+
+                            retVal = updRes.returnValue();
+                            deleted = updRes.deleted();
+                            dhtFut = updRes.dhtFuture();
                         }
 
                         if (retVal == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/185c28ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index fa7020b..553f8cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -52,11 +52,15 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends
GridNearT
         // Obtain the topology version to use.
         long threadId = Thread.currentThread().getId();
 
-        AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
+        AffinityTopologyVersion topVer;
 
-        // If there is another system transaction in progress, use it's topology version
to prevent deadlock.
-        if (topVer == null && tx != null && tx.system())
-            topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
+        if (tx != null && tx.system()) {
+            topVer = cctx.exchange().readyAffinityVersion();
+
+            assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+        }
+        else
+            topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         if (topVer != null) {
             tx.topologyVersion(topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/185c28ae/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
index c95c586..ddfe7fb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
@@ -25,10 +25,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -105,7 +107,7 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends
GridComm
 
         IgniteCache<Object, Object> cache = ignite(0).cache("cache").withAsync();
 
-        cache.putAll(F.asMap(key1, "val1", key2, new TestValue()));
+        cache.putAll(F.asMap(key1, "val1", key2, new TestValue1()));
 
         try {
             Thread.sleep(500);
@@ -118,8 +120,47 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends
GridComm
                 }
             });
 
+            Thread.sleep(1000);
+
+            spi.stopBlock();
+
+            cache.future().get();
+
+            fut.get();
+        }
+        finally {
+            stopGrid(4);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoDeadlockInvoke() throws Exception {
+        int key1 = primaryKey(ignite(1).cache("cache"));
+        int key2 = primaryKey(ignite(2).cache("cache"));
+
+        TestCommunicationSpi spi = (TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi();
+
+        spi.blockMessages(GridNearTxPrepareResponse.class, ignite(0).cluster().localNode().id());
+
+        IgniteCache<Object, Object> cache = ignite(0).cache("cache").withAsync();
+
+        cache.invokeAll(F.asSet(key1, key2), new TestEntryProcessor());
+
+        try {
             Thread.sleep(500);
 
+            IgniteInternalFuture<Void> fut = GridTestUtils.runAsync(new Callable<Void>()
{
+                @Override public Void call() throws Exception {
+                    startGrid(4);
+
+                    return null;
+                }
+            });
+
+            Thread.sleep(1000);
+
             spi.stopBlock();
 
             cache.future().get();
@@ -145,12 +186,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends
GridComm
         /** */
         private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
 
-        /** */
-        private Class<?> recordCls;
-
-        /** */
-        private List<Object> recordedMsgs = new ArrayList<>();
-
         /** {@inheritDoc} */
         @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackC)
             throws IgniteSpiException {
@@ -158,9 +193,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends
GridComm
                 Object msg0 = ((GridIoMessage)msg).message();
 
                 synchronized (this) {
-                    if (recordCls != null && msg0.getClass().equals(recordCls))
-                        recordedMsgs.add(msg0);
-
                     Set<UUID> blockNodes = blockCls.get(msg0.getClass());
 
                     if (F.contains(blockNodes, node.id())) {
@@ -178,28 +210,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends
GridComm
         }
 
         /**
-         * @param recordCls Message class to record.
-         */
-        void record(@Nullable Class<?> recordCls) {
-            synchronized (this) {
-                this.recordCls = recordCls;
-            }
-        }
-
-        /**
-         * @return Recorded messages.
-         */
-        List<Object> recordedMessages() {
-            synchronized (this) {
-                List<Object> msgs = recordedMsgs;
-
-                recordedMsgs = new ArrayList<>();
-
-                return msgs;
-            }
-        }
-
-        /**
          * @param cls Message class.
          * @param nodeId Node ID.
          */
@@ -241,7 +251,27 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends
GridComm
     /**
      *
      */
-    private static class TestValue {
+    static class TestEntryProcessor implements CacheEntryProcessor<Object, Object, Object>
{
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> e, Object... arguments)
{
+            e.setValue(new TestValue2());
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestValue1 {
+        /** Field1. */
+        private String field1;
+    }
+
+    /**
+     *
+     */
+    private static class TestValue2 {
         /** Field1. */
         private String field1;
     }


Mime
View raw message