ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [06/27] ignite git commit: IGNITE-1525 Return value for cache operation can be lost with onePhaseCommit
Date Fri, 23 Sep 2016 16:05:40 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e67e60f..a5b2202 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
@@ -175,6 +177,12 @@ public class IgniteTxHandler {
             }
         });
 
+        ctx.io().addHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage msg) {
+                processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg);
+            }
+        });
+
         ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
             @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                 processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg);
@@ -882,7 +890,7 @@ public class IgniteTxHandler {
      * @param nodeId Sender node ID.
      * @param req Request.
      */
-    protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest req) {
+    protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
         if (txPrepareMsgLog.isDebugEnabled()) {
             txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() +
                 ", dhtTxId=" + req.version() +
@@ -918,14 +926,15 @@ public class IgniteTxHandler {
 
                 if (dhtTx != null) {
                     dhtTx.onePhaseCommit(true);
+                    dhtTx.needReturnValue(req.needReturnValue());
 
-                    finish(nodeId, dhtTx, req);
+                    finish(dhtTx, req);
                 }
 
                 if (nearTx != null) {
                     nearTx.onePhaseCommit(true);
 
-                    finish(nodeId, nearTx, req);
+                    finish(nearTx, req);
                 }
             }
         }
@@ -950,38 +959,60 @@ public class IgniteTxHandler {
                 req.deployInfo() != null);
         }
 
-        try {
-            // Reply back to sender.
-            ctx.io().send(nodeId, res, req.policy());
+        if (req.onePhaseCommit()) {
+            IgniteInternalFuture completeFut;
 
-            if (txPrepareMsgLog.isDebugEnabled()) {
-                txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() +
-                    ", dhtTxId=" + req.version() +
-                    ", node=" + nodeId + ']');
-            }
-        }
-        catch (IgniteCheckedException e) {
-            if (e instanceof ClusterTopologyCheckedException) {
-                if (txPrepareMsgLog.isDebugEnabled()) {
-                    txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() +
-                        ", dhtTxId=" + req.version() +
-                        ", node=" + nodeId + ']');
-                }
-            }
-            else {
-                U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" +
-                    "txId=" + req.nearXidVersion() +
-                    ", dhtTxId=" + req.version() +
-                    ", node=" + nodeId +
-                    ", err=" + e.getMessage() + ']');
+            IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
+                null : dhtTx.done() ? null : dhtTx.finishFuture();
+
+            final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
+                null : nearTx.done() ? null : nearTx.finishFuture();
+
+            if (dhtFin != null && nearFin != null) {
+                GridCompoundFuture fut = new GridCompoundFuture();
+
+                fut.add(dhtFin);
+                fut.add(nearFin);
+
+                fut.markInitialized();
+
+                completeFut = fut;
             }
+            else
+                completeFut = dhtFin != null ? dhtFin : nearFin;
 
-            if (nearTx != null)
-                nearTx.rollback();
+            if (completeFut != null) {
+                final GridDhtTxPrepareResponse res0 = res;
+                final GridDhtTxRemote dhtTx0 = dhtTx;
+                final GridNearTxRemote nearTx0 = nearTx;
 
-            if (dhtTx != null)
-                dhtTx.rollback();
+                completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+                    @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                        sendReply(nodeId, req, res0, dhtTx0, nearTx0);
+                    }
+                });
+            }
+            else
+                sendReply(nodeId, req, res, dhtTx, nearTx);
         }
+        else
+            sendReply(nodeId, req, res, dhtTx, nearTx);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
+        final GridDhtTxOnePhaseCommitAckRequest req) {
+        assert nodeId != null;
+        assert req != null;
+
+        if (log.isDebugEnabled())
+            log.debug("Processing dht tx one phase commit ack request [nodeId=" + nodeId + ", req=" + req + ']');
+
+        for (GridCacheVersion ver : req.versions())
+            ctx.tm().removeTxReturn(ver);
     }
 
     /**
@@ -1139,12 +1170,10 @@ public class IgniteTxHandler {
     }
 
     /**
-     * @param nodeId Node ID.
      * @param tx Transaction.
      * @param req Request.
      */
     protected void finish(
-        UUID nodeId,
         GridDistributedTxRemoteAdapter tx,
         GridDhtTxPrepareRequest req) throws IgniteTxHeuristicCheckedException {
         assert tx != null : "No transaction for one-phase commit prepare request: " + req;
@@ -1177,6 +1206,52 @@ public class IgniteTxHandler {
     }
 
     /**
+     * @param nodeId Node id.
+     * @param req Request.
+     * @param res Response.
+     * @param dhtTx Dht tx.
+     * @param nearTx Near tx.
+     */
+    protected void sendReply(UUID nodeId,
+        GridDhtTxPrepareRequest req,
+        GridDhtTxPrepareResponse res,
+        GridDhtTxRemote dhtTx,
+        GridNearTxRemote nearTx) {
+        try {
+            // Reply back to sender.
+            ctx.io().send(nodeId, res, req.policy());
+
+            if (txPrepareMsgLog.isDebugEnabled()) {
+                txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() +
+                    ", dhtTxId=" + req.version() +
+                    ", node=" + nodeId + ']');
+            }
+        }
+        catch (IgniteCheckedException e) {
+            if (e instanceof ClusterTopologyCheckedException) {
+                if (txPrepareMsgLog.isDebugEnabled()) {
+                    txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() +
+                        ", dhtTxId=" + req.version() +
+                        ", node=" + nodeId + ']');
+                }
+            }
+            else {
+                U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" +
+                    "txId=" + req.nearXidVersion() +
+                    ", dhtTxId=" + req.version() +
+                    ", node=" + nodeId +
+                    ", err=" + e.getMessage() + ']');
+            }
+
+            if (nearTx != null)
+                nearTx.rollback();
+
+            if (dhtTx != null)
+                dhtTx.rollback();
+        }
+    }
+
+    /**
      * Sends tx finish response to remote node, if response is requested.
      *
      * @param nodeId Node id that originated finish request.
@@ -1191,7 +1266,26 @@ public class IgniteTxHandler {
             if (req.checkCommitted()) {
                 res.checkCommitted(true);
 
-                if (!committed) {
+                if (committed) {
+                    if (req.needReturnValue()) {
+                        try {
+                            GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version());
+
+                            if (wrapper != null)
+                                res.returnValue(wrapper.fut().get());
+                            else
+                                assert !ctx.discovery().alive(nodeId) : nodeId;
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (txFinishMsgLog.isDebugEnabled()) {
+                                txFinishMsgLog.debug("Failed to gain entry processor return value. [txId=" + nearTxId +
+                                    ", dhtTxId=" + req.version() +
+                                    ", node=" + nodeId + ']');
+                            }
+                        }
+                    }
+                }
+                else {
                     ClusterTopologyCheckedException cause =
                         new ClusterTopologyCheckedException("Primary node left grid.");
 
@@ -1492,8 +1586,7 @@ public class IgniteTxHandler {
      * @param req Request.
      */
     protected void processCheckPreparedTxRequest(final UUID nodeId,
-        final GridCacheTxRecoveryRequest req)
-    {
+        final GridCacheTxRecoveryRequest req) {
         if (txRecoveryMsgLog.isDebugEnabled()) {
             txRecoveryMsgLog.debug("Received tx recovery request [txId=" + req.nearXidVersion() +
                 ", node=" + nodeId + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 637f322..fe69536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -151,9 +151,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     /** Commit error. */
     protected volatile Throwable commitErr;
 
-    /** Need return value. */
-    protected boolean needRetVal;
-
     /** Implicit transaction result. */
     protected GridCacheReturn implicitRes;
 
@@ -355,13 +352,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /**
-     * @return Flag indicating whether transaction needs return value.
-     */
-    public boolean needReturnValue() {
-        return needRetVal;
-    }
-
-    /**
      * @return {@code True} if transaction participates in a cache that has an interceptor configured.
      */
     public boolean hasInterceptor() {
@@ -369,13 +359,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /**
-     * @param needRetVal Need return value flag.
-     */
-    public void needReturnValue(boolean needRetVal) {
-        this.needRetVal = needRetVal;
-    }
-
-    /**
      * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent
      * to remote nodes.
      */
@@ -703,7 +686,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                         txEntry.cached().unswap(false);
 
                                     IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
-                                        true);
+                                        true, null);
 
                                     GridCacheVersion dhtVer = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index f9357f9..a1580a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -49,7 +50,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -57,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLo
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
@@ -87,8 +92,11 @@ import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
 import org.jsr166.ConcurrentLinkedHashMap;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
@@ -123,6 +131,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /** Tx salvage timeout (default 3s). */
     private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
 
+    /** One phase commit deferred ack request timeout. */
+    public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
+        Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT, 500);
+
+    /** One phase commit deferred ack request buffer size. */
+    private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
+        Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256);
+
     /** Version in which deadlock detection introduced. */
     public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
 
@@ -160,7 +176,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
 
     /** Committed local transactions. */
-    private final ConcurrentLinkedHashMap<GridCacheVersion, Boolean> completedVersHashMap =
+    private final ConcurrentLinkedHashMap<GridCacheVersion, Object> completedVersHashMap =
         new ConcurrentLinkedHashMap<>(
             Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
             0.75f,
@@ -168,6 +184,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT),
             PER_SEGMENT_Q);
 
+    /** Pending one phase commit ack requests sender. */
+    private GridDeferredAckMessageSender deferredAckMessageSender;
+
     /** Transaction finish synchronizer. */
     private GridCacheTxFinishSync txFinishSync;
 
@@ -209,6 +228,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
                     for (TxDeadlockFuture fut : deadlockDetectFuts.values())
                         fut.onNodeLeft(nodeId);
+
+                    for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
+                        Object obj = entry.getValue();
+
+                        if (obj instanceof GridCacheReturnCompletableWrapper &&
+                            nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
+                            removeTxReturn(entry.getKey());
+                    }
                 }
             },
             EVT_NODE_FAILED, EVT_NODE_LEFT);
@@ -237,6 +264,33 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         txFinishSync = new GridCacheTxFinishSync<>(cctx);
 
         txHnd = new IgniteTxHandler(cctx);
+
+        deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+            @Override public int getTimeout() {
+                return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
+            }
+
+            @Override public int getBufferSize() {
+                return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE;
+            }
+
+            @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
+                GridDhtTxOnePhaseCommitAckRequest ackReq = new GridDhtTxOnePhaseCommitAckRequest(vers);
+
+                cctx.kernalContext().gateway().readLock();
+
+                try {
+                    cctx.io().send(nodeId, ackReq, GridIoPolicy.SYSTEM_POOL);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Failed to send one phase commit ack to backup node [backup=" +
+                        nodeId + ']', e);
+                }
+                finally {
+                    cctx.kernalContext().gateway().readUnlock();
+                }
+            }
+        };
     }
 
     /** {@inheritDoc} */
@@ -898,9 +952,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      */
     public void addCommittedTx(IgniteInternalTx tx) {
         addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion());
+    }
 
-        if (!tx.local() && !tx.near() && tx.onePhaseCommit())
-            addCommittedTx(tx, tx.nearXidVersion(), null);
+    /**
+     * @param tx Committed transaction.
+     */
+    public void addCommittedTxReturn(IgniteInternalTx tx, GridCacheReturnCompletableWrapper ret) {
+        addCommittedTxReturn(tx.nearXidVersion(), null, ret);
     }
 
     /**
@@ -925,7 +983,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         if (nearXidVer != null)
             xidVer = new CommittedVersion(xidVer, nearXidVer);
 
-        Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
+        Object committed0 = completedVersHashMap.putIfAbsent(xidVer, true);
 
         if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
             Boolean b = completedVersSorted.putIfAbsent(xidVer, true);
@@ -933,7 +991,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             assert b == null;
         }
 
-        return committed0 == null || committed0;
+        Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
+
+        return committed0 == null || committed;
+    }
+
+    /**
+     * @param xidVer Completed transaction version.
+     * @param nearXidVer Optional near transaction ID.
+     * @param retVal Invoke result.
+     */
+    private void addCommittedTxReturn(
+        GridCacheVersion xidVer,
+        @Nullable GridCacheVersion nearXidVer,
+        GridCacheReturnCompletableWrapper retVal
+    ) {
+        assert retVal != null;
+
+        if (nearXidVer != null)
+            xidVer = new CommittedVersion(xidVer, nearXidVer);
+
+        Object prev = completedVersHashMap.putIfAbsent(xidVer, retVal);
+
+        assert prev == null || Boolean.FALSE.equals(prev) : prev; // Can be rolled back.
     }
 
     /**
@@ -945,7 +1025,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         IgniteInternalTx tx,
         GridCacheVersion xidVer
     ) {
-        Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
+        Object committed0 = completedVersHashMap.putIfAbsent(xidVer, false);
 
         if (committed0 == null && (tx == null || tx.needsCompletedVersions())) {
             Boolean b = completedVersSorted.putIfAbsent(xidVer, false);
@@ -953,7 +1033,47 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             assert b == null;
         }
 
-        return committed0 == null || !committed0;
+        Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
+
+        return committed0 == null || !committed;
+    }
+
+    /**
+     * @param xidVer xidVer Completed transaction version.
+     * @return Tx result.
+     */
+    public GridCacheReturnCompletableWrapper getCommittedTxReturn(GridCacheVersion xidVer) {
+        Object retVal = completedVersHashMap.get(xidVer);
+
+        // Will gain true in regular case or GridCacheReturn in onePhaseCommit case.
+        if (!Boolean.TRUE.equals(retVal)) {
+            assert !Boolean.FALSE.equals(retVal); // Method should be used only after 'committed' checked.
+
+            GridCacheReturnCompletableWrapper res = (GridCacheReturnCompletableWrapper)retVal;
+
+            removeTxReturn(xidVer);
+
+            return res;
+        }
+        else
+            return null;
+    }
+
+    /**
+     * @param xidVer xidVer Completed transaction version.
+     */
+    public void removeTxReturn(GridCacheVersion xidVer) {
+        Object prev = completedVersHashMap.get(xidVer);
+
+        if (Boolean.FALSE.equals(prev)) // Tx can be rolled back.
+            return;
+
+        assert prev instanceof GridCacheReturnCompletableWrapper:
+            prev + " instead of GridCacheReturnCompletableWrapper";
+
+        boolean res = completedVersHashMap.replace(xidVer, prev, true);
+
+        assert res;
     }
 
     /**
@@ -1086,7 +1206,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
          * so we don't do it here.
          */
 
-        Boolean committed = completedVersHashMap.get(tx.xidVersion());
+        Object committed0 = completedVersHashMap.get(tx.xidVersion());
+
+        Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE);
 
         // 1. Make sure that committed version has been recorded.
         if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
@@ -1672,12 +1794,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         boolean committed = false;
 
-        for (Map.Entry<GridCacheVersion, Boolean> entry : completedVersHashMap.entrySet()) {
+        for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
             if (entry.getKey() instanceof CommittedVersion) {
                 CommittedVersion comm = (CommittedVersion)entry.getKey();
 
                 if (comm.nearVer.equals(xidVer)) {
-                    committed = entry.getValue();
+                    committed = !entry.getValue().equals(Boolean.FALSE);
 
                     break;
                 }
@@ -1809,8 +1931,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         // Not all transactions were found. Need to scan committed versions to check
         // if transaction was already committed.
-        for (Map.Entry<GridCacheVersion, Boolean> e : completedVersHashMap.entrySet()) {
-            if (!e.getValue())
+        for (Map.Entry<GridCacheVersion, Object> e : completedVersHashMap.entrySet()) {
+            if (e.getValue().equals(Boolean.FALSE))
                 continue;
 
             GridCacheVersion ver = e.getKey();
@@ -2137,6 +2259,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param nodeId Node ID to send message to.
+     * @param ver Version to ack.
+     */
+    public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) {
+        deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver);
+    }
+
+    /**
      * @return Collection of active transaction deadlock detection futures.
      */
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index e611723..c3d194b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,7 +31,6 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
-
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
@@ -43,8 +43,10 @@ import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -59,6 +61,7 @@ import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
 import static org.apache.ignite.testframework.GridTestUtils.TestMemoryMode;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 
@@ -70,7 +73,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final long DURATION = 60_000;
+    protected static final long DURATION = 60_000;
 
     /** */
     protected static final int GRID_CNT = 4;
@@ -78,8 +81,8 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
     /**
      * @return Keys count for the test.
      */
-    private int keysCount() {
-        return 10_000;
+    protected int keysCount() {
+        return 2_000;
     }
 
     /**
@@ -249,12 +252,17 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
 
         IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
+                Random rnd = new Random();
+
                 while (!finished.get()) {
                     stopGrid(3);
 
                     U.sleep(300);
 
                     startGrid(3);
+
+                    if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another.
+                        awaitPartitionMapExchange();
                 }
 
                 return null;
@@ -456,6 +464,29 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
 
             assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty());
         }
+
+        checkOnePhaseCommitReturnValuesCleaned();
+    }
+
+    /**
+     *
+     */
+    protected void checkOnePhaseCommitReturnValuesCleaned() throws IgniteInterruptedCheckedException {
+        U.sleep(DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            IgniteKernal ignite = (IgniteKernal)grid(i);
+
+            IgniteTxManager tm = ignite.context().cache().context().tm();
+
+            Map completedVersHashMap = U.field(tm, "completedVersHashMap");
+
+            for (Object o : completedVersHashMap.values()) {
+                assertTrue("completedVersHashMap contains" + o.getClass() + " instead of boolean. " +
+                    "These values should be replaced by boolean after onePhaseCommit finished. " +
+                    "[node=" + i + "]", o instanceof Boolean);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 9204bc8..9bfde27 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.HashSet;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
@@ -88,16 +89,6 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public void testGetAndPut() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1525");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void testInvoke() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1525");
-    }
-
     /**
      * @throws Exception If failed.
      */
@@ -217,6 +208,70 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
     }
 
     /**
+     *
+     */
+    public void testOriginatingNodeFailureForcesOnePhaseCommitDataCleanup() throws Exception {
+        ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP, false));
+
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        final int keysCnt = keysCount();
+
+        IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                Random rnd = new Random();
+
+                while (!finished.get()) {
+                    stopGrid(0);
+
+                    U.sleep(300);
+
+                    startGrid(0);
+
+                    if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another.
+                        awaitPartitionMapExchange();
+                }
+
+                return null;
+            }
+        });
+
+        IgniteInternalFuture<Object> fut2 = runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int iter = 0;
+
+                while (!finished.get()) {
+                    try {
+                        IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
+
+                        Integer val = ++iter;
+
+                        for (int i = 0; i < keysCnt; i++)
+                            cache.invoke(i, new SetEntryProcessor(val));
+                    }
+                    catch (Exception e) {
+                        // No-op.
+                    }
+                }
+
+                return null;
+            }
+        });
+
+        try {
+            U.sleep(DURATION);
+        }
+        finally {
+            finished.set(true);
+
+            fut.get();
+            fut2.get();
+        }
+
+        checkOnePhaseCommitReturnValuesCleaned();
+    }
+
+    /**
      * Callable to process inside transaction.
      */
     private static class ProcessCallable implements Callable<Void> {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-client-mode.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-client-mode.properties b/modules/yardstick/config/benchmark-client-mode.properties
index ba5525f..f7c8347 100644
--- a/modules/yardstick/config/benchmark-client-mode.properties
+++ b/modules/yardstick/config/benchmark-client-mode.properties
@@ -70,6 +70,8 @@ CONFIGS="\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds ${ver}atomic-put-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds ${ver}atomic-put-get-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds ${ver}tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds ${ver}tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds ${ver}tx-invoke-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds ${ver}tx-put-get-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds ${ver}sql-query-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds ${ver}sql-query-join-1-backup,\

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-tx-win.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-tx-win.properties b/modules/yardstick/config/benchmark-tx-win.properties
index 73b857d..54a40b1 100644
--- a/modules/yardstick/config/benchmark-tx-win.properties
+++ b/modules/yardstick/config/benchmark-tx-win.properties
@@ -54,6 +54,8 @@ set DRIVER_HOSTS=localhost
 :: Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute).
 set CONFIGS=^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapBenchmark -sn IgniteNode -ds tx-put-offheap-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapValuesBenchmark -sn IgniteNode -ds tx-put-offheap-val-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,^

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-tx.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-tx.properties b/modules/yardstick/config/benchmark-tx.properties
index f3dbc24..0d5bb02 100644
--- a/modules/yardstick/config/benchmark-tx.properties
+++ b/modules/yardstick/config/benchmark-tx.properties
@@ -59,6 +59,8 @@ nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS}
 # Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute).
 CONFIGS="\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapBenchmark -sn IgniteNode -ds tx-put-offheap-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapValuesBenchmark -sn IgniteNode -ds tx-put-offheap-val-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,\

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-win.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-win.properties b/modules/yardstick/config/benchmark-win.properties
index b6ecd67..b75b5d6 100644
--- a/modules/yardstick/config/benchmark-win.properties
+++ b/modules/yardstick/config/benchmark-win.properties
@@ -59,6 +59,8 @@ set CONFIGS=^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds atomic-put-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds atomic-put-get-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,^
+-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds sql-query-1-backup,^
 -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds sql-query-join-1-backup,^

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark.properties b/modules/yardstick/config/benchmark.properties
index 67ef5ef..cfc1499 100644
--- a/modules/yardstick/config/benchmark.properties
+++ b/modules/yardstick/config/benchmark.properties
@@ -71,6 +71,8 @@ CONFIGS="\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds ${ver}atomic-put-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds ${ver}atomic-put-get-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds ${ver}tx-put-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds ${ver}tx-getAndPut-1-backup,\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds ${ver}tx-invoke-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds ${ver}tx-put-get-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds ${ver}sql-query-1-backup,\
 -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds ${ver}sql-query-join-1-backup,\

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
new file mode 100644
index 0000000..40e563c
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+
+/**
+ * Ignite benchmark that performs invoke operations.
+ */
+public class IgniteGetAndPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        int key = nextRandom(args.range());
+
+        cache.getAndPut(key, new SampleValue(key));
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<Integer, Object> cache() {
+        return ignite().cache("atomic");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
new file mode 100644
index 0000000..49ae985
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+
+/**
+ * Ignite benchmark that performs invoke operations.
+ */
+public class IgniteGetAndPutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
+    /** */
+    private IgniteTransactions txs;
+
+    /** */
+    private Callable<Void> clo;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+            ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+
+        txs = ignite().transactions();
+
+        clo = new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                int key = nextRandom(args.range());
+
+                cache.getAndPut(key, new SampleValue(key));
+
+                return null;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<Integer, Object> cache() {
+        return ignite().cache("tx");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
index 8f05598..64dc6b8 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java
@@ -17,12 +17,52 @@
 
 package org.apache.ignite.yardstick.cache;
 
+import java.util.Map;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.yardstick.IgniteBenchmarkUtils;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
 
 /**
  * Ignite benchmark that performs invoke operations.
  */
 public class IgniteInvokeTxBenchmark extends IgniteInvokeBenchmark {
+    /** */
+    private IgniteTransactions txs;
+
+    /** */
+    private Callable<Void> clo;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK"))
+            ignite().compute().broadcast(new WaitMapExchangeFinishCallable());
+
+        txs = ignite().transactions();
+
+        clo = new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                int key = nextRandom(args.range());
+
+                cache.invoke(key, new SetValueEntryProcessor(new SampleValue(key)));
+
+                return null;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo);
+
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteCache<Integer, Object> cache() {
         return ignite().cache("tx");


Mime
View raw message