ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [46/50] [abbrv] incubator-ignite git commit: Merge branch sprint-1 into ignite-9655-merge
Date Thu, 12 Feb 2015 23:24:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1382064,6f75e32..08e2b94
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@@ -4262,9 -4570,9 +4570,9 @@@ public abstract class GridCacheAdapter<
                  true,
                  op.single(),
                  ctx.system(),
 -                PESSIMISTIC,
 +                OPTIMISTIC,
                  READ_COMMITTED,
-                 ctx.kernalContext().config().getTransactionsConfiguration().getDefaultTxTimeout(),
+                 ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(),
                  ctx.hasFlag(INVALIDATE),
                  !ctx.hasFlag(SKIP_STORE),
                  0,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ada0c1c,0c8fa66..bd1f260
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@@ -1213,97 -1221,120 +1210,94 @@@ public abstract class GridCacheMapEntry
  
          IgniteBiTuple<Boolean, V> interceptRes = null;
  
 -        try {
 -            synchronized (this) {
 -                checkObsolete();
 -
 -                if (tx != null && tx.groupLock() && cctx.kernalContext().config().isCacheSanityCheckEnabled())
 -                    groupLockSanityCheck(tx);
 -                else
 -                    assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
 -                        "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
 -
 -                boolean startVer = isStartVersion();
 -
 -                if (startVer) {
 -                    if (tx != null && !tx.local() && tx.onePhaseCommit())
 -                        // Must promote to check version for one-phase commit tx.
 -                        unswap(true, retval);
 -                    else
 -                        // Release swap.
 -                        releaseSwap();
 -                }
 +        synchronized (this) {
 +            checkObsolete();
  
 -                newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion();
 +            if (tx != null && tx.groupLock() && cctx.kernalContext().config().isCacheSanityCheckEnabled())
 +                groupLockSanityCheck(tx);
 +            else
 +                assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
 +                    "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
  
 -                if (tx != null && !tx.local() && tx.onePhaseCommit() && explicitVer == null) {
 -                    if (!startVer && ver.compareTo(newVer) > 0) {
 -                        if (log.isDebugEnabled())
 -                            log.debug("Skipping entry removal for one-phase commit since current entry version is " +
 -                                "greater than write version [entry=" + this + ", newVer=" + newVer + ']');
 +            boolean startVer = isStartVersion();
  
 -                        return new GridCacheUpdateTxResult<>(false, null);
 -                    }
 +            if (startVer) {
 +                // Release swap.
 +                releaseSwap();
 +            }
  
 -                    if (!detached())
 -                        enqueueVer = newVer;
 -                }
 +            newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion();
  
 -                old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val;
 +            old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val;
  
 -                if (intercept) {
 -                    interceptRes = cctx.config().<K, V>getInterceptor().onBeforeRemove(key, old);
 +            if (intercept) {
 +                interceptRes = cctx.config().<K, V>getInterceptor().onBeforeRemove(key, old);
  
 -                    if (cctx.cancelRemove(interceptRes))
 -                        return new GridCacheUpdateTxResult<>(false, cctx.<V>unwrapTemporary(interceptRes.get2()));
 -                }
 +                if (cctx.cancelRemove(interceptRes))
 +                    return new GridCacheUpdateTxResult<>(false, cctx.<V>unwrapTemporary(interceptRes.get2()));
 +            }
  
 -                GridCacheValueBytes oldBytes = valueBytesUnlocked();
 +            GridCacheValueBytes oldBytes = valueBytesUnlocked();
  
 -                if (old == null)
 -                    old = saveValueForIndexUnlocked();
 +            if (old == null)
 +                old = saveValueForIndexUnlocked();
  
 -                // Clear indexes inside of synchronization since indexes
 -                // can be updated without actually holding entry lock.
 -                clearIndex(old);
 +            // Clear indexes inside of synchronization since indexes
 +            // can be updated without actually holding entry lock.
 +            clearIndex(old);
  
 -                boolean hadValPtr = valPtr != 0;
 +            boolean hadValPtr = valPtr != 0;
  
 -                update(null, null, 0, 0, newVer);
 +            update(null, null, 0, 0, newVer);
  
 -                if (cctx.offheapTiered() && hadValPtr) {
 -                    boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes());
 +            if (cctx.offheapTiered() && hadValPtr) {
 +                boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes());
  
 -                    assert rmv;
 -                }
 +                assert rmv;
 +            }
  
 -                if (cctx.deferredDelete() && !detached() && !isInternal()) {
 -                    if (!deletedUnlocked()) {
 -                        deletedUnlocked(true);
 +            if (cctx.deferredDelete() && !detached() && !isInternal()) {
 +                if (!deletedUnlocked()) {
 +                    deletedUnlocked(true);
  
 -                        if (tx != null) {
 -                            GridCacheMvcc<K> mvcc = mvccExtras();
 +                    if (tx != null) {
 +                        GridCacheMvcc<K> mvcc = mvccExtras();
  
 -                            if (mvcc == null || mvcc.isEmpty(tx.xidVersion()))
 -                                clearReaders();
 -                            else
 -                                clearReader(tx.originatingNodeId());
 -                        }
 +                        if (mvcc == null || mvcc.isEmpty(tx.xidVersion()))
 +                            clearReaders();
 +                        else
 +                            clearReader(tx.originatingNodeId());
                      }
                  }
 +            }
  
 -                drReplicate(drType, null, null, newVer);
 +            drReplicate(drType, null, null, newVer);
  
 -                if (metrics && cctx.cache().configuration().isStatisticsEnabled())
 -                    cctx.cache().metrics0().onRemove();
 +            if (metrics && cctx.cache().configuration().isStatisticsEnabled())
 +                cctx.cache().metrics0().onRemove();
  
 -                if (tx == null)
 -                    obsoleteVer = newVer;
 -                else {
 -                    // Only delete entry if the lock is not explicit.
 -                    if (tx.groupLock() || lockedBy(tx.xidVersion()))
 -                        obsoleteVer = tx.xidVersion();
 -                    else if (log.isDebugEnabled())
 -                        log.debug("Obsolete version was not set because lock was explicit: " + this);
 -                }
 +            if (tx == null)
 +                obsoleteVer = newVer;
 +            else {
 +                // Only delete entry if the lock is not explicit.
 +                if (tx.groupLock() || lockedBy(tx.xidVersion()))
 +                    obsoleteVer = tx.xidVersion();
 +                else if (log.isDebugEnabled())
 +                    log.debug("Obsolete version was not set because lock was explicit: " + this);
 +            }
  
 -                if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
 -                    V evtOld = cctx.unwrapTemporary(old);
 +            if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
 +                V evtOld = cctx.unwrapTemporary(old);
  
 -                    cctx.events().addEvent(partition(), key, evtNodeId, tx == null ? null : tx.xid(), newVer,
 -                        EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hasValueUnlocked(), subjId,
 -                        null, taskName);
 -                }
 +                cctx.events().addEvent(partition(), key, evtNodeId, tx == null ? null : tx.xid(), newVer,
 +                    EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hasValueUnlocked(), subjId,
 +                    null, taskName);
 +            }
  
-             CacheMode mode = cctx.config().getCacheMode();
- 
-             if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED ||
-                 (tx != null && tx.local() && !isNear()))
-                 cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false);
+                 if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
+                     cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false);
  
 -                cctx.dataStructures().onEntryUpdated(key, true);
 -            }
 -        }
 -        finally {
 -            if (enqueueVer != null) {
 -                assert cctx.deferredDelete();
 -
 -                cctx.onDeferredDelete(this, enqueueVer);
 -            }
 +            cctx.dataStructures().onEntryUpdated(key, true);
          }
  
          // Persist outside of synchronization. The correctness of the

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a5d39ab,d7b1914..1d064e2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -279,12 -281,21 +281,28 @@@ public class GridCachePartitionExchange
      }
  
      /** {@inheritDoc} */
 +    @Override protected void stop0(boolean cancel) {
 +        super.stop0(cancel);
 +
 +        exchFuts = null;
 +    }
 +
++    /** {@inheritDoc} */
+     @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+     @Override protected void stop0(boolean cancel) {
+         super.stop0(cancel);
+ 
+         // Do not allow any activity in exchange manager after stop.
+         busyLock.writeLock().lock();
+ 
+         exchFuts = null;
+     }
+ 
+     /**
+      * @param cacheId Cache ID.
+      * @param exchId Exchange ID.
+      * @return Topology.
+      */
      public GridDhtPartitionTopology<K, V> clientTopology(int cacheId, GridDhtPartitionExchangeId exchId) {
          GridClientPartitionTopology<K, V> top = clientTops.get(cacheId);
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index c37be22,d8cc296..9b12d1f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@@ -20,12 -20,13 +20,12 @@@ package org.apache.ignite.internal.proc
  import org.apache.ignite.*;
  import org.apache.ignite.internal.*;
  import org.apache.ignite.internal.processors.cache.*;
- import org.apache.ignite.internal.processors.cache.version.*;
- import org.apache.ignite.lang.*;
  import org.apache.ignite.internal.processors.cache.transactions.*;
- import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
  import org.apache.ignite.internal.util.tostring.*;
 -import org.apache.ignite.internal.util.typedef.*;
  import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.plugin.extensions.communication.*;
  import org.jetbrains.annotations.*;
  
  import java.io.*;
@@@ -269,7 -367,11 +269,7 @@@ public class GridDistributedTxFinishReq
          _clone.commit = commit;
          _clone.syncCommit = syncCommit;
          _clone.syncRollback = syncRollback;
-         _clone.baseVer = baseVer;
+         _clone.baseVer = baseVer != null ? (GridCacheVersion)baseVer.clone() : null;
 -        _clone.writeEntries = writeEntries;
 -        _clone.writeEntriesBytes = writeEntriesBytes;
 -        _clone.recoveryWrites = recoveryWrites;
 -        _clone.recoveryWritesBytes = recoveryWritesBytes;
          _clone.txSize = txSize;
          _clone.grpLockKey = grpLockKey;
          _clone.grpLockKeyBytes = grpLockKeyBytes;
@@@ -371,102 -485,110 +383,112 @@@
          if (!super.readFrom(buf))
              return false;
  
-         switch (commState.idx) {
+         switch (state) {
              case 8:
-                 GridCacheVersion baseVer0 = commState.getCacheVersion();
+                 baseVer = reader.readMessage("baseVer");
  
-                 if (baseVer0 == CACHE_VER_NOT_READ)
+                 if (!reader.isLastRead())
                      return false;
  
-                 baseVer = baseVer0;
- 
-                 commState.idx++;
+                 state++;
  
              case 9:
-                 if (buf.remaining() < 1)
-                     return false;
+                 commit = reader.readBoolean("commit");
  
-                 commit = commState.getBoolean();
+                 if (!reader.isLastRead())
+                     return false;
  
-                 commState.idx++;
+                 state++;
  
              case 10:
-                 GridCacheVersion commitVer0 = commState.getCacheVersion();
+                 commitVer = reader.readMessage("commitVer");
  
-                 if (commitVer0 == CACHE_VER_NOT_READ)
+                 if (!reader.isLastRead())
                      return false;
  
-                 commitVer = commitVer0;
- 
-                 commState.idx++;
+                 state++;
  
              case 11:
-                 IgniteUuid futId0 = commState.getGridUuid();
+                 futId = reader.readIgniteUuid("futId");
  
-                 if (futId0 == GRID_UUID_NOT_READ)
+                 if (!reader.isLastRead())
                      return false;
  
-                 futId = futId0;
- 
-                 commState.idx++;
+                 state++;
  
              case 12:
-                 byte[] grpLockKeyBytes0 = commState.getByteArray();
+                 grpLockKeyBytes = reader.readByteArray("grpLockKeyBytes");
  
-                 if (grpLockKeyBytes0 == BYTE_ARR_NOT_READ)
+                 if (!reader.isLastRead())
                      return false;
  
-                 grpLockKeyBytes = grpLockKeyBytes0;
- 
-                 commState.idx++;
+                 state++;
  
              case 13:
-                 if (buf.remaining() < 1)
-                     return false;
+                 invalidate = reader.readBoolean("invalidate");
  
-                 invalidate = commState.getBoolean();
+                 if (!reader.isLastRead())
+                     return false;
  
-                 commState.idx++;
+                 state++;
  
              case 14:
-                 if (buf.remaining() < 1)
-                     return false;
+                 recoveryWritesBytes = reader.readCollection("recoveryWritesBytes", byte[].class);
  
-                 syncCommit = commState.getBoolean();
+                 if (!reader.isLastRead())
+                     return false;
  
-                 commState.idx++;
+                 state++;
  
              case 15:
-                 if (buf.remaining() < 1)
-                     return false;
+                 syncCommit = reader.readBoolean("syncCommit");
  
-                 syncRollback = commState.getBoolean();
+                 if (!reader.isLastRead())
+                     return false;
  
-                 commState.idx++;
+                 state++;
  
              case 16:
+                 syncRollback = reader.readBoolean("syncRollback");
 +                if (buf.remaining() < 1)
 +                    return false;
  
-                 sys = commState.getBoolean();
+                 if (!reader.isLastRead())
+                     return false;
  
-                 commState.idx++;
+                 state++;
  
              case 17:
-                 if (buf.remaining() < 8)
-                     return false;
+                 sys = reader.readBoolean("sys");
  
-                 threadId = commState.getLong();
+                 if (!reader.isLastRead())
+                     return false;
  
-                 commState.idx++;
+                 state++;
  
              case 18:
-                 if (buf.remaining() < 4)
+                 threadId = reader.readLong("threadId");
+ 
+                 if (!reader.isLastRead())
                      return false;
  
-                 txSize = commState.getInt();
+                 state++;
+ 
+             case 19:
+                 txSize = reader.readInt("txSize");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 state++;
+ 
+             case 20:
+                 writeEntriesBytes = reader.readCollection("writeEntriesBytes", byte[].class);
+ 
+                 if (!reader.isLastRead())
+                     return false;
  
-                 commState.idx++;
+                 state++;
  
          }
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 257a331,7e3da3b..8616472
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@@ -21,10 -21,9 +21,10 @@@ import org.apache.ignite.cluster.*
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.transactions.*;
  import org.apache.ignite.internal.processors.cache.version.*;
 +import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.tostring.*;
  import org.apache.ignite.internal.util.typedef.*;
  import org.apache.ignite.internal.util.typedef.internal.*;
- import org.apache.ignite.internal.util.tostring.*;
  import org.jetbrains.annotations.*;
  
  import java.io.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index ff31a72,54d7757..7478b88
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@@ -133,10 -130,9 +133,10 @@@ public class GridDistributedTxPrepareRe
       * @param grpLockKey Group lock key.
       * @param partLock {@code True} if preparing group-lock transaction with partition lock.
       * @param txNodes Transaction nodes mapping.
 +     * @param onePhaseCommit One phase commit flag.
       */
      public GridDistributedTxPrepareRequest(
-         IgniteTxEx<K, V> tx,
+         IgniteInternalTx<K, V> tx,
          @Nullable Collection<IgniteTxEntry<K, V>> reads,
          Collection<IgniteTxEntry<K, V>> writes,
          IgniteTxKey grpLockKey,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 9bf7255,4992ba8..8d1ce82
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@@ -505,42 -505,50 +505,45 @@@ public class GridDistributedTxRemoteAda
  
                                      GridCacheVersion explicitVer = txEntry.drVersion();
  
+                                     if (txEntry.ttl() == CU.TTL_ZERO)
+                                         op = DELETE;
+ 
 -                                    if (finalizationStatus() == FinalizationStatus.RECOVERY_FINISH || optimistic()) {
 -                                        // Primary node has left the grid so we have to process conflicts on backups.
 -                                        if (explicitVer == null)
 -                                            explicitVer = writeVersion(); // Force write version to be used.
  
 -                                        boolean drNeedResolve =
 -                                            cacheCtx.conflictNeedResolve(cached.version(), explicitVer);
 +                                    boolean drNeedResolve =
 +                                        cacheCtx.conflictNeedResolve(cached.version(), explicitVer);
  
-                                     if (drNeedResolve) {
-                                         IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContextImpl<K, V>>
-                                             drRes = conflictResolve(op, txEntry.key(), val, valBytes,
-                                             txEntry.ttl(), txEntry.drExpireTime(), explicitVer, cached);
+                                         if (drNeedResolve) {
+                                             IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>>
+                                                 drRes = conflictResolve(op, txEntry.key(), val, valBytes,
+                                                 txEntry.ttl(), txEntry.drExpireTime(), explicitVer, cached);
  
 -                                            assert drRes != null;
 +                                        assert drRes != null;
  
-                                         GridCacheVersionConflictContextImpl<K, V> drCtx = drRes.get2();
+                                             GridCacheVersionConflictContext<K, V> drCtx = drRes.get2();
  
 -                                            if (drCtx.isUseOld())
 -                                                op = NOOP;
 -                                            else if (drCtx.isUseNew()) {
 -                                                txEntry.ttl(drCtx.ttl());
 -
 -                                                if (drCtx.newEntry().dataCenterId() != cacheCtx.dataCenterId())
 -                                                    txEntry.drExpireTime(drCtx.expireTime());
 -                                                else
 -                                                    txEntry.drExpireTime(-1L);
 -                                            }
 -                                            else if (drCtx.isMerge()) {
 -                                                op = drRes.get1();
 -                                                val = drCtx.mergeValue();
 -                                                valBytes = null;
 -                                                explicitVer = writeVersion();
 -
 -                                                txEntry.ttl(drCtx.ttl());
 +                                        if (drCtx.isUseOld())
 +                                            op = NOOP;
 +                                        else if (drCtx.isUseNew()) {
 +                                            txEntry.ttl(drCtx.ttl());
 +
 +                                            if (drCtx.newEntry().dataCenterId() != cacheCtx.dataCenterId())
 +                                                txEntry.drExpireTime(drCtx.expireTime());
 +                                            else
                                                  txEntry.drExpireTime(-1L);
 -                                            }
                                          }
 -                                        else
 -                                            // Nullify explicit version so that innerSet/innerRemove will work as usual.
 -                                            explicitVer = null;
 +                                        else if (drCtx.isMerge()) {
 +                                            op = drRes.get1();
 +                                            val = drCtx.mergeValue();
 +                                            valBytes = null;
 +                                            explicitVer = writeVersion();
 +
 +                                            txEntry.ttl(drCtx.ttl());
 +                                            txEntry.drExpireTime(-1L);
 +                                        }
                                      }
 +                                    else
 +                                        // Nullify explicit version so that innerSet/innerRemove will work as usual.
 +                                        explicitVer = null;
  
                                      if (op == CREATE || op == UPDATE) {
                                          // Invalidate only for near nodes (backups cannot be invalidated).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index cc36335,b2fcc50..aaa562c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@@ -621,21 -621,14 +620,9 @@@ public class GridDhtCacheEntry<K, V> ex
  
          GridCacheMvcc<K> mvcc = mvccExtras();
  
 -        GridCacheMvccCandidate<K> cand = mvcc == null ? null : mvcc.candidate(ver);
 -
 -        if (cand != null)
 -            cand.mappedNodeIds(mappings);
 -
 -        return cand;
 +        return mvcc == null ? null : mvcc.candidate(ver);
      }
  
-     /** {@inheritDoc} */
-     @Override public CacheEntry<K, V> wrap(boolean prjAware) {
-         GridCacheContext<K, V> nearCtx = cctx.dht().near().context();
- 
-         GridCacheProjectionImpl<K, V> prjPerCall = nearCtx.projectionPerCall();
- 
-         if (prjPerCall != null && prjAware)
-             return new GridPartitionedCacheEntryImpl<>(prjPerCall, nearCtx, key, this);
- 
-         return new GridPartitionedCacheEntryImpl<>(null, nearCtx, key, this);
-     }
- 
      /**
       * @return Cache name.
       */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 922e644,a93bf7d..793066a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@@ -20,23 -20,25 +20,24 @@@ package org.apache.ignite.internal.proc
  import org.apache.ignite.*;
  import org.apache.ignite.cluster.*;
  import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
  import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.transactions.*;
 -import org.apache.ignite.internal.util.future.*;
 -import org.apache.ignite.internal.util.tostring.*;
 +import org.apache.ignite.lang.*;
- import org.apache.ignite.transactions.*;
  import org.apache.ignite.internal.util.typedef.*;
  import org.apache.ignite.internal.util.typedef.internal.*;
 -import org.apache.ignite.lang.*;
 +import org.apache.ignite.internal.util.future.*;
 +import org.apache.ignite.internal.util.tostring.*;
  import org.jetbrains.annotations.*;
  
  import java.io.*;
  import java.util.*;
  import java.util.concurrent.atomic.*;
  
 -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
  import static org.apache.ignite.transactions.IgniteTxState.*;
- import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
  
  /**
   *
@@@ -91,8 -93,10 +92,8 @@@ public final class GridDhtTxFinishFutur
       * @param commit Commit flag.
       */
      public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter<K, V> tx, boolean commit) {
-         super(cctx.kernalContext(), F.<IgniteTx>identityReducer(tx));
+         super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx));
  
 -        assert cctx != null;
 -
          this.cctx = cctx;
          this.tx = tx;
          this.commit = commit;
@@@ -329,8 -334,25 +330,8 @@@
                  tx.subjectId(),
                  tx.taskNameHash());
  
 -            if (!tx.pessimistic()) {
 -                int idx = 0;
 -
 -                for (IgniteTxEntry<K, V> e : dhtMapping.writes())
 -                    req.ttl(idx++, e.ttl());
 -
 -                if (nearMapping != null) {
 -                    idx = 0;
 -
 -                    for (IgniteTxEntry<K, V> e : nearMapping.writes())
 -                        req.nearTtl(idx++, e.ttl());
 -                }
 -            }
 -
 -            if (tx.onePhaseCommit())
 -                req.writeVersion(tx.writeVersion());
 -
              try {
-                 cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                 cctx.io().send(n, req, tx.ioPolicy());
  
                  if (sync)
                      res = true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 9c39c7c,9bdf4bd..8e39b11
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@@ -17,15 -17,18 +17,16 @@@
  
  package org.apache.ignite.internal.processors.cache.distributed.dht;
  
 -import org.apache.ignite.*;
  import org.apache.ignite.internal.*;
 -import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
  import org.apache.ignite.internal.processors.cache.version.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.transactions.*;
- import org.apache.ignite.internal.processors.cache.transactions.*;
- import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.*;
  import org.apache.ignite.internal.util.tostring.*;
  import org.apache.ignite.internal.util.typedef.internal.*;
 -import org.apache.ignite.lang.*;
+ import org.apache.ignite.plugin.extensions.communication.*;
 -import org.apache.ignite.transactions.*;
  import org.jetbrains.annotations.*;
  
  import java.io.*;
@@@ -66,10 -81,14 +67,8 @@@ public class GridDhtTxFinishRequest<K, 
      private UUID subjId;
  
      /** Task name hash. */
-     @GridDirectVersion(2)
      private int taskNameHash;
  
 -    /** TTLs for optimistic transaction. */
 -    private GridLongList ttls;
 -
 -    /** Near cache TTLs for optimistic transaction. */
 -    private GridLongList nearTtls;
 -
      /**
       * Empty constructor required for {@link Externalizable}.
       */
@@@ -247,9 -371,12 +246,13 @@@
          _clone.sysInvalidate = sysInvalidate;
          _clone.topVer = topVer;
          _clone.pendingVers = pendingVers;
+         _clone.onePhaseCommit = onePhaseCommit;
+         _clone.writeVer = writeVer != null ? (GridCacheVersion)writeVer.clone() : null;
 +        _clone.writeVer = writeVer;
          _clone.subjId = subjId;
          _clone.taskNameHash = taskNameHash;
+         _clone.ttls = ttls != null ? (GridLongList)ttls.clone() : null;
+         _clone.nearTtls = nearTtls != null ? (GridLongList)nearTtls.clone() : null;
      }
  
      /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 4077275,6f5284a..1e2a222
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@@ -363,8 -350,7 +364,8 @@@ public class GridDhtTxLocal<K, V> exten
       * @param lastBackups IDs of backup nodes receiving last prepare request.
       * @return Future that will be completed when locks are acquired.
       */
-     public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync(
 -    public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsync(@Nullable Iterable<IgniteTxEntry<K, V>> reads,
++    public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsync(
 +        @Nullable Iterable<IgniteTxEntry<K, V>> reads,
          @Nullable Iterable<IgniteTxEntry<K, V>> writes,
          Map<IgniteTxKey<K>, GridCacheVersion> verMap,
          long msgId,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ee05fa2,ba4fbbd..7d75f92
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@@ -20,31 -20,29 +20,33 @@@ package org.apache.ignite.internal.proc
  import org.apache.ignite.*;
  import org.apache.ignite.cluster.*;
  import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
- import org.apache.ignite.internal.processors.cache.version.*;
- import org.apache.ignite.internal.util.*;
- import org.apache.ignite.lang.*;
  import org.apache.ignite.internal.processors.cache.distributed.near.*;
  import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
  import org.apache.ignite.internal.processors.dr.*;
- import org.apache.ignite.internal.util.typedef.*;
- import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.*;
  import org.apache.ignite.internal.util.future.*;
  import org.apache.ignite.internal.util.tostring.*;
 +import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
  import org.jetbrains.annotations.*;
  
 +import javax.cache.expiry.*;
 +import javax.cache.processor.*;
  import java.io.*;
  import java.util.*;
  import java.util.concurrent.atomic.*;
  
 +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
  import static org.apache.ignite.transactions.IgniteTxState.*;
- import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.events.EventType.*;
  import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ import static org.apache.ignite.transactions.IgniteTxState.*;
  
  /**
   *
@@@ -142,18 -122,10 +144,18 @@@ public final class GridDhtTxPrepareFutu
       * @param last {@code True} if this is last prepare operation for node.
       * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
       */
 -    public GridDhtTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridDhtTxLocalAdapter<K, V> tx,
 -        IgniteUuid nearMiniId, Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap, boolean last, Collection<UUID> lastBackups) {
 +    public GridDhtTxPrepareFuture(
 +        GridCacheSharedContext<K, V> cctx,
 +        final GridDhtTxLocalAdapter<K, V> tx,
 +        IgniteUuid nearMiniId,
 +        Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap,
 +        boolean last,
 +        boolean retVal,
 +        Collection<UUID> lastBackups,
 +        IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb
 +    ) {
-         super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, IgniteTxEx<K, V>>() {
-             @Override public boolean collect(IgniteTxEx<K, V> e) {
+         super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx<K, V>, IgniteInternalTx<K, V>>() {
+             @Override public boolean collect(IgniteInternalTx<K, V> e) {
                  return true;
              }
  
@@@ -515,91 -383,53 +517,91 @@@
  
          this.err.compareAndSet(null, err);
  
 -        if (replied.compareAndSet(false, true)) {
 -            try {
 -                // Must clear prepare future before response is sent or listeners are notified.
 -                if (tx.optimistic())
 -                    tx.clearPrepareFuture(this);
 +        // Must clear prepare future before response is sent or listeners are notified.
 +        if (tx.optimistic())
 +            tx.clearPrepareFuture(this);
  
 -                if (!tx.nearNodeId().equals(cctx.localNodeId())) {
 -                    // Send reply back to originating near node.
 -                    GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>(tx.nearXidVersion(),
 -                        tx.nearFutureId(), nearMiniId, tx.xidVersion(), tx.invalidPartitions(), this.err.get());
 +        if (tx.onePhaseCommit()) {
 +            assert last;
  
 -                    addDhtValues(res);
 +            // Must create prepare response before transaction is committed to grab correct return value.
 +            final GridNearTxPrepareResponse<K, V> res = createPrepareResponse();
  
 -                    GridCacheVersion min = tx.minVersion();
 +            onComplete();
 +
 +            if (!tx.near()) {
 +                if (tx.markFinalizing(IgniteTxEx.FinalizationStatus.USER_FINISH)) {
 +                    IgniteInternalFuture<IgniteTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync();
  
 -                    res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min));
 +                    fut.listenAsync(new CIX1<IgniteInternalFuture<IgniteTx>>() {
 +                        @Override public void applyx(IgniteInternalFuture<IgniteTx> gridCacheTxGridFuture) {
 +                            try {
 +                                if (replied.compareAndSet(false, true))
 +                                    sendPrepareResponse(res);
 +                            }
 +                            catch (IgniteCheckedException e) {
 +                                U.error(log, "Failed to send prepare response for transaction: " + tx, e);
 +                            }
 +                        }
 +                    });
 +                }
 +            }
 +            else {
 +                try {
 +                    if (replied.compareAndSet(false, true))
 +                        sendPrepareResponse(res);
 +                }
 +                catch (IgniteCheckedException e) {
 +                    U.error(log, "Failed to send prepare response for transaction: " + tx, e);
 +                }
 +            }
  
 -                    res.pending(localDhtPendingVersions(tx.writeEntries(), min));
 +            return true;
 +        }
 +        else {
 +            if (replied.compareAndSet(false, true)) {
 +                try {
 +                    sendPrepareResponse(createPrepareResponse());
  
 -                    cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
 +                    return true;
                  }
 +                catch (IgniteCheckedException e) {
 +                    onError(e);
  
 -                return true;
 +                    return true;
 +                }
 +                finally {
 +                    // Will call super.onDone().
 +                    onComplete();
 +                }
              }
 -            catch (IgniteCheckedException e) {
 -                onError(e);
 +            else {
 +                // Other thread is completing future. Wait for it to complete.
 +                try {
 +                    get();
 +                }
 +                catch (IgniteInterruptedException e) {
 +                    onError(new IgniteCheckedException("Got interrupted while waiting for replies to be sent.", e));
 +                }
 +                catch (IgniteCheckedException ignored) {
 +                    // No-op, get() was just synchronization.
 +                }
  
 -                return true;
 -            }
 -            finally {
 -                // Will call super.onDone().
 -                onComplete();
 +                return false;
              }
          }
 +    }
 +
 +    /**
 +     * @throws IgniteCheckedException If failed to send response.
 +     */
 +    private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) throws IgniteCheckedException {
 +        if (!tx.nearNodeId().equals(cctx.localNodeId()))
-             cctx.io().send(tx.nearNodeId(), res);
++            cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
          else {
 -            // Other thread is completing future. Wait for it to complete.
 -            try {
 -                get();
 -            }
 -            catch (IgniteInterruptedCheckedException e) {
 -                onError(new IgniteCheckedException("Got interrupted while waiting for replies to be sent.", e));
 -            }
 -            catch (IgniteCheckedException ignored) {
 -                // No-op, get() was just synchronization.
 -            }
 +            assert completeCb != null;
  
 -            return false;
 +            completeCb.apply(res);
          }
      }
  
@@@ -899,36 -742,13 +901,36 @@@
                          catch (GridCacheEntryRemovedException ignore) {
                              assert false : "Got removed exception on entry with dht local candidate: " + entry;
                          }
 +
 +                        idx++;
 +                    }
 +
 +                    if (!F.isEmpty(nearWrites)) {
 +                        for (IgniteTxEntry<K, V> entry : nearWrites) {
 +                            try {
 +                                GridCacheMvccCandidate<K> added = entry.cached().candidate(version());
 +
 +                                assert added != null;
 +                                assert added.dhtLocal();
 +
 +                                if (added.ownerVersion() != null)
 +                                    req.owned(entry.txKey(), added.ownerVersion());
 +
 +                                break;
 +                            }
 +                            catch (GridCacheEntryRemovedException ignore) {
 +                                assert false : "Got removed exception on entry with dht local candidate: " + entry;
 +                            }
 +                        }
                      }
  
 +                    assert req.transactionNodes() != null;
 +
                      //noinspection TryWithIdenticalCatches
                      try {
-                         cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
 -                        cctx.io().send(nearMapping.node(), req, tx.ioPolicy());
++                        cctx.io().send(n, req, tx.ioPolicy());
                      }
-                     catch (ClusterTopologyException e) {
+                     catch (ClusterTopologyCheckedException e) {
                          fut.onResult(e);
                      }
                      catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 732703e,cd02ff6..0938525
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@@ -363,127 -369,85 +360,91 @@@ public class GridNearLockRequest<K, V> 
          if (!super.writeTo(buf))
              return false;
  
-         if (!commState.typeWritten) {
-             if (!commState.putByte(directType()))
+         if (!typeWritten) {
+             if (!writer.writeByte(null, directType()))
                  return false;
  
-             commState.typeWritten = true;
+             typeWritten = true;
          }
  
-         switch (commState.idx) {
-             case 22:
-                 if (!commState.putLong(accessTtl))
-                     return false;
- 
-                 commState.idx++;
- 
-             case 23:
-                 if (dhtVers != null) {
-                     if (commState.it == null) {
-                         if (!commState.putInt(dhtVers.length))
-                             return false;
- 
-                         commState.it = arrayIterator(dhtVers);
-                     }
- 
-                     while (commState.it.hasNext() || commState.cur != NULL) {
-                         if (commState.cur == NULL)
-                             commState.cur = commState.it.next();
- 
-                         if (!commState.putCacheVersion((GridCacheVersion)commState.cur))
-                             return false;
- 
-                         commState.cur = NULL;
-                     }
- 
-                     commState.it = null;
-                 } else {
-                     if (!commState.putInt(-1))
-                         return false;
-                 }
- 
-                 commState.idx++;
- 
+         switch (state) {
              case 24:
-                 if (filterBytes != null) {
-                     if (commState.it == null) {
-                         if (!commState.putInt(filterBytes.length))
-                             return false;
- 
-                         commState.it = arrayIterator(filterBytes);
-                     }
- 
-                     while (commState.it.hasNext() || commState.cur != NULL) {
-                         if (commState.cur == NULL)
-                             commState.cur = commState.it.next();
- 
-                         if (!commState.putByteArray((byte[])commState.cur))
-                             return false;
- 
-                         commState.cur = NULL;
-                     }
+                 if (!writer.writeLong("accessTtl", accessTtl))
+                     return false;
  
-                     commState.it = null;
-                 } else {
-                     if (!commState.putInt(-1))
-                         return false;
-                 }
- 
-                 commState.idx++;
+                 state++;
  
              case 25:
-                 if (!commState.putBoolean(implicitSingleTx))
+                 if (!writer.writeObjectArray("dhtVers", dhtVers, GridCacheVersion.class))
                      return false;
  
-                 commState.idx++;
+                 state++;
  
              case 26:
-                 if (!commState.putBoolean(implicitTx))
+                 if (!writer.writeObjectArray("filterBytes", filterBytes, byte[].class))
                      return false;
  
-                 commState.idx++;
+                 state++;
  
              case 27:
-                 if (!commState.putGridUuid(miniId))
+                 if (!writer.writeBoolean("hasTransforms", hasTransforms))
                      return false;
  
-                 commState.idx++;
+                 state++;
  
              case 28:
-                 if (!commState.putBoolean(onePhaseCommit))
+                 if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
                      return false;
  
-                 commState.idx++;
+                 state++;
+ 
+             case 29:
+                 if (!writer.writeBoolean("implicitTx", implicitTx))
+                     return false;
+ 
+                 state++;
  
 +            case 29:
 +                if (!commState.putBoolean(syncCommit))
 +                    return false;
 +
 +                commState.idx++;
 +
              case 30:
-                 if (!commState.putLong(topVer))
+                 if (!writer.writeIgniteUuid("miniId", miniId))
                      return false;
  
-                 commState.idx++;
+                 state++;
  
              case 31:
-                 if (!commState.putUuid(subjId))
+                 if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
                      return false;
  
-                 commState.idx++;
+                 state++;
  
              case 32:
-                 if (!commState.putInt(taskNameHash))
+                 if (!writer.writeUuid("subjId", subjId))
                      return false;
  
-                 commState.idx++;
+                 state++;
  
              case 33:
-                 if (!commState.putBoolean(hasTransforms))
+                 if (!writer.writeBoolean("syncCommit", syncCommit))
                      return false;
  
-                 commState.idx++;
+                 state++;
+ 
+             case 34:
+                 if (!writer.writeInt("taskNameHash", taskNameHash))
+                     return false;
+ 
+                 state++;
+ 
+             case 35:
+                 if (!writer.writeLong("topVer", topVer))
+                     return false;
+ 
+                 state++;
  
          }
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index e77689f,6fa2f6a..c5524f8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -688,8 -695,8 +689,8 @@@ public class GridNearTxLocal<K, V> exte
      }
  
      /** {@inheritDoc} */
-     @Override public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync() {
+     @Override public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsync() {
 -        IgniteInternalFuture<IgniteInternalTx<K, V>> fut = prepFut.get();
 +        GridNearTxPrepareFuture<K, V> fut = (GridNearTxPrepareFuture<K, V>)prepFut.get();
  
          if (fut == null) {
              // Future must be created before any exception can be thrown.
@@@ -710,14 -718,16 +711,14 @@@
              if (!state(PREPARING)) {
                  if (setRollbackOnly()) {
                      if (timedOut())
-                         fut.onError(new IgniteTxTimeoutException("Transaction timed out and was " +
 -                        pessimisticFut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was " +
++                        fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was " +
                              "rolled back: " + this));
                      else
 -                        pessimisticFut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" +
 +                        fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" +
                              state() + ", tx=" + this + ']'));
                  }
                  else
-                     fut.onError(new IgniteTxRollbackException("Invalid transaction state for prepare " +
 -                    pessimisticFut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " +
++                    fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " +
                          "[state=" + state() + ", tx=" + this + ']'));
  
                  return fut;
@@@ -873,13 -891,11 +874,13 @@@
       * @return Future that will be completed when locks are acquired.
       */
      @SuppressWarnings("TypeMayBeWeakened")
-     public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsyncLocal(
 -    public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsyncLocal(@Nullable Collection<IgniteTxEntry<K, V>> reads,
 -        @Nullable Collection<IgniteTxEntry<K, V>> writes, Map<UUID, Collection<UUID>> txNodes, boolean last,
 -        Collection<UUID> lastBackups) {
 -        assert optimistic();
 -
++    public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsyncLocal(
 +        @Nullable Collection<IgniteTxEntry<K, V>> reads,
 +        @Nullable Collection<IgniteTxEntry<K, V>> writes,
 +        Map<UUID, Collection<UUID>> txNodes, boolean last,
 +        Collection<UUID> lastBackups,
 +        IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb
 +    ) {
          if (state() != PREPARING) {
              if (timedOut())
                  return new GridFinishedFuture<>(cctx.kernalContext(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 6496ef6,3c90db9..cd4ea34
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@@ -18,18 -18,16 +18,21 @@@
  package org.apache.ignite.internal.processors.cache.distributed.near;
  
  import org.apache.ignite.*;
- import org.apache.ignite.client.util.*;
  import org.apache.ignite.cluster.*;
  import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.managers.discovery.*;
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
 +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
 +import org.apache.ignite.internal.processors.cache.version.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.transactions.*;
 +import org.apache.ignite.internal.managers.discovery.*;
  import org.apache.ignite.internal.processors.cache.distributed.dht.*;
  import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.transactions.*;
  import org.apache.ignite.internal.util.future.*;
  import org.apache.ignite.internal.util.lang.*;
  import org.apache.ignite.internal.util.tostring.*;
@@@ -311,69 -326,65 +315,69 @@@ public final class GridNearTxPrepareFut
       * Waits for topology exchange future to be ready and then prepares user transaction.
       */
      public void prepare() {
 -        GridDhtTopologyFuture topFut = topologyReadLock();
 +        if (tx.optimistic()) {
 +            GridDhtTopologyFuture topFut = topologyReadLock();
  
 -        try {
 -            if (topFut.isDone()) {
 -                try {
 -                    if (!tx.state(PREPARING)) {
 -                        if (tx.setRollbackOnly()) {
 -                            if (tx.timedOut())
 -                                onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
 -                                    "was rolled back: " + this));
 +            try {
 +                if (topFut.isDone()) {
 +                    try {
 +                        if (!tx.state(PREPARING)) {
 +                            if (tx.setRollbackOnly()) {
 +                                if (tx.timedOut())
-                                     onError(null, null, new IgniteTxTimeoutException("Transaction timed out and " +
++                                    onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
 +                                        "was rolled back: " + this));
 +                                else
 +                                    onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " +
 +                                        "[state=" + tx.state() + ", tx=" + this + ']'));
 +                            }
                              else
-                                 onError(null, null, new IgniteTxRollbackException("Invalid transaction state for " +
 -                                onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " +
 -                                    "[state=" + tx.state() + ", tx=" + this + ']'));
++                                onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
 +                                    "prepare [state=" + tx.state() + ", tx=" + this + ']'));
 +
 +                            return;
                          }
 -                        else
 -                            onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
 -                                "prepare [state=" + tx.state() + ", tx=" + this + ']'));
  
 -                        return;
 -                    }
 +                        GridDiscoveryTopologySnapshot snapshot = topFut.topologySnapshot();
  
 -                    GridDiscoveryTopologySnapshot snapshot = topFut.topologySnapshot();
 +                        tx.topologyVersion(snapshot.topologyVersion());
 +                        tx.topologySnapshot(snapshot);
  
 -                    tx.topologyVersion(snapshot.topologyVersion());
 -                    tx.topologySnapshot(snapshot);
 +                        // Make sure to add future before calling prepare.
 +                        cctx.mvcc().addFuture(this);
  
 -                    // Make sure to add future before calling prepare.
 -                    cctx.mvcc().addFuture(this);
 +                        prepare0();
 +                    }
 +                    catch (IgniteTxTimeoutException | IgniteTxOptimisticException e) {
 +                        onError(cctx.localNodeId(), null, e);
 +                    }
 +                    catch (IgniteCheckedException e) {
 +                        tx.setRollbackOnly();
  
 -                    prepare0();
 -                }
 -                catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) {
 -                    onError(cctx.localNodeId(), null, e);
 -                }
 -                catch (IgniteCheckedException e) {
 -                    tx.setRollbackOnly();
 +                        String msg = "Failed to prepare transaction (will attempt rollback): " + this;
  
 -                    String msg = "Failed to prepare transaction (will attempt rollback): " + this;
 +                        U.error(log, msg, e);
  
 -                    U.error(log, msg, e);
 +                        tx.rollbackAsync();
  
-                         onError(null, null, new IgniteTxRollbackException(msg, e));
 -                    tx.rollbackAsync();
++                        onError(null, null, new IgniteTxRollbackCheckedException(msg, e));
 +                    }
 +                }
 +                else {
 +                    topFut.syncNotify(false);
  
 -                    onError(null, null, new IgniteTxRollbackCheckedException(msg, e));
 +                    topFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() {
 +                        @Override public void apply(IgniteInternalFuture<Long> t) {
 +                            prepare();
 +                        }
 +                    });
                  }
              }
 -            else {
 -                topFut.syncNotify(false);
 -
 -                topFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() {
 -                    @Override public void apply(IgniteInternalFuture<Long> t) {
 -                        prepare();
 -                    }
 -                });
 +            finally {
 +                topologyReadUnlock();
              }
          }
 -        finally {
 -            topologyReadUnlock();
 -        }
 +        else
 +            preparePessimistic();
      }
  
      /**
@@@ -469,19 -491,6 +473,19 @@@
          ConcurrentLinkedDeque8<GridDistributedTxMapping<K, V>> mappings =
              new ConcurrentLinkedDeque8<>();
  
 +        if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
 +            for (int cacheId : tx.activeCacheIds()) {
 +                GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 +
 +                if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
-                     onDone(new ClusterTopologyException("Failed to map keys for cache (all " +
++                    onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " +
 +                        "partition nodes left the grid): " + cacheCtx.name()));
 +
 +                    return;
 +                }
 +            }
 +        }
 +
          // Assign keys to primary nodes.
          GridDistributedTxMapping<K, V> cur = null;
  
@@@ -730,8 -646,14 +734,8 @@@
              assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx +
                  ", nodeId=" + n.id() + ']';
  
 -            MiniFuture fut = new MiniFuture(m, mappings);
 -
 -            req.miniId(fut.futureId());
 -
 -            add(fut); // Append new future.
 -
              try {
-                 cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+                 cctx.io().send(n, req, tx.ioPolicy());
              }
              catch (IgniteCheckedException e) {
                  // Fail the whole thing.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 497ed5f,9d5f73d..e3cb897
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@@ -20,12 -20,11 +20,13 @@@ package org.apache.ignite.internal.proc
  import org.apache.ignite.internal.*;
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
 +import org.apache.ignite.internal.util.tostring.*;
 +import org.apache.ignite.lang.*;
  import org.apache.ignite.internal.processors.cache.transactions.*;
- import org.apache.ignite.internal.util.direct.*;
  import org.apache.ignite.internal.util.typedef.*;
  import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.plugin.extensions.communication.*;
  import org.jetbrains.annotations.*;
  
  import java.io.*;
@@@ -56,17 -55,9 +57,16 @@@ public class GridNearTxPrepareRequest<K
  
      /** IDs of backup nodes receiving last prepare request during this prepare. */
      @GridDirectCollection(UUID.class)
 +    @GridToStringInclude
      private Collection<UUID> lastBackups;
  
 +    /** Need return value flag. */
 +    private boolean retVal;
 +
 +    /** Implicit single flag. */
 +    private boolean implicitSingle;
 +
      /** Subject ID. */
-     @GridDirectVersion(1)
      private UUID subjId;
  
      /** Task name hash. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------


Mime
View raw message