ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [05/18] incubator-ignite git commit: # ignite-44
Date Tue, 30 Dec 2014 12:11:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java
index 844fc5c..3c2d32c 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java
@@ -9,12 +9,14 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
-import org.apache.ignite.lang.*;
+import org.apache.ignite.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.store.*;
 import org.gridgain.testframework.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
+import java.io.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
@@ -51,8 +53,8 @@ public abstract class GridCacheMultinodeUpdateAbstractSelfTest extends GridCache
     /**
      * @throws Exception If failed.
      */
-    public void testTransform() throws Exception {
-        GridCache<Integer, Integer> cache = grid(0).cache(null);
+    public void testInvoke() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
         final Integer key = primaryKey(cache);
 
@@ -72,16 +74,16 @@ public abstract class GridCacheMultinodeUpdateAbstractSelfTest extends GridCache
                 @Override public Void call() throws Exception {
                     int idx = gridIdx.incrementAndGet() - 1;
 
-                    final GridCache<Integer, Integer> cache = grid(idx).cache(null);
+                    final IgniteCache<Integer, Integer> cache = grid(idx).jcache(null);
 
                     for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++)
-                        cache.transform(key, new IncClosure());
+                        cache.invoke(key, new IncProcessor());
 
                     return null;
                 }
-            }, THREADS, "transform");
+            }, THREADS, "invoke");
 
-            assertFalse("Got null in transform.", failed);
+            assertFalse("Got null in processor.", failed);
 
             expVal += ITERATIONS_PER_THREAD * THREADS;
 
@@ -103,18 +105,22 @@ public abstract class GridCacheMultinodeUpdateAbstractSelfTest extends GridCache
     /**
      *
      */
-    protected static class IncClosure implements IgniteClosure<Integer, Integer> {
+    protected static class IncProcessor implements EntryProcessor<Integer, Integer, Void>, Serializable {
         /** {@inheritDoc} */
-        @Override public Integer apply(Integer val) {
+        @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+            Integer val = e.getValue();
+
             if (val == null) {
                 failed = true;
 
-                System.out.println(Thread.currentThread() + " got null in transform: " + val);
+                System.out.println(Thread.currentThread() + " got null in processor: " + val);
 
                 return null;
             }
 
-            return val + 1;
+            e.setValue(val + 1);
+
+            return null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java
index e1f5c5a..5a97b7f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java
@@ -9,10 +9,13 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
+import org.apache.ignite.*;
 import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.testframework.*;
 
+import javax.cache.processor.*;
+import java.io.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
@@ -76,7 +79,7 @@ public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extend
      * @throws Exception If failed.
      */
     private void testTransform(final Integer key) throws Exception {
-        final GridCache<Integer, Integer> cache = grid(0).cache(null);
+        final IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
         cache.put(key, 0);
 
@@ -89,7 +92,7 @@ public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extend
                     if (i % 500 == 0)
                         log.info("Iteration " + i);
 
-                    cache.transform(key, new IncClosure());
+                    cache.invoke(key, new IncProcessor());
                 }
 
                 return null;
@@ -339,23 +342,29 @@ public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extend
     }
 
     /**
+     *
      */
-    protected static class IncClosure implements IgniteClosure<Integer, Integer> {
+    protected static class IncProcessor implements EntryProcessor<Integer, Integer, Void>, Serializable {
         /** {@inheritDoc} */
-        @Override public Integer apply(Integer val) {
+        @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+            Integer val = e.getValue();
+
             if (val == null) {
                 failed = true;
 
-                System.out.println(Thread.currentThread() + " got null in transform: " + val);
+                System.out.println(Thread.currentThread() + " got null in processor: " + val);
 
                 return null;
             }
 
-            return val + 1;
+            e.setValue(val + 1);
+
+            return null;
         }
     }
 
     /**
+     *
      */
     protected static class TestFilter implements IgnitePredicate<GridCacheEntry<Integer, Integer>> {
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
index f84e9c7..baa03c2 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
+import org.apache.ignite.*;
 import org.apache.ignite.transactions.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.testframework.*;
@@ -48,7 +49,7 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea
      * @throws Exception If failed.
      */
     private void testTransformTx(final Integer key, final IgniteTxConcurrency txConcurrency) throws Exception {
-        final GridCache<Integer, Integer> cache = grid(0).cache(null);
+        final IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
         cache.put(key, 0);
 
@@ -57,12 +58,14 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea
 
         GridTestUtils.runMultiThreaded(new Callable<Void>() {
             @Override public Void call() throws Exception {
+                IgniteTransactions txs = ignite(0).transactions();
+
                 for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) {
                     if (i % 500 == 0)
                         log.info("Iteration " + i);
 
-                    try (IgniteTx tx = cache.txStart(txConcurrency, REPEATABLE_READ)) {
-                        cache.transform(key, new IncClosure());
+                    try (IgniteTx tx = txs.txStart(txConcurrency, REPEATABLE_READ)) {
+                        cache.invoke(key, new IncProcessor());
 
                         tx.commit();
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java
index 4456346..0799180 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
+import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.portables.*;
@@ -18,6 +19,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.jetbrains.annotations.*;
 import org.junit.*;
 
+import javax.cache.processor.*;
 import java.util.*;
 
 import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
@@ -92,48 +94,61 @@ public abstract class GridCacheOffHeapTieredAbstractSelfTest extends GridCacheAb
      * @throws Exception If failed.
      */
     private void checkTransform(Integer key) throws Exception {
-        GridCache<Integer, Integer> c = grid(0).cache(null);
+        IgniteCache<Integer, Integer> c = grid(0).jcache(null);
+
+        c.invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+            @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+                Integer val = e.getValue();
 
-        c.transform(key, new C1<Integer, Integer>() {
-            @Override public Integer apply(Integer val) {
                 assertNull("Unexpected value: " + val, val);
 
                 return null;
             }
         });
 
-        c.putx(key, 1);
+        c.put(key, 1);
+
+        c.invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+            @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+                Integer val = e.getValue();
 
-        c.transform(key, new C1<Integer, Integer>() {
-            @Override public Integer apply(Integer val) {
                 assertNotNull("Unexpected value: " + val, val);
 
                 assertEquals((Integer) 1, val);
 
-                return val + 1;
+                e.setValue(val + 1);
+
+                return null;
             }
         });
 
         assertEquals((Integer)2, c.get(key));
 
-        c.transform(key, new C1<Integer, Integer>() {
-            @Override public Integer apply(Integer val) {
+        c.invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+            @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+                Integer val = e.getValue();
+
                 assertNotNull("Unexpected value: " + val, val);
 
                 assertEquals((Integer)2, val);
 
-                return val;
+                e.setValue(val);
+
+                return null;
             }
         });
 
-        assertEquals((Integer) 2, c.get(key));
+        assertEquals((Integer)2, c.get(key));
+
+        c.invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+            @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+                Integer val = e.getValue();
 
-        c.transform(key, new C1<Integer, Integer>() {
-            @Override
-            public Integer apply(Integer val) {
                 assertNotNull("Unexpected value: " + val, val);
 
-                assertEquals((Integer) 2, val);
+                assertEquals((Integer)2, val);
+
+                e.remove();
 
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java
index b25fcbd..d4eb179 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java
@@ -9,14 +9,16 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
+import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
 import org.apache.ignite.portables.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.*;
 
+import javax.cache.processor.*;
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -171,7 +173,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
      * @throws Exception If failed.
      */
     public void testTransform() throws Exception {
-        final GridCache<Integer, Object> cache = grid(0).cache(null);
+        final IgniteCache<Integer, Object> cache = grid(0).jcache(null);
 
         GridTestUtils.runMultiThreaded(new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -182,9 +184,9 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
 
                     final TestValue val = vals.get(key % VAL_SIZE);
 
-                    TestClosure c = testClosure(val.val, false);
+                    TestProcessor c = testClosure(val.val, false);
 
-                    cache.transform(key, c);
+                    cache.invoke(key, c);
                 }
 
                 return null;
@@ -208,7 +210,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
      * @param acceptNull If {@code true} value can be null;
      * @return Predicate.
      */
-    private TestClosure testClosure(String expVal, boolean acceptNull) {
+    private TestProcessor testClosure(String expVal, boolean acceptNull) {
         return portableEnabled() ?
             new PortableValueClosure(expVal, acceptNull) :
             new TestValueClosure(expVal, acceptNull);
@@ -326,7 +328,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
     /**
      *
      */
-    protected abstract static class TestClosure implements IgniteClosure<Object, Object> {
+    protected abstract static class TestProcessor implements EntryProcessor<Integer, Object, Void>, Serializable {
         /** */
         protected String expVal;
 
@@ -337,23 +339,29 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
          * @param expVal Expected value.
          * @param acceptNull If {@code true} value can be null;
          */
-        protected TestClosure(String expVal, boolean acceptNull) {
+        protected TestProcessor(String expVal, boolean acceptNull) {
             this.expVal = expVal;
             this.acceptNull = acceptNull;
         }
 
         /** {@inheritDoc} */
-        @Override public final Object apply(Object val) {
+        @Override public Void process(MutableEntry<Integer, Object> e, Object... args) {
+            Object val = e.getValue();
+
             if (val == null) {
                 if (!acceptNull)
                     assertNotNull(val);
 
-                return true;
+                e.setValue(true);
+
+                return null;
             }
 
             checkValue(val);
 
-            return val;
+            e.setValue(val);
+
+            return null;
         }
 
         /**
@@ -366,7 +374,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
      *
      */
     @SuppressWarnings("PackageVisibleInnerClass")
-    static class PortableValueClosure extends TestClosure {
+    static class PortableValueClosure extends TestProcessor {
         /**
          * @param expVal Expected value.
          * @param acceptNull If {@code true} value can be null;
@@ -387,7 +395,7 @@ public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends Gri
      *
      */
     @SuppressWarnings("PackageVisibleInnerClass")
-    static class TestValueClosure extends TestClosure {
+    static class TestValueClosure extends TestProcessor {
         /**
          * @param expVal Expected value.
          * @param acceptNull If {@code true} value can be null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
index 4b88530..b65fcad 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java
@@ -9,11 +9,12 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
+import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.testframework.junits.common.*;
 
+import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
 
@@ -104,6 +105,9 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
     }
 
     /**
+     * @param mode Atomicity mode.
+     * @param order Atomic cache write order mode.
+     * @param b Number of backups.
      * @throws Exception If failed.
      */
     private void checkTransform(GridCacheAtomicityMode mode, GridCacheAtomicWriteOrderMode order, int b)
@@ -126,7 +130,7 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
             failDeserialization = false;
 
             // Get client grid.
-            GridCacheProjection<Integer, TestObject> cache = grid(2).cache(null);
+            IgniteCache<Integer, TestObject> cache = grid(2).jcache(null);
 
             if (backups > 0 && atomicityMode == ATOMIC)
                 cache = cache.flagsOn(FORCE_TRANSFORM_BACKUP);
@@ -138,17 +142,17 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
 
             info(">>>>>> Transforming");
 
-            // Transform (check non-existent keys also.
+            // Transform (check non-existent keys also).
             for (int i = 0; i < 200; i++)
-                cache.transform(i, new Transform());
+                cache.invoke(i, new Transform());
 
-            Map<Integer, Transform> transformMap = new HashMap<>();
+            Set<Integer> keys = new HashSet<>();
 
             // Check transformAll.
             for (int i = 0; i < 300; i++)
-                transformMap.put(i, new Transform());
+                keys.add(i);
 
-            cache.transformAll(transformMap);
+            cache.invokeAll(keys, new Transform());
 
             // Avoid errors during stop.
             failDeserialization = false;
@@ -158,10 +162,15 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
         }
     }
 
-    private static class Transform implements IgniteClosure<TestObject, TestObject> {
+    /**
+     *
+     */
+    private static class Transform implements EntryProcessor<Integer, TestObject, Void>, Serializable {
         /** {@inheritDoc} */
-        @Override public TestObject apply(TestObject testObject) {
-            return new TestObject();
+        @Override public Void process(MutableEntry<Integer, TestObject> entry, Object... args) {
+            entry.setValue(new TestObject());
+
+            return null;
         }
     }
 
@@ -169,7 +178,11 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest
      *
      */
     private static class TestObject implements Externalizable {
+        /**
+         *
+         */
         public TestObject() {
+            // No-op.
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
index 6f9d32c..af729a1 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
@@ -423,9 +423,10 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteBiTuple<Boolean, V> innerUpdateLocal(GridCacheVersion ver,
+    @Override public IgniteBiTuple<Boolean, Object> innerUpdateLocal(GridCacheVersion ver,
         GridCacheOperation op,
         @Nullable Object writeObj,
+        @Nullable Object[] invokeArgs,
         boolean writeThrough,
         boolean retval,
         @Nullable ExpiryPolicy expiryPlc,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
index 0be91f0..8cf48bf 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java
@@ -21,6 +21,8 @@ import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
 import org.gridgain.testframework.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
+import javax.cache.processor.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -417,17 +419,21 @@ public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstrac
 
         info("Going to transform: " + key);
 
-        GridTestUtils.assertThrows(log, new Callable<Void>() {
+        Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
-                grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() {
-                    @Override public Object apply(Object o) {
-                        return 2;
+                grid(0).<Integer, Integer>jcache(null).invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+                    @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+                        e.setValue(2);
+
+                        return null;
                     }
                 });
 
                 return null;
             }
-        }, IgniteTxHeuristicException.class, null);
+        }, CacheException.class, null);
+
+        assertTrue("Unexpected cause: "  +e, e.getCause() instanceof IgniteTxHeuristicException);
 
         checkEmpty(key);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
index 39eb728..a46353e 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java
@@ -21,6 +21,8 @@ import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
 import org.gridgain.testframework.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
+import javax.cache.processor.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -422,17 +424,21 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb
 
         info("Going to transform: " + key);
 
-        GridTestUtils.assertThrows(log, new Callable<Void>() {
+        Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
-                grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() {
-                    @Override public Object apply(Object o) {
-                        return 2;
+                grid(0).<Integer, Integer>jcache(null).invoke(key, new EntryProcessor<Integer, Integer, Void>() {
+                    @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+                        e.setValue(2);
+
+                        return null;
                     }
                 });
 
                 return null;
             }
-        }, IgniteTxRollbackException.class, null);
+        }, CacheException.class, null);
+
+        assertTrue("Unexpected cause: " + e, e.getCause() instanceof IgniteTxRollbackException);
 
         checkValue(key, putBefore);
     }
@@ -588,41 +594,48 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb
             this.fail = fail;
         }
 
-
+        /** {@inheritDoc} */
         @Nullable @Override public Object load(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException {
             return null;
         }
 
+        /** {@inheritDoc} */
         @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args)
             throws IgniteCheckedException {
             if (fail)
                 throw new IgniteCheckedException("Store exception");
         }
 
+        /** {@inheritDoc} */
         @Override public void loadAll(@Nullable IgniteTx tx, Collection<?> keys, IgniteBiInClosure<Object, Object> c)
             throws IgniteCheckedException {
         }
 
+        /** {@inheritDoc} */
         @Override public void put(@Nullable IgniteTx tx, Object key, Object val) throws IgniteCheckedException {
             if (fail)
                 throw new IgniteCheckedException("Store exception");
         }
 
+        /** {@inheritDoc} */
         @Override public void putAll(@Nullable IgniteTx tx, Map<?, ?> map) throws IgniteCheckedException {
             if (fail)
                 throw new IgniteCheckedException("Store exception");
         }
 
+        /** {@inheritDoc} */
         @Override public void remove(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException {
             if (fail)
                 throw new IgniteCheckedException("Store exception");
         }
 
+        /** {@inheritDoc} */
         @Override public void removeAll(@Nullable IgniteTx tx, Collection<?> keys) throws IgniteCheckedException {
             if (fail)
                 throw new IgniteCheckedException("Store exception");
         }
 
+        /** {@inheritDoc} */
         @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException {
             if (fail && commit)
                 throw new IgniteCheckedException("Store exception");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
index 5c7ba54..1ced531 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
@@ -19,9 +19,12 @@ import org.gridgain.grid.cache.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.grid.cache.affinity.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.testframework.junits.common.*;
 
+import javax.cache.processor.*;
+import java.io.*;
 import java.util.*;
 
 import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
@@ -68,7 +71,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
     private UUID[] ids;
 
     /** Caches. */
-    private GridCache<Integer, Integer>[] caches;
+    private IgniteCache<Integer, Integer>[] caches;
 
     /** Recorded events.*/
     private ConcurrentHashSet<IgniteCacheEvent> evts;
@@ -157,14 +160,14 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
 
         ignites = new Ignite[GRID_CNT];
         ids = new UUID[GRID_CNT];
-        caches = new GridCache[GRID_CNT];
+        caches = new IgniteCache[GRID_CNT];
 
         for (int i = 0; i < GRID_CNT; i++) {
             ignites[i] = grid(i);
 
             ids[i] = ignites[i].cluster().localNode().id();
 
-            caches[i] = ignites[i].cache(CACHE_NAME);
+            caches[i] = ignites[i].jcache(CACHE_NAME);
 
             ignites[i].events().localListen(new IgnitePredicate<IgniteEvent>() {
                 @Override public boolean apply(IgniteEvent evt) {
@@ -184,7 +187,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
         int key = 0;
 
         while (true) {
-            if (cacheMode != PARTITIONED || (caches[0].entry(key).primary() && caches[1].entry(key).backup())) {
+            if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) {
                 key1 = key++;
 
                 break;
@@ -194,7 +197,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
         }
 
         while (true) {
-            if (cacheMode != PARTITIONED || (caches[0].entry(key).primary() && caches[1].entry(key).backup())) {
+            if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) {
                 key2 = key;
 
                 break;
@@ -226,6 +229,28 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param gridIdx Grid index.
+     * @param key Key.
+     * @return {@code True} if grid is primary for given key.
+     */
+    private boolean primary(int gridIdx, Object key) {
+        GridCacheAffinity<Object> aff = grid(0).cache(CACHE_NAME).affinity();
+
+        return aff.isPrimary(grid(gridIdx).cluster().localNode(), key);
+    }
+
+    /**
+     * @param gridIdx Grid index.
+     * @param key Key.
+     * @return {@code True} if grid is primary for given key.
+     */
+    private boolean backup(int gridIdx, Object key) {
+        GridCacheAffinity<Object> aff = grid(0).cache(CACHE_NAME).affinity();
+
+        return aff.isBackup(grid(gridIdx).cluster().localNode(), key);
+    }
+
+    /**
      * Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/REPEATABLE_READ transaction.
      *
      * @throws Exception If failed.
@@ -423,13 +448,13 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
     private void checkAtomic(GridCacheMode cacheMode) throws Exception {
         initialize(cacheMode, ATOMIC, null, null);
 
-        caches[0].transform(key1, new Transformer());
+        caches[0].invoke(key1, new Transformer());
 
         checkEventNodeIdsStrict(primaryIdsForKeys(key1));
 
         assert evts.isEmpty();
 
-        caches[0].transformAll(keys, new Transformer());
+        caches[0].invokeAll(keys, new Transformer());
 
         checkEventNodeIdsStrict(primaryIdsForKeys(key1, key2));
     }
@@ -449,7 +474,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
 
         System.out.println("BEFORE: " + evts.size());
 
-        caches[0].transform(key1, new Transformer());
+        caches[0].invoke(key1, new Transformer());
 
         System.out.println("AFTER: " + evts.size());
 
@@ -457,7 +482,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
 
         assert evts.isEmpty();
 
-        caches[0].transformAll(keys, new Transformer());
+        caches[0].invokeAll(keys, new Transformer());
 
         checkEventNodeIdsStrict(idsForKeys(key1, key2));
     }
@@ -500,9 +525,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
         else if (cacheMode == PARTITIONED) {
             for (int key : keys) {
                 for (int i = 0; i < GRID_CNT; i++) {
-                    GridCacheEntry<Integer, Integer> entry = caches[i].entry(key);
-
-                    if (entry.primary() || (!primaryOnly && entry.backup()))
+                    if (primary(i, key) || (!primaryOnly && backup(i, key)))
                         res.add(ids[i]);
                 }
             }
@@ -510,7 +533,7 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
         else if (cacheMode == REPLICATED) {
             for (int key : keys) {
                 if (primaryOnly)
-                    res.add(caches[0].affinity().mapKeyToNode(key).id());
+                    res.add(grid(0).cache(CACHE_NAME).affinity().mapKeyToNode(key).id());
                 else
                     res.addAll(Arrays.asList(ids));
             }
@@ -544,22 +567,19 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
                 }
 
                 if (foundEvt == null) {
-                    GridCache<Integer, Integer> affectedCache = null;
+                    int gridIdx = -1;
 
                     for (int i = 0; i < GRID_CNT; i++) {
                         if (F.eq(this.ids[i], id)) {
-                            affectedCache = caches[i];
+                            gridIdx = i;
 
                             break;
                         }
                     }
 
-                    GridCacheEntry<Integer, Integer> entry1 = affectedCache.entry(key1);
-                    GridCacheEntry<Integer, Integer> entry2 = affectedCache.entry(key2);
-
                     fail("Expected transform event was not triggered on the node [nodeId=" + id +
-                        ", key1Primary=" + entry1.primary() + ", key1Backup=" + entry1.backup() +
-                        ", key2Primary=" + entry2.primary() + ", key2Backup=" + entry2.backup() + ']');
+                        ", key1Primary=" + primary(gridIdx, key1) + ", key1Backup=" + backup(gridIdx, key1) +
+                        ", key2Primary=" + primary(gridIdx, key2) + ", key2Backup=" + backup(gridIdx, key2) + ']');
                 }
                 else
                     evts.remove(foundEvt);
@@ -570,10 +590,12 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
     /**
      * Transform closure.
      */
-    private static class Transformer implements IgniteClosure<Integer, Integer> {
+    private static class Transformer implements EntryProcessor<Integer, Integer, Void>, Serializable {
         /** {@inheritDoc} */
-        @Override public Integer apply(Integer val) {
-            return ++val;
+        @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+            e.setValue(e.getValue() + 1);
+
+            return null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
index 2227e56..2b1fdff 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
@@ -9,14 +9,15 @@
 
 package org.gridgain.grid.kernal.processors.cache.distributed;
 
+import org.apache.ignite.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.util.typedef.*;
 import org.gridgain.testframework.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -58,7 +59,7 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT
      * @throws Exception If failed.
      */
     public void testRemoteTxPreloading() throws Exception {
-        GridCache<String, Integer> cache = cache(0);
+        IgniteCache<String, Integer> cache = jcache(0);
 
         for (int i = 0; i < 10000; i++)
             cache.put(String.valueOf(i), 0);
@@ -86,12 +87,21 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT
         for (int i = 0; i < 10; i++)
             keys.add(String.valueOf(i * 1000));
 
-        cache.transformAll(keys, new C1<Integer, Integer>() {
-            @Override public Integer apply(Integer val) {
-                if (val == null)
+        cache.invokeAll(keys, new EntryProcessor<String, Integer, Void>() {
+            @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+                Integer val = e.getValue();
+
+                if (val == null) {
                     keyNotLoaded = true;
 
-                return val + 1;
+                    e.setValue(1);
+
+                    return null;
+                }
+
+                e.setValue(val + 1);
+
+                return null;
             }
         });
 
@@ -135,7 +145,7 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT
         for (int i = 0; i < 10000; i++)
             map.put(String.valueOf(i), 0);
 
-        GridCache<String, Integer> cache0 = cache(0);
+        IgniteCache<String, Integer> cache0 = jcache(0);
 
         cache0.putAll(map);
 
@@ -148,18 +158,26 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT
 
             startGrid(i);
 
-            GridCache<String, Integer> cache = cache(i);
+            IgniteCache<String, Integer> cache = jcache(i);
+
+            IgniteTransactions txs = ignite(i).transactions();
+
+            try (IgniteTx tx = txs.txStart(txConcurrency, IgniteTxIsolation.READ_COMMITTED)) {
+                cache.invoke(TX_KEY, new EntryProcessor<String, Integer, Void>() {
+                    @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+                        Integer val = e.getValue();
 
-            try (IgniteTx tx = cache.txStart(txConcurrency, IgniteTxIsolation.READ_COMMITTED)) {
-                cache.transform(TX_KEY, new C1<Integer, Integer>() {
-                    @Override public Integer apply(Integer val) {
                         if (val == null) {
                             keyNotLoaded = true;
 
-                            return 1;
+                            e.setValue(1);
+
+                            return null;
                         }
 
-                        return val + 1;
+                        e.setValue(val + 1);
+
+                        return null;
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
index 5635ac8..d4d3396 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
@@ -9,8 +9,9 @@
 
 package org.gridgain.grid.kernal.processors.cache.distributed.dht;
 
+import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.transactions.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.processors.cache.*;
@@ -19,6 +20,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.gridgain.testframework.junits.common.*;
 
+import javax.cache.processor.*;
 import java.util.*;
 
 import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
@@ -55,16 +57,23 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri
     /** IP finder. */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
-    /** Value increment closure. */
-    private static final IgniteClosure<Integer, Integer> INCR_CLOS = new IgniteClosure<Integer, Integer>() {
-        @Override public Integer apply(Integer src) {
-            return src == null ? 1 : src + 1;
+    /** Value increment processor. */
+    private static final EntryProcessor<String, Integer, Void> INCR_CLOS = new EntryProcessor<String, Integer, Void>() {
+        @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+            if (!e.exists())
+                e.setValue(1);
+            else
+                e.setValue(e.getValue() + 1);
+
+            return null;
         }
     };
 
-    /** Value remove closure. */
-    private static final IgniteClosure<Integer, Integer> RMV_CLOS = new IgniteClosure<Integer, Integer>() {
-        @Override public Integer apply(Integer src) {
+    /** Value remove processor. */
+    private static final EntryProcessor<String, Integer, Void> RMV_CLOS = new EntryProcessor<String, Integer, Void>() {
+        @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+            e.remove();
+
             return null;
         }
     };
@@ -82,6 +91,8 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
         TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
 
         discoSpi.setIpFinder(IP_FINDER);
@@ -221,7 +232,7 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri
      * @throws Exception If failed.
      */
     protected void checkTransform(IgniteTxConcurrency concurrency, int nodeType, int op) throws Exception {
-        GridCacheProjection<String, Integer> cache = cache(0);
+        IgniteCache<String, Integer> cache = jcache(0);
 
         Collection<String> keys = keysForType(nodeType);
 
@@ -233,18 +244,18 @@ public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends Gri
         nearStore.reset();
 
         for (String key : keys)
-            cache.clear(key);
+            cache(0).clear(key);
 
         info(">>> Starting transform transaction");
 
-        try (IgniteTx tx = cache.txStart(concurrency, READ_COMMITTED)) {
+        try (IgniteTx tx = ignite(0).transactions().txStart(concurrency, READ_COMMITTED)) {
             if (op == OP_UPDATE) {
                 for (String key : keys)
-                    cache.transform(key, INCR_CLOS);
+                    cache.invoke(key, INCR_CLOS);
             }
             else {
                 for (String key : keys)
-                    cache.transform(key, RMV_CLOS);
+                    cache.invoke(key, RMV_CLOS);
             }
 
             tx.commit();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
index f84d006..ab2024a 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
@@ -11,8 +11,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.affinity.*;
 import org.gridgain.grid.kernal.*;
@@ -23,6 +21,8 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.junits.common.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.processor.*;
+import java.io.*;
 import java.util.*;
 
 import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
@@ -255,9 +255,9 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
 
         Ignite ignite0 = grid(0);
 
-        GridCache<Integer, Integer> cache0 = ignite0.cache(null);
+        IgniteCache<Integer, Integer> cache0 = ignite0.jcache(null);
 
-        GridCacheAffinity<Integer> aff = cache0.affinity();
+        GridCacheAffinity<Object> aff = cache(0).affinity();
 
         UUID id0 = ignite0.cluster().localNode().id();
 
@@ -265,7 +265,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
 
         log.info("Transform from primary.");
 
-        cache0.transform(primaryKey, new TransformClosure(primaryKey));
+        cache0.invoke(primaryKey, new Processor(primaryKey));
 
         for (int i = 0; i < GRID_CNT; i++)
             checkEntry(grid(i), primaryKey, primaryKey, false);
@@ -275,7 +275,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
 
             log.info("Transform from backup.");
 
-            cache0.transform(backupKey, new TransformClosure(backupKey));
+            cache0.invoke(backupKey, new Processor(backupKey));
 
             for (int i = 0; i < GRID_CNT; i++)
                 checkEntry(grid(i), backupKey, backupKey, false);
@@ -285,7 +285,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
 
         log.info("Transform from near.");
 
-        cache0.transform(nearKey, new TransformClosure(nearKey));
+        cache0.invoke(nearKey, new Processor(nearKey));
 
         for (int i = 0; i < GRID_CNT; i++) {
             UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
@@ -302,11 +302,11 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
         for (int i = 0; i < GRID_CNT; i++) {
             delay();
 
-            GridCache<Integer, Integer> cache = grid(i).cache(null);
+            IgniteCache<Integer, Integer> cache = grid(i).jcache(null);
 
             log.info("Transform [grid=" + grid(i).name() + ", val=" + val + ']');
 
-            cache.transform(nearKey, new TransformClosure(val));
+            cache.invoke(nearKey, new Processor(val));
 
             if (!aff.isPrimaryOrBackup(grid(i).localNode(), nearKey))
                 readers.add(grid(i).localNode().id());
@@ -332,53 +332,53 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
 
         Ignite ignite0 = grid(0);
 
-        GridCache<Integer, Integer> cache0 = ignite0.cache(null);
+        IgniteCache<Integer, Integer> cache0 = ignite0.jcache(null);
 
-        GridCacheAffinity<Integer> aff = cache0.affinity();
+        GridCacheAffinity<Object> aff = ignite0.cache(null).affinity();
 
         UUID id0 = ignite0.cluster().localNode().id();
 
-        Map<Integer, TransformClosure> primaryKeys = new HashMap<>();
+        Set<Integer> primaryKeys = new HashSet<>();
 
         for (int i = 0; i < 10; i++)
-            primaryKeys.put(key(ignite0, PRIMARY), new TransformClosure(1));
+            primaryKeys.add(key(ignite0, PRIMARY));
 
         log.info("TransformAll from primary.");
 
-        cache0.transformAll(primaryKeys);
+        cache0.invokeAll(primaryKeys, new Processor(1));
 
         for (int i = 0; i < GRID_CNT; i++) {
-            for (Integer primaryKey : primaryKeys.keySet())
+            for (Integer primaryKey : primaryKeys)
                 checkEntry(grid(i), primaryKey, 1, false);
         }
 
         if (backups > 0) {
-            Map<Integer, TransformClosure> backupKeys = new HashMap<>();
+            Set<Integer> backupKeys = new HashSet<>();
 
             for (int i = 0; i < 10; i++)
-                backupKeys.put(key(ignite0, BACKUP), new TransformClosure(2));
+                backupKeys.add(key(ignite0, BACKUP));
 
             log.info("TransformAll from backup.");
 
-            cache0.transformAll(backupKeys);
+            cache0.invokeAll(backupKeys, new Processor(2));
 
             for (int i = 0; i < GRID_CNT; i++) {
-                for (Integer backupKey : backupKeys.keySet())
+                for (Integer backupKey : backupKeys)
                     checkEntry(grid(i), backupKey, 2, false);
             }
         }
 
-        Map<Integer, TransformClosure> nearKeys = new HashMap<>();
+        Set<Integer> nearKeys = new HashSet<>();
 
         for (int i = 0; i < 30; i++)
-            nearKeys.put(key(ignite0, NOT_PRIMARY_AND_BACKUP), new TransformClosure(3));
+            nearKeys.add(key(ignite0, NOT_PRIMARY_AND_BACKUP));
 
         log.info("TransformAll from near.");
 
-        cache0.transformAll(nearKeys);
+        cache0.invokeAll(nearKeys, new Processor(3));
 
         for (int i = 0; i < GRID_CNT; i++) {
-            for (Integer nearKey : nearKeys.keySet()) {
+            for (Integer nearKey : nearKeys) {
                 UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
 
                 checkEntry(grid(i), nearKey, 3, i == 0, expReaders);
@@ -387,7 +387,7 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
 
         Map<Integer, Collection<UUID>> readersMap = new HashMap<>();
 
-        for (Integer key : nearKeys.keySet())
+        for (Integer key : nearKeys)
             readersMap.put(key, new HashSet<UUID>());
 
         int val = 4;
@@ -395,22 +395,22 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
         for (int i = 0; i < GRID_CNT; i++) {
             delay();
 
-            GridCache<Integer, Integer> cache = grid(i).cache(null);
+            IgniteCache<Integer, Integer> cache = grid(i).jcache(null);
 
-            for (Integer key : nearKeys.keySet())
-                nearKeys.put(key, new TransformClosure(val));
+            for (Integer key : nearKeys)
+                nearKeys.add(key);
 
             log.info("TransformAll [grid=" + grid(i).name() + ", val=" + val + ']');
 
-            cache.transformAll(nearKeys);
+            cache.invokeAll(nearKeys, new Processor(val));
 
-            for (Integer key : nearKeys.keySet()) {
+            for (Integer key : nearKeys) {
                 if (!aff.isPrimaryOrBackup(grid(i).localNode(), key))
                     readersMap.get(key).add(grid(i).localNode().id());
             }
 
             for (int j = 0; j < GRID_CNT; j++) {
-                for (Integer key : nearKeys.keySet()) {
+                for (Integer key : nearKeys) {
                     boolean primaryNode = aff.isPrimary(grid(j).localNode(), key);
 
                     Collection<UUID> readers = readersMap.get(key);
@@ -789,21 +789,23 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     *
      */
-    private static class TransformClosure implements IgniteClosure<Integer, Integer> {
+    private static class Processor implements EntryProcessor<Integer, Integer, Void>, Serializable {
         /** */
         private final Integer newVal;
 
         /**
          * @param newVal New value.
          */
-        private TransformClosure(Integer newVal) {
+        private Processor(Integer newVal) {
             this.newVal = newVal;
         }
 
-        /** {@inheritDoc} */
-        @Override public Integer apply(Integer val) {
-            return newVal;
+        @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+            e.setValue(newVal);
+
+            return null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
index 252ac3a..402cb24 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
@@ -10,13 +10,14 @@
 package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic;
 
 import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.*;
 import org.jdk8.backport.*;
 
+import javax.cache.processor.*;
+import java.io.*;
 import java.util.concurrent.atomic.*;
 
 import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
@@ -62,14 +63,14 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi
                         for (int i = rangeStart; i < rangeStart + range; i++) {
                             int idx = ThreadLocalRandom8.current().nextInt(gridCount());
 
-                            GridCacheProjection<Integer, Integer> cache = grid(idx).cache(null);
+                            IgniteCache<Integer, Integer> cache = grid(idx).jcache(null);
 
                             cache = cache.flagsOn(GridCacheFlag.FORCE_TRANSFORM_BACKUP);
 
-                            cache.transform(i, new Transformer(i));
+                            cache.invoke(i, new Transformer(i));
                         }
                     }
-                    catch (IgniteCheckedException e) {
+                    catch (Exception e) {
                         throw new IgniteException(e);
                     }
                 }
@@ -102,20 +103,28 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi
     /**
      *
      */
-    private static class Transformer implements IgniteClosure<Integer, Integer> {
+    private static class Transformer implements EntryProcessor<Integer, Integer, Void>, Serializable {
+        /** */
         private int key;
 
+        /**
+         * @param key Key.
+         */
         private Transformer(int key) {
             this.key = key;
         }
 
         /** {@inheritDoc} */
-        @Override public Integer apply(Integer old) {
+        @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+            Integer old = e.getValue();
+
             if (key < 5)
                 System.err.println(Thread.currentThread().getName() + " <> Transforming value [key=" + key +
                     ", val=" + old + ']');
 
-            return old == null ? 1 : old + 1;
+            e.setValue(old == null ? 1 : old + 1);
+
+            return null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
index 3c56237..96ab5fb 100644
--- a/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/testframework/junits/GridAbstractTest.java
@@ -1402,7 +1402,8 @@ public abstract class GridAbstractTest extends TestCase {
                 int cnt = 0;
 
                 for (Method m : GridAbstractTest.this.getClass().getMethods())
-                    if (m.getDeclaringClass().getName().startsWith("org.gridgain")) {
+                    if (m.getDeclaringClass().getName().startsWith("org.gridgain") ||
+                        m.getDeclaringClass().getName().startsWith("org.apache.ignite")) {
                         if (m.getName().startsWith("test") && Modifier.isPublic(m.getModifiers()))
                             cnt++;
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
index 9fc3ff7..f40d941 100644
--- a/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/testframework/junits/common/GridCommonAbstractTest.java
@@ -61,6 +61,14 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
 
     /**
      * @param idx Grid index.
+     * @return Cache.
+     */
+    protected <K, V> IgniteCache<K, V> jcache(int idx) {
+        return grid(idx).jcache(null);
+    }
+
+    /**
+     * @param idx Grid index.
      * @param name Cache name.
      * @return Cache.
      */
@@ -275,7 +283,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
 
     /**
      * @param cache Cache.
-     * @return Collection of keys for which given cache is primary.
+     * @return Key for which given cache is primary.
      * @throws IgniteCheckedException If failed.
      */
     protected Integer primaryKey(GridCacheProjection<?, ?> cache)
@@ -327,7 +335,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
 
     /**
      * @param cache Cache.
-     * @return Collection of keys for which given cache is backup.
+     * @return Key for which given cache is backup.
      * @throws IgniteCheckedException If failed.
      */
     protected Integer backupKey(GridCacheProjection<?, ?> cache)
@@ -379,7 +387,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
 
     /**
      * @param cache Cache.
-     * @return Collection of keys for which given cache is neither primary nor backup.
+     * @return Keys for which given cache is neither primary nor backup.
      * @throws IgniteCheckedException If failed.
      */
     protected Integer nearKey(GridCacheProjection<?, ?> cache)
@@ -472,6 +480,42 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
     }
 
     /**
+     * @param cache Cache.
+     * @return Collection of keys for which given cache is primary.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected Integer primaryKey(IgniteCache<?, ?> cache)
+        throws IgniteCheckedException {
+        GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate");
+
+        return primaryKey(prj);
+    }
+
+    /**
+     * @param cache Cache.
+     * @return Keys for which given cache is backup.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected Integer backupKey(IgniteCache<?, ?> cache)
+        throws IgniteCheckedException {
+        GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate");
+
+        return backupKey(prj);
+    }
+
+    /**
+     * @param cache Cache.
+     * @return Key for which given cache is neither primary nor backup.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected Integer nearKey(IgniteCache<?, ?> cache)
+        throws IgniteCheckedException {
+        GridCacheProjection<?, ?> prj = GridTestUtils.getFieldValue(cache, "delegate");
+
+        return nearKey(prj);
+    }
+
+    /**
      * @param comp Compute.
      * @param task Task.
      * @param arg Task argument.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
index e642753..d9a5e22 100644
--- a/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/testsuites/bamboo/GridDataGridTestSuite.java
@@ -10,6 +10,7 @@
 package org.gridgain.testsuites.bamboo;
 
 import junit.framework.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.expiry.*;
 import org.gridgain.grid.*;
 import org.gridgain.grid.cache.affinity.fair.*;
@@ -31,14 +32,24 @@ import org.gridgain.testsuites.*;
  */
 public class GridDataGridTestSuite extends TestSuite {
     /**
-     * @return GridGain TeamCity in-memory data grid test suite.
+     * @return IgniteCache test suite.
      * @throws Exception Thrown in case of the failure.
      */
     public static TestSuite suite() throws Exception {
-        TestSuite suite = new TestSuite("Gridgain In-Memory Data Grid Test Suite");
+        TestSuite suite = new TestSuite("IgniteCache Test Suite");
 
         suite.addTest(IgniteCacheExpiryPolicyTestSuite.suite());
 
+        suite.addTestSuite(IgniteCacheAtomicInvokeTest.class);
+        suite.addTestSuite(IgniteCacheAtomicNearEnabledInvokeTest.class);
+        suite.addTestSuite(IgniteCacheAtomicPrimaryWriteOrderInvokeTest.class);
+        suite.addTestSuite(IgniteCacheAtomicPrimaryWriteOrderWithStoreInvokeTest.class);
+        suite.addTestSuite(IgniteCacheAtomicLocalInvokeTest.class);
+        suite.addTestSuite(IgniteCacheAtomicLocalWithStoreInvokeTest.class);
+        suite.addTestSuite(IgniteCacheTxInvokeTest.class);
+        suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class);
+        suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class);
+
         // Affinity tests.
         suite.addTestSuite(GridCachePartitionFairAffinityNodesSelfTest.class);
         suite.addTestSuite(GridCacheAffinityBackupsSelfTest.class);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
index 75cc266..23d98c1 100644
--- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
@@ -29,6 +29,8 @@ import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
+import javax.cache.processor.*;
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -46,10 +48,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
     private final GridMutex mux = new GridMutex();
 
     /** */
-    private volatile GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj;
+    private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj;
 
     /** Projection with expiry policy for finished job updates. */
-    private volatile GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj;
+    private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj;
 
     /** Map-reduce execution planner. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@@ -96,8 +98,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
      * @return Job meta projection.
      */
     @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-    private GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() {
-        GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj;
+    private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() {
+        GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj;
 
         if (prj == null) {
             synchronized (mux) {
@@ -118,7 +120,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
                         throw new IllegalStateException(e);
                     }
 
-                    jobMetaPrj = prj = sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class);
+                    jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>)
+                        sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class);
 
                     if (ctx.configuration().getFinishedJobInfoTtl() > 0) {
                         ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
@@ -139,8 +142,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
     /**
      * @return Projection with expiry policy for finished job updates.
      */
-    private GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() {
-        GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj;
+    private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() {
+        GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj;
 
         if (prj == null) {
             jobMetaCache();
@@ -430,10 +433,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
             assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)):
                 "Missing local state for finished task [info=" + info + ", status=" + status + ']';
 
-            StackedClosure incrCntrs = null;
+            StackedProcessor incrCntrs = null;
 
             if (status.state() == COMPLETED)
-                incrCntrs = new IncrementCountersClosure(null, status.counters());
+                incrCntrs = new IncrementCountersProcessor(null, status.counters());
 
             switch (info.type()) {
                 case SETUP: {
@@ -462,9 +465,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
 
                 case COMMIT:
                 case ABORT: {
-                    GridCacheProjection<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache();
+                    GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache();
 
-                    cache.transformAsync(info.jobId(), new UpdatePhaseClosure(incrCntrs, PHASE_COMPLETE)).
+                    cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)).
                         listenAsync(failsLog);
 
                     break;
@@ -480,8 +483,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
      * @param jobId Job id.
      * @param c Closure of operation.
      */
-    private void transform(GridHadoopJobId jobId, IgniteClosure<GridHadoopJobMetadata, GridHadoopJobMetadata> c) {
-        jobMetaCache().transformAsync(jobId, c).listenAsync(failsLog);
+    private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void> c) {
+        jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog);
     }
 
     /**
@@ -493,7 +496,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
      */
     public void onExternalMappersInitialized(GridHadoopJobId jobId, Collection<Integer> reducers,
         GridHadoopProcessDescriptor desc) {
-        transform(jobId, new InitializeReducersClosure(null, reducers, desc));
+        transform(jobId, new InitializeReducersProcessor(null, reducers, desc));
     }
 
     /**
@@ -601,7 +604,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
                         }
 
                         if (cancelSplits != null || cancelReducers != null)
-                            jobMetaCache().transform(meta.jobId(), new CancelJobClosure(null, new IgniteCheckedException(
+                            jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException(
                                 "One or more nodes participating in map-reduce job execution failed."), cancelSplits,
                                 cancelReducers));
                     }
@@ -615,8 +618,10 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
 
     /**
      * @param updated Updated cache entries.
+     * @throws IgniteCheckedException If failed.
      */
-    private void processJobMetadataUpdates(Iterable<Map.Entry<GridHadoopJobId, GridHadoopJobMetadata>> updated) throws IgniteCheckedException {
+    private void processJobMetadataUpdates(Iterable<Map.Entry<GridHadoopJobId, GridHadoopJobMetadata>> updated)
+        throws IgniteCheckedException {
         UUID locNodeId = ctx.localNodeId();
 
         for (Map.Entry<GridHadoopJobId, GridHadoopJobMetadata> entry : updated) {
@@ -637,7 +642,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
                 U.error(log, "Failed to process job state changed callback (will fail the job) " +
                     "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e);
 
-                transform(jobId, new CancelJobClosure(null, e));
+                transform(jobId, new CancelJobProcessor(null, e));
 
                 continue;
             }
@@ -780,7 +785,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
                     }
 
                     if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty())
-                        transform(jobId, new CancelJobClosure(null, cancelMappers, cancelReducers));
+                        transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers));
                 }
 
                 break;
@@ -1017,7 +1022,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
             if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) {
                 GridHadoopTaskCancelledException err = new GridHadoopTaskCancelledException("Job cancelled.");
 
-                jobMetaCache().transform(jobId, new CancelJobClosure(null, err));
+                jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err));
             }
         }
         finally {
@@ -1146,13 +1151,13 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param status Task status.
          * @param prev Previous closure.
          */
-        private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedClosure prev) {
+        private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) {
             final GridHadoopJobId jobId = taskInfo.jobId();
 
             if (status.state() == FAILED || status.state() == CRASHED)
-                transform(jobId, new CancelJobClosure(prev, status.failCause()));
+                transform(jobId, new CancelJobProcessor(prev, status.failCause()));
             else
-                transform(jobId, new UpdatePhaseClosure(prev, PHASE_MAP));
+                transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP));
         }
 
         /**
@@ -1161,14 +1166,14 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param prev Previous closure.
          */
         private void onMapFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status,
-            final StackedClosure prev) {
+            final StackedProcessor prev) {
             final GridHadoopJobId jobId = taskInfo.jobId();
 
             boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size();
 
             if (status.state() == FAILED || status.state() == CRASHED) {
                 // Fail the whole job.
-                transform(jobId, new RemoveMappersClosure(prev, taskInfo.inputSplit(), status.failCause()));
+                transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause()));
 
                 return;
             }
@@ -1186,7 +1191,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
                         }
                     }
 
-                    transform(jobId, new RemoveMappersClosure(prev, taskInfo.inputSplit(), err));
+                    transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err));
                 }
             };
 
@@ -1201,13 +1206,13 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param status Task status.
          * @param prev Previous closure.
          */
-        private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedClosure prev) {
+        private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) {
             GridHadoopJobId jobId = taskInfo.jobId();
             if (status.state() == FAILED || status.state() == CRASHED)
                 // Fail the whole job.
-                transform(jobId, new RemoveReducerClosure(prev, taskInfo.taskNumber(), status.failCause()));
+                transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause()));
             else
-                transform(jobId, new RemoveReducerClosure(prev, taskInfo.taskNumber()));
+                transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber()));
         }
 
         /**
@@ -1216,12 +1221,12 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param prev Previous closure.
          */
         private void onCombineFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status,
-            final StackedClosure prev) {
+            final StackedProcessor prev) {
             final GridHadoopJobId jobId = taskInfo.jobId();
 
             if (status.state() == FAILED || status.state() == CRASHED)
                 // Fail the whole job.
-                transform(jobId, new RemoveMappersClosure(prev, currMappers, status.failCause()));
+                transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause()));
             else {
                 ctx.shuffle().flush(jobId).listenAsync(new CIX1<IgniteFuture<?>>() {
                     @Override public void applyx(IgniteFuture<?> f) {
@@ -1236,7 +1241,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
                             }
                         }
 
-                        transform(jobId, new RemoveMappersClosure(prev, currMappers, err));
+                        transform(jobId, new RemoveMappersProcessor(prev, currMappers, err));
                     }
                 });
             }
@@ -1272,7 +1277,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
     /**
      * Update job phase transform closure.
      */
-    private static class UpdatePhaseClosure extends StackedClosure {
+    private static class UpdatePhaseProcessor extends StackedProcessor {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1283,7 +1288,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param prev Previous closure.
          * @param phase Phase to update.
          */
-        private UpdatePhaseClosure(@Nullable StackedClosure prev, GridHadoopJobPhase phase) {
+        private UpdatePhaseProcessor(@Nullable StackedProcessor prev, GridHadoopJobPhase phase) {
             super(prev);
 
             this.phase = phase;
@@ -1298,7 +1303,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
     /**
      * Remove mapper transform closure.
      */
-    private static class RemoveMappersClosure extends StackedClosure {
+    private static class RemoveMappersProcessor extends StackedProcessor {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1313,7 +1318,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param split Mapper split to remove.
          * @param err Error.
          */
-        private RemoveMappersClosure(@Nullable StackedClosure prev, GridHadoopInputSplit split, Throwable err) {
+        private RemoveMappersProcessor(@Nullable StackedProcessor prev, GridHadoopInputSplit split, Throwable err) {
             this(prev, Collections.singletonList(split), err);
         }
 
@@ -1321,8 +1326,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param prev Previous closure.
          * @param splits Mapper splits to remove.
          */
-        private RemoveMappersClosure(@Nullable StackedClosure prev, Collection<GridHadoopInputSplit> splits,
-            Throwable err) {
+        private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<GridHadoopInputSplit> splits,
+                                       Throwable err) {
             super(prev);
 
             this.splits = splits;
@@ -1354,7 +1359,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
     /**
      * Remove reducer transform closure.
      */
-    private static class RemoveReducerClosure extends StackedClosure {
+    private static class RemoveReducerProcessor extends StackedProcessor {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1368,7 +1373,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param prev Previous closure.
          * @param rdc Reducer to remove.
          */
-        private RemoveReducerClosure(@Nullable StackedClosure prev, int rdc) {
+        private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) {
             super(prev);
 
             this.rdc = rdc;
@@ -1378,7 +1383,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param prev Previous closure.
          * @param rdc Reducer to remove.
          */
-        private RemoveReducerClosure(@Nullable StackedClosure prev, int rdc, Throwable err) {
+        private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) {
             super(prev);
 
             this.rdc = rdc;
@@ -1403,7 +1408,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
     /**
      * Initialize reducers.
      */
-    private static class InitializeReducersClosure extends StackedClosure {
+    private static class InitializeReducersProcessor extends StackedProcessor {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1418,7 +1423,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param rdc Reducers to initialize.
          * @param desc External process descriptor.
          */
-        private InitializeReducersClosure(@Nullable StackedClosure prev, Collection<Integer> rdc,
+        private InitializeReducersProcessor(@Nullable StackedProcessor prev,
+            Collection<Integer> rdc,
             GridHadoopProcessDescriptor desc) {
             super(prev);
 
@@ -1446,7 +1452,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
     /**
      * Remove reducer transform closure.
      */
-    private static class CancelJobClosure extends StackedClosure {
+    private static class CancelJobProcessor extends StackedProcessor {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1463,7 +1469,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param prev Previous closure.
          * @param err Fail cause.
          */
-        private CancelJobClosure(@Nullable StackedClosure prev, Throwable err) {
+        private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) {
             this(prev, err, null, null);
         }
 
@@ -1472,7 +1478,8 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param splits Splits to remove.
          * @param rdc Reducers to remove.
          */
-        private CancelJobClosure(@Nullable StackedClosure prev, Collection<GridHadoopInputSplit> splits,
+        private CancelJobProcessor(@Nullable StackedProcessor prev,
+            Collection<GridHadoopInputSplit> splits,
             Collection<Integer> rdc) {
             this(prev, null, splits, rdc);
         }
@@ -1483,7 +1490,9 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param splits Splits to remove.
          * @param rdc Reducers to remove.
          */
-        private CancelJobClosure(@Nullable StackedClosure prev, Throwable err, Collection<GridHadoopInputSplit> splits,
+        private CancelJobProcessor(@Nullable StackedProcessor prev,
+            Throwable err,
+            Collection<GridHadoopInputSplit> splits,
             Collection<Integer> rdc) {
             super(prev);
 
@@ -1522,7 +1531,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
     /**
      * Increment counter values closure.
      */
-    private static class IncrementCountersClosure extends StackedClosure {
+    private static class IncrementCountersProcessor extends StackedProcessor {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1533,7 +1542,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
          * @param prev Previous closure.
          * @param counters Task counters to add into job counters.
          */
-        private IncrementCountersClosure(@Nullable StackedClosure prev, GridHadoopCounters counters) {
+        private IncrementCountersProcessor(@Nullable StackedProcessor prev, GridHadoopCounters counters) {
             super(prev);
 
             assert counters != null;
@@ -1554,22 +1563,33 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
     /**
      * Abstract stacked closure.
      */
-    private abstract static class StackedClosure implements IgniteClosure<GridHadoopJobMetadata, GridHadoopJobMetadata> {
+    private abstract static class StackedProcessor implements
+        EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void>, Serializable {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** */
-        private final StackedClosure prev;
+        private final StackedProcessor prev;
 
         /**
          * @param prev Previous closure.
          */
-        private StackedClosure(@Nullable StackedClosure prev) {
+        private StackedProcessor(@Nullable StackedProcessor prev) {
             this.prev = prev;
         }
 
         /** {@inheritDoc} */
-        @Override public final GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) {
+        @Override public Void process(MutableEntry<GridHadoopJobId, GridHadoopJobMetadata> e, Object... args) {
+            e.setValue(apply(e.getValue()));
+
+            return null;
+        }
+
+        /**
+         * @param meta Old value.
+         * @return New value.
+         */
+        private GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) {
             if (meta == null)
                 return null;
 


Mime
View raw message