ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dma...@apache.org
Subject [15/25] ignite git commit: ignite-1526: full support of IBM JDK by Ignite
Date Mon, 12 Oct 2015 17:53:40 GMT
ignite-1526: full support of IBM JDK by Ignite


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fc682f1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fc682f1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fc682f1

Branch: refs/heads/ignite-1272
Commit: 5fc682f11f43f61d14d6b70be5ccf949a9ae05ac
Parents: 4f95be2
Author: Andrey Gura <agura@gridgain.com>
Authored: Fri Oct 9 13:54:56 2015 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Fri Oct 9 14:22:34 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |   17 +-
 .../internal/portable/PortableContext.java      |    7 +
 .../portable/api/PortableMarshaller.java        |   14 +-
 .../ignite/internal/util/GridJavaProcess.java   |   12 +-
 .../ignite/internal/util/lang/GridFunc.java     |   12 +
 .../apache/ignite/marshaller/Marshaller.java    |    2 +-
 .../optimized/OptimizedMarshallerUtils.java     |    6 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  486 ++++---
 .../CacheNoValueClassOnServerNodeTest.java      |    1 +
 ...tomicClientOnlyMultiNodeFullApiSelfTest.java |   71 +-
 ...achePartitionedMultiNodeFullApiSelfTest.java |  129 +-
 .../testframework/junits/GridAbstractTest.java  |  116 +-
 .../junits/IgniteTestResources.java             |    8 +-
 .../junits/common/GridCommonAbstractTest.java   |   15 +-
 .../junits/multijvm/AffinityProcessProxy.java   |  440 ++++--
 .../multijvm/IgniteCacheProcessProxy.java       | 1348 ++++++++++++++----
 .../multijvm/IgniteClusterProcessProxy.java     |  115 +-
 .../multijvm/IgniteEventsProcessProxy.java      |   50 +-
 .../junits/multijvm/IgniteNodeRunner.java       |   39 +-
 .../junits/multijvm/IgniteProcessProxy.java     |  107 +-
 .../cache/CacheConfigurationP2PTest.java        |    3 +
 21 files changed, 2186 insertions(+), 812 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 1801b9c..cba06de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -987,19 +987,24 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
         boolean locP2pEnabled = locNode.attribute(ATTR_PEER_CLASSLOADING);
 
-        boolean warned = false;
+        boolean ipV4Warned = false;
+
+        boolean jvmMajVerWarned = false;
 
         for (ClusterNode n : nodes) {
             int rmtJvmMajVer = nodeJavaMajorVersion(n);
 
-            if (locJvmMajVer != rmtJvmMajVer)
-                throw new IgniteCheckedException("Local node's java major version is different from remote node's one" +
-                    " [locJvmMajVer=" + locJvmMajVer + ", rmtJvmMajVer=" + rmtJvmMajVer + "]");
+            if (locJvmMajVer != rmtJvmMajVer && !jvmMajVerWarned) {
+                U.warn(log, "Local java version is different from remote [loc=" +
+                    locJvmMajVer + ", rmt=" + rmtJvmMajVer + "]");
+
+                jvmMajVerWarned = true;
+            }
 
             String rmtPreferIpV4 = n.attribute("java.net.preferIPv4Stack");
 
             if (!F.eq(rmtPreferIpV4, locPreferIpV4)) {
-                if (!warned)
+                if (!ipV4Warned)
                     U.warn(log, "Local node's value of 'java.net.preferIPv4Stack' " +
                         "system property differs from remote node's " +
                         "(all nodes in topology should have identical value) " +
@@ -1008,7 +1013,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         ", rmtAddrs=" + U.addressesAsString(n) + ']',
                         "Local and remote 'java.net.preferIPv4Stack' system properties do not match.");
 
-                warned = true;
+                ipV4Warned = true;
             }
 
             // Daemon nodes are allowed to have any deployment they need.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index 2ee96b7..1ad42ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -967,6 +967,9 @@ public class PortableContext implements Externalizable {
         }
     }
 
+    /**
+     * Basic class ID mapper.
+     */
     private static class BasicClassIdMapper implements PortableIdMapper {
         /** {@inheritDoc} */
         @Override public int typeId(String clsName) {
@@ -1121,6 +1124,10 @@ public class PortableContext implements Externalizable {
         /** Whether the following type is registered in a cache or not */
         private final boolean registered;
 
+        /**
+         * @param id Id.
+         * @param registered Registered.
+         */
         public Type(int id, boolean registered) {
             this.id = id;
             this.registered = registered;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
index de0df8d..3dfbdf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java
@@ -29,12 +29,6 @@ import org.apache.ignite.internal.portable.GridPortableMarshaller;
 import org.apache.ignite.internal.portable.PortableContext;
 import org.apache.ignite.marshaller.AbstractMarshaller;
 import org.apache.ignite.marshaller.MarshallerContext;
-import org.apache.ignite.internal.portable.api.PortableException;
-import org.apache.ignite.internal.portable.api.PortableIdMapper;
-import org.apache.ignite.internal.portable.api.PortableObject;
-import org.apache.ignite.internal.portable.api.PortableProtocolVersion;
-import org.apache.ignite.internal.portable.api.PortableSerializer;
-import org.apache.ignite.internal.portable.api.PortableTypeConfiguration;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -336,7 +330,7 @@ public class PortableMarshaller extends AbstractMarshaller {
 
     /** {@inheritDoc} */
     @Override public <T> T unmarshal(InputStream in, @Nullable ClassLoader clsLdr) throws IgniteCheckedException {
-        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
 
         byte[] arr = new byte[4096];
         int cnt;
@@ -345,11 +339,11 @@ public class PortableMarshaller extends AbstractMarshaller {
         // returns number of bytes remaining.
         try {
             while ((cnt = in.read(arr)) != -1)
-                buffer.write(arr, 0, cnt);
+                buf.write(arr, 0, cnt);
 
-            buffer.flush();
+            buf.flush();
 
-            return impl.deserialize(buffer.toByteArray(), clsLdr);
+            return impl.deserialize(buf.toByteArray(), clsLdr);
         }
         catch (IOException e) {
             throw new PortableException("Failed to unmarshal the object from InputStream", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
index 92c20fe..3371eb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
@@ -89,7 +89,7 @@ public final class GridJavaProcess {
      */
     public static GridJavaProcess exec(Class cls, String params, @Nullable IgniteLogger log,
         @Nullable IgniteInClosure<String> printC, @Nullable GridAbsClosure procKilledC) throws Exception {
-        return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, null);
+        return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, null, null);
     }
 
     /**
@@ -108,7 +108,7 @@ public final class GridJavaProcess {
     public static GridJavaProcess exec(Class cls, String params, @Nullable IgniteLogger log,
         @Nullable IgniteInClosure<String> printC, @Nullable GridAbsClosure procKilledC,
         @Nullable Collection<String> jvmArgs, @Nullable String cp) throws Exception {
-        return exec(cls.getCanonicalName(), params, log, printC, procKilledC, jvmArgs, cp);
+        return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, jvmArgs, cp);
     }
 
     /**
@@ -116,9 +116,10 @@ public final class GridJavaProcess {
      *
      * @param clsName Class with main() method to be run.
      * @param params main() method parameters.
+     * @param log Log to use.
      * @param printC Optional closure to be called each time wrapped process prints line to system.out or system.err.
      * @param procKilledC Optional closure to be called when process termination is detected.
-     * @param log Log to use.
+     * @param javaHome Java home location. The process will be started under given JVM.
      * @param jvmArgs JVM arguments to use.
      * @param cp Additional classpath.
      * @return Wrapper around {@link Process}
@@ -126,7 +127,7 @@ public final class GridJavaProcess {
      */
     public static GridJavaProcess exec(String clsName, String params, @Nullable IgniteLogger log,
         @Nullable IgniteInClosure<String> printC, @Nullable GridAbsClosure procKilledC,
-        @Nullable Collection<String> jvmArgs, @Nullable String cp) throws Exception {
+        @Nullable String javaHome, @Nullable Collection<String> jvmArgs, @Nullable String cp) throws Exception {
         if (!(U.isLinux() || U.isMacOs() || U.isWindows()))
             throw new Exception("Your OS is not supported.");
 
@@ -140,7 +141,8 @@ public final class GridJavaProcess {
 
         List<String> procCommands = new ArrayList<>();
 
-        String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
+        String javaBin = (javaHome == null ? System.getProperty("java.home") : javaHome) +
+            File.separator + "bin" + File.separator + "java";
 
         procCommands.add(javaBin);
         procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index ffeeca0..43bc5f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -103,6 +103,9 @@ public class GridFunc {
 
     /** */
     private static final IgniteClosure IDENTITY = new C1() {
+        /** */
+        private static final long serialVersionUID = -6338573080046225172L;
+
         @Override public Object apply(Object o) {
             return o;
         }
@@ -1196,6 +1199,9 @@ public class GridFunc {
         A.notNull(nodeId, "nodeId");
 
         return new P1<T>() {
+            /** */
+            private static final long serialVersionUID = -7082730222779476623L;
+
             @Override public boolean apply(ClusterNode e) {
                 return e.id().equals(nodeId);
             }
@@ -1705,6 +1711,9 @@ public class GridFunc {
         assert c != null;
 
         return new GridSerializableList<T2>() {
+            /** */
+            private static final long serialVersionUID = 3126625219739967068L;
+
             @Override public T2 get(int idx) {
                 return trans.apply(c.get(idx));
             }
@@ -1766,6 +1775,9 @@ public class GridFunc {
         assert m != null;
 
         return isEmpty(p) || isAlwaysTrue(p) ? m : new GridSerializableMap<K, V>() {
+            /** */
+            private static final long serialVersionUID = 5531745605372387948L;
+
             /** Entry predicate. */
             private IgnitePredicate<Entry<K, V>> ep = new P1<Map.Entry<K, V>>() {
                 @Override public boolean apply(Entry<K, V> e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
index 3e815fd..a76daa8 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java
@@ -92,7 +92,7 @@ public interface Marshaller {
     public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException;
 
     /**
-     * Unmarshals object from the output stream using given class loader.
+     * Unmarshals object from the input stream using given class loader.
      * This method should not close given input stream.
      *
      * @param <T> Type of unmarshalled object.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
index 4abbd04..584083c 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
@@ -42,6 +42,10 @@ class OptimizedMarshallerUtils {
     /** */
     private static final Unsafe UNSAFE = GridUnsafe.unsafe();
 
+    /** Use default {@code serialVersionUid} for {@link Serializable} classes. */
+    private static final boolean USE_DFLT_SUID =
+        Boolean.valueOf(System.getProperty("ignite.marsh.optimized.useDefaultSUID", Boolean.TRUE.toString()));
+
     /** */
     static final long HASH_SET_MAP_OFF;
 
@@ -283,7 +287,7 @@ class OptimizedMarshallerUtils {
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     static short computeSerialVersionUid(Class cls, List<Field> fields) throws IOException {
-        if (Serializable.class.isAssignableFrom(cls) && !Enum.class.isAssignableFrom(cls))
+        if (USE_DFLT_SUID && Serializable.class.isAssignableFrom(cls) && !Enum.class.isAssignableFrom(cls))
             return (short)ObjectStreamClass.lookup(cls).getSerialVersionUID();
 
         MessageDigest md;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 2a64963..ec3ea0c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -49,6 +50,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMemoryMode;
@@ -75,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -117,48 +120,37 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     /** */
     public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR =
         new CacheEntryProcessor<String, Integer, String>() {
+            /** */
+            private static final long serialVersionUID = 0L;
+
             @Override public String process(MutableEntry<String, Integer> e, Object... args) {
                 throw new RuntimeException("Failed!");
             }
         };
 
     /** Increment processor for invoke operations. */
-    public static final EntryProcessor<String, Integer, String> INCR_PROCESSOR = new EntryProcessor<String, Integer, String>() {
-        @Override public String process(MutableEntry<String, Integer> e, Object... args) {
-            assertNotNull(e.getKey());
-
-            Integer old = e.getValue();
-
-            e.setValue(old == null ? 1 : old + 1);
-
-            return String.valueOf(old);
-        }
-    };
+    public static final EntryProcessor<String, Integer, String> INCR_PROCESSOR = new IncrementEntryProcessor();
 
     /** Increment processor for invoke operations with IgniteEntryProcessor. */
     public static final CacheEntryProcessor<String, Integer, String> INCR_IGNITE_PROCESSOR =
         new CacheEntryProcessor<String, Integer, String>() {
+            /** */
+            private static final long serialVersionUID = 0L;
+
             @Override public String process(MutableEntry<String, Integer> e, Object... args) {
                 return INCR_PROCESSOR.process(e, args);
             }
         };
 
     /** Increment processor for invoke operations. */
-    public static final EntryProcessor<String, Integer, String> RMV_PROCESSOR = new EntryProcessor<String, Integer, String>() {
-        @Override public String process(MutableEntry<String, Integer> e, Object... args) {
-            assertNotNull(e.getKey());
-
-            Integer old = e.getValue();
-
-            e.remove();
-
-            return String.valueOf(old);
-        }
-    };
+    public static final EntryProcessor<String, Integer, String> RMV_PROCESSOR = new RemoveEntryProcessor();
 
     /** Increment processor for invoke operations with IgniteEntryProcessor. */
     public static final CacheEntryProcessor<String, Integer, String> RMV_IGNITE_PROCESSOR =
         new CacheEntryProcessor<String, Integer, String>() {
+            /** */
+            private static final long serialVersionUID = 0L;
+
             @Override public String process(MutableEntry<String, Integer> e, Object... args) {
                 return RMV_PROCESSOR.process(e, args);
             }
@@ -346,21 +338,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             assert jcache(i).localSize() != 0 || F.isEmpty(keysCol);
         }
 
-        for (int i = 0; i < gridCount(); i++) {
-            executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
-                @Override public void run(int idx) throws Exception {
-                    GridCacheContext<String, Integer> ctx = context(idx);
-
-                    int sum = 0;
-
-                    for (String key : map.keySet())
-                        if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
-                            sum++;
-
-                    assertEquals("Incorrect key size on cache #" + idx, sum, jcache(idx).localSize(ALL));
-                }
-            });
-        }
+        for (int i = 0; i < gridCount(); i++)
+            executeOnLocalOrRemoteJvm(i, new CheckCacheSizeTask(map));
 
         for (int i = 0; i < gridCount(); i++) {
             Collection<String> keysCol = mapped.get(grid(i).localNode());
@@ -1350,13 +1329,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         assertEquals((Integer)3, cache.get("k1"));
 
-        EntryProcessor<String, Integer, Integer> c = new EntryProcessor<String, Integer, Integer>() {
-            @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
-                e.remove();
-
-                return null;
-            }
-        };
+        EntryProcessor<String, Integer, Integer> c = new RemoveAndReturnNullEntryProcessor();
 
         assertNull(cache.invoke("k1", c));
         assertNull(cache.get("k1"));
@@ -1364,11 +1337,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         for (int i = 0; i < gridCount(); i++)
             assertNull(jcache(i).localPeek("k1", ONHEAP));
 
-        final EntryProcessor<String, Integer, Integer> errProcessor = new EntryProcessor<String, Integer, Integer>() {
-            @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
-                throw new EntryProcessorException("Test entry processor exception.");
-            }
-        };
+        final EntryProcessor<String, Integer, Integer> errProcessor = new FailedEntryProcessor();
 
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -2001,7 +1970,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             assertFalse(cacheAsync.<Boolean>future().get());
         }
 
-        cache.localEvict(Arrays.asList("key2"));
+        cache.localEvict(Collections.singletonList("key2"));
 
         // Same checks inside tx.
         Transaction tx = inTx ? transactions().txStart() : null;
@@ -2357,27 +2326,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             for (int i = 0; i < cnt; i++)
                 cache.remove(String.valueOf(i));
 
-            for (int g = 0; g < gridCount(); g++) {
-                executeOnLocalOrRemoteJvm(g, new TestIgniteIdxRunnable() {
-                    @Override public void run(int idx) throws Exception {
-                        for (int i = 0; i < cnt; i++) {
-                            String key = String.valueOf(i);
-
-                            GridCacheContext<String, Integer> cctx = context(idx);
-
-                            GridCacheEntryEx entry = cctx.isNear() ? cctx.near().dht().peekEx(key) :
-                                cctx.cache().peekEx(key);
-
-                            if (grid(idx).affinity(null).mapKeyToPrimaryAndBackups(key).contains(grid(idx).localNode())) {
-                                assertNotNull(entry);
-                                assertTrue(entry.deleted());
-                            }
-                            else
-                                assertNull(entry);
-                        }
-                    }
-                });
-            }
+            for (int g = 0; g < gridCount(); g++)
+                executeOnLocalOrRemoteJvm(g, new CheckEntriesDeletedTask(cnt));
         }
     }
 
@@ -2587,8 +2537,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         c.add(null);
 
         GridTestUtils.assertThrows(log, new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
+            @Override public Void call() throws Exception {
                 cache.removeAll(c);
 
                 return null;
@@ -2725,7 +2674,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     public void testRemoveAfterClear() throws Exception {
         IgniteEx ignite = grid(0);
 
-        boolean affNode = ((IgniteKernal)ignite).context().cache().internalCache(null).context().affinityNode();
+        boolean affNode = ignite.context().cache().internalCache(null).context().affinityNode();
 
         if (!affNode) {
             if (gridCount() < 2)
@@ -2766,13 +2715,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     }
 
     /**
-     *
-     */
-    private void xxx() {
-        System.out.printf("");
-    }
-
-    /**
      * @throws Exception In case of error.
      */
     public void testClear() throws Exception {
@@ -3597,26 +3539,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             info("Local keys (primary + backup): " + locKeys);
         }
 
-        for (int i = 0; i < gridCount(); i++) {
-            grid(i).events().localListen(new IgnitePredicate<Event>() {
-                @Override public boolean apply(Event evt) {
-                    info("Received event: " + evt);
-
-                    switch (evt.type()) {
-                        case EVT_CACHE_OBJECT_SWAPPED:
-                            swapEvts.incrementAndGet();
-
-                            break;
-                        case EVT_CACHE_OBJECT_UNSWAPPED:
-                            unswapEvts.incrementAndGet();
-
-                            break;
-                    }
-
-                    return true;
-                }
-            }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
-        }
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).events().localListen(
+                new SwapEvtsLocalListener(swapEvts, unswapEvts), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
 
         cache.localEvict(F.asList(k2, k3));
 
@@ -3934,7 +3859,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
                 map.put(entry.getKey(), entry.getValue());
         }
 
-        assert map != null;
         assert map.size() == 2;
         assert map.get("key1") == 1;
         assert map.get("key2") == 2;
@@ -3951,32 +3875,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         if (nearEnabled())
             assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL));
         else {
-            for (int i = 0; i < gridCount(); i++) {
-                executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
-                    @Override public void run(int idx) throws Exception {
-                        GridCacheContext<String, Integer> ctx = context(idx);
-
-                        if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
-                            return;
-
-                        int size = 0;
-
-                        for (String key : keys) {
-                            if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
-                                GridCacheEntryEx e =
-                                    ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
-
-                                assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
-                                assert !e.deleted() : "Entry is deleted: " + e;
-
-                                size++;
-                            }
-                        }
-
-                        assertEquals("Incorrect size on cache #" + idx, size, jcache(idx).localSize(ALL));
-                    }
-                });
-            }
+            for (int i = 0; i < gridCount(); i++)
+                executeOnLocalOrRemoteJvm(i, new CheckEntriesTask(keys));
         }
     }
 
@@ -3989,21 +3889,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             assertEquals("Invalid key size: " + jcache().localSize(ALL),
                 keys.size(), jcache().localSize(ALL));
         else {
-            for (int i = 0; i < gridCount(); i++) {
-                executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
-                    @Override public void run(int idx) throws Exception {
-                        GridCacheContext<String, Integer> ctx = context(idx);
-
-                        int size = 0;
-
-                        for (String key : keys)
-                            if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx()))
-                                size++;
-
-                        assertEquals("Incorrect key size on cache #" + idx, size, jcache(idx).localSize(ALL));
-                    }
-                });
-            }
+            for (int i = 0; i < gridCount(); i++)
+                executeOnLocalOrRemoteJvm(i, new CheckKeySizeTask(keys));
         }
     }
 
@@ -4061,27 +3948,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @param cnt Keys count.
      * @return Collection of keys for which given cache is primary.
      */
-    protected List<String> primaryKeysForCache(final IgniteCache<String, Integer> cache, final int cnt, final int startFrom) {
-        return executeOnLocalOrRemoteJvm(cache, new TestCacheCallable<String, Integer, List<String>>() {
-            @Override public List<String> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
-                List<String> found = new ArrayList<>();
-
-                Affinity<Object> affinity = ignite.affinity(cache.getName());
-
-                for (int i = startFrom; i < startFrom + 100_000; i++) {
-                    String key = "key" + i;
-
-                    if (affinity.isPrimary(ignite.cluster().localNode(), key)) {
-                        found.add(key);
-
-                        if (found.size() == cnt)
-                            return found;
-                    }
-                }
-
-                throw new IgniteException("Unable to find " + cnt + " keys as primary for cache.");
-            }
-        });
+    protected List<String> primaryKeysForCache(IgniteCache<String, Integer> cache, int cnt, int startFrom) {
+        return executeOnLocalOrRemoteJvm(cache, new CheckPrimaryKeysTask(startFrom, cnt));
     }
 
     /**
@@ -4272,18 +4140,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * Checks iterators are cleared.
      */
     private void checkIteratorsCleared() {
-        for (int j = 0; j < gridCount(); j++) {
-            executeOnLocalOrRemoteJvm(j, new TestIgniteIdxRunnable() {
-                @Override public void run(int idx) throws Exception {
-                    GridCacheQueryManager queries = context(idx).queries();
-
-                    Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters");
-
-                    for (Object obj : map.values())
-                        assertEquals("Iterators not removed for grid " + idx, 0, ((Map)obj).size());
-                }
-            });
-        }
+        for (int j = 0; j < gridCount(); j++)
+            executeOnLocalOrRemoteJvm(j, new CheckIteratorTask());
     }
 
     /**
@@ -5226,4 +5084,280 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         /** */
         ONE_BY_ONE
     }
+
+    /**
+     *
+     */
+    private static class RemoveEntryProcessor implements EntryProcessor<String, Integer, String>, Serializable {
+        /** {@inheritDoc} */
+        @Override public String process(MutableEntry<String, Integer> e, Object... args) {
+            assertNotNull(e.getKey());
+
+            Integer old = e.getValue();
+
+            e.remove();
+
+            return String.valueOf(old);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IncrementEntryProcessor implements EntryProcessor<String, Integer, String>, Serializable {
+        /** {@inheritDoc} */
+        @Override public String process(MutableEntry<String, Integer> e, Object... args) {
+            assertNotNull(e.getKey());
+
+            Integer old = e.getValue();
+
+            e.setValue(old == null ? 1 : old + 1);
+
+            return String.valueOf(old);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckEntriesTask extends TestIgniteIdxRunnable {
+        /** Keys. */
+        private final Collection<String> keys;
+
+        /**
+         * @param keys Keys.
+         */
+        public CheckEntriesTask(Collection<String> keys) {
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(int idx) throws Exception {
+            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+            if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
+                return;
+
+            int size = 0;
+
+            for (String key : keys) {
+                if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+                    GridCacheEntryEx e =
+                        ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+
+                    assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
+                    assert !e.deleted() : "Entry is deleted: " + e;
+
+                    size++;
+                }
+            }
+
+            assertEquals("Incorrect size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckCacheSizeTask extends TestIgniteIdxRunnable {
+        private final Map<String, Integer> map;
+
+        /**
+         * @param map Map.
+         */
+        public CheckCacheSizeTask(Map<String, Integer> map) {
+            this.map = map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(int idx) throws Exception {
+            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+            int size = 0;
+
+            for (String key : map.keySet())
+                if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
+                    size++;
+
+            assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckPrimaryKeysTask implements TestCacheCallable<String, Integer, List<String>> {
+        /** Start from. */
+        private final int startFrom;
+
+        /** Count. */
+        private final int cnt;
+
+        /**
+         * @param startFrom Start from.
+         * @param cnt Count.
+         */
+        public CheckPrimaryKeysTask(int startFrom, int cnt) {
+            this.startFrom = startFrom;
+            this.cnt = cnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<String> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
+            List<String> found = new ArrayList<>();
+
+            Affinity<Object> affinity = ignite.affinity(cache.getName());
+
+            for (int i = startFrom; i < startFrom + 100_000; i++) {
+                String key = "key" + i;
+
+                if (affinity.isPrimary(ignite.cluster().localNode(), key)) {
+                    found.add(key);
+
+                    if (found.size() == cnt)
+                        return found;
+                }
+            }
+
+            throw new IgniteException("Unable to find " + cnt + " keys as primary for cache.");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckIteratorTask extends TestIgniteIdxCallable<Void> {
+        /**
+         * @param idx Index.
+         */
+        @Override public Void call(int idx) throws Exception {
+            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+            GridCacheQueryManager queries = ctx.queries();
+
+            Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters");
+
+            for (Object obj : map.values())
+                assertEquals("Iterators not removed for grid " + idx, 0, ((Map)obj).size());
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoveAndReturnNullEntryProcessor implements
+        EntryProcessor<String, Integer, Integer>, Serializable {
+
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+            e.remove();
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class SwapEvtsLocalListener implements IgnitePredicate<Event> {
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Swap events. */
+        private final AtomicInteger swapEvts;
+
+        /** Unswap events. */
+        private final AtomicInteger unswapEvts;
+
+        /**
+         * @param swapEvts Swap events.
+         * @param unswapEvts Unswap events.
+         */
+        public SwapEvtsLocalListener(AtomicInteger swapEvts, AtomicInteger unswapEvts) {
+            this.swapEvts = swapEvts;
+            this.unswapEvts = unswapEvts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Event evt) {
+            log.info("Received event: " + evt);
+
+            switch (evt.type()) {
+                case EVT_CACHE_OBJECT_SWAPPED:
+                    swapEvts.incrementAndGet();
+
+                    break;
+                case EVT_CACHE_OBJECT_UNSWAPPED:
+                    unswapEvts.incrementAndGet();
+
+                    break;
+            }
+
+            return true;
+        }
+    }
+
+    private static class CheckEntriesDeletedTask extends TestIgniteIdxRunnable {
+        private final int cnt;
+
+        public CheckEntriesDeletedTask(int cnt) {
+            this.cnt = cnt;
+        }
+
+        @Override public void run(int idx) throws Exception {
+            for (int i = 0; i < cnt; i++) {
+                String key = String.valueOf(i);
+
+                GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+                GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+
+                if (ignite.affinity(null).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) {
+                    assertNotNull(entry);
+                    assertTrue(entry.deleted());
+                }
+                else
+                    assertNull(entry);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckKeySizeTask extends TestIgniteIdxRunnable {
+        /** Keys. */
+        private final Collection<String> keys;
+
+        /**
+         * @param keys Keys.
+         */
+        public CheckKeySizeTask(Collection<String> keys) {
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(int idx) throws Exception {
+            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context();
+
+            int size = 0;
+
+            for (String key : keys)
+                if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx()))
+                    size++;
+
+            assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(null).localSize(ALL));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class FailedEntryProcessor implements EntryProcessor<String, Integer, Integer>, Serializable {
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
+            throw new EntryProcessorException("Test entry processor exception.");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
index da694b5..c6ce81e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java
@@ -111,6 +111,7 @@ public class CacheNoValueClassOnServerNodeTest extends GridCommonAbstractTest {
                         }
                     },
                     null,
+                    null,
                     jvmArgs,
                     cp
                 );

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
index 1511c45..927ee62 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.TouchedExpiryPolicy;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -37,6 +39,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -106,9 +109,11 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
 
         affinityNodes(); // Just to ack cache configuration to log..
 
-        checkKeySize(map.keySet());
+        Set<String> keys = new LinkedHashSet<>(map.keySet());
 
-        checkSize(map.keySet());
+        checkKeySize(keys);
+
+        checkSize(keys);
 
         int fullCacheSize = 0;
 
@@ -317,24 +322,8 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
         Collection<String> locKeys = new HashSet<>();
 
         for (int i = 0; i < gridCount(); i++) {
-            grid(i).events().localListen(new IgnitePredicate<Event>() {
-                @Override public boolean apply(Event evt) {
-                    info("Received event: " + evt);
-
-                    switch (evt.type()) {
-                        case EVT_CACHE_OBJECT_SWAPPED:
-                            swapEvts.incrementAndGet();
-
-                            break;
-                        case EVT_CACHE_OBJECT_UNSWAPPED:
-                            unswapEvts.incrementAndGet();
-
-                            break;
-                    }
-
-                    return true;
-                }
-            }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
+            grid(i).events().localListen(
+                new LocalListener(swapEvts, unswapEvts), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
         }
 
         cache.localEvict(Collections.singleton(k2));
@@ -416,4 +405,46 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache
         assertEquals(cnt, swapEvts.get());
         assertEquals(cnt, unswapEvts.get());
     }
+
+    /**
+     *
+     */
+    private static class LocalListener implements IgnitePredicate<Event> {
+        /** Logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Swap events. */
+        private final AtomicInteger swapEvts;
+
+        /** Unswap events. */
+        private final AtomicInteger unswapEvts;
+
+        /**
+         * @param swapEvts Swap events.
+         * @param unswapEvts Unswap events.
+         */
+        public LocalListener(AtomicInteger swapEvts, AtomicInteger unswapEvts) {
+            this.swapEvts = swapEvts;
+            this.unswapEvts = unswapEvts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Event evt) {
+            log.info("Received event: " + evt);
+
+            switch (evt.type()) {
+                case EVT_CACHE_OBJECT_SWAPPED:
+                    swapEvts.incrementAndGet();
+
+                    break;
+                case EVT_CACHE_OBJECT_UNSWAPPED:
+                    unswapEvts.incrementAndGet();
+
+                    break;
+            }
+
+            return true;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
index c04bf2e..a2440e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -36,6 +39,8 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -93,7 +98,7 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
 
         atomicClockModeDelay(c0);
 
-        c1.removeAll(putMap.keySet());
+        c1.removeAll(new HashSet<>(putMap.keySet()));
 
         for (int i = 0; i < size; i++) {
             assertNull(c0.get(i));
@@ -159,22 +164,8 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
             info("Finished putting value [i=" + i + ']');
         }
 
-        for (int i = 0; i < gridCount(); i++) {
-            executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
-                @Override public void run(int idx) throws Exception {
-                    assertEquals(0, context(idx).tm().idMapSize());
-
-                    IgniteCache<Object, Object> cache = grid(idx).cache(null);
-                    ClusterNode node = grid(idx).localNode();
-
-                    for (int k = 0; k < size; k++) {
-                        if (affinity(cache).isPrimaryOrBackup(node, k))
-                            assertEquals("Check failed for node: " + node.id(), k,
-                                cache.localPeek(k, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP));
-                    }
-                }
-            });
-        }
+        for (int i = 0; i < gridCount(); i++)
+            executeOnLocalOrRemoteJvm(i, new CheckAffinityTask(size));
 
         for (int i = 0; i < size; i++) {
             info("Putting value 2 [i=" + i + ']');
@@ -199,28 +190,9 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
 
         final IgniteAtomicLong unswapEvts = grid(0).atomicLong("unswapEvts", 0, true);
 
-        for (int i = 0; i < gridCount(); i++) {
-            final int iCopy = i;
-
-            grid(i).events().localListen(new IgnitePredicate<Event>() {
-                @Override public boolean apply(Event evt) {
-                    info("Received event: " + evt);
-
-                    switch (evt.type()) {
-                        case EVT_CACHE_OBJECT_SWAPPED:
-                            grid(iCopy).atomicLong("swapEvts", 0, false).incrementAndGet();
-
-                            break;
-                        case EVT_CACHE_OBJECT_UNSWAPPED:
-                            grid(iCopy).atomicLong("unswapEvts", 0, false).incrementAndGet();
-
-                            break;
-                    }
-
-                    return true;
-                }
-            }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
-        }
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).events().localListen(
+                    new SwapUnswapLocalListener(), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED);
 
         jcache().put("key", 1);
 
@@ -254,13 +226,8 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
 
             boolean nearEnabled = nearEnabled(c);
 
-            if (nearEnabled) {
-                executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() {
-                    @Override public void run(int idx) throws Exception {
-                        assertTrue(((IgniteKernal)ignite(idx)).internalCache().context().isNear());
-                    }
-                });
-            }
+            if (nearEnabled)
+                executeOnLocalOrRemoteJvm(i, new IsNearTask());
 
             Integer nearPeekVal = nearEnabled ? 1 : null;
 
@@ -476,4 +443,74 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
             assertFalse(affinity(cache).isPrimaryOrBackup(other, key));
         }
     }
+
+    /**
+     *
+     */
+    private static class SwapUnswapLocalListener implements IgnitePredicate<Event> {
+        /** Logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Ignite. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Event evt) {
+            log.info("Received event: " + evt);
+
+            switch (evt.type()) {
+                case EVT_CACHE_OBJECT_SWAPPED:
+                    ignite.atomicLong("swapEvts", 0, false).incrementAndGet();
+
+                    break;
+                case EVT_CACHE_OBJECT_UNSWAPPED:
+                    ignite.atomicLong("unswapEvts", 0, false).incrementAndGet();
+
+                    break;
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CheckAffinityTask extends TestIgniteIdxRunnable {
+        /** Size. */
+        private final int size;
+
+        /**
+         * @param size Size.
+         */
+        public CheckAffinityTask(int size) {
+            this.size = size;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(int idx) throws Exception {
+            assertEquals(0, ((IgniteKernal)ignite).<String, Integer>internalCache().context().tm().idMapSize());
+
+            IgniteCache<Object, Object> cache = ignite.cache(null);
+            ClusterNode node = ((IgniteKernal)ignite).localNode();
+
+            for (int k = 0; k < size; k++) {
+                if (affinity(cache).isPrimaryOrBackup(node, k))
+                    assertEquals("Check failed for node: " + node.id(), k,
+                        cache.localPeek(k, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP));
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IsNearTask extends TestIgniteIdxRunnable {
+        /** {@inheritDoc} */
+        @Override public void run(int idx) throws Exception {
+            assertTrue(((IgniteKernal)ignite).internalCache().context().isNear());
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index f54fe06..d133a84 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerExclusions;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -1471,6 +1472,8 @@ public abstract class GridAbstractTest extends TestCase {
 
         if (!isMultiJvmObject(ignite))
             try {
+                job.setIgnite(ignite);
+
                 return job.call(idx);
             }
             catch (Exception e) {
@@ -1532,11 +1535,7 @@ public abstract class GridAbstractTest extends TestCase {
 
         IgniteProcessProxy proxy = (IgniteProcessProxy)ignite;
 
-        return proxy.remoteCompute().call(new IgniteCallable<R>() {
-            @Override public R call() throws Exception {
-                return job.call(idx);
-            }
-        });
+        return proxy.remoteCompute().call(new ExecuteRemotelyTask<>(job, idx));
     }
 
     /**
@@ -1546,15 +1545,7 @@ public abstract class GridAbstractTest extends TestCase {
      * @param job Job.
      */
     public static <R> R executeRemotely(IgniteProcessProxy proxy, final TestIgniteCallable<R> job) {
-        final UUID id = proxy.getId();
-
-        return proxy.remoteCompute().call(new IgniteCallable<R>() {
-            @Override public R call() throws Exception {
-                Ignite ignite = Ignition.ignite(id);
-
-                return job.call(ignite);
-            }
-        });
+        return proxy.remoteCompute().call(new TestRemoteTask<>(proxy.getId(), job));
     }
 
     /**
@@ -1571,6 +1562,8 @@ public abstract class GridAbstractTest extends TestCase {
         final String cacheName = cache.getName();
 
         return proxy.remoteCompute().call(new IgniteCallable<R>() {
+            private static final long serialVersionUID = -3868429485920845137L;
+
             @Override public R call() throws Exception {
                 Ignite ignite = Ignition.ignite(id);
                 IgniteCache<K,V> cache = ignite.cache(cacheName);
@@ -1745,6 +1738,22 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * @param name Name.
+     * @param remote Remote.
+     * @param thisRemote This remote.
+     */
+    public static IgniteEx grid(String name, boolean remote, boolean thisRemote) {
+        if (!remote)
+            return (IgniteEx)G.ignite(name);
+        else {
+            if (thisRemote)
+                return IgniteNodeRunner.startedInstance();
+            else
+                return IgniteProcessProxy.ignite(name);
+        }
+    }
+
+    /**
      *
      */
     private static interface WriteReplaceOwner {
@@ -1781,6 +1790,67 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * Remote computation task.
+     */
+    private static class TestRemoteTask<R> implements IgniteCallable<R> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Node ID. */
+        private final UUID id;
+
+        /** Job. */
+        private final TestIgniteCallable<R> job;
+
+        /**
+         * @param id Id.
+         * @param job Job.
+         */
+        public TestRemoteTask(UUID id, TestIgniteCallable<R> job) {
+            this.id = id;
+            this.job = job;
+        }
+
+        /** {@inheritDoc} */
+        @Override public R call() throws Exception {
+            Ignite ignite = Ignition.ignite(id);
+
+            return job.call(ignite);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ExecuteRemotelyTask<R> implements IgniteCallable<R> {
+        /** Ignite. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /** Job. */
+        private final TestIgniteIdxCallable<R> job;
+
+        /** Index. */
+        private final int idx;
+
+        /**
+         * @param job Job.
+         * @param idx Index.
+         */
+        public ExecuteRemotelyTask(TestIgniteIdxCallable<R> job, int idx) {
+            this.job = job;
+            this.idx = idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public R call() throws Exception {
+            job.setIgnite(ignite);
+
+            return job.call(idx);
+        }
+    }
+
+    /**
      * Test counters.
      */
     protected class TestCounters {
@@ -1923,17 +1993,27 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /** */
-    public static interface TestIgniteIdxCallable<R> extends Serializable {
+    public static abstract class TestIgniteIdxCallable<R> implements Serializable {
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /**
+         * @param ignite Ignite.
+         */
+        public void setIgnite(Ignite ignite) {
+            this.ignite = ignite;
+        }
+
         /**
          * @param idx Grid index.
          */
-        R call(int idx) throws Exception;
+        protected abstract R call(int idx) throws Exception;
     }
 
     /** */
-    public abstract static class TestIgniteIdxRunnable implements TestIgniteIdxCallable<Object> {
+    public abstract static class TestIgniteIdxRunnable extends TestIgniteIdxCallable<Void> {
         /** {@inheritDoc} */
-        @Override public Object call(int idx) throws Exception {
+        @Override public Void call(int idx) throws Exception {
             run(idx);
 
             return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
index eb72252..406318f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
@@ -42,6 +42,9 @@ import org.jetbrains.annotations.Nullable;
  * Test resources for injection.
  */
 public class IgniteTestResources {
+    /** Marshaller class name. */
+    public static final String MARSH_CLASS_NAME = "test.marshaller.class";
+
     /** */
     private static final IgniteLogger rootLog = new GridTestLog4jLogger(false);
 
@@ -230,8 +233,9 @@ public class IgniteTestResources {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    public synchronized Marshaller getMarshaller() throws IgniteCheckedException {
-        String marshallerName = GridTestProperties.getProperty(GridTestProperties.MARSH_CLASS_NAME);
+    public static synchronized Marshaller getMarshaller() throws IgniteCheckedException {
+        String marshallerName =
+            System.getProperty(MARSH_CLASS_NAME, GridTestProperties.getProperty(GridTestProperties.MARSH_CLASS_NAME));
 
         Marshaller marsh;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 4bcf51e..e4c2129 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -246,10 +246,12 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
     protected static <K, V> boolean nearEnabled(final IgniteCache<K,V> cache) {
         CacheConfiguration cfg = GridAbstractTest.executeOnLocalOrRemoteJvm(cache,
             new TestCacheCallable<K, V, CacheConfiguration>() {
-            @Override public CacheConfiguration call(Ignite ignite, IgniteCache<K, V> cache) throws Exception {
-                return ((IgniteKernal)ignite).<K, V>internalCache(cache.getName()).context().config();
-            }
-        });
+                private static final long serialVersionUID = 0L;
+
+                @Override public CacheConfiguration call(Ignite ignite, IgniteCache<K, V> cache) throws Exception {
+                    return ((IgniteKernal)ignite).<K, V>internalCache(cache.getName()).context().config();
+                }
+            });
 
         return isNearEnabled(cfg);
     }
@@ -285,10 +287,13 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      * @throws Exception If failed.
      */
     @SuppressWarnings("unchecked")
-    protected static <K> void loadAll(Cache<K, ?> cache, final Set<K> keys, final boolean replaceExistingValues) throws Exception {
+    protected static <K> void loadAll(Cache<K, ?> cache, final Set<K> keys, final boolean replaceExistingValues)
+        throws Exception {
         IgniteCache<K, Object> cacheCp = (IgniteCache<K, Object>)cache;
 
         GridAbstractTest.executeOnLocalOrRemoteJvm(cacheCp, new TestCacheRunnable<K, Object>() {
+            private static final long serialVersionUID = -3030833765012500545L;
+
             @Override public void run(Ignite ignite, IgniteCache<K, Object> cache) throws Exception {
                 final AtomicReference<Exception> ex = new AtomicReference<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
index e1959e5..57fbcfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java
@@ -19,12 +19,12 @@ package org.apache.ignite.testframework.junits.multijvm;
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.UUID;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -38,160 +38,388 @@ public class AffinityProcessProxy<K> implements Affinity<K> {
     /** Cache name. */
     private final String cacheName;
 
-    /** Grid id. */
-    private final UUID gridId;
-
     /**
      * @param cacheName Cache name.
-     * @param proxy Ignite ptocess proxy.
+     * @param proxy Ignite process proxy.
      */
     public AffinityProcessProxy(String cacheName, IgniteProcessProxy proxy) {
         this.cacheName = cacheName;
-        gridId = proxy.getId();
-        compute = proxy.remoteCompute();
-    }
-
-    /**
-     * Returns cache instance. Method to be called from closure at another JVM.
-     *
-     * @return Cache.
-     */
-    private Affinity<Object> affinity() {
-        return Ignition.ignite(gridId).affinity(cacheName);
+        this.compute = proxy.remoteCompute();
     }
 
     /** {@inheritDoc} */
     @Override public int partitions() {
-        return (int)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().partitions();
-            }
-        });
+        return compute.call(new PartitionsTask(cacheName));
     }
 
     /** {@inheritDoc} */
-    @Override public int partition(final K key) {
-        return (int)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().partition(key);
-            }
-        });
+    @Override public int partition(K key) {
+        return compute.call(new PartitionTask<>(cacheName, key));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isPrimary(final ClusterNode n, final K key) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().isPrimary(n, key);
-            }
-        });
+    @Override public boolean isPrimary(ClusterNode n, K key) {
+        return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, true, false));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isBackup(final ClusterNode n, final K key) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().isBackup(n, key);
-            }
-        });
+    @Override public boolean isBackup(ClusterNode n, K key) {
+        return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, false, true));
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isPrimaryOrBackup(final ClusterNode n, final K key) {
-        return (boolean)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().isPrimaryOrBackup(n, key);
-            }
-        });
+    @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) {
+        return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, true, true));
     }
 
     /** {@inheritDoc} */
-    @Override public int[] primaryPartitions(final ClusterNode n) {
-        return (int[])compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().primaryPartitions(n);
-            }
-        });
+    @Override public int[] primaryPartitions(ClusterNode n) {
+        return compute.call(new GetPartitionsTask(cacheName, n, true, false));
     }
 
     /** {@inheritDoc} */
-    @Override public int[] backupPartitions(final ClusterNode n) {
-        return (int[])compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().backupPartitions(n);
-            }
-        });
+    @Override public int[] backupPartitions(ClusterNode n) {
+        return compute.call(new GetPartitionsTask(cacheName, n, false, true));
     }
 
     /** {@inheritDoc} */
-    @Override public int[] allPartitions(final ClusterNode n) {
-        return (int[])compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().allPartitions(n);
-            }
-        });
+    @Override public int[] allPartitions(ClusterNode n) {
+        return compute.call(new GetPartitionsTask(cacheName, n, true, true));
     }
 
     /** {@inheritDoc} */
-    @Override public Object affinityKey(final K key) {
-        return compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().affinityKey(key);
-            }
-        });
+    @Override public Object affinityKey(K key) {
+        return compute.call(new AffinityKeyTask<>(cacheName, key));
     }
 
     /** {@inheritDoc} */
-    @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(final Collection<? extends K> keys) {
-        return (Map<ClusterNode, Collection<K>>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapKeysToNodes(keys);
-            }
-        });
+    @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(Collection<? extends K> keys) {
+        return compute.call(new MapKeysToNodesTask<>(cacheName, keys));
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public ClusterNode mapKeyToNode(final K key) {
-        return (ClusterNode)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapKeyToNode(key);
-            }
-        });
+    @Nullable @Override public ClusterNode mapKeyToNode(K key) {
+        return compute.call(new MapKeyToNodeTask<>(cacheName, key));
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(final K key) {
-        return (Collection<ClusterNode>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapKeyToPrimaryAndBackups(key);
-            }
-        });
+    @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) {
+        return compute.call(new MapKeyToPrimaryAndBackupsTask<>(cacheName, key));
     }
 
     /** {@inheritDoc} */
-    @Override public ClusterNode mapPartitionToNode(final int part) {
-        return (ClusterNode)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapPartitionToNode(part);
-            }
-        });
+    @Override public ClusterNode mapPartitionToNode(int part) {
+        return compute.call(new MapPartitionToNode<>(cacheName, part));
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(final Collection<Integer> parts) {
-        return (Map<Integer, ClusterNode>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapPartitionsToNodes(parts);
-            }
-        });
+    @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) {
+        return compute.call(new MapPartitionsToNodes<>(cacheName, parts));
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(final int part) {
-        return (Collection<ClusterNode>)compute.call(new IgniteCallable<Object>() {
-            @Override public Object call() throws Exception {
-                return affinity().mapPartitionToPrimaryAndBackups(part);
-            }
-        });
+    @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) {
+        return compute.call(new MapPartitionsToPrimaryAndBackupsTask<>(cacheName, part));
+    }
+
+    /**
+     *
+     */
+    private static class PrimaryOrBackupNodeTask<K> extends AffinityTaskAdapter<K, Boolean> {
+        /** Key. */
+        private final K key;
+
+        /** Node. */
+        private final ClusterNode n;
+
+        /** Primary. */
+        private final boolean primary;
+
+        /** Backup. */
+        private final boolean backup;
+
+        /**
+         * @param cacheName Cache name.
+         * @param key Key.
+         * @param n N.
+         */
+        public PrimaryOrBackupNodeTask(String cacheName, K key, ClusterNode n,
+            boolean primary, boolean backup) {
+            super(cacheName);
+            this.key = key;
+            this.n = n;
+            this.primary = primary;
+            this.backup = backup;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            if (primary && backup)
+                return affinity().isPrimaryOrBackup(n, key);
+            else if (primary)
+                return affinity().isPrimary(n, key);
+            else if (backup)
+                return affinity().isBackup(n, key);
+            else
+                throw new IllegalStateException("primary or backup or both flags should be switched on");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MapKeyToPrimaryAndBackupsTask<K> extends AffinityTaskAdapter<K, Collection<ClusterNode>> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param key Key.
+         */
+        public MapKeyToPrimaryAndBackupsTask(String cacheName, K key) {
+            super(cacheName);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<ClusterNode> call() throws Exception {
+            return affinity().mapKeyToPrimaryAndBackups(key);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class PartitionsTask extends AffinityTaskAdapter<Void, Integer> {
+        /**
+         * @param cacheName Cache name.
+         */
+        public PartitionsTask(String cacheName) {
+            super(cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            return affinity().partitions();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class PartitionTask<K> extends AffinityTaskAdapter<K, Integer> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param key Key.
+         */
+        public PartitionTask(String cacheName, K key) {
+            super(cacheName);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            return affinity().partition(key);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class GetPartitionsTask extends AffinityTaskAdapter<Void, int[]> {
+        /** Node. */
+        private final ClusterNode n;
+
+        /** Primary. */
+        private final boolean primary;
+
+        /** Backup. */
+        private final boolean backup;
+
+        /**
+         * @param cacheName Cache name.
+         * @param n N.
+         * @param primary Primary.
+         * @param backup Backup.
+         */
+        public GetPartitionsTask(String cacheName, ClusterNode n, boolean primary, boolean backup) {
+            super(cacheName);
+            this.n = n;
+            this.primary = primary;
+            this.backup = backup;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int[] call() throws Exception {
+            if (primary && backup)
+                return affinity().allPartitions(n);
+            else if (primary)
+                return affinity().primaryPartitions(n);
+            else if (backup)
+                return affinity().backupPartitions(n);
+            else
+                throw new IllegalStateException("primary or backup or both flags should be switched on");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class AffinityKeyTask<K> extends AffinityTaskAdapter<K, Object> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param key Key.
+         */
+        public AffinityKeyTask(String cacheName, K key) {
+            super(cacheName);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return affinity().affinityKey(key);
+        }
+    }
+
+    /**
+     * @param <K>
+     */
+    private static class MapKeysToNodesTask<K> extends AffinityTaskAdapter<K, Map<ClusterNode, Collection<K>>> {
+        /** Keys. */
+        private final Collection<? extends K> keys;
+
+        /**
+         * @param cacheName Cache name.
+         * @param keys Keys.
+         */
+        public MapKeysToNodesTask(String cacheName, Collection<? extends K> keys) {
+            super(cacheName);
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<ClusterNode, Collection<K>> call() throws Exception {
+            return affinity().mapKeysToNodes(keys);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MapKeyToNodeTask<K> extends AffinityTaskAdapter<K, ClusterNode> {
+        /** Key. */
+        private final K key;
+
+        /**
+         * @param cacheName Cache name.
+         * @param key Key.
+         */
+        public MapKeyToNodeTask(String cacheName, K key) {
+            super(cacheName);
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode call() throws Exception {
+            return affinity().mapKeyToNode(key);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MapPartitionToNode<K> extends AffinityTaskAdapter<K, ClusterNode> {
+        /** Partition. */
+        private final int part;
+
+        /**
+         * @param cacheName Cache name.
+         * @param part Partition.
+         */
+        public MapPartitionToNode(String cacheName, int part) {
+            super(cacheName);
+            this.part = part;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode call() throws Exception {
+            return affinity().mapPartitionToNode(part);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MapPartitionsToNodes<K> extends AffinityTaskAdapter<K, Map<Integer, ClusterNode>> {
+        /** Parts. */
+        private final Collection<Integer> parts;
+
+        /**
+         * @param cacheName Cache name.
+         * @param parts Parts.
+         */
+        public MapPartitionsToNodes(String cacheName, Collection<Integer> parts) {
+            super(cacheName);
+            this.parts = parts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<Integer, ClusterNode> call() throws Exception {
+            return affinity().mapPartitionsToNodes(parts);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class MapPartitionsToPrimaryAndBackupsTask<K> extends AffinityTaskAdapter<K, Collection<ClusterNode>> {
+        /** Partition. */
+        private final int part;
+
+        /**
+         * @param cacheName Cache name.
+         * @param part Partition.
+         */
+        public MapPartitionsToPrimaryAndBackupsTask(String cacheName, int part) {
+            super(cacheName);
+            this.part = part;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<ClusterNode> call() throws Exception {
+            return affinity().mapPartitionToPrimaryAndBackups(part);
+        }
+    }
+
+    /**
+     *
+     */
+    private abstract static class AffinityTaskAdapter<K, R> implements IgniteCallable<R> {
+        /** Ignite. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /** Cache name. */
+        protected final String cacheName;
+
+        /**
+         * @param cacheName Cache name.
+         */
+        public AffinityTaskAdapter(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /**
+         * @return Affinity.
+         */
+        protected Affinity<K> affinity() {
+            return ignite.affinity(cacheName);
+        }
     }
 }
\ No newline at end of file


Mime
View raw message