ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [6/8] incubator-ignite git commit: Revert "ignite-471: fixed NPE in PortableMarshaller"
Date Sat, 16 May 2015 07:07:41 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 54b59b8..08fcaf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
@@ -94,6 +93,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param isolation Isolation.
      * @param timeout Timeout.
      * @param txSize Expected transaction size.
+     * @param grpLockKey Group lock key if this is a group-lock transaction.
+     * @param partLock If this is a group-lock transaction and the whole partition should be locked.
      */
     protected GridDhtTxLocalAdapter(
         GridCacheSharedContext cctx,
@@ -109,11 +110,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         boolean invalidate,
         boolean storeEnabled,
         int txSize,
+        @Nullable IgniteTxKey grpLockKey,
+        boolean partLock,
         @Nullable UUID subjId,
         int taskNameHash
     ) {
         super(cctx, xidVer, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, invalidate,
-            storeEnabled, txSize, subjId, taskNameHash);
+            storeEnabled, txSize, grpLockKey, partLock, subjId, taskNameHash);
 
         assert cctx != null;
 
@@ -729,6 +732,68 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override protected void addGroupTxMapping(Collection<IgniteTxKey> keys) {
+        assert groupLock();
+
+        for (GridDistributedTxMapping mapping : dhtMap.values())
+            mapping.entries(Collections.unmodifiableCollection(txMap.values()), true);
+
+        // Here we know that affinity key for all given keys is our group lock key.
+        // Just add entries to dht mapping.
+        // Add near readers. If near cache is disabled on all nodes, do nothing.
+        Collection<UUID> backupIds = dhtMap.keySet();
+
+        Map<ClusterNode, List<GridDhtCacheEntry>> locNearMap = null;
+
+        for (IgniteTxKey key : keys) {
+            IgniteTxEntry txEntry = entry(key);
+
+            if (!txEntry.groupLockEntry() || txEntry.context().isNear())
+                continue;
+
+            assert txEntry.cached() instanceof GridDhtCacheEntry : "Invalid entry type: " + txEntry.cached();
+
+            while (true) {
+                try {
+                    GridDhtCacheEntry entry = (GridDhtCacheEntry)txEntry.cached();
+
+                    Collection<UUID> readers = entry.readers();
+
+                    if (!F.isEmpty(readers)) {
+                        Collection<ClusterNode> nearNodes = cctx.discovery().nodes(readers, F0.notEqualTo(nearNodeId()),
+                            F.notIn(backupIds));
+
+                        if (log.isDebugEnabled())
+                            log.debug("Mapping entry to near nodes [nodes=" + U.nodeIds(nearNodes) + ", entry=" +
+                                entry + ']');
+
+                        for (ClusterNode n : nearNodes) {
+                            if (locNearMap == null)
+                                locNearMap = new HashMap<>();
+
+                            List<GridDhtCacheEntry> entries = locNearMap.get(n);
+
+                            if (entries == null)
+                                locNearMap.put(n, entries = new LinkedList<>());
+
+                            entries.add(entry);
+                        }
+                    }
+
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    // Retry.
+                    txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()));
+                }
+            }
+        }
+
+        if (locNearMap != null)
+            addNearNodeEntryMapping(locNearMap);
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
     @Override public boolean finish(boolean commit) throws IgniteCheckedException {
         if (log.isDebugEnabled())
@@ -820,32 +885,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      */
     protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut);
 
-    /**
-     * @return {@code True} if transaction is finished on prepare step.
-     */
-    protected final boolean commitOnPrepare() {
-        return onePhaseCommit() && !near();
-    }
-
-    /**
-     * @param prepFut Prepare future.
-     * @return If transaction if finished on prepare step returns future which is completed after transaction finish.
-     */
-    protected final IgniteInternalFuture<GridNearTxPrepareResponse> chainOnePhasePrepare(
-        final GridDhtTxPrepareFuture prepFut) {
-        if (commitOnPrepare()) {
-            return finishFuture().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridNearTxPrepareResponse>() {
-                @Override public GridNearTxPrepareResponse applyx(IgniteInternalFuture<IgniteInternalTx> finishFut)
-                    throws IgniteCheckedException
-                {
-                    return prepFut.get();
-                }
-            });
-        }
-
-        return prepFut;
-    }
-
     /** {@inheritDoc} */
     @Override public void rollback() throws IgniteCheckedException {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
index ba2c35f..d207d76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
@@ -28,7 +28,7 @@ import java.util.*;
 /**
  * DHT transaction mapping.
  */
-public class GridDhtTxMapping {
+public class GridDhtTxMapping<K, V> {
     /** Transaction nodes mapping (primary node -> related backup nodes). */
     private final Map<UUID, Collection<UUID>> txNodes = new GridLeanMap<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 3056ae5..3a1a80a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -50,32 +50,19 @@ import static org.apache.ignite.transactions.TransactionState.*;
  *
  */
 @SuppressWarnings("unchecked")
-public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
-    implements GridCacheMvccFuture<GridNearTxPrepareResponse> {
+public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
+    implements GridCacheMvccFuture<IgniteInternalTx> {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
-    /** */
-    private static final IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse> REDUCER =
-        new IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse>() {
-            @Override public boolean collect(IgniteInternalTx e) {
-                return true;
-            }
-
-            @Override public GridNearTxPrepareResponse reduce() {
-                // Nothing to aggregate.
-                return null;
-            }
-        };
-
     /** Logger. */
     private static IgniteLogger log;
 
     /** Context. */
-    private GridCacheSharedContext<?, ?> cctx;
+    private GridCacheSharedContext<K, V> cctx;
 
     /** Future ID. */
     private IgniteUuid futId;
@@ -141,13 +128,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     /** */
     private boolean invoke;
 
+    /** */
+    private IgniteInClosure<GridNearTxPrepareResponse> completeCb;
+
     /**
      * @param cctx Context.
      * @param tx Transaction.
      * @param nearMiniId Near mini future id.
      * @param dhtVerMap DHT versions map.
      * @param last {@code True} if this is last prepare operation for node.
-     * @param retVal Return value flag.
      * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
      */
     public GridDhtTxPrepareFuture(
@@ -157,9 +146,19 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         Map<IgniteTxKey, GridCacheVersion> dhtVerMap,
         boolean last,
         boolean retVal,
-        Collection<UUID> lastBackups
+        Collection<UUID> lastBackups,
+        IgniteInClosure<GridNearTxPrepareResponse> completeCb
     ) {
-        super(REDUCER);
+        super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
+            @Override public boolean collect(IgniteInternalTx e) {
+                return true;
+            }
+
+            @Override public IgniteInternalTx reduce() {
+                // Nothing to aggregate.
+                return tx;
+            }
+        });
 
         this.cctx = cctx;
         this.tx = tx;
@@ -179,6 +178,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
         this.retVal = retVal;
 
+        this.completeCb = completeCb;
+
         assert dhtMap != null;
         assert nearMap != null;
     }
@@ -381,7 +382,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @param t Error.
      */
     public void onError(Throwable t) {
-        onDone(null, t);
+        onDone(tx, t);
     }
 
     /**
@@ -414,7 +415,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         if (log.isDebugEnabled())
             log.debug("Marking all local candidates as ready: " + this);
 
-        Iterable<IgniteTxEntry> checkEntries = writes;
+        Iterable<IgniteTxEntry> checkEntries = tx.groupLock() ?
+            Collections.singletonList(tx.groupLockEntry()) : writes;
 
         for (IgniteTxEntry txEntry : checkEntries) {
             GridCacheContext cacheCtx = txEntry.context();
@@ -430,8 +432,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 txEntry.cached(entry);
             }
 
-            if (tx.optimistic() && txEntry.explicitVersion() == null)
-                lockKeys.add(txEntry.txKey());
+            if (tx.optimistic() && txEntry.explicitVersion() == null) {
+                if (!tx.groupLock() || tx.groupLockKey().equals(entry.txKey()))
+                    lockKeys.add(txEntry.txKey());
+            }
 
             while (true) {
                 try {
@@ -475,7 +479,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(GridNearTxPrepareResponse res0, Throwable err) {
+    @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
         assert err != null || (initialized() && !hasPending()) : "On done called for prepare future that has " +
             "pending mini futures: " + this;
 
@@ -491,15 +495,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             // Must create prepare response before transaction is committed to grab correct return value.
             final GridNearTxPrepareResponse res = createPrepareResponse();
 
-            onComplete(res);
+            onComplete();
 
-            if (tx.commitOnPrepare()) {
+            if (!tx.near()) {
                 if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
                     IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ?
                         tx.commitAsync() : tx.rollbackAsync();
 
                     fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
-                        @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
+                        @Override
+                        public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
                             try {
                                 if (replied.compareAndSet(false, true))
                                     sendPrepareResponse(res);
@@ -525,17 +530,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         }
         else {
             if (replied.compareAndSet(false, true)) {
-                GridNearTxPrepareResponse res = createPrepareResponse();
-
                 try {
-                    sendPrepareResponse(res);
+                    sendPrepareResponse(createPrepareResponse());
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send prepare response for transaction: " + tx, e);
                 }
                 finally {
                     // Will call super.onDone().
-                    onComplete(res);
+                    onComplete();
                 }
 
                 return true;
@@ -559,12 +562,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     }
 
     /**
-     * @param res Response.
      * @throws IgniteCheckedException If failed to send response.
      */
     private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
         if (!tx.nearNodeId().equals(cctx.localNodeId()))
             cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
+        else {
+            assert completeCb != null;
+
+            completeCb.apply(res);
+        }
     }
 
     /**
@@ -609,10 +616,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             for (IgniteTxEntry e : writes) {
                 IgniteTxEntry txEntry = tx.entry(e.txKey());
 
-                assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']';
-
                 GridCacheContext cacheCtx = txEntry.context();
 
+                assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']';
+
                 while (true) {
                     try {
                         GridCacheEntryEx entry = txEntry.cached();
@@ -675,14 +682,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     /**
      * Completeness callback.
      *
-     * @param res Response.
      * @return {@code True} if {@code done} flag was changed as a result of this call.
      */
-    private boolean onComplete(@Nullable GridNearTxPrepareResponse res) {
+    private boolean onComplete() {
         if (last || tx.isSystemInvalidate())
             tx.state(PREPARED);
 
-        if (super.onDone(res, err.get())) {
+        if (super.onDone(tx, err.get())) {
             // Don't forget to clean up.
             cctx.mvcc().removeFuture(this);
 
@@ -696,7 +702,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * Completes this future.
      */
     public void complete() {
-        onComplete(null);
+        onComplete();
     }
 
     /**
@@ -711,7 +717,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         if (tx.empty()) {
             tx.setRollbackOnly();
 
-            onDone((GridNearTxPrepareResponse)null);
+            onDone(tx);
         }
 
         this.reads = reads;
@@ -800,6 +806,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         tx,
                         dhtWrites,
                         nearWrites,
+                        tx.groupLockKey(),
+                        tx.partitionLock(),
                         txNodes,
                         tx.nearXidVersion(),
                         true,
@@ -813,11 +821,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         try {
                             GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
 
-                            GridCacheContext<?, ?> cacheCtx = cached.context();
+                            GridCacheContext<K, V> cacheCtx = cached.context();
 
                             if (entry.explicitVersion() == null) {
                                 GridCacheMvccCandidate added = cached.candidate(version());
 
+                                assert added != null || entry.groupLockEntry() :
+                                    "Null candidate for non-group-lock entry " +
+                                        "[added=" + added + ", entry=" + entry + ']';
                                 assert added == null || added.dhtLocal() :
                                     "Got non-dht-local candidate for prepare future " +
                                         "[added=" + added + ", entry=" + entry + ']';
@@ -898,6 +909,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                             tx,
                             null,
                             nearMapping.writes(),
+                            tx.groupLockKey(),
+                            tx.partitionLock(),
                             tx.transactionNodes(),
                             tx.nearXidVersion(),
                             true,
@@ -910,6 +923,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                                 if (entry.explicitVersion() == null) {
                                     GridCacheMvccCandidate added = entry.cached().candidate(version());
 
+                                    assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " +
+                                        "[added=" + added + ", entry=" + entry + ']';
                                     assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
                                         "[added=" + added + ", entry=" + entry + ']';
 
@@ -962,7 +977,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
         GridCacheContext cacheCtx = entry.context();
 
-        GridDhtCacheAdapter<?, ?> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
+        GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
 
         ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(entry);
 
@@ -1219,7 +1234,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED);
 
                 for (GridCacheEntryInfo info : res.preloadEntries()) {
-                    GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(info.cacheId());
+                    GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId());
 
                     while (true) {
                         GridCacheEntryEx entry = cacheCtx.cache().entryEx(info.key());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 73f86fd..c033273 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -104,6 +104,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
      * @param tx Transaction.
      * @param dhtWrites DHT writes.
      * @param nearWrites Near writes.
+     * @param grpLockKey Group lock key if preparing group-lock transaction.
+     * @param partLock {@code True} if group-lock transaction locks partition.
      * @param txNodes Transaction nodes mapping.
      * @param nearXidVer Near transaction ID.
      * @param last {@code True} if this is last prepare request for node.
@@ -116,13 +118,15 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         GridDhtTxLocalAdapter tx,
         Collection<IgniteTxEntry> dhtWrites,
         Collection<IgniteTxEntry> nearWrites,
+        IgniteTxKey grpLockKey,
+        boolean partLock,
         Map<UUID, Collection<UUID>> txNodes,
         GridCacheVersion nearXidVer,
         boolean last,
         boolean onePhaseCommit,
         UUID subjId,
         int taskNameHash) {
-        super(tx, null, dhtWrites, txNodes, onePhaseCommit);
+        super(tx, null, dhtWrites, grpLockKey, partLock, txNodes, onePhaseCommit);
 
         assert futId != null;
         assert miniId != null;
@@ -333,79 +337,79 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         }
 
         switch (writer.state()) {
-            case 23:
+            case 25:
                 if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 24:
+            case 26:
                 if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
                     return false;
 
                 writer.incrementState();
 
-            case 25:
+            case 27:
                 if (!writer.writeBoolean("last", last))
                     return false;
 
                 writer.incrementState();
 
-            case 26:
+            case 28:
                 if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 27:
+            case 29:
                 if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
-            case 28:
+            case 30:
                 if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 29:
+            case 31:
                 if (!writer.writeMessage("nearXidVer", nearXidVer))
                     return false;
 
                 writer.incrementState();
 
-            case 30:
+            case 32:
                 if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 31:
+            case 33:
                 if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 32:
+            case 34:
                 if (!writer.writeBitSet("preloadKeys", preloadKeys))
                     return false;
 
                 writer.incrementState();
 
-            case 33:
+            case 35:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 34:
+            case 36:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 35:
+            case 37:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -427,7 +431,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
             return false;
 
         switch (reader.state()) {
-            case 23:
+            case 25:
                 futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
@@ -435,7 +439,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 24:
+            case 26:
                 invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
 
                 if (!reader.isLastRead())
@@ -443,7 +447,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 25:
+            case 27:
                 last = reader.readBoolean("last");
 
                 if (!reader.isLastRead())
@@ -451,7 +455,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 26:
+            case 28:
                 miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
@@ -459,7 +463,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 27:
+            case 29:
                 nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
@@ -467,7 +471,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 28:
+            case 30:
                 nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -475,7 +479,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 29:
+            case 31:
                 nearXidVer = reader.readMessage("nearXidVer");
 
                 if (!reader.isLastRead())
@@ -483,7 +487,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 30:
+            case 32:
                 ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -491,7 +495,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 31:
+            case 33:
                 ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -499,7 +503,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 32:
+            case 34:
                 preloadKeys = reader.readBitSet("preloadKeys");
 
                 if (!reader.isLastRead())
@@ -507,7 +511,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 33:
+            case 35:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -515,7 +519,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 34:
+            case 36:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -523,7 +527,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
-            case 35:
+            case 37:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -543,6 +547,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 36;
+        return 38;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 0a69910..30464a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -77,6 +77,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
      * @param timeout Timeout.
      * @param ctx Cache context.
      * @param txSize Expected transaction size.
+     * @param grpLockKey Group lock key if this is a group-lock transaction.
      * @param nearXidVer Near transaction ID.
      * @param txNodes Transaction nodes mapping.
      */
@@ -96,13 +97,14 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         boolean invalidate,
         long timeout,
         int txSize,
+        @Nullable IgniteTxKey grpLockKey,
         GridCacheVersion nearXidVer,
         Map<UUID, Collection<UUID>> txNodes,
         @Nullable UUID subjId,
         int taskNameHash
     ) {
         super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
-            txSize, subjId, taskNameHash);
+            txSize, grpLockKey, subjId, taskNameHash);
 
         assert nearNodeId != null;
         assert rmtFutId != null;
@@ -137,6 +139,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
      * @param timeout Timeout.
      * @param ctx Cache context.
      * @param txSize Expected transaction size.
+     * @param grpLockKey Group lock key if transaction is group-lock.
      */
     public GridDhtTxRemote(
         GridCacheSharedContext ctx,
@@ -155,11 +158,12 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         boolean invalidate,
         long timeout,
         int txSize,
+        @Nullable IgniteTxKey grpLockKey,
         @Nullable UUID subjId,
         int taskNameHash
     ) {
         super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
-            txSize, subjId, taskNameHash);
+            txSize, grpLockKey, subjId, taskNameHash);
 
         assert nearNodeId != null;
         assert rmtFutId != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
index 098ec97..8da4da5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
@@ -86,9 +86,7 @@ public class GridNoStorageCacheMap extends GridCacheConcurrentMap {
         boolean create)
     {
         if (create) {
-            GridCacheMapEntry entry = ctx.useOffheapEntry() ?
-                new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0) :
-                new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0);
+            GridCacheMapEntry entry = new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0);
 
             return new GridTriple<>(entry, null, null);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 19d88e0..905f7bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -124,9 +124,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 GridCacheMapEntry next,
                 int hdrId)
             {
-                if (ctx.useOffheapEntry())
-                    return new GridDhtAtomicOffHeapCacheEntry(ctx, topVer, key, hash, val, next, hdrId);
-
                 return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val, next, hdrId);
             }
         });

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
deleted file mode 100644
index 91a8e65..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-
-/**
- * DHT atomic cache entry for off-heap tiered or off-heap values modes.
- */
-public class GridDhtAtomicOffHeapCacheEntry extends GridDhtAtomicCacheEntry {
-    /** Off-heap value pointer. */
-    private long valPtr;
-
-    /**
-     * @param ctx    Cache context.
-     * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
-     * @param key    Cache key.
-     * @param hash   Key hash value.
-     * @param val    Entry value.
-     * @param next   Next entry in the linked list.
-     * @param hdrId  Header id.
-     */
-    public GridDhtAtomicOffHeapCacheEntry(GridCacheContext ctx,
-        AffinityTopologyVersion topVer,
-        KeyCacheObject key,
-        int hash,
-        CacheObject val,
-        GridCacheMapEntry next,
-        int hdrId) {
-        super(ctx, topVer, key, hash, val, next, hdrId);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean hasOffHeapPointer() {
-        return valPtr != 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long offHeapPointer() {
-        return valPtr;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void offHeapPointer(long valPtr) {
-        this.valPtr = valPtr;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 05b3c7b..c92d9ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -86,9 +86,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                 GridCacheMapEntry next,
                 int hdrId)
             {
-                if (ctx.useOffheapEntry())
-                    return new GridDhtColocatedOffHeapCacheEntry(ctx, topVer, key, hash, val, next, hdrId);
-
                 return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val, next, hdrId);
             }
         });
@@ -129,7 +126,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         boolean allowDetached
     ) {
         return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ?
-            createEntry(key) : entryExx(key, topVer);
+            new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0) : entryExx(key, topVer);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index e6a4eaf..f10baa3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -754,6 +754,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                                         mappedKeys.size(),
                                         inTx() ? tx.size() : mappedKeys.size(),
                                         inTx() && tx.syncCommit(),
+                                        inTx() ? tx.groupLockKey() : null,
+                                        inTx() && tx.partitionLock(),
                                         inTx() ? tx.subjectId() : null,
                                         inTx() ? tx.taskNameHash() : 0,
                                         read ? accessTtl : -1L,
@@ -1088,6 +1090,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             // If primary node left the grid before lock acquisition, fail the whole future.
             throw newTopologyException(null, primary.id());
 
+        if (inTx() && tx.groupLock() && !primary.isLocal())
+            throw new IgniteCheckedException("Failed to start group lock transaction (local node is not primary for " +
+                " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']');
+
         if (mapping == null || !primary.id().equals(mapping.node().id()))
             mapping = new GridNearLockMapping(primary, key);
         else
@@ -1275,18 +1281,25 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
                         GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
 
-                        if (res.dhtVersion(i) == null) {
-                            onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
-                                "(will fail the lock): " + res));
+                        try {
+                            if (res.dhtVersion(i) == null) {
+                                onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+                                    "(will fail the lock): " + res));
 
-                            return;
-                        }
+                                return;
+                            }
 
-                        // Set value to detached entry.
-                        entry.resetFromPrimary(newVal, dhtVer);
+                            // Set value to detached entry.
+                            entry.resetFromPrimary(newVal, dhtVer);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+
+                            return;
+                        }
                     }
                     else
                         cctx.mvcc().markExplicitOwner(k, threadId);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java
deleted file mode 100644
index ed842ad..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
-
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-
-/**
- * Cache entry for colocated cache for off-heap tiered or off-heap values modes.
- */
-public class GridDhtColocatedOffHeapCacheEntry extends GridDhtColocatedCacheEntry {
-    /** Off-heap value pointer. */
-    private long valPtr;
-
-    /**
-     * @param ctx    Cache context.
-     * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
-     * @param key    Cache key.
-     * @param hash   Key hash value.
-     * @param val    Entry value.
-     * @param next   Next entry in the linked list.
-     * @param hdrId  Header id.
-     */
-    public GridDhtColocatedOffHeapCacheEntry(GridCacheContext ctx,
-        AffinityTopologyVersion topVer,
-        KeyCacheObject key,
-        int hash,
-        CacheObject val,
-        GridCacheMapEntry next,
-        int hdrId) {
-        super(ctx, topVer, key, hash, val, next, hdrId);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean hasOffHeapPointer() {
-        return valPtr != 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long offHeapPointer() {
-        return valPtr;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void offHeapPointer(long valPtr) {
-        this.valPtr = valPtr;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 2c84bd4..5c4dd13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -46,8 +46,10 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
      *
      * @param val Value.
      * @param ver Version.
+     * @throws IgniteCheckedException If value unmarshalling failed.
      */
-    public void resetFromPrimary(CacheObject val, GridCacheVersion ver) {
+    public void resetFromPrimary(CacheObject val, GridCacheVersion ver)
+        throws IgniteCheckedException {
         value(val);
 
         this.ver = ver;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4b8db00..45d332c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1252,7 +1252,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 cctx.kernalContext().timeout().removeTimeoutObject(old);
 
             GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter(
-                cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) {
+                cctx.gridConfig().getNetworkTimeout() * cctx.gridConfig().getCacheConfiguration().length) {
                 @Override public void onTimeout() {
                     if (isDone())
                         return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 8258b14..145e980 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -76,9 +76,6 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
             ) {
                 // Can't hold any locks here - this method is invoked when
                 // holding write-lock on the whole cache map.
-                if (ctx.useOffheapEntry())
-                    return new GridNearOffHeapCacheEntry(ctx, key, hash, val, next, hdrId);
-
                 return new GridNearCacheEntry(ctx, key, hash, val, next, hdrId);
             }
         });

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 797fd32..c7fa4ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -204,13 +204,15 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
      * @param topVer Topology version.
      * @return {@code True} if reset was done.
      * @throws GridCacheEntryRemovedException If obsolete.
+     * @throws IgniteCheckedException If failed.
      */
+    @SuppressWarnings( {"RedundantTypeArguments"})
     public boolean resetFromPrimary(CacheObject val,
         GridCacheVersion ver,
         GridCacheVersion dhtVer,
         UUID primaryNodeId,
         AffinityTopologyVersion topVer)
-        throws GridCacheEntryRemovedException
+        throws GridCacheEntryRemovedException, IgniteCheckedException
     {
         assert dhtVer != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 0ffb4e5..a427b65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -890,6 +890,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                                             mappedKeys.size(),
                                             inTx() ? tx.size() : mappedKeys.size(),
                                             inTx() && tx.syncCommit(),
+                                            inTx() ? tx.groupLockKey() : null,
+                                            inTx() && tx.partitionLock(),
                                             inTx() ? tx.subjectId() : null,
                                             inTx() ? tx.taskNameHash() : 0,
                                             read ? accessTtl : -1L,
@@ -1186,6 +1188,10 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
             // If primary node left the grid before lock acquisition, fail the whole future.
             throw newTopologyException(null, primary.id());
 
+        if (inTx() && tx.groupLock() && !primary.isLocal())
+            throw new IgniteCheckedException("Failed to start group lock transaction (local node is not primary for " +
+                " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']');
+
         if (mapping == null || !primary.id().equals(mapping.node().id()))
             mapping = new GridNearLockMapping(primary, key);
         else
@@ -1444,6 +1450,11 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                             // Replace old entry with new one.
                             entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
                         }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+
+                            return;
+                        }
                     }
 
                     i++;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index e71dd65..1ba4bfe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -104,6 +105,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
      * @param keyCnt Number of keys.
      * @param txSize Expected transaction size.
      * @param syncCommit Synchronous commit flag.
+     * @param grpLockKey Group lock key if this is a group-lock transaction.
+     * @param partLock If partition is locked.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
      * @param accessTtl TTL for read operation.
@@ -127,6 +130,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         int keyCnt,
         int txSize,
         boolean syncCommit,
+        @Nullable IgniteTxKey grpLockKey,
+        boolean partLock,
         @Nullable UUID subjId,
         int taskNameHash,
         long accessTtl,
@@ -146,6 +151,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
             timeout,
             keyCnt,
             txSize,
+            grpLockKey,
+            partLock,
             skipStore);
 
         assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
@@ -349,79 +356,79 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         }
 
         switch (writer.state()) {
-            case 21:
+            case 23:
                 if (!writer.writeLong("accessTtl", accessTtl))
                     return false;
 
                 writer.incrementState();
 
-            case 22:
+            case 24:
                 if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 23:
+            case 25:
                 if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 24:
+            case 26:
                 if (!writer.writeBoolean("hasTransforms", hasTransforms))
                     return false;
 
                 writer.incrementState();
 
-            case 25:
+            case 27:
                 if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
                     return false;
 
                 writer.incrementState();
 
-            case 26:
+            case 28:
                 if (!writer.writeBoolean("implicitTx", implicitTx))
                     return false;
 
                 writer.incrementState();
 
-            case 27:
+            case 29:
                 if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
-            case 28:
+            case 30:
                 if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
                     return false;
 
                 writer.incrementState();
 
-            case 29:
+            case 31:
                 if (!writer.writeBoolean("retVal", retVal))
                     return false;
 
                 writer.incrementState();
 
-            case 30:
+            case 32:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 31:
+            case 33:
                 if (!writer.writeBoolean("syncCommit", syncCommit))
                     return false;
 
                 writer.incrementState();
 
-            case 32:
+            case 34:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 33:
+            case 35:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -443,7 +450,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
             return false;
 
         switch (reader.state()) {
-            case 21:
+            case 23:
                 accessTtl = reader.readLong("accessTtl");
 
                 if (!reader.isLastRead())
@@ -451,7 +458,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 22:
+            case 24:
                 dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
 
                 if (!reader.isLastRead())
@@ -459,7 +466,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 23:
+            case 25:
                 filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
 
                 if (!reader.isLastRead())
@@ -467,7 +474,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 24:
+            case 26:
                 hasTransforms = reader.readBoolean("hasTransforms");
 
                 if (!reader.isLastRead())
@@ -475,7 +482,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 25:
+            case 27:
                 implicitSingleTx = reader.readBoolean("implicitSingleTx");
 
                 if (!reader.isLastRead())
@@ -483,7 +490,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 26:
+            case 28:
                 implicitTx = reader.readBoolean("implicitTx");
 
                 if (!reader.isLastRead())
@@ -491,7 +498,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 27:
+            case 29:
                 miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
@@ -499,7 +506,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 28:
+            case 30:
                 onePhaseCommit = reader.readBoolean("onePhaseCommit");
 
                 if (!reader.isLastRead())
@@ -507,7 +514,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 29:
+            case 31:
                 retVal = reader.readBoolean("retVal");
 
                 if (!reader.isLastRead())
@@ -515,7 +522,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 30:
+            case 32:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -523,7 +530,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 31:
+            case 33:
                 syncCommit = reader.readBoolean("syncCommit");
 
                 if (!reader.isLastRead())
@@ -531,7 +538,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 32:
+            case 34:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -539,7 +546,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
                 reader.incrementState();
 
-            case 33:
+            case 35:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -559,7 +566,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 34;
+        return 36;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a6dd007/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java
deleted file mode 100644
index 25eb869..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.internal.processors.cache.*;
-
-/**
- * Near cache entry for off-heap tiered or off-heap values modes.
- */
-public class GridNearOffHeapCacheEntry extends GridNearCacheEntry {
-    /** Off-heap value pointer. */
-    private long valPtr;
-
-    /**
-     * @param ctx   Cache context.
-     * @param key   Cache key.
-     * @param hash  Key hash value.
-     * @param val   Entry value.
-     * @param next  Next entry in the linked list.
-     * @param hdrId Header id.
-     */
-    public GridNearOffHeapCacheEntry(GridCacheContext ctx,
-        KeyCacheObject key,
-        int hash,
-        CacheObject val,
-        GridCacheMapEntry next,
-        int hdrId) {
-        super(ctx, key, hash, val, next, hdrId);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean hasOffHeapPointer() {
-        return valPtr != 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long offHeapPointer() {
-        return valPtr;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void offHeapPointer(long valPtr) {
-        this.valPtr = valPtr;
-    }
-}


Mime
View raw message