ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [02/18] incubator-ignite git commit: # ignite-44
Date Tue, 30 Dec 2014 12:11:47 GMT
# ignite-44


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/71ee2ee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/71ee2ee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/71ee2ee1

Branch: refs/heads/ignite-1
Commit: 71ee2ee194aea90f81a7b4299bb9e7163a59f143
Parents: 982d441
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Dec 23 17:05:07 2014 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Dec 23 17:30:04 2014 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |  32 +-
 .../processors/cache/CacheInvokeEntry.java      |  72 ++++
 .../processors/cache/CacheInvokeResult.java     |  95 +++++
 .../processors/cache/GridCacheAdapter.java      |  18 +-
 .../processors/cache/GridCacheEntryEx.java      |   2 +
 .../processors/cache/GridCacheMapEntry.java     |  58 ++-
 .../processors/cache/GridCacheMessage.java      |  54 +++
 .../processors/cache/GridCacheProjectionEx.java |  21 ++
 .../cache/GridCacheProjectionImpl.java          |  15 +
 .../processors/cache/GridCacheProxyImpl.java    |  29 ++
 .../cache/GridCacheUpdateAtomicResult.java      |  18 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 279 +++++++++++---
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  19 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 121 +++---
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  36 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java | 185 +++++++---
 .../distributed/near/GridNearAtomicCache.java   |   9 +-
 ...eCacheAtomicPrimaryWriteOrderInvokeTest.java |  47 +++
 ...micPrimaryWriteOrderWithStoreInvokeTest.java |  23 ++
 .../cache/IgniteCacheInvokeAbstractTest.java    | 367 +++++++++++++++++++
 .../processors/cache/GridCacheTestEntryEx.java  |  11 +-
 .../junits/common/GridCommonAbstractTest.java   |  43 +++
 22 files changed, 1383 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 3945234..410fb9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -494,16 +494,40 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
         throws EntryProcessorException {
-        // TODO IGNITE-1.
-        throw new UnsupportedOperationException();
+        try {
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+            try {
+                EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args).get();
+
+                return res.get();
+            }
+            finally {
+                gate.leave(prev);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheException(e);
+        }
     }
 
     /** {@inheritDoc} */
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
         EntryProcessor<K, V, T> entryProcessor,
         Object... args) {
-        // TODO IGNITE-1.
-        throw new UnsupportedOperationException();
+        try {
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+            try {
+                return delegate.invokeAll(keys, entryProcessor, args).get();
+            }
+            finally {
+                gate.leave(prev);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheException(e);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
new file mode 100644
index 0000000..1f3900d
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
@@ -0,0 +1,72 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import javax.cache.processor.*;
+
+/**
+ * Implementation of {@link MutableEntry} passed to the {@link EntryProcessor#process(MutableEntry, Object...)}.
+ */
+public class CacheInvokeEntry<K, V> implements MutableEntry<K, V> {
+    /** */
+    @GridToStringInclude
+    private final K key;
+
+    /** */
+    @GridToStringInclude
+    private V val;
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     */
+    public CacheInvokeEntry(K key, V val) {
+        this.key = key;
+        this.val = val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exists() {
+        return val != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove() {
+        val = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setValue(V val) {
+        this.val = val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public K getKey() {
+        return key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getValue() {
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T unwrap(Class<T> clazz) {
+        throw new IllegalArgumentException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheInvokeEntry.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
new file mode 100644
index 0000000..50af119
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
@@ -0,0 +1,95 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.marshaller.optimized.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+
+/**
+ * Implementation of {@link EntryProcessorResult}.
+ */
+public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externalizable, IgniteOptimizedMarshallable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"})
+    private static Object GG_CLASS_ID;
+
+    /** */
+    @GridToStringInclude
+    private T res;
+
+    /** */
+    private Exception err;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public CacheInvokeResult() {
+        // No-op.
+    }
+
+    /**
+     * @param res Computed result.
+     */
+    public CacheInvokeResult(@Nullable T res) {
+        this.res = res;
+    }
+
+    /**
+     * @param err Exception thrown by {@link EntryProcessor#process(MutableEntry, Object...)}.
+     */
+    public CacheInvokeResult(Exception err) {
+        this.err = err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object ggClassId() {
+        return GG_CLASS_ID;
+    }
+
+    /** {@inheritDoc} */
+    @Override public T get() throws EntryProcessorException {
+        if (err != null) {
+            if (err instanceof EntryProcessorException)
+                throw (EntryProcessorException)err;
+
+            throw new EntryProcessorException(err);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(res);
+
+        out.writeObject(err);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        res = (T)in.readObject();
+
+        err = (Exception)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheInvokeResult.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index f55ee0e..b3e567c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -43,6 +43,7 @@ import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -2193,6 +2194,21 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
     }
 
     /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
+        throws EntryProcessorException {
+        // TODO IGNITE-44.
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        // TODO IGNITE-44.
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void transform(final K key, final IgniteClosure<V, V> transformer) throws IgniteCheckedException {
         A.notNull(key, "key", transformer, "valTransform");
 
@@ -4516,7 +4532,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
      * @param key Cache key.
      * @throws IllegalArgumentException If validation fails.
      */
-    private void validateCacheKey(Object key) {
+    protected void validateCacheKey(Object key) {
         if (keyCheck) {
             CU.validateCacheKey(log, key);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index 5fb0b95..1b71eec 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -392,6 +392,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
      * @param op Update operation.
      * @param val Value. Type depends on operation.
      * @param valBytes Value bytes. Can be non-null only if operation is UPDATE.
+     * @param invokeArgs Optional arguments for entry processor.
      * @param writeThrough Write through flag.
      * @param retval Return value flag.
      * @param expiryPlc Expiry policy.
@@ -424,6 +425,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
         GridCacheOperation op,
         @Nullable Object val,
         @Nullable byte[] valBytes,
+        @Nullable Object[] invokeArgs,
         boolean writeThrough,
         boolean retval,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 0fd64de..f14bba5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -29,6 +29,7 @@ import org.jetbrains.annotations.*;
 import sun.misc.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.io.*;
 import java.nio.*;
 import java.util.*;
@@ -1586,6 +1587,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         GridCacheOperation op,
         @Nullable Object writeObj,
         @Nullable byte[] valBytes,
+        @Nullable Object[] invokeArgs,
         boolean writeThrough,
         boolean retval,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
@@ -1615,6 +1617,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
         GridDrResolveResult<V> drRes = null;
 
+        EntryProcessorResult<?> invokeRes = null;
+
         long newTtl = -1L;
         long newExpireTime = 0L;
         long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node.
@@ -1644,7 +1648,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     if (drRes.isUseOld()) {
                         old = retval ? rawGetOrUnmarshalUnlocked(false) : val;
 
-                        return new GridCacheUpdateAtomicResult<>(false, old, null, -1L, -1L, null, null, false);
+                        return new GridCacheUpdateAtomicResult<>(false,
+                            old,
+                            null,
+                            invokeRes,
+                            -1L,
+                            -1L,
+                            null,
+                            null,
+                            false);
                     }
 
                     newTtl = drRes.newTtl();
@@ -1692,7 +1704,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                         old = retval ? rawGetOrUnmarshalUnlocked(false) : val;
 
-                        return new GridCacheUpdateAtomicResult<>(false, old, null, -1L, -1L, null, null, false);
+                        return new GridCacheUpdateAtomicResult<>(false,
+                            old,
+                            null,
+                            invokeRes,
+                            -1L,
+                            -1L,
+                            null,
+                            null,
+                            false);
                     }
                 }
                 else
@@ -1744,6 +1764,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     return new GridCacheUpdateAtomicResult<>(false,
                         retval ? old : null,
                         null,
+                        invokeRes,
                         -1L,
                         -1L,
                         null,
@@ -1760,11 +1781,26 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             if (op == GridCacheOperation.TRANSFORM) {
                 transformClo = writeObj;
 
-                IgniteClosure<V, V> transform = (IgniteClosure<V, V>)writeObj;
+                EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj;
 
-                updated = cctx.unwrapTemporary(transform.apply(old));
+                CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(key, old);
+
+                try {
+                    Object computed = entryProcessor.process(entry, invokeArgs);
+
+                    updated = cctx.unwrapTemporary(entry.getValue());
+
+                    invokeRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed));
 
-                valBytes = null;
+                    valBytes = null;
+                }
+                catch (Exception e) {
+                    invokeRes = new CacheInvokeResult<>(e);
+
+                    updated = old;
+
+                    valBytes = oldBytes.getIfMarshaled();
+                }
             }
             else
                 updated = (V)writeObj;
@@ -1794,6 +1830,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                         return new GridCacheUpdateAtomicResult<>(false,
                             retval ? old : null,
                             null,
+                            invokeRes,
                             -1L,
                             -1L,
                             null,
@@ -1899,6 +1936,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                         return new GridCacheUpdateAtomicResult<>(false,
                             cctx.<V>unwrapTemporary(interceptRes.get2()),
                             null,
+                            invokeRes,
                             -1L,
                             -1L,
                             null,
@@ -2001,7 +2039,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         if (log.isDebugEnabled())
             log.debug("Updated cache entry [val=" + val + ", old=" + old + ", entry=" + this + ']');
 
-        return new GridCacheUpdateAtomicResult<>(res, old, updated, newTtl, newDrExpireTime, enqueueVer, drRes, true);
+        return new GridCacheUpdateAtomicResult<>(res,
+            old,
+            updated,
+            invokeRes,
+            newTtl,
+            newDrExpireTime,
+            enqueueVer,
+            drRes,
+            true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
index ab98dcb..45eda5f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
@@ -372,6 +372,60 @@ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessage
     }
 
     /**
+     * @param args Arguments to marshal.
+     * @param ctx Context.
+     * @return Marshalled collection.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable protected final byte[][] marshalInvokeArguments(@Nullable Object[] args,
+        GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+        assert ctx != null;
+
+        if (args == null || args.length == 0)
+            return null;
+
+        byte[][] argsBytes = new byte[args.length][];
+
+        for (int i = 0; i < args.length; i++) {
+            Object arg = args[i];
+
+            if (ctx.deploymentEnabled())
+                prepareObject(arg, ctx);
+
+            argsBytes[i] = arg == null ? null : CU.marshal(ctx, arg);
+        }
+
+        return argsBytes;
+    }
+
+
+    /**
+     * @param byteCol Collection to unmarshal.
+     * @param ctx Context.
+     * @param ldr Loader.
+     * @return Unmarshalled collection.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable protected final Object[] unmarshalInvokeArguments(@Nullable byte[][] byteCol,
+        GridCacheSharedContext<K, V> ctx,
+        ClassLoader ldr) throws IgniteCheckedException {
+        assert ldr != null;
+        assert ctx != null;
+
+        if (byteCol == null)
+            return null;
+
+        Object[] args = new Object[byteCol.length];
+
+        IgniteMarshaller marsh = ctx.marshaller();
+
+        for (int i = 0; i < byteCol.length; i++)
+            args[i] = byteCol[i] == null ? null : marsh.unmarshal(byteCol[i], ldr);
+
+        return args;
+    }
+
+    /**
      * @param filter Collection to marshal.
      * @param ctx Context.
      * @return Marshalled collection.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
index 2362f57..1a98192 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
@@ -18,6 +18,7 @@ import org.gridgain.grid.kernal.processors.cache.dr.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.util.*;
 
 /**
@@ -393,4 +394,24 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> {
      * @return New projection based on this one, but with the specified expiry policy.
      */
     public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc);
+
+    /**
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param args Arguments.
+     * @return Future.
+     */
+    public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args);
+
+    /**
+     * @param keys Keys.
+     * @param entryProcessor Entry processor.
+     * @param args Arguments.
+     * @return Future.
+     */
+    public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
index 5bd973c..ad5cde3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
@@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -788,6 +789,20 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        return cache.invoke(key, entryProcessor, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        return cache.invokeAll(keys, entryProcessor, args);
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> putxAsync(K key, V val,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         return putxAsync(key, val, null, -1, filter);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
index 136e078..90aeb0b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
@@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -737,6 +738,34 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return delegate.invoke(key, entryProcessor, args);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return delegate.invokeAll(keys, entryProcessor, args);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> putxAsync(K key, V val,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
index 43ca819..34dbe52 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -9,11 +9,12 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
-import org.gridgain.grid.kernal.processors.dr.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.grid.util.tostring.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
+
 /**
  * Cache entry atomic update result.
  */
@@ -46,14 +47,18 @@ public class GridCacheUpdateAtomicResult<K, V> {
     /** Whether update should be propagated to DHT node. */
     private final boolean sndToDht;
 
+    /** Value computed by entry processor. */
+    private EntryProcessorResult<?> res;
+
     /**
      * Constructor.
      *
      * @param success Success flag.
      * @param oldVal Old value.
      * @param newVal New value.
+     * @param res Value computed by the {@link EntryProcessor}.
      * @param newTtl New TTL.
-     * @param drExpireTime Explict DR expire time (if any).
+     * @param drExpireTime Explicit DR expire time (if any).
      * @param rmvVer Version for deferred delete.
      * @param drRes DR resolution result.
      * @param sndToDht Whether update should be propagated to DHT node.
@@ -61,6 +66,7 @@ public class GridCacheUpdateAtomicResult<K, V> {
     public GridCacheUpdateAtomicResult(boolean success,
         @Nullable V oldVal,
         @Nullable V newVal,
+        @Nullable EntryProcessorResult<?> res,
         long newTtl,
         long drExpireTime,
         @Nullable GridCacheVersion rmvVer,
@@ -69,6 +75,7 @@ public class GridCacheUpdateAtomicResult<K, V> {
         this.success = success;
         this.oldVal = oldVal;
         this.newVal = newVal;
+        this.res = res;
         this.newTtl = newTtl;
         this.drExpireTime = drExpireTime;
         this.rmvVer = rmvVer;
@@ -77,6 +84,13 @@ public class GridCacheUpdateAtomicResult<K, V> {
     }
 
     /**
+     * @return Value computed by the {@link EntryProcessor}.
+     */
+    @Nullable public EntryProcessorResult<?> computedResult() {
+        return res;
+    }
+
+    /**
      * @return Success flag.
      */
     public boolean success() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2c988e7..fec59b2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -34,6 +34,7 @@ import org.jetbrains.annotations.*;
 import sun.misc.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -306,14 +307,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @SuppressWarnings("unchecked")
     @Override public IgniteFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry,
         long ttl, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
-        return updateAllAsync0(F0.asMap(key, val), null, null, null, true, false, entry, ttl, filter);
+        return updateAllAsync0(F0.asMap(key, val),
+            null,
+            null,
+            null,
+            null,
+            true,
+            false,
+            entry,
+            ttl,
+            filter);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, long ttl,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
-        return updateAllAsync0(F0.asMap(key, val), null, null, null, false, false, entry, ttl, filter);
+        return updateAllAsync0(F0.asMap(key, val),
+            null,
+            null,
+            null,
+            null,
+            false,
+            false,
+            entry,
+            ttl,
+            filter);
     }
 
     /** {@inheritDoc} */
@@ -385,7 +404,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public IgniteFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) {
-        return updateAllAsync0(F.asMap(key, newVal), null, null, null, true, true, null, 0,
+        return updateAllAsync0(F.asMap(key, newVal),
+            null,
+            null,
+            null,
+            null,
+            true,
+            true,
+            null,
+            0,
             ctx.equalsPeekArray(oldVal));
     }
 
@@ -398,7 +425,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> putAllAsync(Map<? extends K, ? extends V> m,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
-        return updateAllAsync0(m, null, null, null, false, false, null, 0, filter);
+        return updateAllAsync0(m,
+            null,
+            null,
+            null,
+            null,
+            false,
+            false,
+            null,
+            0,
+            filter);
     }
 
     /** {@inheritDoc} */
@@ -410,7 +446,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @Override public IgniteFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) {
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
-        return updateAllAsync0(null, null, drMap, null, false, false, null, 0, null);
+        return updateAllAsync0(null,
+            null,
+            null,
+            drMap,
+            null,
+            false,
+            false,
+            null,
+            0,
+            null);
     }
 
     /** {@inheritDoc} */
@@ -421,16 +466,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /** {@inheritDoc} */
     @Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer)
         throws IgniteCheckedException {
+        /*
         return (R)updateAllAsync0(null,
             Collections.singletonMap(key, new GridCacheTransformComputeClosure<>(transformer)), null, null, true,
             false, null, 0, null).get();
+        */
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer,
         @Nullable GridCacheEntryEx<K, V> entry, long ttl) {
+        /*
         return updateAllAsync0(null, Collections.singletonMap(key, transformer), null, null, false, false, entry, ttl,
             null);
+        */
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
@@ -440,10 +493,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) {
+        /*
         if (F.isEmpty(m))
             return new GridFinishedFuture<Object>(ctx.kernalContext());
 
         return updateAllAsync0(null, m, null, null, false, false, null, 0, null);
+        */
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
@@ -579,11 +636,83 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             "GridCacheAtomicityMode.ATOMIC mode (use GridCacheAtomicityMode.TRANSACTIONAL instead)"));
     }
 
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        A.notNull(key, "key", entryProcessor, "entryProcessor");
+
+        if (keyCheck)
+            validateCacheKey(key);
+
+        ctx.denyOnLocalRead();
+
+        Map<? extends K, EntryProcessor> transformMap =
+            Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+
+        IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
+            transformMap,
+            args,
+            null,
+            null,
+            true,
+            false,
+            null,
+            -1L,
+            null);
+
+        return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
+            @Override public EntryProcessorResult<T> applyx(IgniteFuture<Map<K, EntryProcessorResult<T>>> fut)
+                throws IgniteCheckedException {
+                Map<K, EntryProcessorResult<T>> resMap = fut.get();
+
+                assert resMap != null;
+                assert resMap.size() == 1 : resMap.size();
+
+                return resMap.values().iterator().next();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+        final EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        A.notNull(keys, "keys", entryProcessor, "entryProcessor");
+
+        if (keyCheck)
+            validateCacheKeys(keys);
+
+        ctx.denyOnLocalRead();
+
+        Map<? extends K, EntryProcessor> transformMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+            @Override public EntryProcessor apply(K k) {
+                return entryProcessor;
+            }
+        });
+
+        IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
+            transformMap,
+            args,
+            null,
+            null,
+            true,
+            false,
+            null,
+            -1L,
+            null);
+
+        return fut;
+    }
+
     /**
      * Entry point for all public API put/transform methods.
      *
      * @param map Put map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed.
      * @param transformMap Transform map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed.
+     * @param invokeArgs Optional arguments for EntryProcessor.
      * @param drPutMap DR put map.
      * @param drRmvMap DR remove map.
      * @param retval Return value required flag.
@@ -595,7 +724,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private IgniteFuture updateAllAsync0(
         @Nullable final Map<? extends K, ? extends V> map,
-        @Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+        @Nullable final Map<? extends K, EntryProcessor> transformMap,
+        @Nullable Object[] invokeArgs,
         @Nullable final Map<? extends K, GridCacheDrInfo<V>> drPutMap,
         @Nullable final Map<? extends K, GridCacheVersion> drRmvMap,
         final boolean retval,
@@ -623,6 +753,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : drPutMap != null ?
                 drPutMap.keySet() : drRmvMap.keySet(),
             map != null ? map.values() : transformMap != null ? transformMap.values() : null,
+            invokeArgs,
             drPutMap != null ? drPutMap.values() : null,
             drRmvMap != null ? drRmvMap.values() : null,
             retval,
@@ -682,6 +813,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             keys != null ? keys : drMap.keySet(),
             null,
             null,
+            null,
             keys != null ? null : drMap.values(),
             retval,
             rawRetval,
@@ -692,8 +824,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             taskNameHash);
 
         return asyncOp(new CO<IgniteFuture<Object>>() {
-            @Override
-            public IgniteFuture<Object> apply() {
+            @Override public IgniteFuture<Object> apply() {
                 updateFut.map();
 
                 return updateFut;
@@ -858,11 +989,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         IgniteFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
 
         if (forceFut.isDone())
-            updateAllAsyncInternal0(nodeId, req, cached, completionCb);
+            updateAllAsyncInternal0(nodeId, req, completionCb);
         else {
             forceFut.listenAsync(new CI1<IgniteFuture<Object>>() {
                 @Override public void apply(IgniteFuture<Object> t) {
-                    updateAllAsyncInternal0(nodeId, req, cached, completionCb);
+                    updateAllAsyncInternal0(nodeId, req, completionCb);
                 }
             });
         }
@@ -873,13 +1004,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *
      * @param nodeId Node ID.
      * @param req Update request.
-     * @param cached Cached entry if updating single local entry.
      * @param completionCb Completion callback.
      */
     public void updateAllAsyncInternal0(
         UUID nodeId,
         GridNearAtomicUpdateRequest<K, V> req,
-        @Nullable GridCacheEntryEx<K, V> cached,
         CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb
     ) {
         GridNearAtomicUpdateResponse<K, V> res = new GridNearAtomicUpdateResponse<>(ctx.cacheId(), nodeId,
@@ -887,7 +1016,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         List<K> keys = req.keys();
 
-        assert !req.returnValue() || keys.size() == 1;
+        assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1);
 
         GridDhtAtomicUpdateFuture<K, V> dhtFut = null;
 
@@ -966,6 +1095,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                             deleted = updRes.deleted();
                             dhtFut = updRes.dhtFuture();
+
+                            if (req.operation() == TRANSFORM)
+                                retVal = new GridCacheReturn<>((Object)updRes.invokeResults(), true);
                         }
                         else {
                             UpdateSingleResult<K, V> updRes = updateSingle(node,
@@ -1075,8 +1207,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry
     ) throws GridCacheEntryRemovedException {
-        // Cannot update in batches during DR due to possible conflicts.
-        assert !req.returnValue(); // Should not request return values for putAll.
+        assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
+        assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
 
         if (!F.isEmpty(req.filter())) {
             try {
@@ -1092,11 +1224,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         int size = req.keys().size();
 
         Map<K, V> putMap = null;
-        Map<K, IgniteClosure<V, V>> transformMap = null;
+        Map<K, EntryProcessor<K, V, ?>> entryProcessorMap = null;
         Collection<K> rmvKeys = null;
         UpdateBatchResult<K, V> updRes = new UpdateBatchResult<>();
         List<GridDhtCacheEntry<K, V>> filtered = new ArrayList<>(size);
         GridCacheOperation op = req.operation();
+        Map<Object, Object> invokeResMap = op == TRANSFORM ? U.newHashMap(size) : null;
 
         int firstEntryIdx = 0;
 
@@ -1136,7 +1269,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (op == TRANSFORM) {
-                    IgniteClosure<V, V> transform = req.transformClosure(i);
+                    EntryProcessor<K, V, ?> entryProcessor = req.entryProcessor(i);
 
                     V old = entry.innerGet(
                         null,
@@ -1148,17 +1281,35 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         /*event*/true,
                         /*temporary*/true,
                         req.subjectId(),
-                        transform,
+                        entryProcessor,
                         taskName,
                         CU.<K, V>empty(),
                         null);
 
-                    if (transformMap == null)
-                        transformMap = new HashMap<>();
+                    if (entryProcessorMap == null)
+                        entryProcessorMap = new HashMap<>();
+
+                    entryProcessorMap.put(entry.key(), entryProcessor);
+
+                    CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old);
 
-                    transformMap.put(entry.key(), transform);
+                    V updated;
+                    CacheInvokeResult invokeRes;
 
-                    V updated = transform.apply(old);
+                    try {
+                        Object computed = entryProcessor.process(invokeEntry, req.invokeArguments());
+
+                        updated = ctx.unwrapTemporary(invokeEntry.getValue());
+
+                        invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed));
+                    }
+                    catch (Exception e) {
+                        invokeRes = new CacheInvokeResult<>(e);
+
+                        updated = old;
+                    }
+
+                    invokeResMap.put(entry.key(), invokeRes);
 
                     if (updated == null) {
                         if (intercept) {
@@ -1179,7 +1330,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 node,
                                 putMap,
                                 null,
-                                transformMap,
+                                entryProcessorMap,
                                 dhtFut,
                                 completionCb,
                                 req,
@@ -1192,7 +1343,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             firstEntryIdx = i + 1;
 
                             putMap = null;
-                            transformMap = null;
+                            entryProcessorMap = null;
 
                             filtered = new ArrayList<>();
                         }
@@ -1221,7 +1372,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 node,
                                 null,
                                 rmvKeys,
-                                transformMap,
+                                entryProcessorMap,
                                 dhtFut,
                                 completionCb,
                                 req,
@@ -1234,7 +1385,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             firstEntryIdx = i + 1;
 
                             rmvKeys = null;
-                            transformMap = null;
+                            entryProcessorMap = null;
 
                             filtered = new ArrayList<>();
                         }
@@ -1331,7 +1482,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 node,
                 putMap,
                 rmvKeys,
-                transformMap,
+                entryProcessorMap,
                 dhtFut,
                 completionCb,
                 req,
@@ -1346,6 +1497,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         updRes.dhtFuture(dhtFut);
 
+        updRes.invokeResult(invokeResMap);
+
         return updRes;
     }
 
@@ -1442,6 +1595,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
+        Map<K, EntryProcessorResult<?>> computedMap = null;
+
         // Avoid iterator creation.
         for (int i = 0; i < keys.size(); i++) {
             K k = keys.get(i);
@@ -1487,6 +1642,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     op,
                     writeVal,
                     newValBytes,
+                    req.invokeArguments(),
                     primary && storeEnabled(),
                     req.returnValue(),
                     expiry,
@@ -1524,16 +1680,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             newValBytes = null; // Value has been changed.
                         }
 
-                        IgniteClosure<V, V> transformC = null;
+                        EntryProcessor<K, V, ?> entryProcessor = null;
 
                         if (req.forceTransformBackups() && op == TRANSFORM)
-                            transformC = (IgniteClosure<V, V>)writeVal;
+                            entryProcessor = (EntryProcessor<K, V, ?>)writeVal;
 
                         if (!readersOnly) {
                             dhtFut.addWriteEntry(entry,
                                 updRes.newValue(),
                                 newValBytes,
-                                transformC,
+                                entryProcessor,
                                 updRes.newTtl(),
                                 expireTime,
                                 newDrVer);
@@ -1544,7 +1700,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 entry,
                                 updRes.newValue(),
                                 newValBytes,
-                                transformC,
+                                entryProcessor,
                                 ttl,
                                 expireTime);
                     }
@@ -1599,17 +1755,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     deleted.add(F.t(entry, updRes.removeVersion()));
                 }
 
-                // Create only once.
-                if (retVal == null) {
-                    Object ret = updRes.oldValue();
+                if (op == TRANSFORM) {
+                    assert req.returnValue();
 
-                    if (op == TRANSFORM && writeVal instanceof GridCacheTransformComputeClosure) {
-                        assert req.returnValue();
+                    if (retVal == null) {
+                        computedMap = U.newHashMap(keys.size());
 
-                        ret = ((GridCacheTransformComputeClosure<V, ?>)writeVal).returnValue();
+                        retVal = new GridCacheReturn<>((Object)computedMap, updRes.success());
                     }
 
-                    retVal = new GridCacheReturn<>(req.returnValue() ? ret : null, updRes.success());
+                    computedMap.put(k, updRes.computedResult());
+                }
+                else {
+                    // Create only once.
+                    if (retVal == null) {
+                        Object ret = updRes.oldValue();
+
+                        retVal = new GridCacheReturn<>(req.returnValue() ? ret : null, updRes.success());
+                    }
                 }
             }
             catch (IgniteCheckedException e) {
@@ -1628,7 +1791,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param node Originating node.
      * @param putMap Values to put.
      * @param rmvKeys Keys to remove.
-     * @param transformMap Transform closures.
+     * @param entryProcessorMap Entry processors.
      * @param dhtFut DHT update future if has backups.
      * @param completionCb Completion callback to invoke when DHT future is completed.
      * @param req Request.
@@ -1648,7 +1811,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         ClusterNode node,
         @Nullable Map<K, V> putMap,
         @Nullable Collection<K> rmvKeys,
-        @Nullable Map<K, IgniteClosure<V, V>> transformMap,
+        @Nullable Map<K, EntryProcessor<K, V, ?>> entryProcessorMap,
         @Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut,
         CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb,
         final GridNearAtomicUpdateRequest<K, V> req,
@@ -1740,6 +1903,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         op,
                         writeVal,
                         null,
+                        null,
                         false,
                         false,
                         expiry,
@@ -1784,13 +1948,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         byte[] valBytes = valBytesTuple.getIfMarshaled();
 
-                        IgniteClosure<V, V> transformC = transformMap == null ? null : transformMap.get(entry.key());
+                        EntryProcessor<K, V, ?> entryProcessor =
+                            entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
                         if (!batchRes.readersOnly())
                             dhtFut.addWriteEntry(entry,
                                 writeVal,
                                 valBytes,
-                                transformC,
+                                entryProcessor,
                                 updRes.newTtl(),
                                 -1,
                                 null);
@@ -1800,7 +1965,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 entry,
                                 writeVal,
                                 valBytes,
-                                transformC,
+                                entryProcessor,
                                 updRes.newTtl(),
                                 -1);
                     }
@@ -2086,6 +2251,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             req.operation(),
             req.keys(),
             vals,
+            req.invokeArguments(),
             drPutVals,
             drRmvVals,
             req.returnValue(),
@@ -2229,9 +2395,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         V val = req.value(i);
                         byte[] valBytes = req.valueBytes(i);
-                        IgniteClosure<V, V> transform = req.transformClosure(i);
+                        EntryProcessor<K, V, ?> entryProcessor = req.entryProcessor(i);
 
-                        GridCacheOperation op = transform != null ? TRANSFORM :
+                        GridCacheOperation op = entryProcessor != null ? TRANSFORM :
                             (val != null || valBytes != null) ?
                                 UPDATE :
                                 DELETE;
@@ -2247,8 +2413,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             nodeId,
                             nodeId,
                             op,
-                            op == TRANSFORM ? transform : val,
+                            op == TRANSFORM ? entryProcessor : val,
                             valBytes,
+                            op == TRANSFORM ? req.invokeArguments() : null,
                             /*write-through*/false,
                             /*retval*/false,
                             /*expiry policy*/null,
@@ -2487,12 +2654,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /** */
         private boolean readersOnly;
 
+        /** */
+        private Map<Object, Object> invokeRes;
+
         /**
          * @param entry Entry.
          * @param updRes Entry update result.
          * @param entries All entries.
          */
-        private void addDeleted(GridDhtCacheEntry<K, V> entry, GridCacheUpdateAtomicResult<K, V> updRes,
+        private void addDeleted(GridDhtCacheEntry<K, V> entry,
+            GridCacheUpdateAtomicResult<K, V> updRes,
             Collection<GridDhtCacheEntry<K, V>> entries) {
             if (updRes.removeVersion() != null) {
                 if (deleted == null)
@@ -2517,6 +2688,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
 
         /**
+         * @param invokeRes Result for invoke operation.
+         */
+        private void invokeResult(Map<Object, Object> invokeRes) {
+            this.invokeRes = invokeRes;
+        }
+
+        /**
+         * @return Result for invoke operation.
+         */
+        Map<Object, Object> invokeResults() {
+            return invokeRes;
+        }
+
+        /**
          * @param dhtFut DHT future.
          */
         private void dhtFuture(@Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 8602ae3..88cc8f6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -22,6 +22,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -197,7 +198,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
      * @param entry Entry to map.
      * @param val Value to write.
      * @param valBytes Value bytes.
-     * @param transformC Transform closure.
+     * @param entryProcessor Entry processor.
      * @param ttl TTL (optional).
      * @param drExpireTime DR expire time (optional).
      * @param drVer DR version (optional).
@@ -205,7 +206,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
     public void addWriteEntry(GridDhtCacheEntry<K, V> entry,
         @Nullable V val,
         @Nullable byte[] valBytes,
-        IgniteClosure<V, V> transformC,
+        EntryProcessor<K, V, ?> entryProcessor,
         long ttl,
         long drExpireTime,
         @Nullable GridCacheVersion drVer) {
@@ -236,7 +237,8 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
                         topVer,
                         forceTransformBackups,
                         this.updateReq.subjectId(),
-                        this.updateReq.taskNameHash());
+                        this.updateReq.taskNameHash(),
+                        forceTransformBackups ? this.updateReq.invokeArguments() : null);
 
                     mappings.put(nodeId, updateReq);
                 }
@@ -245,7 +247,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
                     entry.keyBytes(),
                     val,
                     valBytes,
-                    transformC,
+                    entryProcessor,
                     ttl,
                     drExpireTime,
                     drVer);
@@ -258,7 +260,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
      * @param entry Entry.
      * @param val Value.
      * @param valBytes Value bytes.
-     * @param transformC Transform closure.
+     * @param entryProcessor Entry processor..
      * @param ttl TTL for near cache update (optional).
      * @param expireTime Expire time for near cache update (optional).
      */
@@ -266,7 +268,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
         GridDhtCacheEntry<K, V> entry,
         @Nullable V val,
         @Nullable byte[] valBytes,
-        IgniteClosure<V, V> transformC,
+        EntryProcessor<K, V, ?> entryProcessor,
         long ttl,
         long expireTime) {
         GridCacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
@@ -294,7 +296,8 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
                     topVer,
                     forceTransformBackups,
                     this.updateReq.subjectId(),
-                    this.updateReq.taskNameHash());
+                    this.updateReq.taskNameHash(),
+                    forceTransformBackups ? this.updateReq.invokeArguments() : null);
 
                 mappings.put(nodeId, updateReq);
             }
@@ -308,7 +311,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
                 entry.keyBytes(),
                 val,
                 valBytes,
-                transformC,
+                entryProcessor,
                 ttl,
                 expireTime);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 2b68734..9441bd3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -10,7 +10,6 @@
 package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic;
 
 import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
@@ -20,6 +19,7 @@ import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
 import java.io.*;
 import java.nio.*;
 import java.util.*;
@@ -111,23 +111,30 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
     @GridDirectVersion(2)
     private boolean forceTransformBackups;
 
-    /** Transform closures. */
+    /** Entry processors. */
     @GridDirectTransient
-    private List<IgniteClosure<V, V>> transformClos;
+    private List<EntryProcessor<K, V, ?>> entryProcessors;
 
-    /** Transform closure bytes. */
+    /** Entry processors bytes. */
     @GridDirectCollection(byte[].class)
     @GridDirectVersion(2)
-    private List<byte[]> transformClosBytes;
+    private List<byte[]> entryProcessorsBytes;
 
     /** Near transform closures. */
     @GridDirectTransient
-    private List<IgniteClosure<V, V>> nearTransformClos;
+    private List<EntryProcessor<K, V, ?>> nearEntryProcessors;
 
     /** Near transform closures bytes. */
     @GridDirectCollection(byte[].class)
     @GridDirectVersion(2)
-    private List<byte[]> nearTransformClosBytes;
+    private List<byte[]> nearEntryProcessorsBytes;
+
+    /** Optional arguments for entry processor. */
+    @GridDirectTransient
+    private Object[] invokeArgs;
+
+    /** Filter bytes. */
+    private byte[][] invokeArgsBytes;
 
     /** Subject ID. */
     @GridDirectVersion(3)
@@ -151,6 +158,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
      * @param nodeId Node ID.
      * @param futVer Future version.
      * @param writeVer Write version for cache values.
+     * @param invokeArgs Optional arguments for entry processor.
      * @param syncMode Cache write synchronization mode.
      * @param topVer Topology version.
      * @param forceTransformBackups Force transform backups flag.
@@ -166,8 +174,11 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         long topVer,
         boolean forceTransformBackups,
         UUID subjId,
-        int taskNameHash
+        int taskNameHash,
+        Object[] invokeArgs
     ) {
+        assert invokeArgs == null || forceTransformBackups;
+
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futVer = futVer;
@@ -177,13 +188,14 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         this.forceTransformBackups = forceTransformBackups;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+        this.invokeArgs = invokeArgs;
 
         keys = new ArrayList<>();
         keyBytes = new ArrayList<>();
 
         if (forceTransformBackups) {
-            transformClos = new ArrayList<>();
-            transformClosBytes = new ArrayList<>();
+            entryProcessors = new ArrayList<>();
+            entryProcessorsBytes = new ArrayList<>();
         }
         else {
             vals = new ArrayList<>();
@@ -203,7 +215,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
      * @param keyBytes Key bytes, if key was already serialized.
      * @param val Value, {@code null} if should be removed.
      * @param valBytes Value bytes, {@code null} if should be removed.
-     * @param transformC Transform closure.
+     * @param entryProcessor Entry processor.
      * @param ttl TTL (optional).
      * @param drExpireTime DR expire time (optional).
      * @param drVer DR version (optional).
@@ -212,15 +224,15 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         @Nullable byte[] keyBytes,
         @Nullable V val,
         @Nullable byte[] valBytes,
-        IgniteClosure<V, V> transformC,
+        EntryProcessor<K, V, ?> entryProcessor,
         long ttl,
         long drExpireTime,
         @Nullable GridCacheVersion drVer) {
         keys.add(key);
         this.keyBytes.add(keyBytes);
 
-        if (forceTransformBackups && transformC != null)
-            transformClos.add(transformC);
+        if (forceTransformBackups && entryProcessor != null)
+            entryProcessors.add(entryProcessor);
         else {
             vals.add(val);
             this.valBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null);
@@ -270,7 +282,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
      * @param keyBytes Key bytes, if key was already serialized.
      * @param val Value, {@code null} if should be removed.
      * @param valBytes Value bytes, {@code null} if should be removed.
-     * @param transformC Transform closure.
+     * @param entryProcessor Entry processor.
      * @param ttl TTL.
      * @param expireTime Expire time.
      */
@@ -278,7 +290,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         @Nullable byte[] keyBytes,
         @Nullable V val,
         @Nullable byte[] valBytes,
-        IgniteClosure<V, V> transformC,
+        EntryProcessor<K, V, ?> entryProcessor,
         long ttl,
         long expireTime)
     {
@@ -287,8 +299,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
             nearKeyBytes = new ArrayList<>();
 
             if (forceTransformBackups) {
-                nearTransformClos = new ArrayList<>();
-                nearTransformClosBytes = new ArrayList<>();
+                nearEntryProcessors = new ArrayList<>();
+                nearEntryProcessorsBytes = new ArrayList<>();
             }
             else {
                 nearVals = new ArrayList<>();
@@ -300,9 +312,9 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         nearKeyBytes.add(keyBytes);
 
         if (forceTransformBackups) {
-            assert transformC != null;
+            assert entryProcessor != null;
 
-            nearTransformClos.add(transformC);
+            nearEntryProcessors.add(entryProcessor);
         }
         else {
             nearVals.add(val);
@@ -465,10 +477,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
     /**
      * @param idx Key index.
-     * @return Transform closure.
+     * @return Entry processor.
      */
-    @Nullable public IgniteClosure<V, V> transformClosure(int idx) {
-        return transformClos == null ? null : transformClos.get(idx);
+    @Nullable public EntryProcessor<K, V, ?> entryProcessor(int idx) {
+        return entryProcessors == null ? null : entryProcessors.get(idx);
     }
 
     /**
@@ -497,8 +509,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
      * @param idx Key index.
      * @return Transform closure.
      */
-    @Nullable public IgniteClosure<V, V> nearTransformClosure(int idx) {
-        return nearTransformClos == null ? null : nearTransformClos.get(idx);
+    @Nullable public EntryProcessor<K, V, ?> nearEntryProcessor(int idx) {
+        return nearEntryProcessors == null ? null : nearEntryProcessors.get(idx);
     }
 
     /**
@@ -615,6 +627,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         return -1L;
     }
 
+    /**
+     * @return Optional arguments for entry processor.
+     */
+    @Nullable public Object[] invokeArguments() {
+        return invokeArgs;
+    }
+
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
@@ -623,14 +642,17 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         keyBytes = marshalCollection(keys, ctx);
         valBytes = marshalValuesCollection(vals, ctx);
 
-        if (forceTransformBackups)
-            transformClosBytes = marshalCollection(transformClos, ctx);
+        if (forceTransformBackups) {
+            invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx);
+
+            entryProcessorsBytes = marshalCollection(entryProcessors, ctx);
+        }
 
         nearKeyBytes = marshalCollection(nearKeys, ctx);
         nearValBytes = marshalValuesCollection(nearVals, ctx);
 
         if (forceTransformBackups)
-            nearTransformClosBytes = marshalCollection(nearTransformClos, ctx);
+            nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, ctx);
     }
 
     /** {@inheritDoc} */
@@ -640,14 +662,17 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         keys = unmarshalCollection(keyBytes, ctx, ldr);
         vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
 
-        if (forceTransformBackups)
-            transformClos = unmarshalCollection(transformClosBytes, ctx, ldr);
+        if (forceTransformBackups) {
+            entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+            invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+        }
 
         nearKeys = unmarshalCollection(nearKeyBytes, ctx, ldr);
         nearVals = unmarshalValueBytesCollection(nearValBytes, ctx, ldr);
 
         if (forceTransformBackups)
-            nearTransformClos = unmarshalCollection(nearTransformClosBytes, ctx, ldr);
+            nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
     }
 
     /** {@inheritDoc} */
@@ -683,10 +708,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         _clone.nearVals = nearVals;
         _clone.nearValBytes = nearValBytes;
         _clone.forceTransformBackups = forceTransformBackups;
-        _clone.transformClos = transformClos;
-        _clone.transformClosBytes = transformClosBytes;
-        _clone.nearTransformClos = nearTransformClos;
-        _clone.nearTransformClosBytes = nearTransformClosBytes;
+        _clone.entryProcessors = entryProcessors;
+        _clone.entryProcessorsBytes = entryProcessorsBytes;
+        _clone.nearEntryProcessors = nearEntryProcessors;
+        _clone.nearEntryProcessorsBytes = nearEntryProcessorsBytes;
         _clone.nearExpireTimes = nearExpireTimes;
         _clone.nearTtls = nearTtls;
         _clone.subjId = subjId;
@@ -872,7 +897,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                         if (commState.cur == NULL)
                             commState.cur = commState.it.next();
 
-                        if (!commState.putValueBytes((GridCacheValueBytes)commState.cur))
+                        if (!commState.putValueBytes((GridCacheValueBytes) commState.cur))
                             return false;
 
                         commState.cur = NULL;
@@ -893,12 +918,12 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                 commState.idx++;
 
             case 16:
-                if (nearTransformClosBytes != null) {
+                if (nearEntryProcessorsBytes != null) {
                     if (commState.it == null) {
-                        if (!commState.putInt(nearTransformClosBytes.size()))
+                        if (!commState.putInt(nearEntryProcessorsBytes.size()))
                             return false;
 
-                        commState.it = nearTransformClosBytes.iterator();
+                        commState.it = nearEntryProcessorsBytes.iterator();
                     }
 
                     while (commState.it.hasNext() || commState.cur != NULL) {
@@ -920,12 +945,12 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                 commState.idx++;
 
             case 17:
-                if (transformClosBytes != null) {
+                if (entryProcessorsBytes != null) {
                     if (commState.it == null) {
-                        if (!commState.putInt(transformClosBytes.size()))
+                        if (!commState.putInt(entryProcessorsBytes.size()))
                             return false;
 
-                        commState.it = transformClosBytes.iterator();
+                        commState.it = entryProcessorsBytes.iterator();
                     }
 
                     while (commState.it.hasNext() || commState.cur != NULL) {
@@ -1213,8 +1238,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                 }
 
                 if (commState.readSize >= 0) {
-                    if (nearTransformClosBytes == null)
-                        nearTransformClosBytes = new ArrayList<>(commState.readSize);
+                    if (nearEntryProcessorsBytes == null)
+                        nearEntryProcessorsBytes = new ArrayList<>(commState.readSize);
 
                     for (int i = commState.readItems; i < commState.readSize; i++) {
                         byte[] _val = commState.getByteArray();
@@ -1222,7 +1247,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                         if (_val == BYTE_ARR_NOT_READ)
                             return false;
 
-                        nearTransformClosBytes.add((byte[])_val);
+                        nearEntryProcessorsBytes.add((byte[]) _val);
 
                         commState.readItems++;
                     }
@@ -1242,8 +1267,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                 }
 
                 if (commState.readSize >= 0) {
-                    if (transformClosBytes == null)
-                        transformClosBytes = new ArrayList<>(commState.readSize);
+                    if (entryProcessorsBytes == null)
+                        entryProcessorsBytes = new ArrayList<>(commState.readSize);
 
                     for (int i = commState.readItems; i < commState.readSize; i++) {
                         byte[] _val = commState.getByteArray();
@@ -1251,7 +1276,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                         if (_val == BYTE_ARR_NOT_READ)
                             return false;
 
-                        transformClosBytes.add((byte[])_val);
+                        entryProcessorsBytes.add((byte[])_val);
 
                         commState.readItems++;
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 6b233e7..1bc013a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -70,6 +70,9 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Collection<?> vals;
 
+    /** Optional arguments for entry processor. */
+    private Object[] invokeArgs;
+
     /** DR put values. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Collection<GridCacheDrInfo<V>> drPutVals;
@@ -158,6 +161,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
      * @param syncMode Write synchronization mode.
      * @param keys Keys to update.
      * @param vals Values or transform closure.
+     * @param invokeArgs Optional arguments for entry processor.
      * @param drPutVals DR put values (optional).
      * @param drRmvVals DR remove values (optional).
      * @param retval Return value require flag.
@@ -175,6 +179,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         GridCacheOperation op,
         Collection<? extends K> keys,
         @Nullable Collection<?> vals,
+        @Nullable Object[] invokeArgs,
         @Nullable Collection<GridCacheDrInfo<V>> drPutVals,
         @Nullable Collection<GridCacheVersion> drRmvVals,
         final boolean retval,
@@ -186,6 +191,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         int taskNameHash
     ) {
         super(cctx.kernalContext());
+
         this.rawRetval = rawRetval;
 
         assert vals == null || vals.size() == keys.size();
@@ -200,6 +206,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         this.op = op;
         this.keys = keys;
         this.vals = vals;
+        this.invokeArgs = invokeArgs;
         this.drPutVals = drPutVals;
         this.drRmvVals = drRmvVals;
         this.retval = retval;
@@ -366,7 +373,12 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                 if (res.error() != null)
                     addFailedKeys(req.keys(), res.error());
                 else {
-                    if (req.fastMap() && req.hasPrimary())
+                    if (op == TRANSFORM) {
+                        assert !req.fastMap();
+
+                        addInvokeResults(res.returnValue());
+                    }
+                    else if (req.fastMap() && req.hasPrimary())
                         opRes = res.returnValue();
                 }
 
@@ -464,7 +476,9 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
      * @param remap Flag indicating if this is partial remap for this future.
      * @param oldNodeId Old node ID if was remap.
      */
-    private void map0(GridDiscoveryTopologySnapshot topSnapshot, Collection<? extends K> keys, boolean remap,
+    private void map0(GridDiscoveryTopologySnapshot topSnapshot,
+        Collection<? extends K> keys,
+        boolean remap,
         @Nullable UUID oldNodeId) {
         assert oldNodeId == null || remap;
 
@@ -560,6 +574,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                 retval,
                 op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP),
                 expiryPlc,
+                invokeArgs,
                 filter,
                 subjId,
                 taskNameHash);
@@ -666,6 +681,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                             retval,
                             op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP),
                             expiryPlc,
+                            invokeArgs,
                             filter,
                             subjId,
                             taskNameHash);
@@ -820,6 +836,22 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
     }
 
     /**
+     * @param ret Result from single node.
+     */
+    private synchronized void addInvokeResults(GridCacheReturn<Object> ret) {
+        assert op == TRANSFORM : op;
+        assert ret.value() instanceof Map : ret.value();
+
+        if (opRes != null) {
+            Map<Object, Object> map = (Map<Object, Object>)opRes.value();
+
+            map.putAll((Map<Object, Object>)ret.value());
+        }
+        else
+            opRes = ret;
+    }
+
+    /**
      * @param failedKeys Failed keys.
      * @param err Error cause.
      * @return Root {@link GridCachePartialUpdateException}.


Mime
View raw message