ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] incubator-ignite git commit: # ignite-44
Date Wed, 24 Dec 2014 14:45:55 GMT
# ignite-44


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/928aa3d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/928aa3d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/928aa3d4

Branch: refs/heads/ignite-44
Commit: 928aa3d48c7a29dc101c10866a8c6bde66492953
Parents: 71ee2ee
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Dec 24 17:45:42 2014 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Dec 24 17:45:42 2014 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheInvokeResult.java     |   2 +-
 .../processors/cache/GridCacheAdapter.java      | 108 +++++-
 .../processors/cache/GridCacheReturn.java       |  57 ++--
 .../GridDistributedLockResponse.java            |  19 +-
 .../GridDistributedTxRemoteAdapter.java         |   6 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   9 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  13 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |   8 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  21 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  68 ++--
 .../distributed/near/GridNearLockResponse.java  |   1 +
 .../cache/transactions/IgniteTxAdapter.java     |  16 +-
 .../cache/transactions/IgniteTxEntry.java       |  77 +++--
 .../cache/transactions/IgniteTxHandler.java     |   2 +-
 .../transactions/IgniteTxLocalAdapter.java      | 339 +++++++++++++------
 .../cache/transactions/IgniteTxLocalEx.java     |  20 +-
 .../cache/IgniteCacheInvokeAbstractTest.java    | 163 ++++++++-
 .../cache/IgniteCacheTxInvokeTest.java          |  41 +++
 .../cache/GridCacheAbstractFullApiSelfTest.java | 115 ++++---
 .../cache/GridCacheAbstractSelfTest.java        |  15 +
 21 files changed, 817 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
index 50af119..5f472d7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
@@ -25,7 +25,7 @@ public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externaliz
     private static final long serialVersionUID = 0L;
 
     /** */
-    @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"})
+    @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "JavaAbbreviationUsage", "UnusedDeclaration"})
     private static Object GG_CLASS_ID;
 
     /** */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index b3e567c..62daeb9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -2194,22 +2194,97 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
     }
 
     /** {@inheritDoc} */
-    @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
+    @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(
+        final K key,
+        final EntryProcessor<K, V, T> entryProcessor,
+        final Object... args)
         throws EntryProcessorException {
-        // TODO IGNITE-44.
-        return null;
+        A.notNull(key, "key", entryProcessor, "entryProcessor");
+
+        if (keyCheck)
+            validateCacheKey(key);
+
+        ctx.denyOnLocalRead();
+
+        IgniteFuture<?> fut = asyncOp(new AsyncInOp(key) {
+            @Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) {
+                Map<? extends K, EntryProcessor> invokeMap =
+                    Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+
+                return tx.invokeAsync(ctx, false, invokeMap, args);
+            }
+
+            @Override public String toString() {
+                return "invokeAsync [key=" + key + ", entryProcessor=" + entryProcessor + ']';
+            }
+        });
+
+        IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut0 =
+            (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)fut;
+
+        return fut0.chain(new CX1<IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>, EntryProcessorResult<T>>() {
+            @Override public EntryProcessorResult<T> applyx(IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut)
+                throws IgniteCheckedException {
+                GridCacheReturn<Map<K, EntryProcessorResult<T>>> ret = fut.get();
+
+                Map<K, EntryProcessorResult<T>> resMap = ret.value();
+
+                assert resMap != null;
+                assert resMap.size() == 1 : resMap.size();
+
+                return resMap.values().iterator().next();
+            }
+        });
     }
 
     /** {@inheritDoc} */
-    @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
-        EntryProcessor<K, V, T> entryProcessor,
-        Object... args) {
-        // TODO IGNITE-44.
-        return null;
+    @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(
+        final Set<? extends K> keys,
+        final EntryProcessor<K, V, T> entryProcessor,
+        final Object... args) {
+        A.notNull(entryProcessor, "entryProcessor");
+
+        if (keyCheck)
+            validateCacheKeys(keys);
+
+        ctx.denyOnLocalRead();
+
+        IgniteFuture<?> fut = asyncOp(new AsyncInOp(keys) {
+            @Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) {
+                Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+                    @Override public EntryProcessor apply(K k) {
+                        return entryProcessor;
+                    }
+                });
+
+                return tx.invokeAsync(ctx, false, invokeMap, args);
+            }
+
+            @Override public String toString() {
+                return "invokeAllAsync [keys=" + keys + ", entryProcessor=" + entryProcessor + ']';
+            }
+        });
+
+        IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut0 =
+            (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)fut;
+
+        return fut0.chain(new CX1<IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>, Map<K, EntryProcessorResult<T>>>() {
+            @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut)
+                throws IgniteCheckedException {
+                GridCacheReturn<Map<K, EntryProcessorResult<T>>> ret = fut.get();
+
+                assert ret != null;
+
+                return ret.value();
+            }
+        });
     }
 
     /** {@inheritDoc} */
     @Override public void transform(final K key, final IgniteClosure<V, V> transformer) throws IgniteCheckedException {
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
+        /*
         A.notNull(key, "key", transformer, "valTransform");
 
         if (keyCheck)
@@ -2226,11 +2301,15 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
                 return "transform [key=" + key + ", valTransform=" + transformer + ']';
             }
         });
+        */
     }
 
     /** {@inheritDoc} */
     @Override public <R> R transformAndCompute(final K key, final IgniteClosure<V, IgniteBiTuple<V, R>> transformer)
         throws IgniteCheckedException {
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
+        /*
         A.notNull(key, "key", transformer, "transformer");
 
         if (keyCheck)
@@ -2250,6 +2329,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
                 return "transformAndCompute [key=" + key + ", valTransform=" + transformer + ']';
             }
         });
+        */
     }
 
     /** {@inheritDoc} */
@@ -2291,6 +2371,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> transformAsync(final K key, final IgniteClosure<V, V> transformer,
         @Nullable final GridCacheEntryEx<K, V> entry, final long ttl) {
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
+        /*
         A.notNull(key, "key", transformer, "transformer");
 
         if (keyCheck)
@@ -2307,6 +2390,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
                 return "transformAsync [key=" + key + ", valTransform=" + transformer + ']';
             }
         });
+        */
     }
 
     /** {@inheritDoc} */
@@ -2581,6 +2665,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
     /** {@inheritDoc} */
     @Override public void transformAll(@Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> m)
         throws IgniteCheckedException {
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
+        /*
         if (F.isEmpty(m))
             return;
 
@@ -2598,6 +2685,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
                 return "transformAll [map=" + m + ']';
             }
         });
+        */
     }
 
     /** {@inheritDoc} */
@@ -2640,6 +2728,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> transformAllAsync(@Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> m) {
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
+        /*
         if (F.isEmpty(m))
             return new GridFinishedFuture<>(ctx.kernalContext());
 
@@ -2657,6 +2748,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
                 return "transformAllAsync [map=" + m + ']';
             }
         });
+        */
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java
index e9c476a..ab05b34 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturn.java
@@ -14,7 +14,9 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.grid.util.tostring.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
 import java.io.*;
+import java.util.*;
 
 /**
  * Return value for cases where both, value and success flag need to be returned.
@@ -24,7 +26,7 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
     private static final long serialVersionUID = 0L;
 
     /** */
-    @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"})
+    @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "JavaAbbreviationUsage", "UnusedDeclaration"})
     private static Object GG_CLASS_ID;
 
     /** Value. */
@@ -42,13 +44,6 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
     }
 
     /**
-     * @param v Value.
-     */
-    public GridCacheReturn(V v) {
-        this.v = v;
-    }
-
-    /**
      *
      * @param success Success flag.
      */
@@ -93,17 +88,6 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
     }
 
     /**
-     * @param v Value.
-     * @return This instance for chaining.
-     */
-    public GridCacheReturn<V> valueIfNull(V v) {
-        if (this.v == null)
-            this.v = v;
-
-        return this;
-    }
-
-    /**
      * @return Success flag.
      */
     public boolean success() {
@@ -123,27 +107,34 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
     }
 
     /**
-     * @param v Value.
      * @param success Success flag.
      * @return This instance for chaining.
      */
-    public GridCacheReturn<V> setIfNull(V v, boolean success) {
-        if (this.v == null) {
-            this.v = v;
-            this.success = success;
-        }
+    public GridCacheReturn<V> success(boolean success) {
+        this.success = success;
 
         return this;
     }
 
     /**
-     * @param success Success flag.
-     * @return This instance for chaining.
+     * @param key Key.
+     * @param res Result.
      */
-    public GridCacheReturn<V> success(boolean success) {
-        this.success = success;
+    @SuppressWarnings("unchecked")
+    public synchronized void addEntryProcessResult(Object key, EntryProcessorResult<?> res) {
+        assert v == null || v instanceof Map : v;
+        assert key != null;
+        assert res != null;
 
-        return this;
+        HashMap<Object, EntryProcessorResult> resMap = (HashMap<Object, EntryProcessorResult>)v;
+
+        if (resMap == null) {
+            resMap = new HashMap<>();
+
+            v = (V)resMap;
+        }
+
+        resMap.put(key, res);
     }
 
     /** {@inheritDoc} */
@@ -157,11 +148,15 @@ public class GridCacheReturn<V> implements Externalizable, IgniteOptimizedMarsha
         out.writeObject(v);
     }
 
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         success = in.readBoolean();
         v = (V)in.readObject();
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() { return S.toString(GridCacheReturn.class, this); }
+    @Override public String toString() {
+        return S.toString(GridCacheReturn.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java
index 8edfc7c..76fd449 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -57,11 +57,15 @@ public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessag
     }
 
     /**
+     * @param cacheId Cache ID.
      * @param lockVer Lock version.
      * @param futId Future ID.
      * @param cnt Key count.
      */
-    public GridDistributedLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, int cnt) {
+    public GridDistributedLockResponse(int cacheId,
+        GridCacheVersion lockVer,
+        IgniteUuid futId,
+        int cnt) {
         super(lockVer, cnt);
 
         assert futId != null;
@@ -74,11 +78,15 @@ public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessag
     }
 
     /**
+     * @param cacheId Cache ID.
      * @param lockVer Lock ID.
      * @param futId Future ID.
      * @param err Error.
      */
-    public GridDistributedLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, Throwable err) {
+    public GridDistributedLockResponse(int cacheId,
+        GridCacheVersion lockVer,
+        IgniteUuid futId,
+        Throwable err) {
         super(lockVer, 0);
 
         assert futId != null;
@@ -89,12 +97,17 @@ public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessag
     }
 
     /**
+     * @param cacheId Cache ID.
      * @param lockVer Lock ID.
      * @param futId Future ID.
      * @param cnt Count.
      * @param err Error.
      */
-    public GridDistributedLockResponse(int cacheId, GridCacheVersion lockVer, IgniteUuid futId, int cnt, Throwable err) {
+    public GridDistributedLockResponse(int cacheId,
+        GridCacheVersion lockVer,
+        IgniteUuid futId,
+        int cnt,
+        Throwable err) {
         super(lockVer, cnt);
 
         assert futId != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 4a93646..d507d0d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -83,6 +83,8 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param timeout Timeout.
      * @param txSize Expected transaction size.
      * @param grpLockKey Group lock key if this is a group-lock transaction.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
      */
     public GridDistributedTxRemoteAdapter(
         GridCacheSharedContext<K, V> ctx,
@@ -325,7 +327,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
         else {
             // Copy values.
             entry.value(e.value(), e.hasWriteValue(), e.hasReadValue());
-            entry.transformClosures(e.transformClosures());
+            entry.entryProcessors(e.entryProcessors());
             entry.valueBytes(e.valueBytes());
             entry.op(e.op());
             entry.ttl(e.ttl());
@@ -481,7 +483,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     if (updateNearCache(cacheCtx, txEntry.key(), topVer))
                                         nearCached = cacheCtx.dht().near().peekExx(txEntry.key());
 
-                                    if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters()))
+                                    if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
                                         txEntry.cached().unswap(true, false);
 
                                     GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index dc8ddcb..b7ff63e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -215,7 +215,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                 req.keyBytes() != null ? req.keyBytes().get(i) : null,
                                 writeEntry == null ? null : writeEntry.value(),
                                 writeEntry == null ? null : writeEntry.valueBytes(),
-                                writeEntry == null ? null : writeEntry.transformClosures(),
+                                writeEntry == null ? null : writeEntry.entryProcessors(),
                                 drVer,
                                 req.accessTtl());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 0f11ecc..32e43a7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -340,8 +340,13 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
      * @return Future that will be completed when locks are acquired.
      */
     public IgniteFuture<IgniteTxEx<K, V>> prepareAsync(@Nullable Iterable<IgniteTxEntry<K, V>> reads,
-        @Nullable Iterable<IgniteTxEntry<K, V>> writes, Map<IgniteTxKey<K>, GridCacheVersion> verMap, long msgId,
-        IgniteUuid nearMiniId, Map<UUID, Collection<UUID>> txNodes, boolean last, Collection<UUID> lastBackups) {
+        @Nullable Iterable<IgniteTxEntry<K, V>> writes,
+        Map<IgniteTxKey<K>, GridCacheVersion> verMap,
+        long msgId,
+        IgniteUuid nearMiniId,
+        Map<UUID, Collection<UUID>> txNodes,
+        boolean last,
+        Collection<UUID> lastBackups) {
         assert optimistic();
 
         // In optimistic mode prepare still can be called explicitly from salvageTx.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index b752178..55d8f7a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -406,7 +406,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
             if (entry != null) {
                 entry.op(e.op()); // Absolutely must set operation, as default is DELETE.
                 entry.value(e.value(), e.hasWriteValue(), e.hasReadValue());
-                entry.transformClosures(e.transformClosures());
+                entry.entryProcessors(e.entryProcessors());
                 entry.valueBytes(e.valueBytes());
                 entry.ttl(e.ttl());
                 entry.filters(e.filters());
@@ -525,11 +525,13 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
 
                     cached.unswap(!read, read);
 
-                    IgniteTxEntry<K, V> w = writeEntries == null ? null : writeEntries.get(idx++);
+                    IgniteTxEntry<K, V>
+                        w = writeEntries == null ? null : writeEntries.get(idx++);
 
                     txEntry = addEntry(NOOP,
                         null,
                         null,
+                        null,
                         cached,
                         null,
                         CU.<K, V>empty(),
@@ -545,7 +547,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
                         txEntry.value(w.value(), w.hasWriteValue(), w.hasReadValue());
                         txEntry.valueBytes(w.valueBytes());
                         txEntry.drVersion(w.drVersion());
-                        txEntry.transformClosures(w.transformClosures());
+                        txEntry.entryProcessors(w.entryProcessors());
                         txEntry.ttl(w.ttl());
                         txEntry.filters(w.filters());
                         txEntry.drExpireTime(w.drExpireTime());
@@ -635,14 +637,13 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
                     postLockWrite(cacheCtx,
                         passedKeys,
                         skipped,
-                        null,
-                        null,
                         ret,
                         /*remove*/false,
                         /*retval*/false,
                         /*read*/read,
                         accessTtl,
-                        filter == null ? CU.<K, V>empty() : filter);
+                        filter == null ? CU.<K, V>empty() : filter,
+                        /**computeInvoke*/false);
 
                     return ret;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 97ec1af..2b4491f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -16,9 +16,11 @@ import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.*;
 import org.gridgain.grid.kernal.processors.cache.transactions.*;
 import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 
@@ -280,7 +282,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
      * @param val Value.
      * @param valBytes Value bytes.
      * @param drVer Data center replication version.
-     * @param clos Transform closures.
+     * @param entryProcessors Entry processors.
      * @param ttl TTL.
      */
     public void addWrite(GridCacheContext<K, V> cacheCtx,
@@ -289,7 +291,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
         byte[] keyBytes,
         @Nullable V val,
         @Nullable byte[] valBytes,
-        @Nullable Collection<IgniteClosure<V, V>> clos,
+        @Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors,
         @Nullable GridCacheVersion drVer,
         long ttl) {
         checkInternal(key);
@@ -310,7 +312,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
 
         txEntry.keyBytes(keyBytes);
         txEntry.valueBytes(valBytes);
-        txEntry.transformClosures(clos);
+        txEntry.entryProcessors(entryProcessors);
 
         writeMap.put(key, txEntry);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fec59b2..78d92f8 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -315,7 +315,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             true,
             false,
             entry,
-            ttl,
             filter);
     }
 
@@ -331,7 +330,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             false,
             entry,
-            ttl,
             filter);
     }
 
@@ -412,7 +410,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             true,
             true,
             null,
-            0,
             ctx.equalsPeekArray(oldVal));
     }
 
@@ -433,7 +430,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             false,
             null,
-            0,
             filter);
     }
 
@@ -454,7 +450,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             false,
             null,
-            0,
             null);
     }
 
@@ -648,18 +643,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         ctx.denyOnLocalRead();
 
-        Map<? extends K, EntryProcessor> transformMap =
+        Map<? extends K, EntryProcessor> invokeMap =
             Collections.singletonMap(key, (EntryProcessor)entryProcessor);
 
         IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
-            transformMap,
+            invokeMap,
             args,
             null,
             null,
             true,
             false,
             null,
-            -1L,
             null);
 
         return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
@@ -687,24 +681,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         ctx.denyOnLocalRead();
 
-        Map<? extends K, EntryProcessor> transformMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+        Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
             @Override public EntryProcessor apply(K k) {
                 return entryProcessor;
             }
         });
 
-        IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
-            transformMap,
+        return updateAllAsync0(null,
+            invokeMap,
             args,
             null,
             null,
             true,
             false,
             null,
-            -1L,
             null);
-
-        return fut;
     }
 
     /**
@@ -718,7 +709,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param retval Return value required flag.
      * @param rawRetval Return {@code GridCacheReturn} instance.
      * @param cached Cached cache entry for key. May be passed if and only if map size is {@code 1}.
-     * @param ttl Entry time-to-live.
      * @param filter Cache entry filter for atomic updates.
      * @return Completion future.
      */
@@ -731,7 +721,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final boolean retval,
         final boolean rawRetval,
         @Nullable GridCacheEntryEx<K, V> cached,
-        long ttl,
         @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
     ) {
         if (map != null && keyCheck)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 8c8a8e5..e6a4eb7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -164,15 +164,14 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      * @return Participating nodes.
      */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return
-            F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() {
-                @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) {
-                    if (isMini(f))
-                        return ((MiniFuture)f).node();
+        return F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() {
+            @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) {
+                if (isMini(f))
+                    return ((MiniFuture)f).node();
 
-                    return cctx.discovery().localNode();
-                }
-            });
+                return cctx.discovery().localNode();
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -272,18 +271,38 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             }
             else {
                 // Check transaction entries (corresponding tx entries must be enlisted in transaction).
-                cand = new GridCacheMvccCandidate<>(entry, cctx.localNodeId(),
-                    null, null, threadId, lockVer, timeout, true, tx.entry(entry.txKey()).locked(), inTx(),
-                    inTx() && tx.implicitSingle(), false, false);
+                cand = new GridCacheMvccCandidate<>(entry,
+                    cctx.localNodeId(),
+                    null,
+                    null,
+                    threadId,
+                    lockVer,
+                    timeout,
+                    true,
+                    tx.entry(entry.txKey()).locked(),
+                    inTx(),
+                    inTx() && tx.implicitSingle(),
+                    false,
+                    false);
 
                 cand.topologyVersion(topSnapshot.get().topologyVersion());
             }
         }
         else {
             if (cand == null) {
-                cand = new GridCacheMvccCandidate<>(entry, cctx.localNodeId(),
-                    null, null, threadId, lockVer, timeout, true, false, inTx(),
-                    inTx() && tx.implicitSingle(), false, false);
+                cand = new GridCacheMvccCandidate<>(entry,
+                    cctx.localNodeId(),
+                    null,
+                    null,
+                    threadId,
+                    lockVer,
+                    timeout,
+                    true,
+                    false,
+                    inTx(),
+                    inTx() && tx.implicitSingle(),
+                    false,
+                    false);
 
                 cand.topologyVersion(topSnapshot.get().topologyVersion());
             }
@@ -611,8 +630,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
             if (mapAsPrimary(keys, topVer))
                 return;
 
-            ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings =
-                new ConcurrentLinkedDeque8<>();
+            ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings = new ConcurrentLinkedDeque8<>();
 
             // Assign keys to primary nodes.
             GridNearLockMapping<K, V> map = null;
@@ -1270,10 +1288,20 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                     else
                         cctx.mvcc().markExplicitOwner(k, threadId);
 
-                    if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
-                        cctx.events().addEvent(cctx.affinity().partition(k), k, tx, null,
-                            EVT_CACHE_OBJECT_READ, newVal, newVal != null || newBytes != null,
-                            null, false, CU.subjectId(tx, cctx.shared()), null, tx == null ? null : tx.resolveTaskName());
+                    if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                        cctx.events().addEvent(cctx.affinity().partition(k),
+                            k,
+                            tx,
+                            null,
+                            EVT_CACHE_OBJECT_READ,
+                            newVal,
+                            newVal != null || newBytes != null,
+                            null,
+                            false,
+                            CU.subjectId(tx, cctx.shared()),
+                            null,
+                            tx == null ? null : tx.resolveTaskName());
+                    }
 
                     i++;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
index 7a0c2fd..7711470 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -57,6 +57,7 @@ public class GridNearLockResponse<K, V> extends GridDistributedLockResponse<K, V
     }
 
     /**
+     * @param cacheId Cache ID.
      * @param lockVer Lock ID.
      * @param futId Future ID.
      * @param miniId Mini future ID.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
index 29e33b8..6fb77a1 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1169,7 +1169,8 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
      * @throws IgniteCheckedException If failed to get previous value for transform.
      * @throws GridCacheEntryRemovedException If entry was concurrently deleted.
      */
-    protected GridTuple3<GridCacheOperation, V, byte[]> applyTransformClosures(IgniteTxEntry<K, V> txEntry,
+    protected GridTuple3<GridCacheOperation, V, byte[]> applyTransformClosures(
+        IgniteTxEntry<K, V> txEntry,
         boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException {
         GridCacheContext cacheCtx = txEntry.context();
 
@@ -1177,7 +1178,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
 
         if (isSystemInvalidate())
             return F.t(cacheCtx.isStoreEnabled() ? RELOAD : DELETE, null, null);
-        if (F.isEmpty(txEntry.transformClosures()))
+        if (F.isEmpty(txEntry.entryProcessors()))
             return F.t(txEntry.op(), txEntry.value(), txEntry.valueBytes());
         else {
             try {
@@ -1193,19 +1194,12 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
                         /*event*/recordEvt,
                         /*temporary*/true,
                         /*subjId*/subjId,
-                        /**closure name */recordEvt ? F.first(txEntry.transformClosures()) : null,
+                        /**closure name */recordEvt ? F.first(txEntry.entryProcessors()) : null,
                         resolveTaskName(),
                         CU.<K, V>empty(),
                         null);
 
-                try {
-                    for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                        val = clos.apply(val);
-                }
-                catch (Throwable e) {
-                    throw new IgniteException("Transform closure must not throw any exceptions " +
-                        "(transaction will be invalidated)", e);
-                }
+                val = txEntry.applyEntryProcessors(val);
 
                 GridCacheOperation op = val == null ? DELETE : UPDATE;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
index 17b153d..73d17b5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java
@@ -22,6 +22,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
@@ -71,7 +72,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
 
     /** Transform. */
     @GridToStringInclude
-    private Collection<IgniteClosure<V, V>> transformClosCol;
+    private Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol;
 
     /** Transform closure bytes. */
     @GridToStringExclude
@@ -192,7 +193,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
      * @param tx Owning transaction.
      * @param op Operation.
      * @param val Value.
-     * @param transformClos Transform closure.
+     * @param entryProcessor Entry processor.
+     * @param invokeArgs Optional arguments for EntryProcessor.
      * @param ttl Time to live.
      * @param entry Cache entry.
      * @param filters Put filters.
@@ -202,9 +204,10 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
         IgniteTxEx<K, V> tx,
         GridCacheOperation op,
         V val,
-        IgniteClosure<V, V> transformClos,
+        EntryProcessor<K, V, ?> entryProcessor,
+        Object[] invokeArgs,
         long ttl,
-        GridCacheEntryEx<K,V> entry,
+        GridCacheEntryEx<K, V> entry,
         IgnitePredicate<GridCacheEntry<K, V>>[] filters,
         GridCacheVersion drVer) {
         assert ctx != null;
@@ -220,8 +223,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
         this.filters = filters;
         this.drVer = drVer;
 
-        if (transformClos != null)
-            addTransformClosure(transformClos);
+        if (entryProcessor != null)
+            addEntryProcessor(entryProcessor, invokeArgs);
 
         key = entry.key();
         keyBytes = entry.keyBytes();
@@ -299,7 +302,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
         cp.filters = filters;
         cp.val.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
         cp.val.valueBytes(val.valueBytes());
-        cp.transformClosCol = transformClosCol;
+        cp.entryProcessorsCol = entryProcessorsCol;
         cp.ttl = ttl;
         cp.drExpireTime = drExpireTime;
         cp.explicitVer = explicitVer;
@@ -605,13 +608,14 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
     }
 
     /**
-     * @param transformClos Transform closure.
+     * @param entryProcessor Entry processor.
+     * @param invokeArgs Optional arguments for EntryProcessor.
      */
-    public void addTransformClosure(IgniteClosure<V, V> transformClos) {
-        if (transformClosCol  == null)
-            transformClosCol = new LinkedList<>();
+    public void addEntryProcessor(EntryProcessor<K, V, ?> entryProcessor, Object[] invokeArgs) {
+        if (entryProcessorsCol == null)
+            entryProcessorsCol = new LinkedList<>();
 
-        transformClosCol.add(transformClos);
+        entryProcessorsCol.add(new T2<EntryProcessor<K, V, ?>, Object[]>(entryProcessor, invokeArgs));
 
         // Must clear transform closure bytes since collection has changed.
         transformClosBytes = null;
@@ -620,17 +624,41 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
     }
 
     /**
-     * @return Collection of transform closures.
+     * @return Collection of entry processors.
      */
-    public Collection<IgniteClosure<V, V>> transformClosures() {
-        return transformClosCol;
+    public Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessors() {
+        return entryProcessorsCol;
+    }
+
+    /**
+     * @param val Value.
+     * @return New value.
+     */
+    @SuppressWarnings("unchecked")
+    public V applyEntryProcessors(V val) {
+        for (T2<EntryProcessor<K, V, ?>, Object[]> t : entryProcessors()) {
+            try {
+                CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(key, val);
+
+                EntryProcessor processor = t.get1();
+
+                processor.process(invokeEntry, t.get2());
+
+                val = invokeEntry.getValue();
+            }
+            catch (Exception ignore) {
+                // No-op.
+            }
+        }
+
+        return val;
     }
 
     /**
-     * @param transformClosCol Collection of transform closures.
+     * @param entryProcessorsCol Collection of entry processors.
      */
-    public void transformClosures(@Nullable Collection<IgniteClosure<V, V>> transformClosCol) {
-        this.transformClosCol = transformClosCol;
+    public void entryProcessors(@Nullable Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol) {
+        this.entryProcessorsCol = entryProcessorsCol;
 
         // Must clear transform closure bytes since collection has changed.
         transformClosBytes = null;
@@ -740,8 +768,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
             if (keyBytes == null)
                 keyBytes = entry.getOrMarshalKeyBytes();
 
-            if (transformClosBytes == null && transformClosCol != null)
-                transformClosBytes = CU.marshal(ctx, transformClosCol);
+            if (transformClosBytes == null && entryProcessorsCol != null)
+                transformClosBytes = CU.marshal(ctx, entryProcessorsCol);
 
             if (F.isEmptyOrNulls(filters))
                 filterBytes = null;
@@ -781,8 +809,8 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
                 key = ctx.marshaller().unmarshal(keyBytes, clsLdr);
 
             // Unmarshal transform closure anyway if it exists.
-            if (transformClosBytes != null && transformClosCol == null)
-                transformClosCol = ctx.marshaller().unmarshal(transformClosBytes, clsLdr);
+            if (transformClosBytes != null && entryProcessorsCol == null)
+                entryProcessorsCol = ctx.marshaller().unmarshal(transformClosBytes, clsLdr);
 
             if (filters == null && filterBytes != null) {
                 filters = ctx.marshaller().unmarshal(filterBytes, clsLdr);
@@ -820,7 +848,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
         }
         else {
             out.writeObject(key);
-            U.writeCollection(out, transformClosCol);
+            U.writeCollection(out, entryProcessorsCol);
             U.writeArray(out, filters);
         }
 
@@ -850,7 +878,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
         }
         else {
             key = (K)in.readObject();
-            transformClosCol = U.readCollection(in);
+            entryProcessorsCol = U.readCollection(in);
             filters = U.readEntryFilterArray(in);
         }
 
@@ -1022,6 +1050,7 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
         }
 
         /**
+         * @param sharedCtx Shared cache context.
          * @param ctx Cache context.
          * @param depEnabled Deployment enabled flag.
          * @throws IgniteCheckedException If marshaling failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
index 1d4b5d7..7284161 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
@@ -1155,7 +1155,7 @@ public class IgniteTxHandler<K, V> {
                             txEntry.keyBytes(),
                             txEntry.value(),
                             txEntry.valueBytes(),
-                            txEntry.transformClosures(),
+                            txEntry.entryProcessors(),
                             txEntry.drVersion(),
                             txEntry.ttl());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/928aa3d4/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 84ef5b7..34938d5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -28,6 +28,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
@@ -456,7 +457,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                         boolean intercept = e.context().config().getInterceptor() != null;
 
-                        if (intercept || !F.isEmpty(e.transformClosures()))
+                        if (intercept || !F.isEmpty(e.entryProcessors()))
                             e.cached().unswap(true, false);
 
                         GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false);
@@ -645,7 +646,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     if (cacheCtx.isNear())
                                         ((GridNearCacheEntry<K, V>)cached).recordDhtVersion(txEntry.dhtVersion());
 
-                                    if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters()))
+                                    if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
                                         txEntry.cached().unswap(true, false);
 
                                     GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,
@@ -702,7 +703,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                         txEntry.value(val, true, false);
                                         txEntry.valueBytes(valBytes);
                                         txEntry.op(op);
-                                        txEntry.transformClosures(null);
+                                        txEntry.entryProcessors(null);
                                         txEntry.drVersion(explicitVer);
                                     }
 
@@ -1061,10 +1062,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                     // Read value from locked entry in group-lock transaction as well.
                     if (txEntry.hasValue()) {
-                        if (!F.isEmpty(txEntry.transformClosures())) {
-                            for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                                val = clos.apply(val);
-                        }
+                        if (!F.isEmpty(txEntry.entryProcessors()))
+                            val = txEntry.applyEntryProcessors(val);
 
                         if (val != null) {
                             V val0 = val;
@@ -1082,7 +1081,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             try {
                                 Object transformClo =
                                     (txEntry.op() == TRANSFORM  && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
-                                        F.first(txEntry.transformClosures()) : null;
+                                        F.first(txEntry.entryProcessors()) : null;
 
                                 val = txEntry.cached().innerGet(this,
                                     /*swap*/true,
@@ -1102,10 +1101,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     if (!readCommitted())
                                         txEntry.readValue(val);
 
-                                    if (!F.isEmpty(txEntry.transformClosures())) {
-                                        for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                                            val = clos.apply(val);
-                                    }
+                                    if (!F.isEmpty(txEntry.entryProcessors()))
+                                        val = txEntry.applyEntryProcessors(val);
 
                                     V val0 = val;
 
@@ -1195,6 +1192,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             txEntry = addEntry(READ,
                                 val,
                                 null,
+                                null,
                                 entry,
                                 expiryPlc,
                                 filter,
@@ -1229,6 +1227,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             txEntry = addEntry(READ,
                                 val,
                                 null,
+                                null,
                                 entry,
                                 expiryPlc,
                                 CU.<K, V>empty(),
@@ -1344,10 +1343,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         if (!readCommitted())
                             txEntry.readValue(val);
 
-                        if (!F.isEmpty(txEntry.transformClosures())) {
-                            for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                                visibleVal = clos.apply(visibleVal);
-                        }
+                        if (!F.isEmpty(txEntry.entryProcessors()))
+                            visibleVal = txEntry.applyEntryProcessors(visibleVal);
                     }
 
                     // In pessimistic mode we hold the lock, so filter validation
@@ -1560,9 +1557,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                                 try {
                                     Object transformClo =
-                                        (!F.isEmpty(txEntry.transformClosures()) &&
+                                        (!F.isEmpty(txEntry.entryProcessors()) &&
                                             cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
-                                            F.first(txEntry.transformClosures()) : null;
+                                            F.first(txEntry.entryProcessors()) : null;
 
                                     V val = cached.innerGet(IgniteTxLocalAdapter.this,
                                         cacheCtx.isSwapOrOffheapEnabled(),
@@ -1584,10 +1581,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                                         txEntry.setAndMarkValid(val);
 
-                                        if (!F.isEmpty(txEntry.transformClosures())) {
-                                            for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                                                val = clos.apply(val);
-                                        }
+                                        if (!F.isEmpty(txEntry.entryProcessors()))
+                                            val = txEntry.applyEntryProcessors(val);
 
                                         if (cacheCtx.portableEnabled())
                                             val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
@@ -1711,10 +1706,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     if (!readCommitted())
                                         txEntry.readValue(val);
 
-                                    if (!F.isEmpty(txEntry.transformClosures())) {
-                                        for (IgniteClosure<V, V> clos : txEntry.transformClosures())
-                                            val = clos.apply(val);
-                                    }
+                                    if (!F.isEmpty(txEntry.entryProcessors()))
+                                        val = txEntry.applyEntryProcessors(val);
 
                                     retMap.put(entry.getKey(), val);
                                 }
@@ -1736,6 +1729,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public IgniteFuture<GridCacheReturn<V>> putAllAsync(
         GridCacheContext<K, V> cacheCtx,
         Map<? extends K, ? extends V> map,
@@ -1744,7 +1738,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         long ttl,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter
     ) {
-        return putAllAsync0(cacheCtx, map, null, null, retval, cached, ttl, filter);
+        return (IgniteFuture<GridCacheReturn<V>>)putAllAsync0(cacheCtx,
+            map,
+            null,
+            null,
+            null,
+            retval,
+            cached,
+            filter);
     }
 
     /** {@inheritDoc} */
@@ -1752,18 +1753,32 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         GridCacheContext<K, V> cacheCtx,
         Map<? extends K, GridCacheDrInfo<V>> drMap
     ) {
-        return putAllAsync0(cacheCtx, null, null, drMap, false, null, -1, null);
+        return putAllAsync0(cacheCtx,
+            null,
+            null,
+            null,
+            drMap,
+            false,
+            null,
+            null);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<GridCacheReturn<V>> transformAllAsync(
+    @SuppressWarnings("unchecked")
+    @Override public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync(
         GridCacheContext<K, V> cacheCtx,
-        @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> map,
         boolean retval,
-        @Nullable GridCacheEntryEx<K, V> cached,
-        long ttl
+        @Nullable Map<? extends K, EntryProcessor> map,
+        Object... invokeArgs
     ) {
-        return putAllAsync0(cacheCtx, null, map, null, retval, null, -1, null);
+        return (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)putAllAsync0(cacheCtx,
+            null,
+            map,
+            invokeArgs,
+            null,
+            retval,
+            null,
+            null);
     }
 
     /** {@inheritDoc} */
@@ -1796,7 +1811,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param expiryPlc Explicitly specified expiry policy for entry.
      * @param implicit Implicit flag.
      * @param lookup Value lookup map ({@code null} for remove).
-     * @param transformMap Map with transform closures if this is a transform operation.
+     * @param invokeMap Map with entry processors for invoke operation.
+     * @param invokeArgs Optional arguments for EntryProcessor.
      * @param retval Flag indicating whether a value should be returned.
      * @param lockOnly If {@code true}, then entry will be enlisted as noop.
      * @param filter User filters.
@@ -1807,13 +1823,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
      */
     protected IgniteFuture<Set<K>> enlistWrite(
-        GridCacheContext<K, V> cacheCtx,
+        final GridCacheContext<K, V> cacheCtx,
         Collection<? extends K> keys,
         @Nullable GridCacheEntryEx<K, V> cached,
         @Nullable ExpiryPolicy expiryPlc,
         boolean implicit,
         @Nullable Map<? extends K, ? extends V> lookup,
-        @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+        @Nullable Map<? extends K, EntryProcessor> invokeMap,
+        @Nullable Object[] invokeArgs,
         boolean retval,
         boolean lockOnly,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -1834,18 +1851,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
         Set<K> skipped = null;
 
-        boolean rmv = lookup == null && transformMap == null;
+        Set<K> missedForInvoke = null;
+
+        boolean rmv = lookup == null && invokeMap == null;
 
         try {
             // Set transform flag for transaction.
-            if (transformMap != null)
+            if (invokeMap != null)
                 transform = true;
 
             groupLockSanityCheck(cacheCtx, keys);
 
             for (K key : keys) {
                 V val = rmv || lookup == null ? null : lookup.get(key);
-                IgniteClosure<V, V> transformClo = transformMap == null ? null : transformMap.get(key);
+                EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
 
                 GridCacheVersion drVer;
                 long drTtl;
@@ -1876,7 +1895,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 if (key == null)
                     continue;
 
-                if (!rmv && val == null && transformClo == null) {
+                if (!rmv && val == null && entryProcessor == null) {
                     skipped = skip(skipped, key);
 
                     continue;
@@ -1930,7 +1949,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                         /*events*/retval,
                                         /*temporary*/false,
                                         CU.subjectId(this, cctx),
-                                        transformClo,
+                                        entryProcessor,
                                         resolveTaskName(),
                                         CU.<K, V>empty(),
                                         null);
@@ -1952,7 +1971,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 if (!readCommitted() && old != null) {
                                     // Enlist failed filters as reads for non-read-committed mode,
                                     // so future ops will get the same values.
-                                    txEntry = addEntry(READ, old, null, entry, null, CU.<K, V>empty(), false, -1L, -1L,
+                                    txEntry = addEntry(READ,
+                                        old,
+                                        null,
+                                        null,
+                                        entry,
+                                        null,
+                                        CU.<K, V>empty(),
+                                        false,
+                                        -1L,
+                                        -1L,
                                         null);
 
                                     txEntry.markValid();
@@ -1964,9 +1992,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 break; // While.
                             }
 
-                            txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM :
-                                old != null ? UPDATE : CREATE, val, transformClo, entry, expiryPlc, filter, true, drTtl,
-                                drExpireTime, drVer);
+                            GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+                                entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+
+                            txEntry = addEntry(op,
+                                val,
+                                entryProcessor,
+                                invokeArgs,
+                                entry,
+                                expiryPlc,
+                                filter,
+                                true,
+                                drTtl,
+                                drExpireTime,
+                                drVer);
 
                             if (!implicit() && readCommitted())
                                 cacheCtx.evicts().touch(entry, topologyVersion());
@@ -2013,15 +2052,39 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                             }
                                         );
                                     }
-                                    else
-                                        ret.set(null, true);
+                                    else {
+                                        if (retval)
+                                            ret.set(null, true);
+                                        else {
+                                            if (txEntry.op() == TRANSFORM) {
+                                                if (missedForInvoke == null)
+                                                    missedForInvoke = new HashSet<>();
+
+                                                missedForInvoke.add(key);
+                                            }
+                                            else
+                                                ret.success(true);
+                                        }
+                                    }
+                                }
+                                else {
+                                    if (retval)
+                                        ret.set(old, true);
+                                    else {
+                                        if (txEntry.op() == TRANSFORM)
+                                            addInvokeResult(txEntry, old, ret);
+                                        else
+                                            ret.success(true);
+                                    }
                                 }
-                                else
-                                    ret.set(old, true);
                             }
                             // Pessimistic.
-                            else
-                                ret.set(old, true);
+                            else {
+                                if (retval)
+                                    ret.set(old, true);
+                                else
+                                    ret.success(true);
+                            }
 
                             break; // While.
                         }
@@ -2032,7 +2095,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     }
                 }
                 else {
-                    if (transformClo == null && txEntry.op() == TRANSFORM)
+                    if (entryProcessor == null && txEntry.op() == TRANSFORM)
                         throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
                             "transaction after transform closure is applied): " + key);
 
@@ -2051,9 +2114,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             continue;
                         }
 
-                        txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM :
-                            v != null ? UPDATE : CREATE, val, transformClo, entry, expiryPlc, filter, true, drTtl,
-                            drExpireTime, drVer);
+                        GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
+                            v != null ? UPDATE : CREATE;
+
+                        txEntry = addEntry(op,
+                            val,
+                            entryProcessor,
+                            invokeArgs,
+                            entry,
+                            expiryPlc,
+                            filter,
+                            true,
+                            drTtl,
+                            drExpireTime,
+                            drVer);
 
                         enlisted.add(key);
                     }
@@ -2061,8 +2135,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     if (!pessimistic()) {
                         txEntry.markValid();
 
-                        // Set tx entry and return values.
-                        ret.set(v, true);
+                        if (retval)
+                            ret.set(v, true);
                     }
                 }
             }
@@ -2071,6 +2145,38 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             return new GridFinishedFuture<>(cctx.kernalContext(), e);
         }
 
+        if (missedForInvoke != null) {
+            assert optimistic();
+            assert invokeMap != null;
+
+            IgniteFuture<Boolean> fut = loadMissing(
+                cacheCtx,
+                true,
+                missedForInvoke,
+                deserializePortables(cacheCtx),
+                new CI2<K, V>() {
+                    @Override public void apply(K k, V v) {
+                        if (log.isDebugEnabled())
+                            log.debug("Loaded value from remote node [key=" + k + ", val=" + v + ']');
+
+                        addInvokeResult(entry(new IgniteTxKey<>(k, cacheCtx.cacheId())), v, ret);
+                    }
+                });
+
+            return new GridEmbeddedFuture<>(
+                cctx.kernalContext(),
+                fut,
+                new C2<Boolean, Exception, Set<K>>() {
+                    @Override public Set<K> apply(Boolean b, Exception e) {
+                        if (e != null)
+                            throw new GridClosureException(e);
+
+                        return Collections.emptySet();
+                    }
+                }
+            );
+        }
+
         return new GridFinishedFuture<>(cctx.kernalContext(), skipped);
     }
 
@@ -2080,8 +2186,6 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param cacheCtx Context.
      * @param keys Keys.
      * @param failed Collection of potentially failed keys (need to populate in this method).
-     * @param transformed Output map where transformed values will be placed.
-     * @param transformMap Transform map.
      * @param ret Return value.
      * @param rmv {@code True} if remove.
      * @param retval Flag to return value or not.
@@ -2090,19 +2194,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param filter Filter to check entries.
      * @return Failed keys.
      * @throws IgniteCheckedException If error.
+     * @param computeInvoke If {@code true} computes return value for invoke operation.
      */
+    @SuppressWarnings("unchecked")
     protected Set<K> postLockWrite(
         GridCacheContext<K, V> cacheCtx,
         Iterable<? extends K> keys,
         Set<K> failed,
-        @Nullable Map<K, V> transformed,
-        @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
-        GridCacheReturn<V> ret,
+        GridCacheReturn ret,
         boolean rmv,
         boolean retval,
         boolean read,
         long accessTtl,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        boolean computeInvoke
     ) throws IgniteCheckedException {
         for (K k : keys) {
             IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(k));
@@ -2132,7 +2237,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
                         retval = true;
 
-                    if (retval) {
+                    if (retval || txEntry.op() == TRANSFORM) {
                         if (!cacheCtx.isNear()) {
                             try {
                                 if (!hasPrevVal)
@@ -2161,7 +2266,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 v = retval ? cached.rawGetOrUnmarshal(false) : cached.rawGet();
                         }
 
-                        ret.value(v);
+                        if (txEntry.op() == TRANSFORM) {
+                            if (computeInvoke)
+                                addInvokeResult(txEntry, v, ret);
+                        }
+                        else
+                            ret.value(v);
                     }
 
                     boolean pass = cacheCtx.isAll(cached, filter);
@@ -2185,7 +2295,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         failed = skip(failed, k);
 
                         // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
-                        txEntry.setAndMarkValid(txEntry.previousOperation(), ret.value());
+                        txEntry.setAndMarkValid(txEntry.previousOperation(), (V)ret.value());
                         txEntry.filters(CU.<K, V>empty());
                         txEntry.filtersSet(false);
 
@@ -2222,33 +2332,58 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     }
 
     /**
+     * @param txEntry Entry.
+     * @param val Value.
+     * @param ret Return value to update.
+     */
+    private void addInvokeResult(IgniteTxEntry<K, V> txEntry, V val, GridCacheReturn ret) {
+        try {
+            Object res = null;
+
+            for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) {
+                CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), val);
+
+                EntryProcessor<K, V, ?> entryProcessor = t.get1();
+
+                res = entryProcessor.process(invokeEntry, t.get2());
+            }
+
+            ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult<>(res));
+        }
+        catch (Exception e) {
+            ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult(e));
+        }
+    }
+
+    /**
      * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
      * maps must be non-null.
      *
      * @param cacheCtx Context.
      * @param map Key-value map to store.
-     * @param transformMap Transform map.
+     * @param invokeMap Invoke map.
+     * @param invokeArgs Optional arguments for EntryProcessor.
      * @param drMap DR map.
      * @param retval Key-transform value map to store.
      * @param cached Cached entry, if any.
-     * @param ttl Time to live.
      * @param filter Filter.
      * @return Operation future.
      */
-    private IgniteFuture<GridCacheReturn<V>> putAllAsync0(
+    @SuppressWarnings("unchecked")
+    private IgniteFuture putAllAsync0(
         final GridCacheContext<K, V> cacheCtx,
         @Nullable Map<? extends K, ? extends V> map,
-        @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+        @Nullable Map<? extends K, EntryProcessor> invokeMap,
+        @Nullable final Object[] invokeArgs,
         @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap,
         final boolean retval,
         @Nullable GridCacheEntryEx<K, V> cached,
-        long ttl,
         @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT);
 
         // Cached entry may be passed only from entry wrapper.
         final Map<K, V> map0;
-        final Map<K, IgniteClosure<V, V>> transformMap0;
+        final Map<K, EntryProcessor> invokeMap0;
 
         if (drMap != null) {
             assert map == null;
@@ -2259,7 +2394,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 }
             });
 
-            transformMap0 = null;
+            invokeMap0 = null;
         }
         else if (cacheCtx.portableEnabled()) {
             if (map != null) {
@@ -2280,14 +2415,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             else
                 map0 = null;
 
-            if (transformMap != null) {
-                transformMap0 = U.newHashMap(transformMap.size());
+            if (invokeMap != null) {
+                invokeMap0 = U.newHashMap(invokeMap.size());
 
                 try {
-                    for (Map.Entry<? extends K, ? extends IgniteClosure<V, V>> e : transformMap.entrySet()) {
+                    for (Map.Entry<? extends K, EntryProcessor> e : invokeMap.entrySet()) {
                         K key = (K)cacheCtx.marshalToPortable(e.getKey());
 
-                        transformMap0.put(key, e.getValue());
+                        invokeMap0.put(key, e.getValue());
                     }
                 }
                 catch (PortableException e) {
@@ -2295,19 +2430,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 }
             }
             else
-                transformMap0 = null;
+                invokeMap0 = null;
         }
         else {
             map0 = (Map<K, V>)map;
-            transformMap0 = (Map<K, IgniteClosure<V, V>>)transformMap;
+            invokeMap0 = (Map<K, EntryProcessor>)invokeMap;
         }
 
         if (log.isDebugEnabled())
             log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
 
-        assert map0 != null || transformMap0 != null;
+        assert map0 != null || invokeMap0 != null;
         assert cached == null ||
-            (map0 != null && map0.size() == 1) || (transformMap0 != null && transformMap0.size() == 1);
+            (map0 != null && map0.size() == 1) || (invokeMap0 != null && invokeMap0.size() == 1);
 
         try {
             checkValid();
@@ -2320,7 +2455,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
         final GridCacheReturn<V> ret = new GridCacheReturn<>(false);
 
-        if (F.isEmpty(map0) && F.isEmpty(transformMap0)) {
+        if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
             if (implicit())
                 try {
                     commit();
@@ -2333,7 +2468,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         }
 
         try {
-            Set<? extends K> keySet = map0 != null ? map0.keySet() : transformMap0.keySet();
+            Set<? extends K> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
 
             Collection<K> enlisted = new LinkedList<>();
 
@@ -2346,7 +2481,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 prj != null ? prj.expiry() : null,
                 implicit,
                 map0,
-                transformMap0,
+                invokeMap0,
+                invokeArgs,
                 retval,
                 false,
                 filter,
@@ -2390,19 +2526,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         if (log.isDebugEnabled())
                             log.debug("Acquired transaction lock for put on keys: " + keys);
 
-                        Map<K, V> transformed = null;
-
                         postLockWrite(cacheCtx,
                             keys,
                             loaded,
-                            transformed,
-                            transformMap0,
                             ret,
                             /*remove*/false,
                             retval,
                             /*read*/false,
                             -1L,
-                            filter);
+                            filter,
+                            /*computeInvoke*/true);
 
                         return ret;
                     }
@@ -2554,7 +2687,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 plc,
                 implicit,
                 /** lookup map */null,
-                /** transform map */null,
+                /** invoke map */null,
+                /** invoke arguments */null,
                 retval,
                 /** lock only */false,
                 filter,
@@ -2595,14 +2729,13 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         postLockWrite(cacheCtx,
                             passedKeys,
                             loadFut.get(),
-                            null,
-                            null,
                             ret,
                             /*remove*/true,
                             retval,
                             /*read*/false,
                             -1L,
-                            filter);
+                            filter,
+                            /*computeInvoke*/false);
 
                         return ret;
                     }
@@ -2650,6 +2783,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     /**
      * Checks if portable values should be deserialized.
      *
+     * @param cacheCtx Cache context.
      * @return {@code True} if portables should be deserialized, {@code false} otherwise.
      */
     private boolean deserializePortables(GridCacheContext<K, V> cacheCtx) {
@@ -2670,6 +2804,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     /**
      * Checks that affinity keys are enlisted in group transaction on start.
      *
+     * @param cacheCtx Cache context.
      * @param keys Keys to check.
      * @throws IgniteCheckedException If sanity check failed.
      */
@@ -2721,7 +2856,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 /** expiry - leave unchanged */null,
                 /** implicit */false,
                 /** lookup map */null,
-                /** transform map */null,
+                /** invoke map */null,
+                /** invoke arguments */null,
                 /** retval */false,
                 /** lock only */true,
                 CU.<K, V>empty(),
@@ -2842,7 +2978,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param op Cache operation.
      * @param val Value.
      * @param expiryPlc Explicitly specified expiry policy.
-     * @param transformClos Transform closure.
+     * @param invokeArgs Optional arguments for EntryProcessor.
+     * @param entryProcessor Entry processor.
      * @param entry Cache entry.
      * @param filter Filter.
      * @param filtersSet {@code True} if filter should be marked as set.
@@ -2853,7 +2990,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      */
     protected final IgniteTxEntry<K, V> addEntry(GridCacheOperation op,
         @Nullable V val,
-        @Nullable IgniteClosure<V, V> transformClos,
+        @Nullable EntryProcessor entryProcessor,
+        Object[] invokeArgs,
         GridCacheEntryEx<K, V> entry,
         @Nullable ExpiryPolicy expiryPlc,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -2861,6 +2999,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         long drTtl,
         long drExpireTime,
         @Nullable GridCacheVersion drVer) {
+        assert invokeArgs == null || op == TRANSFORM;
+
         IgniteTxKey<K> key = entry.txKey();
 
         checkInternal(key);
@@ -2883,12 +3023,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         IgniteTxEntry<K, V> txEntry;
 
         if (old != null) {
-            if (transformClos != null) {
+            if (entryProcessor != null) {
                 assert val == null;
                 assert op == TRANSFORM;
 
                 // Will change the op.
-                old.addTransformClosure(transformClos);
+                old.addEntryProcessor(entryProcessor, invokeArgs);
             }
             else {
                 assert old.op() != TRANSFORM;
@@ -2922,7 +3062,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 this,
                 op,
                 val,
-                transformClos,
+                entryProcessor,
+                invokeArgs,
                 hasDrTtl ? drTtl : -1L,
                 entry,
                 filter,


Mime
View raw message