ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-3478
Date Mon, 25 Sep 2017 09:57:04 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 af0c3bc21 -> 7a4baba58


ignite-3478


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7a4baba5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7a4baba5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7a4baba5

Branch: refs/heads/ignite-3478
Commit: 7a4baba58609a99ee2da22dcf0ffca937581a4ce
Parents: af0c3bc
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Sep 25 12:56:55 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Sep 25 12:56:55 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |   3 +-
 .../mvcc/CacheCoordinatorsSharedManager.java    | 162 ++++++++++++-------
 .../mvcc/MvccCoordinatorVersionResponse.java    |   4 +
 .../cache/mvcc/CacheMvccTransactionsTest.java   |   4 +-
 4 files changed, 116 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7a4baba5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 6ef78db..dc24586 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1121,7 +1121,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
                 ", memoryPolicyName=" + memPlcName +
                 ", mode=" + cfg.getCacheMode() +
-                ", atomicity=" + cfg.getAtomicityMode() + ']');
+                ", atomicity=" + cfg.getAtomicityMode() +
+                ", mvcc=" + cacheCtx.mvccEnabled() + ']');
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a4baba5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index 641e6d4..c46a624 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -17,13 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -39,7 +38,6 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridAtomicLong;
@@ -86,7 +84,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>();
 
     /** */
-    private final Map<Long, Integer> activeQueries = new HashMap<>();
+    private final ConcurrentMap<Long, AtomicInteger> activeQueries = new ConcurrentHashMap<>();
 
     /** */
     private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>();
@@ -95,6 +93,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>();
 
     /** */
+    private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new ConcurrentHashMap<>();
+
+    /** */
     private final AtomicLong futIdCntr = new AtomicLong();
 
     /** */
@@ -475,7 +476,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param txId Transaction ID.
      * @return Counter.
      */
-    private synchronized MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion
txId, long futId) {
+    private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId)
{
         assert crdVer != 0;
 
         long nextCtr = mvccCntr.incrementAndGet();
@@ -508,76 +509,112 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     private void onTxDone(Long txCntr) {
         GridFutureAdapter fut; // TODO IGNITE-3478.
 
-        synchronized (this) {
-            GridCacheVersion ver = activeTxs.remove(txCntr);
+        GridCacheVersion ver = activeTxs.remove(txCntr);
 
-            assert ver != null;
+        assert ver != null;
 
-            committedCntr.setIfGreater(txCntr);
+        committedCntr.setIfGreater(txCntr);
 
-            fut = waitTxFuts.remove(txCntr);
-        }
+        fut = waitTxFuts.remove(txCntr);
 
         if (fut != null)
             fut.onDone();
     }
 
+    static boolean increment(AtomicInteger cntr) {
+        for (;;) {
+            int current = cntr.get();
+
+            if (current == 0)
+                return false;
+
+            if (cntr.compareAndSet(current, current + 1))
+                return true;
+        }
+    }
+
     /**
      * @param qryNodeId Node initiated query.
      * @return Counter for query.
      */
-    private synchronized MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId,
long futId) {
+    private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId)
{
         assert crdVer != 0;
 
-        Long mvccCntr = committedCntr.get();
-
         MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
 
-        Long trackCntr = mvccCntr;
+        Long mvccCntr;
 
-        for (Long txVer : activeTxs.keySet()) {
-            if (txVer < trackCntr)
-                trackCntr = txVer;
+        for(;;) {
+            mvccCntr = committedCntr.get();
 
-            res.addTx(txVer);
-        }
+            Long trackCntr = mvccCntr;
 
-        Integer queries = activeQueries.get(trackCntr);
+            for (Long txVer : activeTxs.keySet()) {
+                if (txVer < trackCntr)
+                    trackCntr = txVer;
 
-        if (queries != null)
-            activeQueries.put(trackCntr, queries + 1);
-        else
-            activeQueries.put(trackCntr, 1);
+                res.addTx(txVer);
+            }
+
+            registerActiveQuery(trackCntr);
+
+            if (committedCntr.get() == mvccCntr)
+                break;
+            else {
+                res.resetTransactionsCount();
+
+                onQueryDone(trackCntr);
+            }
+        }
 
         res.init(futId, crdVer, mvccCntr, COUNTER_NA);
 
         return res;
     }
 
+    private void registerActiveQuery(Long cntr) {
+        for (;;) {
+            AtomicInteger qryCnt = activeQueries.get(cntr);
+
+            if (qryCnt != null) {
+                boolean inc = increment(qryCnt);
+
+                if (!inc) {
+                    activeQueries.remove(mvccCntr, qryCnt);
+
+                    continue;
+                }
+            }
+            else {
+                qryCnt = new AtomicInteger(1);
+
+                if (activeQueries.putIfAbsent(cntr, qryCnt) != null)
+                    continue;
+            }
+
+            break;
+        }
+    }
+
     /**
      * @param mvccCntr Query counter.
      */
-    private synchronized void onQueryDone(long mvccCntr) {
-        Integer queries = activeQueries.get(mvccCntr);
+    private void onQueryDone(long mvccCntr) {
+        AtomicInteger cntr = activeQueries.get(mvccCntr);
 
-        assert queries != null : mvccCntr;
+        assert cntr != null : mvccCntr;
 
-        int left = queries - 1;
+        int left = cntr.decrementAndGet();
 
         assert left >= 0 : left;
 
         if (left == 0) {
-            Integer rmvd = activeQueries.remove(mvccCntr);
+            boolean rmv = activeQueries.remove(mvccCntr, cntr);
 
-            assert rmvd != null;
+            assert rmv;
         }
-        else
-            activeQueries.put(mvccCntr, left);
     }
 
-    /** */
-    private Map<Long, GridFutureAdapter> waitTxFuts = new HashMap<>(); // TODO
IGNITE-3478.
-
     /**
      * @param msg Message.
      */
@@ -586,37 +623,38 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
 
         GridLongList txs = msg.transactions();
 
-        // TODO IGNITE-3478.
-        GridCompoundFuture fut = null;
+        GridCompoundFuture resFut = null;
 
-        synchronized (this) {
-            for (int i = 0; i < txs.size(); i++) {
-                long txId = txs.get(i);
+        for (int i = 0; i < txs.size(); i++) {
+            Long txId = txs.get(i);
 
-                if (activeTxs.containsKey(txId)) {
-                    GridFutureAdapter fut0 = waitTxFuts.get(txId);
+            WaitTxFuture fut = waitTxFuts.get(txId);
 
-                    if (fut0 == null) {
-                        fut0 = new GridFutureAdapter();
+            if (fut == null) {
+                WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new WaitTxFuture(txId));
 
-                        waitTxFuts.put(txId, fut0);
-                    }
+                if (old != null)
+                    fut = old;
+            }
 
-                    if (fut == null)
-                        fut = new GridCompoundFuture();
+            if (!activeTxs.containsKey(txId))
+                fut.onDone();
 
-                    fut.add(fut0);
-                }
+            if (!fut.isDone()) {
+                if (resFut == null)
+                    resFut = new GridCompoundFuture();
+
+                resFut.add(fut);
             }
         }
 
-        if (fut != null)
-            fut.markInitialized();
+        if (resFut != null)
+            resFut.markInitialized();
 
-        if (fut == null || fut.isDone())
+        if (resFut == null || resFut.isDone())
             sendFutureResponse(nodeId, msg);
         else {
-            fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+            resFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
                 @Override public void apply(IgniteInternalFuture fut) {
                     sendFutureResponse(nodeId, msg);
                 }
@@ -944,4 +982,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         }
     }
 
+    /**
+     *
+     */
+    private static class WaitTxFuture extends GridFutureAdapter {
+        /** */
+        private final long txId;
+
+        /**
+         * @param txId Transaction ID.
+         */
+        WaitTxFuture(long txId) {
+            this.txId = txId;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a4baba5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
index 9d61a6d..04ef8d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -78,6 +78,10 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage,
M
         txs[txsCnt++] = txId;
     }
 
+    void resetTransactionsCount() {
+        txsCnt = 0;
+    }
+
     /** {@inheritDoc} */
     @Override public int size() {
         return txsCnt;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a4baba5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 1c37171..6b01aef 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1671,9 +1671,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * TODO IGNITE-3478.
+     *
      * @throws Exception If failed.
      */
-    public void testReadInProgressCoordinatorFails() throws Exception {
+    public void _testReadInProgressCoordinatorFails() throws Exception {
         testSpi = true;
 
         startGrids(4);


Mime
View raw message