ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4285
Date Tue, 29 Nov 2016 14:39:46 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4285 36e98d6a2 -> e7df145ce


ignite-4285


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7df145c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7df145c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7df145c

Branch: refs/heads/ignite-4285
Commit: e7df145ce7df62e229653c0cb50986004b4d1ebd
Parents: 36e98d6
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Nov 29 11:08:14 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Nov 29 13:27:20 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMvcc.java         |  70 +++--
 .../distributed/GridDistributedCacheEntry.java  | 100 +++----
 .../distributed/dht/GridDhtCacheEntry.java      |   9 +-
 .../distributed/dht/GridDhtLockFuture.java      |   1 -
 .../cache/local/GridLocalCacheEntry.java        |  69 ++---
 .../cache/transactions/IgniteTxManager.java     |   2 +-
 .../CacheSerializableTransactionsTest.java      | 271 +++++++++++++++++--
 7 files changed, 368 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e7df145c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index 87c3be2..be7fd75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -151,7 +151,7 @@ public final class GridCacheMvcc {
         if (locs != null) {
             assert !locs.isEmpty();
 
-            CacheLockCandidates locks = null;
+            CacheLockCandidates owners = null;
 
             GridCacheMvccCandidate first = locs.getFirst();
 
@@ -160,19 +160,19 @@ public final class GridCacheMvcc {
                     if (cand.owner()) {
                         assert cand.serializable() && cand.read() : cand;
 
-                        if (locks != null) {
-                            if (locks.size() == 1) {
-                                GridCacheMvccCandidate owner = locks.candidate(0);
+                        if (owners != null) {
+                            if (owners.size() == 1) {
+                                GridCacheMvccCandidate owner = owners.candidate(0);
 
-                                locks = new CacheLockCandidatesList();
+                                owners = new CacheLockCandidatesList();
 
-                                ((CacheLockCandidatesList)locks).add(owner);
+                                ((CacheLockCandidatesList)owners).add(owner);
                             }
 
-                            ((CacheLockCandidatesList)locks).add(cand);
+                            ((CacheLockCandidatesList)owners).add(cand);
                         }
                         else
-                            locks = cand;
+                            owners = cand;
                     }
 
                     if (!cand.serializable() || !cand.read())
@@ -180,9 +180,9 @@ public final class GridCacheMvcc {
                 }
             }
             else if (first.owner())
-                locks = first;
+                owners = first;
 
-            return locks;
+            return owners;
         }
 
         return null;
@@ -192,7 +192,7 @@ public final class GridCacheMvcc {
      * @return Local candidate only if it's first in the list and is marked
      *      as <tt>'owner'</tt>.
      */
-    @Nullable public GridCacheMvccCandidate localOwner() {
+    @Nullable GridCacheMvccCandidate localOwner() {
         if (locs != null) {
             assert !locs.isEmpty();
 
@@ -372,12 +372,12 @@ public final class GridCacheMvcc {
         }
         // Remote.
         else {
-            assert !cand.serializable() : cand;
+            assert !cand.serializable() && !cand.read() : cand;
 
             if (rmts == null)
                 rmts = new LinkedList<>();
 
-            assert !cand.owner() || localOwner() == null : "Cannot have local and remote
owners " +
+            assert !cand.owner() || localOwners() == null : "Cannot have local and remote
owners " +
                 "at the same time [cand=" + cand + ", locs=" + locs + ", rmts=" + rmts +
']';
 
             GridCacheMvccCandidate cur = candidate(rmts, cand.version());
@@ -486,7 +486,6 @@ public final class GridCacheMvcc {
      * @param baseVer Base version.
      * @param committedVers Committed versions relative to base.
      * @param rolledbackVers Rolled back versions relative to base.
-     * @return Lock owner.
      */
     public void orderCompleted(GridCacheVersion baseVer,
         Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion>
rolledbackVers) {
@@ -503,10 +502,13 @@ public final class GridCacheMvcc {
                 if (!cur.version().equals(baseVer) && committedVers.contains(cur.version()))
{
                     cur.setOwner();
 
-                    assert localOwner() == null || localOwner().nearLocal(): "Cannot not
have local owner and " +
+                    assert localOwners() == null || localOwner().nearLocal(): "Cannot not
have local owner and " +
                         "remote completed transactions at the same time [baseVer=" + baseVer
+
-                        ", committedVers=" + committedVers + ", rolledbackVers=" + rolledbackVers
+
-                        ", localOwner=" + localOwner() + ", locs=" + locs + ", rmts=" + rmts
+ ']';
+                        ", committedVers=" + committedVers +
+                        ", rolledbackVers=" + rolledbackVers +
+                        ", localOwner=" + localOwner() +
+                        ", locs=" + locs +
+                        ", rmts=" + rmts + ']';
 
                     if (maxIdx < 0)
                         maxIdx = it.nextIndex();
@@ -1149,6 +1151,8 @@ public final class GridCacheMvcc {
                     }
 
                     if (assigned) {
+                        assert !cand.serializable() && !cand.read() : cand;
+
                         it.remove();
 
                         // Owner must be first in the list.
@@ -1416,10 +1420,19 @@ public final class GridCacheMvcc {
      * @return {@code True} if lock is owned by the thread with given ID.
      */
     boolean isLocallyOwnedByThread(long threadId, boolean allowDhtLoc, GridCacheVersion...
exclude) {
-        GridCacheMvccCandidate owner = localOwner();
+        CacheLockCandidates owners = localOwners();
+
+        if (owners != null) {
+            for (int i = 0; i < owners.size(); i++) {
+                GridCacheMvccCandidate owner = owners.candidate(i);
 
-        return owner != null && owner.threadId() == threadId && owner.nodeId().equals(cctx.nodeId())
&&
-            (allowDhtLoc || !owner.dhtLocal()) && !U.containsObjectArray(exclude,
owner.version());
+                if (owner.threadId() == threadId && owner.nodeId().equals(cctx.nodeId())
&&
+                    (allowDhtLoc || !owner.dhtLocal()) && !U.containsObjectArray(exclude,
owner.version()))
+                    return true;
+            }
+        }
+
+        return false;
     }
 
     /**
@@ -1428,9 +1441,9 @@ public final class GridCacheMvcc {
      * @return {@code True} if candidate is owner.
      */
     boolean isLocallyOwned(GridCacheVersion lockVer) {
-        GridCacheMvccCandidate owner = localOwner();
+        CacheLockCandidates owners = localOwners();
 
-        return owner != null && owner.version().equals(lockVer);
+        return owners != null && owners.hasCandidate(lockVer);
     }
 
     /**
@@ -1439,9 +1452,18 @@ public final class GridCacheMvcc {
      * @return {@code True} if locked by lock ID or thread ID.
      */
     boolean isLocallyOwnedByIdOrThread(GridCacheVersion lockVer, long threadId) {
-        GridCacheMvccCandidate owner = localOwner();
+        CacheLockCandidates owners = localOwners();
+
+        if (owners != null) {
+            for (int i = 0; i < owners.size(); i++) {
+                GridCacheMvccCandidate owner = owners.candidate(i);
 
-        return owner != null && (owner.version().equals(lockVer) || owner.threadId()
== threadId);
+                if ((owner.version().equals(lockVer) || owner.threadId() == threadId))
+                    return true;
+            }
+        }
+
+        return false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7df145c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index 3a94e8b..91a3e75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -689,51 +689,57 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
     }
 
     /**
-     * @param prev Previous owners.
-     * @param owner Corrent owners.
+     * @param prevOwners Previous owners.
+     * @param owners Current owners.
      * @param val Entry value.
      */
-    protected final void checkOwnerChanged(@Nullable CacheLockCandidates prev,
-        @Nullable CacheLockCandidates owner,
+    protected final void checkOwnerChanged(@Nullable CacheLockCandidates prevOwners,
+        @Nullable CacheLockCandidates owners,
         CacheObject val) {
         assert !Thread.holdsLock(this);
 
-        if (prev != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_UNLOCKED))
{
-            for (int i = 0; i < prev.size(); i++) {
-                GridCacheMvccCandidate cand = prev.candidate(i);
+        if (prevOwners != null) {
+            for (int i = 0; i < prevOwners.size(); i++) {
+                GridCacheMvccCandidate cand = prevOwners.candidate(i);
 
-                boolean unlocked = owner == null || !owner.hasCandidate(cand.version());
+                boolean unlocked = owners == null || !owners.hasCandidate(cand.version());
 
                 if (unlocked) {
-                    boolean hasVal = hasValue();
-
-                    cctx.events().addEvent(partition(),
-                        key,
-                        cand.nodeId(),
-                        cand,
-                        EVT_CACHE_OBJECT_UNLOCKED,
-                        val,
-                        hasVal,
-                        val, hasVal,
-                        null,
-                        null,
-                        null,
-                        true);
+                    cctx.mvcc().callback().onOwnerChanged(this, null);
+
+                    if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNLOCKED)) {
+                        boolean hasVal = hasValue();
+
+                        cctx.events().addEvent(partition(),
+                            key,
+                            cand.nodeId(),
+                            cand,
+                            EVT_CACHE_OBJECT_UNLOCKED,
+                            val,
+                            hasVal,
+                            val, hasVal,
+                            null,
+                            null,
+                            null,
+                            true);
+                    }
+
+                    break;
                 }
             }
         }
 
-        if (owner != null) {
-            for (int i = 0; i < owner.size(); i++) {
-                GridCacheMvccCandidate cand = owner.candidate(i);
+        if (owners != null) {
+            for (int i = 0; i < owners.size(); i++) {
+                GridCacheMvccCandidate owner = owners.candidate(i);
 
-                boolean locked = prev == null || !prev.hasCandidate(cand.version());
+                boolean locked = prevOwners == null || !prevOwners.hasCandidate(owner.version());
 
                 if (locked) {
-                    cctx.mvcc().callback().onOwnerChanged(this, cand);
+                    cctx.mvcc().callback().onOwnerChanged(this, owner);
 
-                    if (cand.local())
-                        checkThreadChain(cand);
+                    if (owner.local())
+                        checkThreadChain(owner);
 
                     if (cctx.events().isRecordable(EVT_CACHE_OBJECT_LOCKED)) {
                         boolean hasVal = hasValue();
@@ -741,8 +747,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
                         // Event notification.
                         cctx.events().addEvent(partition(),
                             key,
-                            cand.nodeId(),
-                            cand,
+                            owner.nodeId(),
+                            owner,
                             EVT_CACHE_OBJECT_LOCKED,
                             val,
                             hasVal,
@@ -759,38 +765,6 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
     }
 
     /**
-     * @param prev Previous owner.
-     * @param owner Current owner.
-     * @param val Entry value.
-     */
-    protected final void checkOwnerChanged(GridCacheMvccCandidate prev, GridCacheMvccCandidate
owner, CacheObject val) {
-        assert !Thread.holdsLock(this);
-
-        if (owner != prev) {
-            cctx.mvcc().callback().onOwnerChanged(this, owner);
-
-            if (owner != null && owner.local())
-                checkThreadChain(owner);
-
-            if (prev != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_UNLOCKED))
{
-                boolean hasVal = hasValue();
-
-                // Event notification.
-                cctx.events().addEvent(partition(), key, prev.nodeId(), prev, EVT_CACHE_OBJECT_UNLOCKED,
val, hasVal,
-                    val, hasVal, null, null, null, true);
-            }
-
-            if (owner != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_LOCKED))
{
-                boolean hasVal = hasValue();
-
-                // Event notification.
-                cctx.events().addEvent(partition(), key, owner.nodeId(), owner, EVT_CACHE_OBJECT_LOCKED,
val, hasVal,
-                    val, hasVal, null, null, null, true);
-            }
-        }
-    }
-
-    /**
      * @param owner Starting candidate in the chain.
      */
     protected void checkThreadChain(GridCacheMvccCandidate owner) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7df145c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index dbbb235..cf4085b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -136,7 +136,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
      * @return Local candidate by near version.
      * @throws GridCacheEntryRemovedException If removed.
      */
-    @Nullable public synchronized GridCacheMvccCandidate localCandidateByNearVersion(GridCacheVersion
nearVer,
+    @Nullable synchronized GridCacheMvccCandidate localCandidateByNearVersion(GridCacheVersion
nearVer,
         boolean rmv) throws GridCacheEntryRemovedException {
         checkObsolete();
 
@@ -166,30 +166,28 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
      * @param threadId Owning thread ID.
      * @param ver Lock version.
      * @param serOrder Version for serializable transactions ordering.
-     * @param serReadVer Optional read entry version for optimistic serializable transaction.
      * @param timeout Timeout to acquire lock.
      * @param reenter Reentry flag.
      * @param tx Tx flag.
      * @param implicitSingle Implicit flag.
+     * @param read Read lock flag.
      * @return New candidate.
      * @throws GridCacheEntryRemovedException If entry has been removed.
      * @throws GridDistributedLockCancelledException If lock was cancelled.
      */
-    @Nullable public GridCacheMvccCandidate addDhtLocal(
+    @Nullable GridCacheMvccCandidate addDhtLocal(
         UUID nearNodeId,
         GridCacheVersion nearVer,
         AffinityTopologyVersion topVer,
         long threadId,
         GridCacheVersion ver,
         @Nullable GridCacheVersion serOrder,
-        @Nullable GridCacheVersion serReadVer,
         long timeout,
         boolean reenter,
         boolean tx,
         boolean implicitSingle,
         boolean read)
         throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
-        assert serReadVer == null || serOrder != null;
         assert !reenter || serOrder == null;
 
         GridCacheMvccCandidate cand;
@@ -280,7 +278,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
                 tx.threadId(),
                 tx.xidVersion(),
                 serOrder,
-                serReadVer,
                 timeout,
                 /*reenter*/false,
                 /*tx*/true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7df145c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index afc7615..d77933e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -374,7 +374,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             threadId,
             lockVer,
             null,
-            null,
             timeout,
             /*reenter*/false,
             inTx(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7df145c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index d910363..156e3e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -87,12 +87,11 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
         boolean implicitSingle,
         boolean read
     ) throws GridCacheEntryRemovedException {
-        GridCacheMvccCandidate prev;
-        GridCacheMvccCandidate cand;
-        GridCacheMvccCandidate owner;
+        assert serReadVer == null || serOrder != null;
 
-        CacheObject val;
-        boolean hasVal;
+        GridCacheMvccCandidate cand;
+        CacheLockCandidates prev;
+        CacheLockCandidates owner;
 
         synchronized (this) {
             checkObsolete();
@@ -110,7 +109,7 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
                 mvccExtras(mvcc);
             }
 
-            prev = mvcc.localOwner();
+            prev = mvcc.localOwners();
 
             cand = mvcc.addLocal(
                 this,
@@ -127,25 +126,14 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
                 read
             );
 
-            owner = mvcc.localOwner();
-
-            val = this.val;
-
-            hasVal = hasValueUnlocked();
+            owner = mvcc.localOwners();
 
             if (mvcc.isEmpty())
                 mvccExtras(null);
         }
 
-        if (cand != null) {
-            if (!cand.reentry())
-                cctx.mvcc().addNext(cctx, cand);
-
-            // Event notification.
-            if (cctx.events().isRecordable(EVT_CACHE_OBJECT_LOCKED))
-                cctx.events().addEvent(partition(), key, cand.nodeId(), cand, EVT_CACHE_OBJECT_LOCKED,
val, hasVal,
-                    val, hasVal, null, null, null, true);
-        }
+        if (cand != null && !cand.reentry())
+            cctx.mvcc().addNext(cctx, cand);
 
         checkOwnerChanged(prev, owner);
 
@@ -227,39 +215,38 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
     }
 
     /**
-     * @param prev Previous owners.
-     * @param owner Current owners.
+     * @param prevOwners Previous owners.
+     * @param owners Current owners.
      */
-    private void checkOwnerChanged(@Nullable CacheLockCandidates prev, @Nullable CacheLockCandidates
owner) {
+    private void checkOwnerChanged(@Nullable CacheLockCandidates prevOwners, @Nullable CacheLockCandidates
owners) {
         assert !Thread.holdsLock(this);
 
-        if (owner != null) {
-            for (int i = 0; i < owner.size(); i++) {
-                GridCacheMvccCandidate cand = owner.candidate(i);
+        if (prevOwners != null) {
+            for (int i = 0; i < prevOwners.size(); i++) {
+                GridCacheMvccCandidate cand = prevOwners.candidate(i);
 
-                boolean locked = prev == null || !prev.hasCandidate(cand.version());
+                boolean unlocked = owners == null || !owners.hasCandidate(cand.version());
 
-                if (locked) {
-                    cctx.mvcc().callback().onOwnerChanged(this, cand);
+                if (unlocked) {
+                    cctx.mvcc().callback().onOwnerChanged(this, null);
 
-                    checkThreadChain(cand);
+                    break;
                 }
             }
         }
-    }
 
-    /**
-     * @param prev Previous owner.
-     * @param owner Current owner.
-     */
-    private void checkOwnerChanged(GridCacheMvccCandidate prev, GridCacheMvccCandidate owner)
{
-        assert !Thread.holdsLock(this);
+        if (owners != null) {
+            for (int i = 0; i < owners.size(); i++) {
+                GridCacheMvccCandidate cand = owners.candidate(i);
+
+                boolean locked = prevOwners == null || !prevOwners.hasCandidate(cand.version());
 
-        if (owner != prev) {
-            cctx.mvcc().callback().onOwnerChanged(this, owner);
+                if (locked) {
+                    cctx.mvcc().callback().onOwnerChanged(this, cand);
 
-            if (owner != null)
-                checkThreadChain(owner);
+                    checkThreadChain(cand);
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7df145c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a4fb367..2c02f96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1615,7 +1615,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
                     assert serReadVer == null || (tx.optimistic() && tx.serializable())
: txEntry1;
 
-                    boolean read = serReadVer != null && txEntry1.op() == READ;
+                    boolean read = serOrder != null && txEntry1.op() == READ;
 
                     if (!entry1.tmLock(tx, timeout, serOrder, serReadVer, read)) {
                         // Unlock locks locked so far.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7df145c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 95750dc..4986c8a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -2469,28 +2469,106 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
 
             final AtomicInteger putKey = new AtomicInteger(1_000_000);
 
-            final int THREADS = 64;
+            ignite0.createCache(ccfg);
+
+            try {
+                checkNoReadLockConflict(ignite(0), ccfg.getName(), entry, putKey);
+
+                checkNoReadLockConflict(ignite(1), ccfg.getName(), entry, putKey);
+
+                checkNoReadLockConflict(ignite(SRVS), ccfg.getName(), entry, putKey);
+            }
+            finally {
+                destroyCache(ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     * @param cacheName Cache name.
+     * @param entry If {@code true} then uses 'getEntry' to read value, otherwise uses 'get'.
+     * @param putKey Write key counter.
+     * @throws Exception If failed.
+     */
+    private void checkNoReadLockConflict(final Ignite ignite,
+        String cacheName,
+        final boolean entry,
+        final AtomicInteger putKey) throws Exception
+    {
+        final int THREADS = 64;
+
+        final IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+
+        List<Integer> readKeys = testKeys(cache);
+
+        for (final Integer readKey : readKeys) {
+            final CyclicBarrier barrier = new CyclicBarrier(THREADS);
+
+            cache.put(readKey, Integer.MIN_VALUE);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE))
{
+                        if (entry)
+                            cache.get(readKey);
+                        else
+                            cache.getEntry(readKey);
+
+                        barrier.await();
+
+                        cache.put(putKey.incrementAndGet(), 0);
+
+                        tx.commit();
+                    }
+
+                    return null;
+                }
+            }, THREADS, "test-thread");
+
+            assertEquals((Integer)Integer.MIN_VALUE, cache.get(readKey));
+
+            cache.put(readKey, readKey);
+
+            assertEquals(readKey, cache.get(readKey));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoReadLockConflictMultiNode() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        for (final CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations())
{
+            logCacheInfo(ccfg);
+
+            final AtomicInteger putKey = new AtomicInteger(1_000_000);
 
             ignite0.createCache(ccfg);
 
             try {
-                final Ignite ignite = ignite0;
-                final IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
+                final int THREADS = 64;
 
-                List<Integer> readKeys = testKeys(cache);
+                IgniteCache<Integer, Integer> cache0 = ignite0.cache(ccfg.getName());
+
+                List<Integer> readKeys = testKeys(cache0);
 
                 for (final Integer readKey : readKeys) {
                     final CyclicBarrier barrier = new CyclicBarrier(THREADS);
 
-                    cache.put(readKey, 0);
+                    cache0.put(readKey, Integer.MIN_VALUE);
+
+                    final AtomicInteger idx = new AtomicInteger();
 
                     GridTestUtils.runMultiThreaded(new Callable<Void>() {
                         @Override public Void call() throws Exception {
+                            Ignite ignite = ignite(idx.incrementAndGet() % (CLIENTS + SRVS));
+
+                            IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
+
                             try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
SERIALIZABLE)) {
-                                if (entry)
-                                    cache.get(readKey);
-                                else
-                                    cache.getEntry(readKey);
+                                cache.get(readKey);
 
                                 barrier.await();
 
@@ -2502,6 +2580,12 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
                             return null;
                         }
                     }, THREADS, "test-thread");
+
+                    assertEquals((Integer)Integer.MIN_VALUE, cache0.get(readKey));
+
+                    cache0.put(readKey, readKey);
+
+                    assertEquals(readKey, cache0.get(readKey));
                 }
             }
             finally {
@@ -2513,29 +2597,178 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
+    @SuppressWarnings("UnnecessaryLocalVariable")
     public void testReadLockPessimisticTxConflict() throws Exception {
-        // TODO: no conflict for write, read conflict with pessimistic tx.
+        Ignite ignite0 = ignite(0);
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            ignite0.createCache(ccfg);
+
+            try {
+                Ignite ignite = ignite0;
+
+                IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
+
+                Integer writeKey = Integer.MAX_VALUE;
+
+                List<Integer> readKeys = testKeys(cache);
+
+                for (Integer readKey : readKeys) {
+                    CountDownLatch latch = new CountDownLatch(1);
+
+                    IgniteInternalFuture<?> fut = lockKey(latch, cache, readKey);
+
+                    try {
+                        // No conflict for write, conflict with pessimistic tx for read.
+                        try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE))
{
+                            cache.put(writeKey, writeKey);
+
+                            cache.get(readKey);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+                    finally {
+                        latch.countDown();
+                    }
+
+                    fut.get();
+                }
+            }
+            finally {
+                destroyCache(ccfg.getName());
+            }
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
+    @SuppressWarnings("UnnecessaryLocalVariable")
     public void testReadWriteTxConflict() throws Exception {
-        // TODO: no conflict for read, conflict for write.
+        Ignite ignite0 = ignite(0);
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            ignite0.createCache(ccfg);
+
+            try {
+                Ignite ignite = ignite0;
+
+                IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
+
+                Integer writeKey = Integer.MAX_VALUE;
+
+                List<Integer> readKeys = testKeys(cache);
+
+                for (Integer readKey : readKeys) {
+                    try {
+                        // No conflict for read, conflict for write.
+                        try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE))
{
+                            cache.getAndPut(writeKey, writeKey);
+
+                            cache.get(readKey);
+
+                            updateKey(cache, writeKey, writeKey + readKey);
+
+                            tx.commit();
+                        }
+
+                        fail();
+                    }
+                    catch (TransactionOptimisticException e) {
+                        log.info("Expected exception: " + e);
+                    }
+
+                    assertEquals((Integer)(writeKey + readKey), cache.get(writeKey));
+                    assertNull(cache.get(readKey));
+
+                    cache.put(readKey, readKey);
+
+                    assertEquals(readKey, cache.get(readKey));
+                }
+            }
+            finally {
+                destroyCache(ccfg.getName());
+            }
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testNoReadLockConflictMultiNode() throws Exception {
-        // TODO
+    public void testReadWriteTransactionsNoDeadlock() throws Exception {
+        checkReadWriteTransactionsNoDeadlock(false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testReadWriteTransactionsNoDeadlock() throws Exception {
-        // TODO
+    public void testReadWriteTransactionsNoDeadlockMultinode() throws Exception {
+        checkReadWriteTransactionsNoDeadlock(true);
+    }
+
+    /**
+     * @param multiNode Multi-node test flag.
+     * @throws Exception If failed.
+     */
+    private void checkReadWriteTransactionsNoDeadlock(final boolean multiNode) throws Exception
{
+        final Ignite ignite0 = ignite(0);
+
+        for (final CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations())
{
+            logCacheInfo(ccfg);
+
+            ignite0.createCache(ccfg);
+
+            try {
+                final long stopTime = U.currentTimeMillis() + 10_000;
+
+                final AtomicInteger idx = new AtomicInteger();
+
+                GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        Ignite ignite = multiNode ? ignite(idx.incrementAndGet() % (SRVS
+ CLIENTS)) : ignite0;
+
+                        IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName());
+
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                        while (U.currentTimeMillis() < stopTime) {
+                            try {
+                                try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
SERIALIZABLE)) {
+                                    for (int i = 0; i < 10; i++) {
+                                        Integer key = rnd.nextInt(30);
+
+                                        if (rnd.nextBoolean())
+                                            cache.get(key);
+                                        else
+                                            cache.put(key, key);
+                                    }
+
+                                    tx.commit();
+                                }
+                            }
+                            catch (TransactionOptimisticException ignore) {
+                                // No-op.
+                            }
+                        }
+
+                        return null;
+                    }
+                }, 32, "test-thread");
+            }
+            finally {
+                destroyCache(ccfg.getName());
+            }
+        }
     }
 
     /**
@@ -4287,10 +4520,12 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
         if (ccfg.getCacheMode() == PARTITIONED)
             keys.add(nearKey(cache));
 
-        keys.add(primaryKey(cache));
+        if (!cache.unwrap(Ignite.class).configuration().isClientMode()) {
+            keys.add(primaryKey(cache));
 
-        if (ccfg.getBackups() != 0)
-            keys.add(backupKey(cache));
+            if (ccfg.getBackups() != 0)
+                keys.add(backupKey(cache));
+        }
 
         return keys;
     }


Mime
View raw message