ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject ignite git commit: IGNITE-264 - Addressing review comments.
Date Tue, 08 Sep 2015 22:36:26 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-264 1b737d702 -> 29ce3f94d


IGNITE-264 - Addressing review comments.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/29ce3f94
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/29ce3f94
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/29ce3f94

Branch: refs/heads/ignite-264
Commit: 29ce3f94db38c611cdf43eaaade75c2266f4777c
Parents: 1b737d7
Author: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Authored: Tue Sep 8 15:36:18 2015 -0700
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Tue Sep 8 15:36:18 2015 -0700

----------------------------------------------------------------------
 .../dht/GridDhtTransactionalCacheAdapter.java   | 504 +++++++++----------
 .../distributed/dht/GridDhtTxPrepareFuture.java |  24 +-
 .../cache/transactions/IgniteTxHandler.java     |  64 ++-
 3 files changed, 276 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/29ce3f94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index d2c9613..b9514a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -709,325 +709,311 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V>
extends GridDhtCach
         @Nullable final CacheEntryPredicate[] filter0) {
         final List<KeyCacheObject> keys = req.keys();
 
-        IgniteInternalFuture<Object> keyFut = null;
+        CacheEntryPredicate[] filter = filter0;
 
-        if (keyFut == null)
-            keyFut = new GridFinishedFuture<>();
+        // Set message into thread context.
+        GridDhtTxLocal tx = null;
 
-        return new GridEmbeddedFuture<>(keyFut,
-            new C2<Object, Exception, IgniteInternalFuture<GridNearLockResponse>>()
{
-                @Override public IgniteInternalFuture<GridNearLockResponse> apply(Object
o, Exception exx) {
-                    if (exx != null)
-                        return new GridDhtFinishedFuture<>(exx);
+        try {
+            int cnt = keys.size();
 
-                    CacheEntryPredicate[] filter = filter0;
+            if (req.inTx()) {
+                GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
 
-                    // Set message into thread context.
-                    GridDhtTxLocal tx = null;
+                if (dhtVer != null)
+                    tx = ctx.tm().tx(dhtVer);
+            }
 
-                    try {
-                        int cnt = keys.size();
+            final List<GridCacheEntryEx> entries = new ArrayList<>(cnt);
 
-                        if (req.inTx()) {
-                            GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
+            // Unmarshal filter first.
+            if (filter == null)
+                filter = req.filter();
 
-                            if (dhtVer != null)
-                                tx = ctx.tm().tx(dhtVer);
-                        }
+            GridDhtLockFuture fut = null;
 
-                        final List<GridCacheEntryEx> entries = new ArrayList<>(cnt);
+            if (!req.inTx()) {
+                GridDhtPartitionTopology top = null;
 
-                        // Unmarshal filter first.
-                        if (filter == null)
-                            filter = req.filter();
+                if (req.firstClientRequest()) {
+                    assert CU.clientNode(nearNode);
 
-                        GridDhtLockFuture fut = null;
+                    top = topology();
 
-                        if (!req.inTx()) {
-                            GridDhtPartitionTopology top = null;
+                    topology().readLock();
+                }
 
-                            if (req.firstClientRequest()) {
-                                assert CU.clientNode(nearNode);
+                try {
+                    if (top != null && needRemap(req.topologyVersion(), top.topologyVersion()))
{
+                        if (log.isDebugEnabled()) {
+                            log.debug("Client topology version mismatch, need remap lock
request [" +
+                                "reqTopVer=" + req.topologyVersion() +
+                                ", locTopVer=" + top.topologyVersion() +
+                                ", req=" + req + ']');
+                        }
 
-                                top = topology();
+                        GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+                            req,
+                            top.topologyVersion());
 
-                                topology().readLock();
-                            }
+                        return new GridFinishedFuture<>(res);
+                    }
 
-                            try {
-                                if (top != null && needRemap(req.topologyVersion(),
top.topologyVersion())) {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("Client topology version mismatch, need
remap lock request [" +
-                                            "reqTopVer=" + req.topologyVersion() +
-                                            ", locTopVer=" + top.topologyVersion() +
-                                            ", req=" + req + ']');
-                                    }
+                    fut = new GridDhtLockFuture(ctx,
+                        nearNode.id(),
+                        req.version(),
+                        req.topologyVersion(),
+                        cnt,
+                        req.txRead(),
+                        req.needReturnValue(),
+                        req.timeout(),
+                        tx,
+                        req.threadId(),
+                        req.accessTtl(),
+                        filter,
+                        req.skipStore());
+
+                    // Add before mapping.
+                    if (!ctx.mvcc().addFuture(fut))
+                        throw new IllegalStateException("Duplicate future ID: " + fut);
+                }
+                finally {
+                    if (top != null)
+                        top.readUnlock();
+                }
+            }
 
-                                    GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
-                                        req,
-                                        top.topologyVersion());
+            boolean timedout = false;
 
-                                    return new GridFinishedFuture<>(res);
-                                }
+            for (KeyCacheObject key : keys) {
+                if (timedout)
+                    break;
 
-                                fut = new GridDhtLockFuture(ctx,
-                                    nearNode.id(),
-                                    req.version(),
-                                    req.topologyVersion(),
-                                    cnt,
-                                    req.txRead(),
-                                    req.needReturnValue(),
-                                    req.timeout(),
-                                    tx,
-                                    req.threadId(),
-                                    req.accessTtl(),
-                                    filter,
-                                    req.skipStore());
+                while (true) {
+                    // Specify topology version to make sure containment is checked
+                    // based on the requested version, not the latest.
+                    GridDhtCacheEntry entry = entryExx(key, req.topologyVersion());
 
-                                // Add before mapping.
-                                if (!ctx.mvcc().addFuture(fut))
-                                    throw new IllegalStateException("Duplicate future ID:
" + fut);
-                            }
-                            finally {
-                                if (top != null)
-                                    top.readUnlock();
-                            }
-                        }
+                    try {
+                        if (fut != null) {
+                            // This method will add local candidate.
+                            // Entry cannot become obsolete after this method succeeded.
+                            fut.addEntry(key == null ? null : entry);
 
-                        boolean timedout = false;
+                            if (fut.isDone()) {
+                                timedout = true;
 
-                        for (KeyCacheObject key : keys) {
-                            if (timedout)
                                 break;
+                            }
+                        }
 
-                            while (true) {
-                                // Specify topology version to make sure containment is checked
-                                // based on the requested version, not the latest.
-                                GridDhtCacheEntry entry = entryExx(key, req.topologyVersion());
+                        entries.add(entry);
 
-                                try {
-                                    if (fut != null) {
-                                        // This method will add local candidate.
-                                        // Entry cannot become obsolete after this method
succeeded.
-                                        fut.addEntry(key == null ? null : entry);
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry when adding lock (will retry): "
+ entry);
+                    }
+                    catch (GridDistributedLockCancelledException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got lock request for cancelled lock (will ignore):
" +
+                                entry);
 
-                                        if (fut.isDone()) {
-                                            timedout = true;
+                        fut.onError(e);
 
-                                            break;
-                                        }
-                                    }
+                        return new GridDhtFinishedFuture<>(e);
+                    }
+                }
+            }
 
-                                    entries.add(entry);
+            // Handle implicit locks for pessimistic transactions.
+            if (req.inTx()) {
+                if (tx == null) {
+                    GridDhtPartitionTopology top = null;
 
-                                    break;
-                                }
-                                catch (GridCacheEntryRemovedException ignore) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got removed entry when adding lock (will
retry): " + entry);
-                                }
-                                catch (GridDistributedLockCancelledException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got lock request for cancelled lock (will
ignore): " +
-                                            entry);
+                    if (req.firstClientRequest()) {
+                        assert CU.clientNode(nearNode);
 
-                                    fut.onError(e);
+                        top = topology();
 
-                                    return new GridDhtFinishedFuture<>(e);
-                                }
+                        topology().readLock();
+                    }
+
+                    try {
+                        if (top != null && needRemap(req.topologyVersion(), top.topologyVersion()))
{
+                            if (log.isDebugEnabled()) {
+                                log.debug("Client topology version mismatch, need remap lock
request [" +
+                                    "reqTopVer=" + req.topologyVersion() +
+                                    ", locTopVer=" + top.topologyVersion() +
+                                    ", req=" + req + ']');
                             }
-                        }
 
-                        // Handle implicit locks for pessimistic transactions.
-                        if (req.inTx()) {
-                            if (tx == null) {
-                                GridDhtPartitionTopology top = null;
+                            GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
+                                req,
+                                top.topologyVersion());
 
-                                if (req.firstClientRequest()) {
-                                    assert CU.clientNode(nearNode);
+                            return new GridFinishedFuture<>(res);
+                        }
 
-                                    top = topology();
+                        tx = new GridDhtTxLocal(
+                            ctx.shared(),
+                            nearNode.id(),
+                            req.version(),
+                            req.futureId(),
+                            req.miniId(),
+                            req.threadId(),
+                            req.implicitTx(),
+                            req.implicitSingleTx(),
+                            ctx.systemTx(),
+                            false,
+                            ctx.ioPolicy(),
+                            PESSIMISTIC,
+                            req.isolation(),
+                            req.timeout(),
+                            req.isInvalidate(),
+                            !req.skipStore(),
+                            false,
+                            req.txSize(),
+                            null,
+                            req.subjectId(),
+                            req.taskNameHash());
 
-                                    topology().readLock();
-                                }
+                        tx.syncCommit(req.syncCommit());
 
-                                try {
-                                    if (top != null && needRemap(req.topologyVersion(),
top.topologyVersion())) {
-                                        if (log.isDebugEnabled()) {
-                                            log.debug("Client topology version mismatch,
need remap lock request [" +
-                                                "reqTopVer=" + req.topologyVersion() +
-                                                ", locTopVer=" + top.topologyVersion() +
-                                                ", req=" + req + ']');
-                                        }
+                        tx = ctx.tm().onCreated(null, tx);
 
-                                        GridNearLockResponse res = sendClientLockRemapResponse(nearNode,
-                                            req,
-                                            top.topologyVersion());
+                        if (tx == null || !tx.init()) {
+                            String msg = "Failed to acquire lock (transaction has been completed):
" +
+                                req.version();
 
-                                        return new GridFinishedFuture<>(res);
-                                    }
+                            U.warn(log, msg);
 
-                                    tx = new GridDhtTxLocal(
-                                        ctx.shared(),
-                                        nearNode.id(),
-                                        req.version(),
-                                        req.futureId(),
-                                        req.miniId(),
-                                        req.threadId(),
-                                        req.implicitTx(),
-                                        req.implicitSingleTx(),
-                                        ctx.systemTx(),
-                                        false,
-                                        ctx.ioPolicy(),
-                                        PESSIMISTIC,
-                                        req.isolation(),
-                                        req.timeout(),
-                                        req.isInvalidate(),
-                                        !req.skipStore(),
-                                        false,
-                                        req.txSize(),
-                                        null,
-                                        req.subjectId(),
-                                        req.taskNameHash());
-
-                                    tx.syncCommit(req.syncCommit());
-
-                                    tx = ctx.tm().onCreated(null, tx);
-
-                                    if (tx == null || !tx.init()) {
-                                        String msg = "Failed to acquire lock (transaction
has been completed): " +
-                                            req.version();
-
-                                        U.warn(log, msg);
-
-                                        if (tx != null)
-                                            tx.rollback();
-
-                                        return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
-                                    }
+                            if (tx != null)
+                                tx.rollback();
 
-                                    tx.topologyVersion(req.topologyVersion());
-                                }
-                                finally {
-                                    if (top != null)
-                                        top.readUnlock();
-                                }
-                            }
+                            return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
+                        }
 
-                            ctx.tm().txContext(tx);
+                        tx.topologyVersion(req.topologyVersion());
+                    }
+                    finally {
+                        if (top != null)
+                            top.readUnlock();
+                    }
+                }
 
-                            if (log.isDebugEnabled())
-                                log.debug("Performing DHT lock [tx=" + tx + ", entries="
+ entries + ']');
+                ctx.tm().txContext(tx);
 
-                            IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(
-                                cacheCtx,
+                if (log.isDebugEnabled())
+                    log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries +
']');
+
+                IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(
+                    cacheCtx,
+                    entries,
+                    req.messageId(),
+                    req.txRead(),
+                    req.needReturnValue(),
+                    req.accessTtl(),
+                    req.skipStore());
+
+                final GridDhtTxLocal t = tx;
+
+                return new GridDhtEmbeddedFuture(
+                    txFut,
+                    new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>()
{
+                        @Override public IgniteInternalFuture<GridNearLockResponse>
apply(
+                            GridCacheReturn o, Exception e) {
+                            if (e != null)
+                                e = U.unwrap(e);
+
+                            assert !t.empty();
+
+                            // Create response while holding locks.
+                            final GridNearLockResponse resp = createLockReply(nearNode,
                                 entries,
-                                req.messageId(),
-                                req.txRead(),
-                                req.needReturnValue(),
-                                req.accessTtl(),
-                                req.skipStore());
+                                req,
+                                t,
+                                t.xidVersion(),
+                                e);
+
+                            if (resp.error() == null && t.onePhaseCommit()) {
+                                assert t.implicit();
+
+                                return t.commitAsync().chain(
+                                    new C1<IgniteInternalFuture<IgniteInternalTx>,
GridNearLockResponse>() {
+                                        @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx>
f) {
+                                            try {
+                                                // Check for error.
+                                                f.get();
+                                            }
+                                            catch (IgniteCheckedException e1) {
+                                                resp.error(e1);
+                                            }
 
-                            final GridDhtTxLocal t = tx;
-
-                            return new GridDhtEmbeddedFuture(
-                                txFut,
-                                new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>()
{
-                                    @Override public IgniteInternalFuture<GridNearLockResponse>
apply(
-                                        GridCacheReturn o, Exception e) {
-                                        if (e != null)
-                                            e = U.unwrap(e);
-
-                                        assert !t.empty();
-
-                                        // Create response while holding locks.
-                                        final GridNearLockResponse resp = createLockReply(nearNode,
-                                            entries,
-                                            req,
-                                            t,
-                                            t.xidVersion(),
-                                            e);
-
-                                        if (resp.error() == null && t.onePhaseCommit())
{
-                                            assert t.implicit();
-
-                                            return t.commitAsync().chain(
-                                                new C1<IgniteInternalFuture<IgniteInternalTx>,
GridNearLockResponse>() {
-                                                    @Override public GridNearLockResponse
apply(IgniteInternalFuture<IgniteInternalTx> f) {
-                                                        try {
-                                                            // Check for error.
-                                                            f.get();
-                                                        }
-                                                        catch (IgniteCheckedException e1)
{
-                                                            resp.error(e1);
-                                                        }
-
-                                                        sendLockReply(nearNode, t, req, resp);
-
-                                                        return resp;
-                                                    }
-                                                });
-                                        }
-                                        else {
                                             sendLockReply(nearNode, t, req, resp);
 
-                                            return new GridFinishedFuture<>(resp);
+                                            return resp;
                                         }
-                                    }
-                                }
-                            );
+                                    });
+                            }
+                            else {
+                                sendLockReply(nearNode, t, req, resp);
+
+                                return new GridFinishedFuture<>(resp);
+                            }
                         }
-                        else {
-                            assert fut != null;
+                    }
+                );
+            }
+            else {
+                assert fut != null;
 
-                            // This will send remote messages.
-                            fut.map();
+                // This will send remote messages.
+                fut.map();
 
-                            final GridCacheVersion mappedVer = fut.version();
+                final GridCacheVersion mappedVer = fut.version();
 
-                            return new GridDhtEmbeddedFuture<>(
-                                new C2<Boolean, Exception, GridNearLockResponse>()
{
-                                    @Override public GridNearLockResponse apply(Boolean b,
Exception e) {
-                                        if (e != null)
-                                            e = U.unwrap(e);
-                                        else if (!b)
-                                            e = new GridCacheLockTimeoutException(req.version());
+                return new GridDhtEmbeddedFuture<>(
+                    new C2<Boolean, Exception, GridNearLockResponse>() {
+                        @Override public GridNearLockResponse apply(Boolean b, Exception
e) {
+                            if (e != null)
+                                e = U.unwrap(e);
+                            else if (!b)
+                                e = new GridCacheLockTimeoutException(req.version());
 
-                                        GridNearLockResponse res = createLockReply(nearNode,
-                                            entries,
-                                            req,
-                                            null,
-                                            mappedVer,
-                                            e);
+                            GridNearLockResponse res = createLockReply(nearNode,
+                                entries,
+                                req,
+                                null,
+                                mappedVer,
+                                e);
 
-                                        sendLockReply(nearNode, null, req, res);
+                            sendLockReply(nearNode, null, req, res);
 
-                                        return res;
-                                    }
-                                },
-                                fut);
+                            return res;
                         }
-                    }
-                    catch (IgniteCheckedException e) {
-                        String err = "Failed to unmarshal at least one of the keys for lock
request message: " + req;
-
-                        U.error(log, err, e);
+                    },
+                    fut);
+            }
+        }
+        catch (IgniteCheckedException | RuntimeException e) {
+            String err = "Failed to unmarshal at least one of the keys for lock request message:
" + req;
 
-                        if (tx != null) {
-                            try {
-                                tx.rollback();
-                            }
-                            catch (IgniteCheckedException ex) {
-                                U.error(log, "Failed to rollback the transaction: " + tx,
ex);
-                            }
-                        }
+            U.error(log, err, e);
 
-                        return new GridDhtFinishedFuture<>(
-                            new IgniteCheckedException(err, e));
-                    }
+            if (tx != null) {
+                try {
+                    tx.rollback();
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(log, "Failed to rollback the transaction: " + tx, ex);
                 }
             }
-        );
+
+            return new GridDhtFinishedFuture<>(
+                new IgniteCheckedException(err, e));
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/29ce3f94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 5b05698..89fc0ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -339,15 +339,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
 
                 if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
{
-                    CacheObject val;
-
                     cached.unswap(retVal);
 
                     boolean readThrough = (retVal || hasFilters) &&
                         cacheCtx.config().isLoadPreviousValue() &&
                         !txEntry.skipStore();
 
-                    val = cached.innerGet(
+                    CacheObject val = cached.innerGet(
                         tx,
                         /*swap*/true,
                         readThrough,
@@ -688,26 +686,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     }
 
     /**
-     * Checks if transaction involves a near-enabled cache on originating node.
-     *
-     * @return {@code True} if originating node has a near cache enabled and that cache participates
in
-     *      the transaction.
-     */
-    private boolean originatingNodeHasNearCache() {
-        ClusterNode node = cctx.discovery().node(tx.originatingNodeId());
-
-        if (node == null)
-            return false;
-
-        for (int cacheId : tx.activeCacheIds()) {
-            if (cctx.discovery().cacheNearNode(node, cctx.cacheContext(cacheId).name()))
-                return true;
-        }
-
-        return false;
-    }
-
-    /**
      * @param res Response being sent.
      */
     private void addDhtValues(GridNearTxPrepareResponse res) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/29ce3f94/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 5ef5629..9efa43a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -59,10 +59,8 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.typedef.C2;
+import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
@@ -213,42 +211,40 @@ public class IgniteTxHandler {
         final GridNearTxLocal locTx,
         final GridNearTxPrepareRequest req
     ) {
-        IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO
force preload keys.
-
-        return new GridEmbeddedFuture<>(
-            fut,
-            new C2<Object, Exception, IgniteInternalFuture<GridNearTxPrepareResponse>>()
{
-                @Override public IgniteInternalFuture<GridNearTxPrepareResponse> apply(Object
o, Exception ex) {
-                    if (ex != null)
-                        throw new GridClosureException(ex);
-
-                    IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
-                        req.reads(),
-                        req.writes(),
-                        req.transactionNodes(),
-                        req.last(),
-                        req.lastBackups());
-
-                    if (locTx.isRollbackOnly())
-                        locTx.rollbackAsync();
-
-                    return fut;
+        IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
+            req.reads(),
+            req.writes(),
+            req.transactionNodes(),
+            req.last(),
+            req.lastBackups());
+
+        if (locTx.isRollbackOnly())
+            locTx.rollbackAsync();
+
+        return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>,
GridNearTxPrepareResponse>() {
+            @Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse>
f) {
+                try {
+                    return f.get();
                 }
-            },
-            new C2<GridNearTxPrepareResponse, Exception, GridNearTxPrepareResponse>()
{
-                @Nullable @Override public GridNearTxPrepareResponse apply(GridNearTxPrepareResponse
res, Exception e) {
-                    if (e != null) {
-                        locTx.setRollbackOnly(); // Just in case.
+                catch (Exception e) {
+                    locTx.setRollbackOnly(); // Just in case.
 
-                        if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
-                            !X.hasCause(e, IgniteFutureCancelledException.class))
-                            U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
-                    }
+                    if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
+                        !X.hasCause(e, IgniteFutureCancelledException.class))
+                        U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
 
-                    return res;
+                    return new GridNearTxPrepareResponse(
+                        req.version(),
+                        req.futureId(),
+                        req.miniId(),
+                        req.version(),
+                        req.version(),
+                        null,
+                        e,
+                        null);
                 }
             }
-        );
+        });
     }
 
     /**


Mime
View raw message