ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [14/18] ignite git commit: IGNITE-1537 Fixed near optimistic TX future to avoid early completion.
Date Fri, 27 Nov 2015 08:03:15 GMT
IGNITE-1537 Fixed near optimistic TX future to avoid early completion.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/de08cd55
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/de08cd55
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/de08cd55

Branch: refs/heads/ignite-1956
Commit: de08cd554e75db0df06a5438da5012ebf6c7ad09
Parents: 895760e
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Nov 27 08:40:16 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Nov 27 08:40:16 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtTxPrepareFuture.java |  2 +-
 ...arOptimisticSerializableTxPrepareFuture.java | 92 ++++----------------
 .../near/GridNearOptimisticTxPrepareFuture.java | 57 ++++++------
 ...ridNearOptimisticTxPrepareFutureAdapter.java | 70 +++++++++++++++
 .../cache/local/GridLocalCacheEntry.java        |  6 ++
 .../transactions/IgniteTxLocalAdapter.java      |  2 +-
 .../util/future/GridCompoundFuture.java         |  2 +-
 7 files changed, 123 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index c55bead..1d418ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -417,7 +417,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 U.error(log, "Failed to get result value for cache entry: " + cached, e);
             }
             catch (GridCacheEntryRemovedException e) {
-                assert false : "Got entry removed exception while holding transactional lock
on entry: " + e;
+                assert false : "Got entry removed exception while holding transactional lock
on entry [e=" + e + ", cached=" + cached + ']';
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 144070c..916c7ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -39,12 +39,10 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -76,7 +74,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
     /** */
     @GridToStringExclude
-    private KeyLockFuture keyLockFut = new KeyLockFuture();
+    private KeyLockFuture keyLockFut;
 
     /** */
     @GridToStringExclude
@@ -134,7 +132,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                     }
                 }
 
-                keyLockFut.onKeyLocked(entry.txKey());
+                if (keyLockFut != null)
+                    keyLockFut.onKeyLocked(entry.txKey());
 
                 return true;
             }
@@ -189,7 +188,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         err.compareAndSet(null, e);
 
-        keyLockFut.onDone(e);
+        if (keyLockFut != null)
+            keyLockFut.onDone(e);
     }
 
     /** {@inheritDoc} */
@@ -210,7 +210,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         if (err != null) {
             this.err.compareAndSet(null, err);
 
-            keyLockFut.onDone(err);
+            if (keyLockFut != null)
+                keyLockFut.onDone(err);
         }
 
         return onComplete();
@@ -335,10 +336,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
         for (IgniteTxEntry read : reads)
             map(read, topVer, mappings, remap, topLocked);
 
-        keyLockFut.onAllKeysAdded();
-
-        if (!remap)
-            add(keyLockFut);
+        if (keyLockFut != null)
+            keyLockFut.onAllKeysAdded();
 
         if (isDone()) {
             if (log.isDebugEnabled())
@@ -535,8 +534,15 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
 
         if (!remap && (cacheCtx.isNear() || cacheCtx.isLocal())) {
-            if (entry.explicitVersion() == null)
+            if (entry.explicitVersion() == null) {
+                if (keyLockFut == null) {
+                    keyLockFut = new KeyLockFuture();
+
+                    add(keyLockFut);
+                }
+
                 keyLockFut.addLockKey(entry.txKey());
+            }
         }
 
         IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear());
@@ -854,68 +860,4 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(),
"err", error());
         }
     }
-
-    /**
-     * Keys lock future.
-     */
-    private class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse>
{
-        /** */
-        @GridToStringInclude
-        private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
-
-        /** */
-        private volatile boolean allKeysAdded;
-
-        /**
-         * @param key Key to track for locking.
-         */
-        private void addLockKey(IgniteTxKey key) {
-            assert !allKeysAdded;
-
-            lockKeys.add(key);
-        }
-
-        /**
-         * @param key Locked keys.
-         */
-        private void onKeyLocked(IgniteTxKey key) {
-            lockKeys.remove(key);
-
-            checkLocks();
-        }
-
-        /**
-         * Moves future to the ready state.
-         */
-        private void onAllKeysAdded() {
-            allKeysAdded = true;
-
-            checkLocks();
-        }
-
-        /**
-         * @return {@code True} if all locks are owned.
-         */
-        private boolean checkLocks() {
-            boolean locked = lockKeys.isEmpty();
-
-            if (locked && allKeysAdded) {
-                if (log.isDebugEnabled())
-                    log.debug("All locks are acquired for near prepare future: " + this);
-
-                onDone((GridNearTxPrepareResponse)null);
-            }
-            else {
-                if (log.isDebugEnabled())
-                    log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys
+ ']');
-            }
-
-            return locked;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(KeyLockFuture.class, this, super.toString());
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index e70e574..ca1d36c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -40,15 +40,15 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -65,8 +65,8 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
  */
 public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter
{
     /** */
-    @GridToStringInclude
-    private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+    @GridToStringExclude
+    private KeyLockFuture keyLockFut;
 
     /**
      * @param cctx Context.
@@ -84,10 +84,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             log.debug("Transaction future received owner changed callback: " + entry);
 
         if ((entry.context().isNear() || entry.context().isLocal()) && owner != null
&& tx.hasWriteKey(entry.txKey())) {
-            lockKeys.remove(entry.txKey());
-
-            // This will check for locks.
-            onDone();
+            if (keyLockFut != null)
+                keyLockFut.onKeyLocked(entry.txKey());
 
             return true;
         }
@@ -151,24 +149,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
         }
     }
 
-    /**
-     * @return {@code True} if all locks are owned.
-     */
-    private boolean checkLocks() {
-        boolean locked = lockKeys.isEmpty();
-
-        if (locked) {
-            if (log.isDebugEnabled())
-                log.debug("All locks are acquired for near prepare future: " + this);
-        }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys +
']');
-        }
-
-        return locked;
-    }
-
     /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
         if (!isDone()) {
@@ -215,8 +195,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
     /** {@inheritDoc} */
     @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
-        // If locks were not acquired yet, delay completion.
-        if (isDone() || (err == null && !checkLocks()))
+        if (isDone())
             return false;
 
         this.err.compareAndSet(null, err);
@@ -320,6 +299,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             return;
         }
 
+        if (keyLockFut != null)
+            keyLockFut.onAllKeysAdded();
+
         tx.addSingleEntryMapping(mapping, write);
 
         cctx.mvcc().recheckPendingLocks();
@@ -385,6 +367,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             return;
         }
 
+        if (keyLockFut != null)
+            keyLockFut.onAllKeysAdded();
+
         tx.addEntryMapping(mappings);
 
         cctx.mvcc().recheckPendingLocks();
@@ -543,8 +528,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
 
         if (cacheCtx.isNear() || cacheCtx.isLocal()) {
-            if (entry.explicitVersion() == null)
-                lockKeys.add(entry.txKey());
+            if (entry.explicitVersion() == null) {
+                if (keyLockFut == null) {
+                    keyLockFut = new KeyLockFuture();
+
+                    add(keyLockFut);
+                }
+
+                keyLockFut.addLockKey(entry.txKey());
+            }
         }
 
         if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear())
{
@@ -594,10 +586,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                     ", loc=" + ((MiniFuture)f).node().isLocal() +
                     ", done=" + f.isDone() + "]";
             }
+        }, new P1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+            @Override public boolean apply(IgniteInternalFuture<GridNearTxPrepareResponse>
fut) {
+                return isMini(fut);
+            }
         });
 
         return S.toString(GridNearOptimisticTxPrepareFuture.class, this,
             "innerFuts", futs,
+            "keyLockFut", keyLockFut,
             "tx", tx,
             "super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 6b7244a..5c7553f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -17,14 +17,20 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import java.util.Collection;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -157,4 +163,68 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends
GridNearT
      * @param topLocked {@code True} if thread already acquired lock preventing topology
change.
      */
     protected abstract void prepare0(boolean remap, boolean topLocked);
+
+    /**
+     * Keys lock future.
+     */
+    protected static class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse>
{
+        /** */
+        @GridToStringInclude
+        private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+
+        /** */
+        private volatile boolean allKeysAdded;
+
+        /**
+         * @param key Key to track for locking.
+         */
+        protected void addLockKey(IgniteTxKey key) {
+            assert !allKeysAdded;
+
+            lockKeys.add(key);
+        }
+
+        /**
+         * @param key Locked keys.
+         */
+        protected void onKeyLocked(IgniteTxKey key) {
+            lockKeys.remove(key);
+
+            checkLocks();
+        }
+
+        /**
+         * Moves future to the ready state.
+         */
+        protected void onAllKeysAdded() {
+            allKeysAdded = true;
+
+            checkLocks();
+        }
+
+        /**
+         * @return {@code True} if all locks are owned.
+         */
+        private boolean checkLocks() {
+            boolean locked = lockKeys.isEmpty();
+
+            if (locked && allKeysAdded) {
+                if (log.isDebugEnabled())
+                    log.debug("All locks are acquired for near prepare future: " + this);
+
+                onDone((GridNearTxPrepareResponse)null);
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys
+ ']');
+            }
+
+            return locked;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(KeyLockFuture.class, this, super.toString());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index 0ceae20..76bfc46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_LOCKED;
@@ -434,4 +435,9 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
     @Override protected void offHeapPointer(long valPtr) {
         this.valPtr = valPtr;
     }
+
+    /** {@inheritDoc} */
+    @Override public synchronized String toString() {
+        return S.toString(GridLocalCacheEntry.class, this, super.toString());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index b3ff3a6..f13cff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -598,7 +598,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      * @param entry Cache entry to check.
      */
     private void checkCommitLocks(GridCacheEntryEx entry) {
-        assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode
[entry=" + entry +
+        assert ownsLockUnsafe(entry) : "Lock is not owned for commit [entry=" + entry +
             ", tx=" + this + ']';
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/de08cd55/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index cc296e6..4b2461e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -423,7 +423,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
{
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return "Compound future listener: " + GridCompoundFuture.this;
+            return "Compound future listener []";
         }
     }
 }


Mime
View raw message