ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [06/50] [abbrv] incubator-ignite git commit: # ignite-44
Date Mon, 19 Jan 2015 14:20:43 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index eaf0173..650f0ab 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -25,6 +25,7 @@ import org.jetbrains.annotations.*;
 import sun.misc.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -106,6 +107,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (V)updateAllInternal(UPDATE,
             Collections.singleton(key),
             Collections.singleton(val),
+            null,
             expiryPerCall(),
             true,
             false,
@@ -127,6 +129,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (Boolean)updateAllInternal(UPDATE,
             Collections.singleton(key),
             Collections.singleton(val),
+            null,
             expiryPerCall(),
             false,
             false,
@@ -145,6 +148,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (Boolean)updateAllInternal(UPDATE,
             Collections.singleton(key),
             Collections.singleton(val),
+            null,
             expiryPerCall(),
             false,
             false,
@@ -163,7 +167,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
         ctx.denyOnLocalRead();
 
-        return updateAllAsync0(F0.asMap(key, val), null, true, false, ttl, filter);
+        return updateAllAsync0(F0.asMap(key, val),
+            null,
+            null,
+            true,
+            false,
+            filter);
     }
 
     /** {@inheritDoc} */
@@ -177,7 +186,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
         ctx.denyOnLocalRead();
 
-        return updateAllAsync0(F0.asMap(key, val), null, false, false, ttl, filter);
+        return updateAllAsync0(F0.asMap(key, val),
+            null,
+            null,
+            false,
+            false,
+            filter);
     }
 
     /** {@inheritDoc} */
@@ -242,6 +256,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (GridCacheReturn<V>)updateAllInternal(UPDATE,
             Collections.singleton(key),
             Collections.singleton(newVal),
+            null,
             expiryPerCall(),
             true,
             true,
@@ -259,6 +274,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (GridCacheReturn<V>)updateAllInternal(DELETE,
             Collections.singleton(key),
             null,
+            null,
             expiryPerCall(),
             true,
             true,
@@ -283,7 +299,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
         ctx.denyOnLocalRead();
 
-        return updateAllAsync0(F.asMap(key, newVal), null, true, true, 0,
+        return updateAllAsync0(F.asMap(key, newVal),
+            null,
+            null,
+            true,
+            true,
             ctx.equalsPeekArray(oldVal));
     }
 
@@ -295,6 +315,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         updateAllInternal(UPDATE,
             m.keySet(),
             m.values(),
+            null,
             expiryPerCall(),
             false,
             false,
@@ -307,11 +328,17 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         ctx.denyOnLocalRead();
 
-        return updateAllAsync0(m, null, false, false, 0, filter);
+        return updateAllAsync0(m,
+            null,
+            null,
+            false,
+            false,
+            filter);
     }
 
     /** {@inheritDoc} */
     @Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException {
+        /*
         ctx.denyOnLocalRead();
 
         updateAllInternal(TRANSFORM,
@@ -322,12 +349,16 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
             false,
             null,
             ctx.isStoreEnabled());
+            */
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer)
         throws IgniteCheckedException {
+        /*
         return (R)updateAllInternal(TRANSFORM,
             Collections.singleton(key),
             Collections.singleton(new GridCacheTransformComputeClosure<>(transformer)),
@@ -336,6 +367,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
             false,
             null,
             ctx.isStoreEnabled());
+            */
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
@@ -343,14 +377,19 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         IgniteClosure<V, V> transformer,
         @Nullable GridCacheEntryEx<K, V> entry,
         long ttl) {
+        /*
         ctx.denyOnLocalRead();
 
         return updateAllAsync0(null, Collections.singletonMap(key, transformer), false, false, ttl, null);
+        */
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("ConstantConditions")
     @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException {
+        /*
         ctx.denyOnLocalRead();
 
         if (F.isEmpty(m))
@@ -364,16 +403,23 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
             false,
             null,
             ctx.isStoreEnabled());
+        */
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) {
+        /*
         ctx.denyOnLocalRead();
 
         if (F.isEmpty(m))
             return new GridFinishedFuture<Object>(ctx.kernalContext());
 
         return updateAllAsync0(null, m, false, false, 0, null);
+        */
+        // TODO IGNITE-44.
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
@@ -386,6 +432,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (V)updateAllInternal(DELETE,
             Collections.singleton(key),
             null,
+            null,
             expiryPerCall(),
             true,
             false,
@@ -412,6 +459,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         updateAllInternal(DELETE,
             keys,
             null,
+            null,
             expiryPerCall(),
             false,
             false,
@@ -439,6 +487,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (Boolean)updateAllInternal(DELETE,
             Collections.singleton(key),
             null,
+            null,
             expiryPerCall(),
             false,
             false,
@@ -467,6 +516,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return (Boolean)updateAllInternal(DELETE,
             Collections.singleton(key),
             null,
+            null,
             expiryPerCall(),
             false,
             false,
@@ -669,31 +719,107 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         return getAllAsync(keys, null, false, subjId, taskName, deserializePortable, false, expiry, filter).get();
     }
 
+    /** {@inheritDoc} */
+    @Override public <T> EntryProcessorResult<T> invoke(K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) throws IgniteCheckedException {
+        return invokeAsync(key, entryProcessor, args).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) throws IgniteCheckedException {
+        return invokeAllAsync(keys, entryProcessor, args).get();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args) throws EntryProcessorException {
+        A.notNull(key, "key", entryProcessor, "entryProcessor");
+
+        if (keyCheck)
+            validateCacheKey(key);
+
+        ctx.denyOnLocalRead();
+
+        Map<? extends K, EntryProcessor> invokeMap =
+            Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+
+        IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
+            invokeMap,
+            args,
+            true,
+            false,
+            null);
+
+        return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
+            @Override public EntryProcessorResult<T> applyx(IgniteFuture<Map<K, EntryProcessorResult<T>>> fut)
+                throws IgniteCheckedException {
+                Map<K, EntryProcessorResult<T>> resMap = fut.get();
+
+                assert resMap != null;
+                assert resMap.size() == 1 : resMap.size();
+
+                return resMap.values().iterator().next();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
+        Set<? extends K> keys,
+        final EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        A.notNull(keys, "keys", entryProcessor, "entryProcessor");
+
+        if (keyCheck)
+            validateCacheKeys(keys);
+
+        ctx.denyOnLocalRead();
+
+        Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+            @Override public EntryProcessor apply(K k) {
+                return entryProcessor;
+            }
+        });
+
+        return updateAllAsync0(null,
+            invokeMap,
+            args,
+            true,
+            false,
+            null);
+    }
+
     /**
      * Entry point for public API update methods.
      *
-     * @param map Put map. Either {@code map} or {@code transformMap} should be passed.
-     * @param transformMap Transform map. Either {@code map} or {@code transformMap} should be passed.
+     * @param map Put map. Either {@code map} or {@code invokeMap} should be passed.
+     * @param invokeMap Transform map. Either {@code map} or {@code invokeMap} should be passed.
+     * @param invokeArgs Optional arguments for EntryProcessor.
      * @param retval Return value required flag.
      * @param rawRetval Return {@code GridCacheReturn} instance.
-     * @param ttl Entry time-to-live.
      * @param filter Cache entry filter for atomic updates.
      * @return Completion future.
      */
     private IgniteFuture updateAllAsync0(
         @Nullable final Map<? extends K, ? extends V> map,
-        @Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+        @Nullable final Map<? extends K, EntryProcessor> invokeMap,
+        @Nullable final Object[] invokeArgs,
         final boolean retval,
         final boolean rawRetval,
-        final long ttl,
         @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
     ) {
-        final GridCacheOperation op = transformMap != null ? TRANSFORM : UPDATE;
+        final GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE;
 
         final Collection<? extends K> keys =
-            map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : null;
+            map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : null;
 
-        final Collection<?> vals = map != null ? map.values() : transformMap != null ? transformMap.values() : null;
+        final Collection<?> vals = map != null ? map.values() : invokeMap != null ? invokeMap.values() : null;
 
         final boolean storeEnabled = ctx.isStoreEnabled();
 
@@ -704,6 +830,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                 return updateAllInternal(op,
                     keys,
                     vals,
+                    invokeArgs,
                     expiry,
                     retval,
                     rawRetval,
@@ -737,6 +864,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                 return updateAllInternal(DELETE,
                     keys,
                     null,
+                    null,
                     expiryPlc,
                     retval,
                     rawRetval,
@@ -747,11 +875,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
     }
 
     /**
-     * Entry point for all public update methods (put, remove, transform).
+     * Entry point for all public update methods (put, remove, invoke).
      *
      * @param op Operation.
      * @param keys Keys.
      * @param vals Values.
+     * @param invokeArgs Optional arguments for EntryProcessor.
      * @param expiryPlc Expiry policy.
      * @param retval Return value required flag.
      * @param rawRetval Return {@code GridCacheReturn} instance.
@@ -764,6 +893,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
     private Object updateAllInternal(GridCacheOperation op,
         Collection<? extends K> keys,
         @Nullable Iterable<?> vals,
+        @Nullable Object[] invokeArgs,
         @Nullable ExpiryPolicy expiryPlc,
         boolean retval,
         boolean rawRetval,
@@ -784,9 +914,15 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         UUID subjId = ctx.subjectIdPerCall(null);
 
         if (storeEnabled && keys.size() > 1) {
-            updateWithBatch(op, keys, vals, expiryPlc, ver, filter, subjId, taskName);
-
-            return null;
+            return updateWithBatch(op,
+                keys,
+                vals,
+                invokeArgs,
+                expiryPlc,
+                ver,
+                filter,
+                subjId,
+                taskName);
         }
 
         Iterator<?> valsIter = vals != null ? vals.iterator() : null;
@@ -809,10 +945,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                 try {
                     entry = entryEx(key);
 
-                    IgniteBiTuple<Boolean, V> t = entry.innerUpdateLocal(
+                    IgniteBiTuple<Boolean, Object> t = entry.innerUpdateLocal(
                         ver,
                         val == null ? DELETE : op,
                         val,
+                        invokeArgs,
                         storeEnabled,
                         retval,
                         expiryPlc,
@@ -823,16 +960,23 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                         subjId,
                         taskName);
 
-                    if (res == null) {
-                        if (op == TRANSFORM && val instanceof GridCacheTransformComputeClosure) {
-                            assert retval;
+                    if (op == TRANSFORM) {
+                        assert t.get2() instanceof EntryProcessorResult : t.get2();
+
+                        Map<K, EntryProcessorResult> computedMap;
 
-                            res = new IgniteBiTuple<>(t.get1(),
-                                ((GridCacheTransformComputeClosure<V, ?>)val).returnValue());
+                        if (res == null) {
+                            computedMap = U.newHashMap(keys.size());
+
+                            res = new IgniteBiTuple<>(true, computedMap);
                         }
                         else
-                            res = t;
+                            computedMap = (Map<K, EntryProcessorResult>)res.get2();
+
+                        computedMap.put(key, (EntryProcessorResult)t.getValue());
                     }
+                    else if (res == null)
+                        res = t;
 
                     break; // While.
                 }
@@ -872,18 +1016,21 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
      * @param op Operation.
      * @param keys Keys.
      * @param vals Values.
+     * @param invokeArgs Optional arguments for EntryProcessor.
      * @param expiryPlc Expiry policy.
      * @param ver Cache version.
      * @param filter Optional filter.
      * @param subjId Subject ID.
      * @param taskName Task name.
      * @throws GridCachePartialUpdateException If update failed.
+     * @return Results map for invoke operation.
      */
     @SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"})
-    private void updateWithBatch(
+    private Map<K, EntryProcessorResult> updateWithBatch(
         GridCacheOperation op,
         Collection<? extends K> keys,
         @Nullable Iterable<?> vals,
+        @Nullable Object[] invokeArgs,
         @Nullable ExpiryPolicy expiryPlc,
         GridCacheVersion ver,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
@@ -896,7 +1043,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
             int size = locked.size();
 
             Map<K, V> putMap = null;
+
             Collection<K> rmvKeys = null;
+
+            Map<K, EntryProcessorResult> invokeResMap =
+                op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null;
+
             List<GridCacheEntryEx<K, V>> filtered = new ArrayList<>(size);
 
             GridCachePartialUpdateException err = null;
@@ -933,7 +1085,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                     }
 
                     if (op == TRANSFORM) {
-                        IgniteClosure<V, V> transform = (IgniteClosure<V, V>)val;
+                        EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)val;
 
                         V old = entry.innerGet(null,
                             /*swap*/true,
@@ -944,12 +1096,30 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                             /**event*/true,
                             /**temporary*/true,
                             subjId,
-                            transform,
+                            entryProcessor,
                             taskName,
                             CU.<K, V>empty(),
                             null);
 
-                        V updated = transform.apply(old);
+                        CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old);
+
+                        V updated;
+                        CacheInvokeResult invokeRes;
+
+                        try {
+                            Object computed = entryProcessor.process(invokeEntry, invokeArgs);
+
+                            updated = ctx.unwrapTemporary(invokeEntry.getValue());
+
+                            invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed));
+                        }
+                        catch (Exception e) {
+                            invokeRes = new CacheInvokeResult<>(e);
+
+                            updated = old;
+                        }
+
+                        invokeResMap.put(entry.key(), invokeRes);
 
                         if (updated == null) {
                             if (intercept) {
@@ -1107,6 +1277,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
             if (err != null)
                 throw err;
+
+            return invokeResMap;
         }
         finally {
             unlockEntries(locked);
@@ -1179,10 +1351,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
                 assert writeVal != null || op == DELETE : "null write value found.";
 
-                IgniteBiTuple<Boolean, V> t = entry.innerUpdateLocal(
+                IgniteBiTuple<Boolean, Object> t = entry.innerUpdateLocal(
                     ver,
                     op,
                     writeVal,
+                    null,
                     false,
                     false,
                     expiryPlc,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
index 6fb77a1..bfd9359 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1194,7 +1194,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
                         /*event*/recordEvt,
                         /*temporary*/true,
                         /*subjId*/subjId,
-                        /**closure name */recordEvt ? F.first(txEntry.entryProcessors()) : null,
+                        /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null,
                         resolveTaskName(),
                         CU.<K, V>empty(),
                         null);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 34938d5..1680724 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1768,7 +1768,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     @Override public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync(
         GridCacheContext<K, V> cacheCtx,
         boolean retval,
-        @Nullable Map<? extends K, EntryProcessor> map,
+        @Nullable Map<? extends K, EntryProcessor<K, V, Object>> map,
         Object... invokeArgs
     ) {
         return (IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>>)putAllAsync0(cacheCtx,
@@ -1829,7 +1829,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         @Nullable ExpiryPolicy expiryPlc,
         boolean implicit,
         @Nullable Map<? extends K, ? extends V> lookup,
-        @Nullable Map<? extends K, EntryProcessor> invokeMap,
+        @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap,
         @Nullable Object[] invokeArgs,
         boolean retval,
         boolean lockOnly,
@@ -1992,7 +1992,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 break; // While.
                             }
 
-                            GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+                            final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
                                 entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
 
                             txEntry = addEntry(op,
@@ -2019,7 +2019,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 txEntry.markValid();
 
                                 if (old == null) {
-                                    if (retval && !readThrough) {
+                                    boolean load = retval && !readThrough;
+
+                                    // Check for transform here to avoid map creation.
+                                    load |= (op == TRANSFORM && keys.size() == 1);
+
+                                    if (load) {
                                         // If return value is required, then we know for sure that there is only
                                         // one key in the keys collection.
                                         assert keys.size() == 1;
@@ -2035,7 +2040,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                                         log.debug("Loaded value from remote node [key=" + k + ", val=" +
                                                             v + ']');
 
-                                                    ret.set(v, true);
+                                                    if (op == TRANSFORM) {
+                                                        IgniteTxEntry<K, V> e =
+                                                            entry(new IgniteTxKey<>(k, cacheCtx.cacheId()));
+
+                                                        assert e != null && e.op() == TRANSFORM : e;
+
+                                                        addInvokeResult(e, v, ret);
+                                                    }
+                                                    else
+                                                        ret.set(v, true);
                                                 }
                                             });
 
@@ -2130,6 +2144,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             drVer);
 
                         enlisted.add(key);
+
+                        if (txEntry.op() == TRANSFORM)
+                            addInvokeResult(txEntry, txEntry.value(), ret);
                     }
 
                     if (!pessimistic()) {
@@ -2137,6 +2154,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                         if (retval)
                             ret.set(v, true);
+                        else
+                            ret.success(true);
                     }
                 }
             }
@@ -2155,11 +2174,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 missedForInvoke,
                 deserializePortables(cacheCtx),
                 new CI2<K, V>() {
-                    @Override public void apply(K k, V v) {
+                    @Override public void apply(K key, V val) {
                         if (log.isDebugEnabled())
-                            log.debug("Loaded value from remote node [key=" + k + ", val=" + v + ']');
+                            log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
+
+                        IgniteTxEntry<K, V> e = entry(new IgniteTxKey<>(key, cacheCtx.cacheId()));
 
-                        addInvokeResult(entry(new IgniteTxKey<>(k, cacheCtx.cacheId())), v, ret);
+                        assert e != null && e.op() == TRANSFORM : e;
+
+                        addInvokeResult(e, val, ret);
                     }
                 });
 
@@ -2373,17 +2396,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     private IgniteFuture putAllAsync0(
         final GridCacheContext<K, V> cacheCtx,
         @Nullable Map<? extends K, ? extends V> map,
-        @Nullable Map<? extends K, EntryProcessor> invokeMap,
+        @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap,
         @Nullable final Object[] invokeArgs,
         @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap,
         final boolean retval,
         @Nullable GridCacheEntryEx<K, V> cached,
         @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        assert filter == null || invokeMap == null;
+
         cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT);
 
         // Cached entry may be passed only from entry wrapper.
         final Map<K, V> map0;
-        final Map<K, EntryProcessor> invokeMap0;
+        final Map<K, EntryProcessor<K, V, Object>> invokeMap0;
 
         if (drMap != null) {
             assert map == null;
@@ -2419,7 +2444,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 invokeMap0 = U.newHashMap(invokeMap.size());
 
                 try {
-                    for (Map.Entry<? extends K, EntryProcessor> e : invokeMap.entrySet()) {
+                    for (Map.Entry<? extends K, EntryProcessor<K, V, Object>> e : invokeMap.entrySet()) {
                         K key = (K)cacheCtx.marshalToPortable(e.getKey());
 
                         invokeMap0.put(key, e.getValue());
@@ -2434,7 +2459,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         }
         else {
             map0 = (Map<K, V>)map;
-            invokeMap0 = (Map<K, EntryProcessor>)invokeMap;
+            invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
         }
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
index 8a485b6..12680f3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -96,7 +96,7 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> {
     public <T> IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync(
         GridCacheContext<K, V> cacheCtx,
         boolean retval,
-        Map<? extends K, EntryProcessor> map,
+        Map<? extends K, EntryProcessor<K, V, Object>> map,
         Object... invokeArgs);
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
index a21c8fc..89bcbe6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDataManager.java
@@ -34,6 +34,7 @@ import org.gridgain.grid.util.worker.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
 import java.io.*;
 import java.nio.*;
 import java.util.*;
@@ -1101,7 +1102,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
 
         // No affinity key present, just concat and return.
         if (colocatedKey.affinityKey() == null) {
-            dataCachePrj.transform(colocatedKey, new UpdateClosure(startOff, data));
+            dataCachePrj.invoke(colocatedKey, new UpdateProcessor(startOff, data));
 
             return;
         }
@@ -1125,16 +1126,16 @@ public class GridGgfsDataManager extends GridGgfsManager {
 
             boolean hasVal = false;
 
-            UpdateClosure transformClos = new UpdateClosure(startOff, data);
+            UpdateProcessor transformClos = new UpdateProcessor(startOff, data);
 
             if (vals.get(colocatedKey) != null) {
-                dataCachePrj.transform(colocatedKey, transformClos);
+                dataCachePrj.invoke(colocatedKey, transformClos);
 
                 hasVal = true;
             }
 
             if (vals.get(key) != null) {
-                dataCachePrj.transform(key, transformClos);
+                dataCachePrj.invoke(key, transformClos);
 
                 hasVal = true;
             }
@@ -1570,7 +1571,8 @@ public class GridGgfsDataManager extends GridGgfsManager {
      * Helper closure to update data in cache.
      */
     @GridInternal
-    private static final class UpdateClosure implements IgniteClosure<byte[], byte[]>, Externalizable {
+    private static final class UpdateProcessor implements EntryProcessor<GridGgfsBlockKey, byte[], Void>,
+        Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1584,7 +1586,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
          * Empty constructor required for {@link Externalizable}.
          *
          */
-        public UpdateClosure() {
+        public UpdateProcessor() {
             // No-op.
         }
 
@@ -1594,7 +1596,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
          * @param start Start position in the block to write new data from.
          * @param data Data block to write into cache.
          */
-        private UpdateClosure(int start, byte[] data) {
+        private UpdateProcessor(int start, byte[] data) {
             assert start >= 0;
             assert data != null;
             assert start + data.length >= 0 : "Too much data [start=" + start + ", data.length=" + data.length + ']';
@@ -1604,7 +1606,9 @@ public class GridGgfsDataManager extends GridGgfsManager {
         }
 
         /** {@inheritDoc} */
-        @Override public byte[] apply(byte[] e) {
+        @Override public Void process(MutableEntry<GridGgfsBlockKey, byte[]> entry, Object... args) {
+            byte[] e = entry.getValue();
+
             final int size = data.length;
 
             if (e == null || e.length == 0)
@@ -1621,7 +1625,9 @@ public class GridGgfsDataManager extends GridGgfsManager {
             // Copy data into entry.
             U.arrayCopy(data, 0, e, start, size);
 
-            return e;
+            entry.setValue(e);
+
+            return null;
         }
 
         /** {@inheritDoc} */
@@ -1638,7 +1644,7 @@ public class GridGgfsDataManager extends GridGgfsManager {
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(UpdateClosure.class, this, "start", start, "data.length", data.length);
+            return S.toString(UpdateProcessor.class, this, "start", start, "data.length", data.length);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
index eb0a728..87e09b9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetaManager.java
@@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.grid.util.lang.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 
@@ -751,7 +752,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
 
         assert metaCache.get(parentId) != null;
 
-        id2InfoPrj.transform(parentId, new UpdateListing(fileName, new GridGgfsListingEntry(newFileInfo), false));
+        id2InfoPrj.invoke(parentId, new UpdateListing(fileName, new GridGgfsListingEntry(newFileInfo), false));
 
         return null;
     }
@@ -868,10 +869,10 @@ public class GridGgfsMetaManager extends GridGgfsManager {
         assert metaCache.get(destParentId) != null;
 
         // Remove listing entry from the source parent listing.
-        id2InfoPrj.transform(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
+        id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true));
 
         // Add listing entry into the destination parent listing.
-        id2InfoPrj.transform(destParentId, new UpdateListing(destFileName, srcEntry, false));
+        id2InfoPrj.invoke(destParentId, new UpdateListing(destFileName, srcEntry, false));
     }
 
     /**
@@ -987,7 +988,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
 
         // Update a file info of the removed file with a file path,
         // which will be used by delete worker for event notifications.
-        id2InfoPrj.transform(fileId, new UpdatePath(path));
+        id2InfoPrj.invoke(fileId, new UpdatePath(path));
 
         return GridGgfsFileInfo.builder(fileInfo).path(path).build();
     }
@@ -1086,12 +1087,12 @@ public class GridGgfsMetaManager extends GridGgfsManager {
                 id2InfoPrj.put(newInfo.id(), newInfo);
 
                 // Add new info to trash listing.
-                id2InfoPrj.transform(TRASH_ID, new UpdateListing(newInfo.id().toString(),
+                id2InfoPrj.invoke(TRASH_ID, new UpdateListing(newInfo.id().toString(),
                     new GridGgfsListingEntry(newInfo), false));
 
                 // Remove listing entries from root.
                 for (Map.Entry<String, GridGgfsListingEntry> entry : transferListing.entrySet())
-                    id2InfoPrj.transform(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true));
+                    id2InfoPrj.invoke(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true));
 
                 resId = newInfo.id();
             }
@@ -1228,7 +1229,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
                         GridGgfsListingEntry listingEntry = parentInfo.listing().get(name);
 
                         if (listingEntry != null)
-                            id2InfoPrj.transform(parentId, new UpdateListing(name, listingEntry, true));
+                            id2InfoPrj.invoke(parentId, new UpdateListing(name, listingEntry, true));
 
                         id2InfoPrj.remove(id);
 
@@ -1359,7 +1360,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
 
                 assert metaCache.get(parentId) != null;
 
-                id2InfoPrj.transform(parentId, new UpdateListing(fileName, entry, false));
+                id2InfoPrj.invoke(parentId, new UpdateListing(fileName, entry, false));
             }
 
             return newInfo;
@@ -1424,7 +1425,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
 
                 assert validTxState(false);
 
-                id2InfoPrj.transformAsync(parentId, new UpdateListingEntry(fileId, fileName, lenDelta, 0,
+                id2InfoPrj.invokeAsync(parentId, new UpdateListingEntry(fileId, fileName, lenDelta, 0,
                     modificationTime));
             }
             finally {
@@ -1659,9 +1660,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
                                 id2InfoPrj.removex(oldId); // Remove the old one.
                                 id2InfoPrj.putx(newInfo.id(), newInfo); // Put the new one.
 
-                                id2InfoPrj.transform(parentInfo.id(),
+                                id2InfoPrj.invoke(parentInfo.id(),
                                     new UpdateListing(path.name(), parentInfo.listing().get(path.name()), true));
-                                id2InfoPrj.transform(parentInfo.id(),
+                                id2InfoPrj.invoke(parentInfo.id(),
                                     new UpdateListing(path.name(), new GridGgfsListingEntry(newInfo), false));
 
                                 IgniteFuture<?> delFut = ggfsCtx.data().delete(oldInfo);
@@ -2150,7 +2151,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
                         }
 
                         // Update the deleted file info with path information for delete worker.
-                        id2InfoPrj.transform(info.id(), new UpdatePath(path));
+                        id2InfoPrj.invoke(info.id(), new UpdatePath(path));
 
                         return true; // No additional handling is required.
                     }
@@ -2606,7 +2607,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
 
                     id2InfoPrj.putx(fileId, updated);
 
-                    id2InfoPrj.transform(parentId, new UpdateListingEntry(fileId, fileName, 0, accessTime,
+                    id2InfoPrj.invoke(parentId, new UpdateListingEntry(fileId, fileName, 0, accessTime,
                         modificationTime));
 
                     tx.commit();
@@ -2741,7 +2742,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
     /**
      * Updates file length information in parent listing.
      */
-    private static final class UpdateListingEntry implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
+    private static final class UpdateListingEntry implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>,
         Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
@@ -2775,8 +2776,11 @@ public class GridGgfsMetaManager extends GridGgfsManager {
          * @param accessTime Last access time.
          * @param modificationTime Last modification time.
          */
-        private UpdateListingEntry(IgniteUuid fileId, String fileName, long lenDelta,
-            long accessTime, long modificationTime) {
+        private UpdateListingEntry(IgniteUuid fileId,
+            String fileName,
+            long lenDelta,
+            long accessTime,
+            long modificationTime) {
             this.fileId = fileId;
             this.fileName = fileName;
             this.lenDelta = lenDelta;
@@ -2785,13 +2789,18 @@ public class GridGgfsMetaManager extends GridGgfsManager {
         }
 
         /** {@inheritDoc} */
-        @Override public GridGgfsFileInfo apply(GridGgfsFileInfo fileInfo) {
+        @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) {
+            GridGgfsFileInfo fileInfo = e.getValue();
+
             Map<String, GridGgfsListingEntry> listing = fileInfo.listing();
 
             GridGgfsListingEntry entry = listing.get(fileName);
 
-            if (entry == null || !entry.fileId().equals(fileId))
-                return fileInfo;
+            if (entry == null || !entry.fileId().equals(fileId)) {
+                e.setValue(fileInfo);
+
+                return null;
+            }
 
             entry = new GridGgfsListingEntry(entry, entry.length() + lenDelta,
                 accessTime == -1 ? entry.accessTime() : accessTime,
@@ -2803,7 +2812,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
             // Modify listing map in-place since map is serialization-safe.
             listing.put(fileName, entry);
 
-            return new GridGgfsFileInfo(listing, fileInfo);
+            e.setValue(new GridGgfsFileInfo(listing, fileInfo));
+
+            return null;
         }
 
         /** {@inheritDoc} */
@@ -2829,7 +2840,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
      * Update directory listing closure.
      */
     @GridInternal
-    private static final class UpdateListing implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
+    private static final class UpdateListing implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>,
         Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
@@ -2868,7 +2879,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
         }
 
         /** {@inheritDoc} */
-        @Override @Nullable public GridGgfsFileInfo apply(GridGgfsFileInfo fileInfo) {
+        @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) {
+            GridGgfsFileInfo fileInfo = e.getValue();
+
             assert fileInfo != null : "File info not found for the child: " + entry.fileId();
             assert fileInfo.isDirectory();
 
@@ -2897,7 +2910,9 @@ public class GridGgfsMetaManager extends GridGgfsManager {
                         ", oldEntry=" + oldEntry + ']');
             }
 
-            return new GridGgfsFileInfo(listing, fileInfo);
+            e.setValue(new GridGgfsFileInfo(listing, fileInfo));
+
+            return null;
         }
 
         /** {@inheritDoc} */
@@ -2924,7 +2939,7 @@ public class GridGgfsMetaManager extends GridGgfsManager {
      * Update path closure.
      */
     @GridInternal
-    private static final class UpdatePath implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
+    private static final class UpdatePath implements EntryProcessor<IgniteUuid, GridGgfsFileInfo, Void>,
         Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
@@ -2943,11 +2958,16 @@ public class GridGgfsMetaManager extends GridGgfsManager {
          * Default constructor (required by Externalizable).
          */
         public UpdatePath() {
+            // No-op.
         }
 
         /** {@inheritDoc} */
-        @Override public GridGgfsFileInfo apply(GridGgfsFileInfo info) {
-            return GridGgfsFileInfo.builder(info).path(path).build();
+        @Override public Void process(MutableEntry<IgniteUuid, GridGgfsFileInfo> e, Object... args) {
+            GridGgfsFileInfo info = e.getValue();
+
+            e.setValue(GridGgfsFileInfo.builder(info).path(path).build());
+
+            return null;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java
new file mode 100644
index 0000000..6a97b19
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicInvokeTest.java
@@ -0,0 +1,47 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicInvokeTest extends IgniteCacheInvokeAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return CLOCK;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java
new file mode 100644
index 0000000..7048f18
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalInvokeTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicLocalInvokeTest extends IgniteCacheInvokeAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return LOCAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java
new file mode 100644
index 0000000..2ff0468
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicLocalWithStoreInvokeTest.java
@@ -0,0 +1,22 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.store.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicLocalWithStoreInvokeTest extends IgniteCacheAtomicLocalInvokeTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheStore<?, ?> cacheStore() {
+        return new TestStore();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java
new file mode 100644
index 0000000..8f4d71c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicNearEnabledInvokeTest.java
@@ -0,0 +1,24 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicNearEnabledInvokeTest extends IgniteCacheAtomicInvokeTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return NEAR_PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
index 8731306..068476c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
@@ -39,7 +39,7 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
     public void testInvoke() throws Exception {
         // TODO IGNITE41 test with forceTransformBackups.
 
-        invoke(null);
+       invoke(null);
 
         if (atomicityMode() == TRANSACTIONAL) {
             invoke(PESSIMISTIC);
@@ -165,19 +165,21 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
 
         invokeAll(cache, new HashSet<>(primaryKeys(cache, 3, 0)), txMode);
 
-        invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0)), txMode);
+        if (gridCount() > 1) {
+            invokeAll(cache, new HashSet<>(backupKeys(cache, 3, 0)), txMode);
 
-        invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0)), txMode);
+            invokeAll(cache, new HashSet<>(nearKeys(cache, 3, 0)), txMode);
 
-        Set<Integer> keys = new HashSet<>();
+            Set<Integer> keys = new HashSet<>();
 
-        keys.addAll(primaryKeys(jcache(0), 3, 0));
-        keys.addAll(primaryKeys(jcache(1), 3, 0));
-        keys.addAll(primaryKeys(jcache(2), 3, 0));
+            keys.addAll(primaryKeys(jcache(0), 3, 0));
+            keys.addAll(primaryKeys(jcache(1), 3, 0));
+            keys.addAll(primaryKeys(jcache(2), 3, 0));
 
-        invokeAll(cache, keys, txMode);
+            invokeAll(cache, keys, txMode);
+        }
 
-        keys = new HashSet<>();
+        Set<Integer> keys = new HashSet<>();
 
         for (int i = 0; i < 1000; i++)
             keys.add(i);
@@ -415,7 +417,6 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
         /** {@inheritDoc} */
         @Override public Integer process(MutableEntry<Integer, Integer> e,
             Object... arguments) throws EntryProcessorException {
-            System.out.println(Thread.currentThread() + " compute, old=" + e.getValue());
             if (e.exists()) {
                 Integer val = e.getValue();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java
new file mode 100644
index 0000000..20576ab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxLocalInvokeTest.java
@@ -0,0 +1,41 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxLocalInvokeTest extends IgniteCacheInvokeAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return LOCAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java
new file mode 100644
index 0000000..ffc50ff
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTxNearEnabledInvokeTest.java
@@ -0,0 +1,24 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxNearEnabledInvokeTest extends IgniteCacheTxInvokeTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return NEAR_PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index a57da71..0d3b54f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -363,12 +363,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         createUpdatePutAll(null);
 
         if (atomicityMode() == TRANSACTIONAL) {
-            IgniteTxConcurrency[] txModes;
-
-            if (cacheMode() == LOCAL)
-                txModes= new IgniteTxConcurrency[]{PESSIMISTIC};
-            else
-                txModes= new IgniteTxConcurrency[]{PESSIMISTIC, OPTIMISTIC};
+            IgniteTxConcurrency[] txModes = new IgniteTxConcurrency[]{PESSIMISTIC, OPTIMISTIC};
 
             for (IgniteTxConcurrency tx : txModes) {
                 for (final Integer key : keys()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index fd3751c..9d56c7f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -891,6 +891,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             assertEquals("3", res.get("key3").get());
 
             assertEquals(3, res.size());
+
+            cache.remove("key1");
+            cache.put("key2", 1);
+            cache.put("key3", 3);
         }
 
         Map<String, EntryProcessorResult<String>> res = cache.invokeAll(F.asSet("key1", "key2", "key3"), RMV_PROCESSOR);
@@ -901,9 +905,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             assertNull(cache(i).peek("key3"));
         }
 
-        assertEquals("1", res.get("key1").get());
-        assertEquals("2", res.get("key2").get());
-        assertEquals("4", res.get("key3").get());
+        assertEquals("null", res.get("key1").get());
+        assertEquals("1", res.get("key2").get());
+        assertEquals("3", res.get("key3").get());
 
         assertEquals(3, res.size());
 
@@ -928,9 +932,23 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @throws Exception If failed.
      */
     public void testTransformAllWithNulls() throws Exception {
-        final GridCacheProjection<String, Integer> cache = cache();
+        final IgniteCache<String, Integer> cache = jcache();
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                cache.invokeAll(null, INCR_PROCESSOR);
+
+                return null;
+            }
+        }, NullPointerException.class, null);
 
-        cache.transformAll(null); // This should be no-op.
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                cache.invokeAll(F.asSet("key1"), null);
+
+                return null;
+            }
+        }, NullPointerException.class, null);
 
         {
             Map<String, Integer> m = new HashMap<>(2);
@@ -944,38 +962,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         }
 
         {
-            Map<String, IgniteClosure<Integer, Integer>> tm = new HashMap<>(2);
-
-            tm.put("key1", INCR_PROCESSOR);
-            tm.put(null, INCR_PROCESSOR);
-
-            // WARN: F.asMap() doesn't work here, because it will throw NPE.
-
-            cache.transformAll(tm);
-        }
-
-        {
-            Map<String, IgniteClosure<Integer, Integer>> tm = new HashMap<>(2);
-
-            tm.put("key1", INCR_PROCESSOR);
-            tm.put("key2", null);
-
-            // WARN: F.asMap() doesn't work here, because it will throw NPE.
-
-            cache.transformAll(tm);
-        }
-
-        cache.transformAll(null, INCR_PROCESSOR); // This should be no-op.
-
-        {
-            Set<String> ts = new HashSet<>(3);
+            Set<String> keys = new HashSet<>(2);
 
-            ts.add("key1");
-            ts.add(null);
+            keys.add("key1");
+            keys.add(null);
 
             // WARN: F.asSet() doesn't work here, because it will throw NPE.
 
-            cache.transformAll(ts, INCR_PROCESSOR);
+            cache.invokeAll(keys, INCR_PROCESSOR);
         }
     }
 
@@ -1014,17 +1008,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      */
     private void checkTransformSequential0(boolean startVal, IgniteTxConcurrency concurrency)
         throws Exception {
-        GridCacheProjection<String, Integer> cache = cache();
+        IgniteCache<String, Integer> cache = jcache();
 
-        IgniteTx tx = txEnabled() ? cache.txStart(concurrency, READ_COMMITTED) : null;
+        IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;
 
         try {
             if (startVal)
                 cache.put("key", 2);
 
-            cache.transform("key", INCR_PROCESSOR);
-            cache.transform("key", INCR_PROCESSOR);
-            cache.transform("key", INCR_PROCESSOR);
+            cache.invoke("key", INCR_PROCESSOR);
+            cache.invoke("key", INCR_PROCESSOR);
+            cache.invoke("key", INCR_PROCESSOR);
 
             if (tx != null)
                 tx.commit();
@@ -1063,18 +1057,18 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @throws Exception If failed.
      */
     private void checkTransformAfterRemove(IgniteTxConcurrency concurrency) throws Exception {
-        GridCacheProjection<String, Integer> cache = cache();
+        IgniteCache<String, Integer> cache = jcache();
 
         cache.put("key", 4);
 
-        IgniteTx tx = txEnabled() ? cache.txStart(concurrency, READ_COMMITTED) : null;
+        IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;
 
         try {
             cache.remove("key");
 
-            cache.transform("key", INCR_PROCESSOR);
-            cache.transform("key", INCR_PROCESSOR);
-            cache.transform("key", INCR_PROCESSOR);
+            cache.invoke("key", INCR_PROCESSOR);
+            cache.invoke("key", INCR_PROCESSOR);
+            cache.invoke("key", INCR_PROCESSOR);
 
             if (tx != null)
                 tx.commit();
@@ -1128,20 +1122,23 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @param isolation Isolation.
      * @throws Exception If failed.
      */
-    private void checkTransformReturnValue(boolean put, IgniteTxConcurrency concurrency,
-        IgniteTxIsolation isolation) throws Exception {
-        GridCacheProjection<String, Integer> cache = cache();
+    private void checkTransformReturnValue(boolean put,
+        IgniteTxConcurrency concurrency,
+        IgniteTxIsolation isolation)
+        throws Exception
+    {
+        IgniteCache<String, Integer> cache = jcache();
 
         if (!put)
             cache.put("key", 1);
 
-        IgniteTx tx = txEnabled() ? cache.txStart(concurrency, isolation) : null;
+        IgniteTx tx = txEnabled() ? ignite(0).transactions().txStart(concurrency, isolation) : null;
 
         try {
             if (put)
                 cache.put("key", 1);
 
-            cache.transform("key", INCR_PROCESSOR);
+            cache.invoke("key", INCR_PROCESSOR);
 
             assertEquals((Integer)2, cache.get("key"));
 
@@ -1211,33 +1208,25 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     /**
      * @throws Exception If failed.
      */
-    public void testTransformEntry() throws Exception {
-        GridCacheEntry<String, Integer> entry = cache().entry("test");
+    public void testInvokeAsync() throws Exception {
+        IgniteCache<String, Integer> cache = jcache();
 
-        entry.setValue(1);
+        cache.put("key2", 1);
+        cache.put("key3", 3);
 
-        // Make user entry capture cache entry.
-        entry.version();
+        cache = cache.enableAsync();
 
-        assertEquals((Integer)1, entry.getValue());
+        assertNull(cache.invoke("key1", INCR_PROCESSOR));
 
-        entry.transform(INCR_PROCESSOR);
+        IgniteFuture<?> fut0 = cache.future();
 
-        assertEquals((Integer)2, entry.getValue());
-    }
+        assertNull(cache.invoke("key2", INCR_PROCESSOR));
 
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTransformAsync() throws Exception {
-        GridCacheProjection<String, Integer> cache = cache();
+        IgniteFuture<?> fut1 = cache.future();
 
-        cache.put("key2", 1);
-        cache.put("key3", 3);
+        assertNull(cache.invoke("key3", RMV_PROCESSOR));
 
-        IgniteFuture<?> fut0 = cache.transformAsync("key1", INCR_PROCESSOR);
-        IgniteFuture<?> fut1 = cache.transformAsync("key2", INCR_PROCESSOR);
-        IgniteFuture<?> fut2 = cache.transformAsync("key3", RMV_PROCESSOR);
+        IgniteFuture<?> fut2 = cache.future();
 
         fut0.get();
         fut1.get();
@@ -1254,46 +1243,54 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     /**
      * @throws Exception If failed.
      */
-    public void testTransformCompute() throws Exception {
-        GridCacheProjection<String, Integer> cache = cache();
-
-        IgniteClosure<Integer, IgniteBiTuple<Integer, String>> c;
-
-        c = new IgniteClosure<Integer, IgniteBiTuple<Integer, String>>() {
-            @Override public IgniteBiTuple<Integer, String> apply(Integer val) {
-                return val == null ? new IgniteBiTuple<>(0, "null") : new IgniteBiTuple<>(val + 1, String.valueOf(val));
-            }
-        };
+    public void testInvoke() throws Exception {
+        final IgniteCache<String, Integer> cache = jcache();
 
-        assertEquals("null", cache.transformAndCompute("k0", c));
+        assertEquals("null", cache.invoke("k0", INCR_PROCESSOR));
 
-        assertEquals((Integer)0, cache.get("k0"));
+        assertEquals((Integer)1, cache.get("k0"));
 
-        assertEquals("0", cache.transformAndCompute("k0", c));
+        assertEquals("1", cache.invoke("k0", INCR_PROCESSOR));
 
-        assertEquals((Integer)1, cache.get("k0"));
+        assertEquals((Integer)2, cache.get("k0"));
 
         cache.put("k1", 1);
 
-        assertEquals("1", cache.transformAndCompute("k1", c));
+        assertEquals("1", cache.invoke("k1", INCR_PROCESSOR));
 
         assertEquals((Integer)2, cache.get("k1"));
 
-        assertEquals("2", cache.transformAndCompute("k1", c));
+        assertEquals("2", cache.invoke("k1", INCR_PROCESSOR));
 
         assertEquals((Integer)3, cache.get("k1"));
 
-        c = new IgniteClosure<Integer, IgniteBiTuple<Integer, String>>() {
-            @Override public IgniteBiTuple<Integer, String> apply(Integer integer) {
-                return new IgniteBiTuple<>(null, null);
+        EntryProcessor<String, Integer, Integer> c = new EntryProcessor<String, Integer, Integer>() {
+            @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+                e.remove();
+
+                return null;
             }
         };
 
-        assertNull(cache.transformAndCompute("k1", c));
+        assertNull(cache.invoke("k1", c));
         assertNull(cache.get("k1"));
 
         for (int i = 0; i < gridCount(); i++)
             assertNull(cache(i).peek("k1"));
+
+        final EntryProcessor<String, Integer, Integer> errProcessor = new EntryProcessor<String, Integer, Integer>() {
+            @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+                throw new EntryProcessorException("Test entry processor exception.");
+            }
+        };
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                cache.invoke("k1", errProcessor);
+
+                return null;
+            }
+        }, EntryProcessorException.class, "Test entry processor exception.");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
index 8a929bf..c204e00 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
+import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
@@ -17,6 +18,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.gridgain.testframework.junits.common.*;
 
+import javax.cache.processor.*;
+import java.io.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
@@ -108,19 +111,21 @@ public abstract class GridCacheGetAndTransformStoreAbstractTest extends GridComm
             startGrid(1);
             startGrid(2);
 
-            final IgniteClosure<String, String> trans = new TransformClosure();
+            final Processor entryProcessor = new Processor();
 
             IgniteFuture<?> fut = multithreadedAsync(
                 new Callable<Object>() {
                     @Override public Object call() throws Exception {
-                        GridCache<Integer, String> c = cache(ThreadLocalRandom.current().nextInt(3));
+                        IgniteCache<Integer, String> c = jcache(ThreadLocalRandom.current().nextInt(3));
 
                         while (!finish.get() && !Thread.currentThread().isInterrupted()) {
                             c.get(ThreadLocalRandom.current().nextInt(100));
+
                             c.put(ThreadLocalRandom.current().nextInt(100), "s");
-                            c.transform(
+
+                            c.invoke(
                                 ThreadLocalRandom.current().nextInt(100),
-                                trans);
+                                entryProcessor);
                         }
 
                         return null;
@@ -147,10 +152,12 @@ public abstract class GridCacheGetAndTransformStoreAbstractTest extends GridComm
     /**
      *
      */
-    private static class TransformClosure implements IgniteClosure<String, String> {
+    private static class Processor implements EntryProcessor<Integer, String, Void>, Serializable {
         /** {@inheritDoc} */
-        @Override public String apply(String s) {
-            return "str";
+        @Override public Void process(MutableEntry<Integer, String> e, Object... args) {
+            e.setValue("str");
+
+            return null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
index 44b8618..0e8602c 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java
@@ -10,16 +10,17 @@
 package org.gridgain.grid.kernal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.grid.util.typedef.*;
 import org.gridgain.testframework.*;
 import org.gridgain.testframework.junits.common.*;
 
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
@@ -162,7 +163,7 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest {
                 ignite = restarts ? grids.getAndSet(idx, null) : grid(idx);
             }
 
-            GridCache <String, TestObject> cache = ignite.cache(null);
+            IgniteCache<String, TestObject> cache = ignite.jcache(null);
 
             assertNotNull(cache);
 
@@ -173,11 +174,11 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest {
 
             while (true) {
                 try {
-                    cache.transform("key", new Transformer());
+                    cache.invoke("key", new Processor());
 
                     break;
                 }
-                catch (GridCachePartialUpdateException ignored) {
+                catch (CachePartialUpdateException ignored) {
                     // Need to re-check if update actually succeeded.
                     TestObject updated = cache.get("key");
 
@@ -210,12 +211,16 @@ public class GridCacheIncrementTransformTest extends GridCommonAbstractTest {
     }
 
     /** */
-    private static class Transformer implements C1<TestObject, TestObject> {
+    private static class Processor implements EntryProcessor<String, TestObject, Void>, Serializable {
         /** {@inheritDoc} */
-        @Override public TestObject apply(TestObject obj) {
+        @Override public Void process(MutableEntry<String, TestObject> e, Object... args) {
+            TestObject obj = e.getValue();
+
             assert obj != null;
 
-            return new TestObject(obj.val + 1);
+            e.setValue(new TestObject(obj.val + 1));
+
+            return null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
index b42406d..66abdc6 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
+import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
@@ -18,6 +19,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
 
@@ -1172,27 +1174,28 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
     private void cacheUpdate(int grid, boolean rmv, Operation op, String key, final Integer val,
         @Nullable final Integer expOld, @Nullable final Integer expRmvRet)
         throws Exception {
-        GridCache<String, Integer> cache = cache(grid);
+        IgniteCache<String, Integer> cache = jcache(grid);
 
         if (rmv) {
             assertNull(val);
 
             switch (op) {
                 case UPDATE: {
-                    assertEquals(expRmvRet, cache.remove(key));
+                    assertEquals(expRmvRet, cache.getAndRemove(key));
 
                     break;
                 }
 
                 case UPDATEX: {
-                    cache.removex(key);
+                    cache.remove(key);
 
                     break;
                 }
 
                 case UPDATE_FILTER: {
-                    Object old = cache.remove(key, new IgnitePredicate<GridCacheEntry<String, Integer>>() {
-                        @Override public boolean apply(GridCacheEntry<String, Integer> entry) {
+                    Object old = cache.getAndRemoveIf(key, new IgnitePredicate<GridCacheEntry<String, Integer>>() {
+                        @Override
+                        public boolean apply(GridCacheEntry<String, Integer> entry) {
                             return true;
                         }
                     });
@@ -1203,10 +1206,15 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
                 }
 
                 case TRANSFORM: {
-                    cache.transform(key, new IgniteClosure<Integer, Integer>() {
-                        @Nullable @Override public Integer apply(Integer old) {
+                    cache.invoke(key, new EntryProcessor<String, Integer, Void>() {
+                        @Override
+                        public Void process(MutableEntry<String, Integer> e, Object... args) {
+                            Integer old = e.getValue();
+
                             assertEquals(expOld, old);
 
+                            e.remove();
+
                             return null;
                         }
                     });
@@ -1221,20 +1229,21 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
         else {
             switch (op) {
                 case UPDATE: {
-                    assertEquals(expOld, cache.put(key, val));
+                    assertEquals(expOld, cache.getAndPut(key, val));
 
                     break;
                 }
 
                 case UPDATEX: {
-                    cache.putx(key, val);
+                    cache.put(key, val);
 
                     break;
                 }
 
                 case UPDATE_FILTER: {
-                    Object old = cache.put(key, val, new P1<GridCacheEntry<String, Integer>>() {
-                        @Override public boolean apply(GridCacheEntry<String, Integer> entry) {
+                    Object old = cache.getAndPutIf(key, val, new P1<GridCacheEntry<String, Integer>>() {
+                        @Override
+                        public boolean apply(GridCacheEntry<String, Integer> entry) {
                             return true;
                         }
                     });
@@ -1245,11 +1254,16 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
                 }
 
                 case TRANSFORM: {
-                    cache.transform(key, new IgniteClosure<Integer, Integer>() {
-                        @Override public Integer apply(Integer old) {
+                    cache.invoke(key, new EntryProcessor<String, Integer, Void>() {
+                        @Override
+                        public Void process(MutableEntry<String, Integer> e, Object... args) {
+                            Integer old = e.getValue();
+
                             assertEquals(expOld, old);
 
-                            return val;
+                            e.setValue(val);
+
+                            return null;
                         }
                     });
 
@@ -1294,7 +1308,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
     @SuppressWarnings("unchecked")
     private void cacheBatchUpdate(int grid, boolean rmv, Operation op, final Map<String, Integer> map)
         throws Exception {
-        GridCache<String, Integer> cache = cache(grid);
+        IgniteCache<String, Integer> cache = jcache(grid);
 
         if (rmv) {
             switch (op) {
@@ -1305,8 +1319,10 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
                 }
 
                 case TRANSFORM: {
-                    cache.transformAll(map.keySet(), new IgniteClosure<Integer, Integer>() {
-                        @Nullable @Override public Integer apply(Integer old) {
+                    cache.invokeAll(map.keySet(), new EntryProcessor<String, Integer, Void>() {
+                        @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+                            e.remove();
+
                             return null;
                         }
                     });
@@ -1327,17 +1343,13 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst
                 }
 
                 case TRANSFORM: {
-                    Map<String, IgniteClosure<Integer, Integer>> m = new HashMap<>();
-
-                    for (final String key : map.keySet()) {
-                        m.put(key, new IgniteClosure<Integer, Integer>() {
-                            @Override public Integer apply(Integer old) {
-                                return map.get(key);
-                            }
-                        });
-                    }
+                    cache.invokeAll(map.keySet(), new EntryProcessor<String, Integer, Void>() {
+                        @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+                            e.setValue(map.get(e.getKey()));
 
-                    cache.transformAll(m);
+                            return null;
+                        }
+                    });
 
                     break;
                 }


Mime
View raw message