ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [4/5] incubator-ignite git commit: # IGNITE-283: Fixed TX stuff.
Date Fri, 20 Feb 2015 07:04:46 GMT
# IGNITE-283: Fixed TX stuff.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2945ee4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2945ee4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2945ee4f

Branch: refs/heads/ignite-283-tx
Commit: 2945ee4f5ddb5e8678b9e8baacdc7e9b27af7909
Parents: 43128cb
Author: vozerov <vozerov@gridgain.com>
Authored: Fri Feb 20 09:57:12 2015 +0300
Committer: vozerov <vozerov@gridgain.com>
Committed: Fri Feb 20 09:57:12 2015 +0300

----------------------------------------------------------------------
 .../GridDistributedTxRemoteAdapter.java         | 43 +++++++++-------
 .../cache/transactions/IgniteTxAdapter.java     | 52 +++++++++++++++++---
 .../cache/transactions/IgniteTxEntry.java       | 12 ++++-
 .../transactions/IgniteTxLocalAdapter.java      | 17 ++++---
 .../cache/version/GridCacheVersion.java         |  2 +-
 .../cache/version/GridCacheVersionEx.java       |  2 +-
 6 files changed, 91 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2945ee4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index e36947e..ce7a393 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -506,19 +506,23 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K,
V>
 
                                     GridCacheVersion explicitVer = txEntry.conflictVersion();
 
+                                    if (explicitVer == null)
+                                        explicitVer = writeVersion();
+
                                     if (txEntry.ttl() == CU.TTL_ZERO)
                                         op = DELETE;
 
                                     boolean drNeedResolve = cacheCtx.conflictNeedResolve();
 
+                                    GridCacheVersionConflictContext<K, V> drCtx = null;
+
                                     if (drNeedResolve) {
                                         IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K,
V>>
-                                            drRes = conflictResolve(op, txEntry.key(), val,
valBytes,
-                                            txEntry.ttl(), txEntry.conflictExpireTime(),
explicitVer, cached);
+                                            drRes = conflictResolve(op, txEntry, val, valBytes,
explicitVer, cached);
 
                                         assert drRes != null;
 
-                                        GridCacheVersionConflictContext<K, V> drCtx
= drRes.get2();
+                                        drCtx = drRes.get2();
 
                                         if (drCtx.isUseOld())
                                             op = NOOP;
@@ -610,26 +614,27 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K,
V>
                                             "Transaction does not own lock for group lock
entry during  commit [tx=" +
                                                 this + ", txEntry=" + txEntry + ']';
 
-                                        if (txEntry.ttl() != -1L)
-                                            cached.updateTtl(null, txEntry.ttl());
+                                        if (drCtx == null || !drCtx.isUseOld()) {
+                                            if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
+                                                cached.updateTtl(null, txEntry.ttl());
 
-                                        if (nearCached != null) {
-                                            V val0 = null;
-                                            byte[] valBytes0 = null;
+                                            if (nearCached != null) {
+                                                V val0 = null;
+                                                byte[] valBytes0 = null;
 
-                                            GridCacheValueBytes valBytesTuple = cached.valueBytes();
+                                                GridCacheValueBytes valBytesTuple = cached.valueBytes();
 
-                                            if (!valBytesTuple.isNull()) {
-                                                if (valBytesTuple.isPlain())
-                                                    val0 = (V)valBytesTuple.get();
-                                                else
-                                                    valBytes0 = valBytesTuple.get();
-                                            }
-                                            else
-                                                val0 = cached.rawGet();
+                                                if (!valBytesTuple.isNull()) {
+                                                    if (valBytesTuple.isPlain())
+                                                        val0 = (V) valBytesTuple.get();
+                                                    else
+                                                        valBytes0 = valBytesTuple.get();
+                                                } else
+                                                    val0 = cached.rawGet();
 
-                                            nearCached.updateOrEvict(xidVer, val0, valBytes0,
cached.expireTime(),
-                                                cached.ttl(), nodeId);
+                                                nearCached.updateOrEvict(xidVer, val0, valBytes0,
cached.expireTime(),
+                                                    cached.ttl(), nodeId);
+                                            }
                                         }
                                     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2945ee4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 93f4cf9..0dbd552 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1287,11 +1287,9 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
      * Resolve DR conflict.
      *
      * @param op Initially proposed operation.
-     * @param key Key.
+     * @param txEntry TX entry being updated.
      * @param newVal New value.
      * @param newValBytes New value bytes.
-     * @param newTtl New TTL.
-     * @param newDrExpireTime New explicit DR expire time.
      * @param newVer New version.
      * @param old Old entry.
      * @return Tuple with adjusted operation type and conflict context.
@@ -1299,9 +1297,49 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
      * @throws GridCacheEntryRemovedException If entry got removed.
      */
     protected IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K,
V>> conflictResolve(
-        GridCacheOperation op, K key, V newVal, byte[] newValBytes, long newTtl, long newDrExpireTime,
-        GridCacheVersion newVer, GridCacheEntryEx<K, V> old)
+        GridCacheOperation op, IgniteTxEntry txEntry, V newVal, byte[] newValBytes, GridCacheVersion
newVer,
+        GridCacheEntryEx<K, V> old)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
+        assert newVer != null;
+
+        // 1. Convert TTL and expire time.
+        long newTtl = txEntry.ttl();
+        long newExpireTime = txEntry.conflictExpireTime();
+
+        if (newTtl == CU.TTL_NOT_CHANGED) {
+            assert op == DELETE || newVer.conflictVersion().dataCenterId() == cctx.dataCenterId()
:
+                "TTL can be not-explicit only for local updates.";
+
+            ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry);
+
+            if (expiry == null) {
+                newTtl = CU.TTL_ETERNAL;
+                newExpireTime = CU.EXPIRE_TIME_ETERNAL;
+            }
+            else {
+                if (op == DELETE) {
+                    newTtl = CU.TTL_ETERNAL;
+                    newExpireTime = CU.EXPIRE_TIME_ETERNAL;
+                }
+                else if (op == CREATE)
+                    newTtl = CU.toTtl(expiry.getExpiryForUpdate());
+                else if (op == UPDATE)
+                    newTtl = CU.toTtl(expiry.getExpiryForUpdate());
+            }
+
+            if (newTtl == CU.TTL_ZERO) {
+                newTtl = CU.TTL_MINIMUM;
+                newExpireTime = CU.expireTimeInPast();
+            }
+        }
+        else if (newTtl == CU.TTL_ZERO) {
+            assert newVer.conflictVersion().dataCenterId() == cctx.dataCenterId() :
+                "TTL can be not-explicit only for local updates.";
+
+            newTtl = CU.TTL_MINIMUM;
+            newExpireTime = CU.expireTimeInPast();
+        }
+
         // Construct old entry info.
         GridCacheVersionedEntryEx<K, V> oldEntry = old.versionedEntry();
 
@@ -1309,10 +1347,8 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
         if (newVal == null && newValBytes != null)
             newVal = cctx.marshaller().unmarshal(newValBytes, cctx.deploy().globalLoader());
 
-        long newExpireTime = newDrExpireTime >= 0L ? newDrExpireTime : CU.toExpireTime(newTtl);
-
         GridCacheVersionedEntryEx<K, V> newEntry =
-            new GridCachePlainVersionedEntry<>(key, newVal, newTtl, newExpireTime,
newVer);
+            new GridCachePlainVersionedEntry<>((K)txEntry.key(), newVal, newTtl, newExpireTime,
newVer);
 
         GridCacheVersionConflictContext<K, V> ctx = old.context().conflictResolve(oldEntry,
newEntry, false);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2945ee4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 1416d8c..0f1366b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -852,10 +852,17 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware,
Externalizable,
         val.writeTo(out);
 
         out.writeLong(ttl);
-        out.writeLong(conflictExpireTime);
 
         CU.writeVersion(out, explicitVer);
         out.writeBoolean(grpLock);
+
+        if (conflictExpireTime != CU.EXPIRE_TIME_CALCULATE) {
+            out.writeBoolean(true);
+            out.writeLong(conflictExpireTime);
+        }
+        else
+            out.writeBoolean(false);
+
         CU.writeVersion(out, conflictVer);
 
         out.writeObject(transferExpiryPlc ? new IgniteExternalizableExpiryPolicy(expiryPlc)
: null);
@@ -882,10 +889,11 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware,
Externalizable,
         val.readFrom(in);
 
         ttl = in.readLong();
-        conflictExpireTime = in.readLong();
 
         explicitVer = CU.readVersion(in);
         grpLock = in.readBoolean();
+
+        conflictExpireTime = in.readBoolean() ? in.readLong() : CU.EXPIRE_TIME_CALCULATE;
         conflictVer = CU.readVersion(in);
 
         expiryPlc = (ExpiryPolicy)in.readObject();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2945ee4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 2da1bb7..aa78a53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -764,14 +764,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K,
V>
 
                                     boolean drNeedResolve = cacheCtx.conflictNeedResolve();
 
+                                    GridCacheVersionConflictContext<K, V> conflictCtx
= null;
+
                                     if (drNeedResolve) {
                                         IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K,
V>>
-                                            drRes = conflictResolve(op, txEntry.key(), val,
valBytes, txEntry.ttl(),
-                                                txEntry.conflictExpireTime(), explicitVer,
cached);
+                                            drRes = conflictResolve(op, txEntry, val, valBytes,
explicitVer, cached);
 
                                         assert drRes != null;
 
-                                        GridCacheVersionConflictContext<K, V> conflictCtx
= drRes.get2();
+                                        conflictCtx = drRes.get2();
 
                                         if (conflictCtx.isUseOld())
                                             op = NOOP;
@@ -905,8 +906,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K,
V>
                                             "Transaction does not own lock for group lock
entry during  commit [tx=" +
                                                 this + ", txEntry=" + txEntry + ']';
 
-                                        if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
-                                            cached.updateTtl(null, txEntry.ttl());
+                                        if (conflictCtx == null || !conflictCtx.isUseOld())
{
+                                            if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
+                                                cached.updateTtl(null, txEntry.ttl());
+                                        }
 
                                         if (log.isDebugEnabled())
                                             log.debug("Ignoring NOOP entry when committing:
" + txEntry);
@@ -3257,8 +3260,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K,
V>
 
         IgniteTxEntry<K, V> e = entry(key);
 
-        if (e != null)
+        if (e != null) {
             e.expiry(expiryPlc);
+            e.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2945ee4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
index b706e92..b3dc710 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
@@ -151,7 +151,7 @@ public class GridCacheVersion extends MessageAdapter implements Comparable<GridC
     /**
      * @return Conflict version.
      */
-    @Nullable public GridCacheVersion conflictVersion() {
+    public GridCacheVersion conflictVersion() {
         return this; // Use current version.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2945ee4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
index 4f135ca..9a6cbd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
@@ -77,7 +77,7 @@ public class GridCacheVersionEx extends GridCacheVersion {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion conflictVersion() {
+    public GridCacheVersion conflictVersion() {
         return drVer;
     }
 


Mime
View raw message