Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB7EC18E90 for ; Tue, 13 Oct 2015 02:25:47 +0000 (UTC) Received: (qmail 78966 invoked by uid 500); 13 Oct 2015 02:25:47 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 78887 invoked by uid 500); 13 Oct 2015 02:25:47 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 78523 invoked by uid 99); 13 Oct 2015 02:25:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Oct 2015 02:25:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B5BEDE0AA1; Tue, 13 Oct 2015 02:25:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: anovikov@apache.org To: commits@ignite.apache.org Date: Tue, 13 Oct 2015 02:25:56 -0000 Message-Id: <1788aba4a4fb42e6904ab60a8482a31d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/19] ignite git commit: ignite-1526: full support of IBM JDK by Ignite ignite-1526: full support of IBM JDK by Ignite Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fc682f1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fc682f1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fc682f1 Branch: refs/heads/ignite-843 Commit: 5fc682f11f43f61d14d6b70be5ccf949a9ae05ac Parents: 4f95be2 Author: Andrey Gura Authored: Fri Oct 9 13:54:56 2015 +0300 Committer: Denis Magda Committed: Fri Oct 9 14:22:34 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 17 +- .../internal/portable/PortableContext.java | 7 + .../portable/api/PortableMarshaller.java | 14 +- .../ignite/internal/util/GridJavaProcess.java | 12 +- .../ignite/internal/util/lang/GridFunc.java | 12 + .../apache/ignite/marshaller/Marshaller.java | 2 +- .../optimized/OptimizedMarshallerUtils.java | 6 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 486 ++++--- .../CacheNoValueClassOnServerNodeTest.java | 1 + ...tomicClientOnlyMultiNodeFullApiSelfTest.java | 71 +- ...achePartitionedMultiNodeFullApiSelfTest.java | 129 +- .../testframework/junits/GridAbstractTest.java | 116 +- .../junits/IgniteTestResources.java | 8 +- .../junits/common/GridCommonAbstractTest.java | 15 +- .../junits/multijvm/AffinityProcessProxy.java | 440 ++++-- .../multijvm/IgniteCacheProcessProxy.java | 1348 ++++++++++++++---- .../multijvm/IgniteClusterProcessProxy.java | 115 +- .../multijvm/IgniteEventsProcessProxy.java | 50 +- .../junits/multijvm/IgniteNodeRunner.java | 39 +- .../junits/multijvm/IgniteProcessProxy.java | 107 +- .../cache/CacheConfigurationP2PTest.java | 3 + 21 files changed, 2186 insertions(+), 812 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 1801b9c..cba06de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -987,19 +987,24 @@ public class GridDiscoveryManager extends GridManagerAdapter { boolean locP2pEnabled = locNode.attribute(ATTR_PEER_CLASSLOADING); - boolean warned = false; + boolean ipV4Warned = false; + + boolean jvmMajVerWarned = false; for (ClusterNode n : nodes) { int rmtJvmMajVer = nodeJavaMajorVersion(n); - if (locJvmMajVer != rmtJvmMajVer) - throw new IgniteCheckedException("Local node's java major version is different from remote node's one" + - " [locJvmMajVer=" + locJvmMajVer + ", rmtJvmMajVer=" + rmtJvmMajVer + "]"); + if (locJvmMajVer != rmtJvmMajVer && !jvmMajVerWarned) { + U.warn(log, "Local java version is different from remote [loc=" + + locJvmMajVer + ", rmt=" + rmtJvmMajVer + "]"); + + jvmMajVerWarned = true; + } String rmtPreferIpV4 = n.attribute("java.net.preferIPv4Stack"); if (!F.eq(rmtPreferIpV4, locPreferIpV4)) { - if (!warned) + if (!ipV4Warned) U.warn(log, "Local node's value of 'java.net.preferIPv4Stack' " + "system property differs from remote node's " + "(all nodes in topology should have identical value) " + @@ -1008,7 +1013,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { ", rmtAddrs=" + U.addressesAsString(n) + ']', "Local and remote 'java.net.preferIPv4Stack' system properties do not match."); - warned = true; + ipV4Warned = true; } // Daemon nodes are allowed to have any deployment they need. http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java index 2ee96b7..1ad42ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java @@ -967,6 +967,9 @@ public class PortableContext implements Externalizable { } } + /** + * Basic class ID mapper. + */ private static class BasicClassIdMapper implements PortableIdMapper { /** {@inheritDoc} */ @Override public int typeId(String clsName) { @@ -1121,6 +1124,10 @@ public class PortableContext implements Externalizable { /** Whether the following type is registered in a cache or not */ private final boolean registered; + /** + * @param id Id. + * @param registered Registered. + */ public Type(int id, boolean registered) { this.id = id; this.registered = registered; http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java index de0df8d..3dfbdf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/api/PortableMarshaller.java @@ -29,12 +29,6 @@ import org.apache.ignite.internal.portable.GridPortableMarshaller; import org.apache.ignite.internal.portable.PortableContext; import org.apache.ignite.marshaller.AbstractMarshaller; import org.apache.ignite.marshaller.MarshallerContext; -import org.apache.ignite.internal.portable.api.PortableException; -import org.apache.ignite.internal.portable.api.PortableIdMapper; -import org.apache.ignite.internal.portable.api.PortableObject; -import org.apache.ignite.internal.portable.api.PortableProtocolVersion; -import org.apache.ignite.internal.portable.api.PortableSerializer; -import org.apache.ignite.internal.portable.api.PortableTypeConfiguration; import org.jetbrains.annotations.Nullable; /** @@ -336,7 +330,7 @@ public class PortableMarshaller extends AbstractMarshaller { /** {@inheritDoc} */ @Override public T unmarshal(InputStream in, @Nullable ClassLoader clsLdr) throws IgniteCheckedException { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + ByteArrayOutputStream buf = new ByteArrayOutputStream(); byte[] arr = new byte[4096]; int cnt; @@ -345,11 +339,11 @@ public class PortableMarshaller extends AbstractMarshaller { // returns number of bytes remaining. try { while ((cnt = in.read(arr)) != -1) - buffer.write(arr, 0, cnt); + buf.write(arr, 0, cnt); - buffer.flush(); + buf.flush(); - return impl.deserialize(buffer.toByteArray(), clsLdr); + return impl.deserialize(buf.toByteArray(), clsLdr); } catch (IOException e) { throw new PortableException("Failed to unmarshal the object from InputStream", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java index 92c20fe..3371eb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java @@ -89,7 +89,7 @@ public final class GridJavaProcess { */ public static GridJavaProcess exec(Class cls, String params, @Nullable IgniteLogger log, @Nullable IgniteInClosure printC, @Nullable GridAbsClosure procKilledC) throws Exception { - return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, null); + return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, null, null); } /** @@ -108,7 +108,7 @@ public final class GridJavaProcess { public static GridJavaProcess exec(Class cls, String params, @Nullable IgniteLogger log, @Nullable IgniteInClosure printC, @Nullable GridAbsClosure procKilledC, @Nullable Collection jvmArgs, @Nullable String cp) throws Exception { - return exec(cls.getCanonicalName(), params, log, printC, procKilledC, jvmArgs, cp); + return exec(cls.getCanonicalName(), params, log, printC, procKilledC, null, jvmArgs, cp); } /** @@ -116,9 +116,10 @@ public final class GridJavaProcess { * * @param clsName Class with main() method to be run. * @param params main() method parameters. + * @param log Log to use. * @param printC Optional closure to be called each time wrapped process prints line to system.out or system.err. * @param procKilledC Optional closure to be called when process termination is detected. - * @param log Log to use. + * @param javaHome Java home location. The process will be started under given JVM. * @param jvmArgs JVM arguments to use. * @param cp Additional classpath. * @return Wrapper around {@link Process} @@ -126,7 +127,7 @@ public final class GridJavaProcess { */ public static GridJavaProcess exec(String clsName, String params, @Nullable IgniteLogger log, @Nullable IgniteInClosure printC, @Nullable GridAbsClosure procKilledC, - @Nullable Collection jvmArgs, @Nullable String cp) throws Exception { + @Nullable String javaHome, @Nullable Collection jvmArgs, @Nullable String cp) throws Exception { if (!(U.isLinux() || U.isMacOs() || U.isWindows())) throw new Exception("Your OS is not supported."); @@ -140,7 +141,8 @@ public final class GridJavaProcess { List procCommands = new ArrayList<>(); - String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java"; + String javaBin = (javaHome == null ? System.getProperty("java.home") : javaHome) + + File.separator + "bin" + File.separator + "java"; procCommands.add(javaBin); procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs); http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index ffeeca0..43bc5f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -103,6 +103,9 @@ public class GridFunc { /** */ private static final IgniteClosure IDENTITY = new C1() { + /** */ + private static final long serialVersionUID = -6338573080046225172L; + @Override public Object apply(Object o) { return o; } @@ -1196,6 +1199,9 @@ public class GridFunc { A.notNull(nodeId, "nodeId"); return new P1() { + /** */ + private static final long serialVersionUID = -7082730222779476623L; + @Override public boolean apply(ClusterNode e) { return e.id().equals(nodeId); } @@ -1705,6 +1711,9 @@ public class GridFunc { assert c != null; return new GridSerializableList() { + /** */ + private static final long serialVersionUID = 3126625219739967068L; + @Override public T2 get(int idx) { return trans.apply(c.get(idx)); } @@ -1766,6 +1775,9 @@ public class GridFunc { assert m != null; return isEmpty(p) || isAlwaysTrue(p) ? m : new GridSerializableMap() { + /** */ + private static final long serialVersionUID = 5531745605372387948L; + /** Entry predicate. */ private IgnitePredicate> ep = new P1>() { @Override public boolean apply(Entry e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java index 3e815fd..a76daa8 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/Marshaller.java @@ -92,7 +92,7 @@ public interface Marshaller { public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException; /** - * Unmarshals object from the output stream using given class loader. + * Unmarshals object from the input stream using given class loader. * This method should not close given input stream. * * @param Type of unmarshalled object. http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java index 4abbd04..584083c 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java @@ -42,6 +42,10 @@ class OptimizedMarshallerUtils { /** */ private static final Unsafe UNSAFE = GridUnsafe.unsafe(); + /** Use default {@code serialVersionUid} for {@link Serializable} classes. */ + private static final boolean USE_DFLT_SUID = + Boolean.valueOf(System.getProperty("ignite.marsh.optimized.useDefaultSUID", Boolean.TRUE.toString())); + /** */ static final long HASH_SET_MAP_OFF; @@ -283,7 +287,7 @@ class OptimizedMarshallerUtils { */ @SuppressWarnings("ForLoopReplaceableByForEach") static short computeSerialVersionUid(Class cls, List fields) throws IOException { - if (Serializable.class.isAssignableFrom(cls) && !Enum.class.isAssignableFrom(cls)) + if (USE_DFLT_SUID && Serializable.class.isAssignableFrom(cls) && !Enum.class.isAssignableFrom(cls)) return (short)ObjectStreamClass.lookup(cls).getSerialVersionUID(); MessageDigest md; http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 2a64963..ec3ea0c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -49,6 +50,7 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMemoryMode; @@ -75,6 +77,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi; import org.apache.ignite.testframework.GridTestUtils; @@ -117,48 +120,37 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract /** */ public static final CacheEntryProcessor ERR_PROCESSOR = new CacheEntryProcessor() { + /** */ + private static final long serialVersionUID = 0L; + @Override public String process(MutableEntry e, Object... args) { throw new RuntimeException("Failed!"); } }; /** Increment processor for invoke operations. */ - public static final EntryProcessor INCR_PROCESSOR = new EntryProcessor() { - @Override public String process(MutableEntry e, Object... args) { - assertNotNull(e.getKey()); - - Integer old = e.getValue(); - - e.setValue(old == null ? 1 : old + 1); - - return String.valueOf(old); - } - }; + public static final EntryProcessor INCR_PROCESSOR = new IncrementEntryProcessor(); /** Increment processor for invoke operations with IgniteEntryProcessor. */ public static final CacheEntryProcessor INCR_IGNITE_PROCESSOR = new CacheEntryProcessor() { + /** */ + private static final long serialVersionUID = 0L; + @Override public String process(MutableEntry e, Object... args) { return INCR_PROCESSOR.process(e, args); } }; /** Increment processor for invoke operations. */ - public static final EntryProcessor RMV_PROCESSOR = new EntryProcessor() { - @Override public String process(MutableEntry e, Object... args) { - assertNotNull(e.getKey()); - - Integer old = e.getValue(); - - e.remove(); - - return String.valueOf(old); - } - }; + public static final EntryProcessor RMV_PROCESSOR = new RemoveEntryProcessor(); /** Increment processor for invoke operations with IgniteEntryProcessor. */ public static final CacheEntryProcessor RMV_IGNITE_PROCESSOR = new CacheEntryProcessor() { + /** */ + private static final long serialVersionUID = 0L; + @Override public String process(MutableEntry e, Object... args) { return RMV_PROCESSOR.process(e, args); } @@ -346,21 +338,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assert jcache(i).localSize() != 0 || F.isEmpty(keysCol); } - for (int i = 0; i < gridCount(); i++) { - executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() { - @Override public void run(int idx) throws Exception { - GridCacheContext ctx = context(idx); - - int sum = 0; - - for (String key : map.keySet()) - if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion()))) - sum++; - - assertEquals("Incorrect key size on cache #" + idx, sum, jcache(idx).localSize(ALL)); - } - }); - } + for (int i = 0; i < gridCount(); i++) + executeOnLocalOrRemoteJvm(i, new CheckCacheSizeTask(map)); for (int i = 0; i < gridCount(); i++) { Collection keysCol = mapped.get(grid(i).localNode()); @@ -1350,13 +1329,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertEquals((Integer)3, cache.get("k1")); - EntryProcessor c = new EntryProcessor() { - @Override public Integer process(MutableEntry e, Object... args) { - e.remove(); - - return null; - } - }; + EntryProcessor c = new RemoveAndReturnNullEntryProcessor(); assertNull(cache.invoke("k1", c)); assertNull(cache.get("k1")); @@ -1364,11 +1337,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract for (int i = 0; i < gridCount(); i++) assertNull(jcache(i).localPeek("k1", ONHEAP)); - final EntryProcessor errProcessor = new EntryProcessor() { - @Override public Integer process(MutableEntry e, Object... args) { - throw new EntryProcessorException("Test entry processor exception."); - } - }; + final EntryProcessor errProcessor = new FailedEntryProcessor(); GridTestUtils.assertThrows(log, new Callable() { @Override public Void call() throws Exception { @@ -2001,7 +1970,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertFalse(cacheAsync.future().get()); } - cache.localEvict(Arrays.asList("key2")); + cache.localEvict(Collections.singletonList("key2")); // Same checks inside tx. Transaction tx = inTx ? transactions().txStart() : null; @@ -2357,27 +2326,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract for (int i = 0; i < cnt; i++) cache.remove(String.valueOf(i)); - for (int g = 0; g < gridCount(); g++) { - executeOnLocalOrRemoteJvm(g, new TestIgniteIdxRunnable() { - @Override public void run(int idx) throws Exception { - for (int i = 0; i < cnt; i++) { - String key = String.valueOf(i); - - GridCacheContext cctx = context(idx); - - GridCacheEntryEx entry = cctx.isNear() ? cctx.near().dht().peekEx(key) : - cctx.cache().peekEx(key); - - if (grid(idx).affinity(null).mapKeyToPrimaryAndBackups(key).contains(grid(idx).localNode())) { - assertNotNull(entry); - assertTrue(entry.deleted()); - } - else - assertNull(entry); - } - } - }); - } + for (int g = 0; g < gridCount(); g++) + executeOnLocalOrRemoteJvm(g, new CheckEntriesDeletedTask(cnt)); } } @@ -2587,8 +2537,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract c.add(null); GridTestUtils.assertThrows(log, new Callable() { - @Override - public Void call() throws Exception { + @Override public Void call() throws Exception { cache.removeAll(c); return null; @@ -2725,7 +2674,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract public void testRemoveAfterClear() throws Exception { IgniteEx ignite = grid(0); - boolean affNode = ((IgniteKernal)ignite).context().cache().internalCache(null).context().affinityNode(); + boolean affNode = ignite.context().cache().internalCache(null).context().affinityNode(); if (!affNode) { if (gridCount() < 2) @@ -2766,13 +2715,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract } /** - * - */ - private void xxx() { - System.out.printf(""); - } - - /** * @throws Exception In case of error. */ public void testClear() throws Exception { @@ -3597,26 +3539,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract info("Local keys (primary + backup): " + locKeys); } - for (int i = 0; i < gridCount(); i++) { - grid(i).events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - info("Received event: " + evt); - - switch (evt.type()) { - case EVT_CACHE_OBJECT_SWAPPED: - swapEvts.incrementAndGet(); - - break; - case EVT_CACHE_OBJECT_UNSWAPPED: - unswapEvts.incrementAndGet(); - - break; - } - - return true; - } - }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED); - } + for (int i = 0; i < gridCount(); i++) + grid(i).events().localListen( + new SwapEvtsLocalListener(swapEvts, unswapEvts), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED); cache.localEvict(F.asList(k2, k3)); @@ -3934,7 +3859,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract map.put(entry.getKey(), entry.getValue()); } - assert map != null; assert map.size() == 2; assert map.get("key1") == 1; assert map.get("key2") == 2; @@ -3951,32 +3875,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract if (nearEnabled()) assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL)); else { - for (int i = 0; i < gridCount(); i++) { - executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() { - @Override public void run(int idx) throws Exception { - GridCacheContext ctx = context(idx); - - if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED) - return; - - int size = 0; - - for (String key : keys) { - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) { - GridCacheEntryEx e = - ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); - - assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']'; - assert !e.deleted() : "Entry is deleted: " + e; - - size++; - } - } - - assertEquals("Incorrect size on cache #" + idx, size, jcache(idx).localSize(ALL)); - } - }); - } + for (int i = 0; i < gridCount(); i++) + executeOnLocalOrRemoteJvm(i, new CheckEntriesTask(keys)); } } @@ -3989,21 +3889,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertEquals("Invalid key size: " + jcache().localSize(ALL), keys.size(), jcache().localSize(ALL)); else { - for (int i = 0; i < gridCount(); i++) { - executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() { - @Override public void run(int idx) throws Exception { - GridCacheContext ctx = context(idx); - - int size = 0; - - for (String key : keys) - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) - size++; - - assertEquals("Incorrect key size on cache #" + idx, size, jcache(idx).localSize(ALL)); - } - }); - } + for (int i = 0; i < gridCount(); i++) + executeOnLocalOrRemoteJvm(i, new CheckKeySizeTask(keys)); } } @@ -4061,27 +3948,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @param cnt Keys count. * @return Collection of keys for which given cache is primary. */ - protected List primaryKeysForCache(final IgniteCache cache, final int cnt, final int startFrom) { - return executeOnLocalOrRemoteJvm(cache, new TestCacheCallable>() { - @Override public List call(Ignite ignite, IgniteCache cache) throws Exception { - List found = new ArrayList<>(); - - Affinity affinity = ignite.affinity(cache.getName()); - - for (int i = startFrom; i < startFrom + 100_000; i++) { - String key = "key" + i; - - if (affinity.isPrimary(ignite.cluster().localNode(), key)) { - found.add(key); - - if (found.size() == cnt) - return found; - } - } - - throw new IgniteException("Unable to find " + cnt + " keys as primary for cache."); - } - }); + protected List primaryKeysForCache(IgniteCache cache, int cnt, int startFrom) { + return executeOnLocalOrRemoteJvm(cache, new CheckPrimaryKeysTask(startFrom, cnt)); } /** @@ -4272,18 +4140,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * Checks iterators are cleared. */ private void checkIteratorsCleared() { - for (int j = 0; j < gridCount(); j++) { - executeOnLocalOrRemoteJvm(j, new TestIgniteIdxRunnable() { - @Override public void run(int idx) throws Exception { - GridCacheQueryManager queries = context(idx).queries(); - - Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters"); - - for (Object obj : map.values()) - assertEquals("Iterators not removed for grid " + idx, 0, ((Map)obj).size()); - } - }); - } + for (int j = 0; j < gridCount(); j++) + executeOnLocalOrRemoteJvm(j, new CheckIteratorTask()); } /** @@ -5226,4 +5084,280 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract /** */ ONE_BY_ONE } + + /** + * + */ + private static class RemoveEntryProcessor implements EntryProcessor, Serializable { + /** {@inheritDoc} */ + @Override public String process(MutableEntry e, Object... args) { + assertNotNull(e.getKey()); + + Integer old = e.getValue(); + + e.remove(); + + return String.valueOf(old); + } + } + + /** + * + */ + private static class IncrementEntryProcessor implements EntryProcessor, Serializable { + /** {@inheritDoc} */ + @Override public String process(MutableEntry e, Object... args) { + assertNotNull(e.getKey()); + + Integer old = e.getValue(); + + e.setValue(old == null ? 1 : old + 1); + + return String.valueOf(old); + } + } + + /** + * + */ + private static class CheckEntriesTask extends TestIgniteIdxRunnable { + /** Keys. */ + private final Collection keys; + + /** + * @param keys Keys. + */ + public CheckEntriesTask(Collection keys) { + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public void run(int idx) throws Exception { + GridCacheContext ctx = ((IgniteKernal)ignite).internalCache().context(); + + if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED) + return; + + int size = 0; + + for (String key : keys) { + if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) { + GridCacheEntryEx e = + ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); + + assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']'; + assert !e.deleted() : "Entry is deleted: " + e; + + size++; + } + } + + assertEquals("Incorrect size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL)); + } + } + + /** + * + */ + private static class CheckCacheSizeTask extends TestIgniteIdxRunnable { + private final Map map; + + /** + * @param map Map. + */ + public CheckCacheSizeTask(Map map) { + this.map = map; + } + + /** {@inheritDoc} */ + @Override public void run(int idx) throws Exception { + GridCacheContext ctx = ((IgniteKernal)ignite).internalCache().context(); + + int size = 0; + + for (String key : map.keySet()) + if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion()))) + size++; + + assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL)); + } + } + + /** + * + */ + private static class CheckPrimaryKeysTask implements TestCacheCallable> { + /** Start from. */ + private final int startFrom; + + /** Count. */ + private final int cnt; + + /** + * @param startFrom Start from. + * @param cnt Count. + */ + public CheckPrimaryKeysTask(int startFrom, int cnt) { + this.startFrom = startFrom; + this.cnt = cnt; + } + + /** {@inheritDoc} */ + @Override public List call(Ignite ignite, IgniteCache cache) throws Exception { + List found = new ArrayList<>(); + + Affinity affinity = ignite.affinity(cache.getName()); + + for (int i = startFrom; i < startFrom + 100_000; i++) { + String key = "key" + i; + + if (affinity.isPrimary(ignite.cluster().localNode(), key)) { + found.add(key); + + if (found.size() == cnt) + return found; + } + } + + throw new IgniteException("Unable to find " + cnt + " keys as primary for cache."); + } + } + + /** + * + */ + private static class CheckIteratorTask extends TestIgniteIdxCallable { + /** + * @param idx Index. + */ + @Override public Void call(int idx) throws Exception { + GridCacheContext ctx = ((IgniteKernal)ignite).internalCache().context(); + GridCacheQueryManager queries = ctx.queries(); + + Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters"); + + for (Object obj : map.values()) + assertEquals("Iterators not removed for grid " + idx, 0, ((Map)obj).size()); + + return null; + } + } + + /** + * + */ + private static class RemoveAndReturnNullEntryProcessor implements + EntryProcessor, Serializable { + + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry e, Object... args) { + e.remove(); + + return null; + } + } + + /** + * + */ + private static class SwapEvtsLocalListener implements IgnitePredicate { + @LoggerResource + private IgniteLogger log; + + /** Swap events. */ + private final AtomicInteger swapEvts; + + /** Unswap events. */ + private final AtomicInteger unswapEvts; + + /** + * @param swapEvts Swap events. + * @param unswapEvts Unswap events. + */ + public SwapEvtsLocalListener(AtomicInteger swapEvts, AtomicInteger unswapEvts) { + this.swapEvts = swapEvts; + this.unswapEvts = unswapEvts; + } + + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + log.info("Received event: " + evt); + + switch (evt.type()) { + case EVT_CACHE_OBJECT_SWAPPED: + swapEvts.incrementAndGet(); + + break; + case EVT_CACHE_OBJECT_UNSWAPPED: + unswapEvts.incrementAndGet(); + + break; + } + + return true; + } + } + + private static class CheckEntriesDeletedTask extends TestIgniteIdxRunnable { + private final int cnt; + + public CheckEntriesDeletedTask(int cnt) { + this.cnt = cnt; + } + + @Override public void run(int idx) throws Exception { + for (int i = 0; i < cnt; i++) { + String key = String.valueOf(i); + + GridCacheContext ctx = ((IgniteKernal)ignite).internalCache().context(); + + GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); + + if (ignite.affinity(null).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) { + assertNotNull(entry); + assertTrue(entry.deleted()); + } + else + assertNull(entry); + } + } + } + + /** + * + */ + private static class CheckKeySizeTask extends TestIgniteIdxRunnable { + /** Keys. */ + private final Collection keys; + + /** + * @param keys Keys. + */ + public CheckKeySizeTask(Collection keys) { + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public void run(int idx) throws Exception { + GridCacheContext ctx = ((IgniteKernal)ignite).internalCache().context(); + + int size = 0; + + for (String key : keys) + if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) + size++; + + assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(null).localSize(ALL)); + } + } + + /** + * + */ + private static class FailedEntryProcessor implements EntryProcessor, Serializable { + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry e, Object... args) { + throw new EntryProcessorException("Test entry processor exception."); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java index da694b5..c6ce81e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheNoValueClassOnServerNodeTest.java @@ -111,6 +111,7 @@ public class CacheNoValueClassOnServerNodeTest extends GridCommonAbstractTest { } }, null, + null, jvmArgs, cp ); http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java index 1511c45..927ee62 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.cache.expiry.Duration; import javax.cache.expiry.TouchedExpiryPolicy; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.configuration.CacheConfiguration; @@ -37,6 +39,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.LoggerResource; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; @@ -106,9 +109,11 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache affinityNodes(); // Just to ack cache configuration to log.. - checkKeySize(map.keySet()); + Set keys = new LinkedHashSet<>(map.keySet()); - checkSize(map.keySet()); + checkKeySize(keys); + + checkSize(keys); int fullCacheSize = 0; @@ -317,24 +322,8 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache Collection locKeys = new HashSet<>(); for (int i = 0; i < gridCount(); i++) { - grid(i).events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - info("Received event: " + evt); - - switch (evt.type()) { - case EVT_CACHE_OBJECT_SWAPPED: - swapEvts.incrementAndGet(); - - break; - case EVT_CACHE_OBJECT_UNSWAPPED: - unswapEvts.incrementAndGet(); - - break; - } - - return true; - } - }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED); + grid(i).events().localListen( + new LocalListener(swapEvts, unswapEvts), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED); } cache.localEvict(Collections.singleton(k2)); @@ -416,4 +405,46 @@ public class GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest extends GridCache assertEquals(cnt, swapEvts.get()); assertEquals(cnt, unswapEvts.get()); } + + /** + * + */ + private static class LocalListener implements IgnitePredicate { + /** Logger. */ + @LoggerResource + private IgniteLogger log; + + /** Swap events. */ + private final AtomicInteger swapEvts; + + /** Unswap events. */ + private final AtomicInteger unswapEvts; + + /** + * @param swapEvts Swap events. + * @param unswapEvts Unswap events. + */ + public LocalListener(AtomicInteger swapEvts, AtomicInteger unswapEvts) { + this.swapEvts = swapEvts; + this.unswapEvts = unswapEvts; + } + + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + log.info("Received event: " + evt); + + switch (evt.type()) { + case EVT_CACHE_OBJECT_SWAPPED: + swapEvts.incrementAndGet(); + + break; + case EVT_CACHE_OBJECT_UNSWAPPED: + unswapEvts.incrementAndGet(); + + break; + } + + return true; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java index c04bf2e..a2440e2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java @@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; @@ -36,6 +39,8 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -93,7 +98,7 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti atomicClockModeDelay(c0); - c1.removeAll(putMap.keySet()); + c1.removeAll(new HashSet<>(putMap.keySet())); for (int i = 0; i < size; i++) { assertNull(c0.get(i)); @@ -159,22 +164,8 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti info("Finished putting value [i=" + i + ']'); } - for (int i = 0; i < gridCount(); i++) { - executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() { - @Override public void run(int idx) throws Exception { - assertEquals(0, context(idx).tm().idMapSize()); - - IgniteCache cache = grid(idx).cache(null); - ClusterNode node = grid(idx).localNode(); - - for (int k = 0; k < size; k++) { - if (affinity(cache).isPrimaryOrBackup(node, k)) - assertEquals("Check failed for node: " + node.id(), k, - cache.localPeek(k, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP)); - } - } - }); - } + for (int i = 0; i < gridCount(); i++) + executeOnLocalOrRemoteJvm(i, new CheckAffinityTask(size)); for (int i = 0; i < size; i++) { info("Putting value 2 [i=" + i + ']'); @@ -199,28 +190,9 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti final IgniteAtomicLong unswapEvts = grid(0).atomicLong("unswapEvts", 0, true); - for (int i = 0; i < gridCount(); i++) { - final int iCopy = i; - - grid(i).events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - info("Received event: " + evt); - - switch (evt.type()) { - case EVT_CACHE_OBJECT_SWAPPED: - grid(iCopy).atomicLong("swapEvts", 0, false).incrementAndGet(); - - break; - case EVT_CACHE_OBJECT_UNSWAPPED: - grid(iCopy).atomicLong("unswapEvts", 0, false).incrementAndGet(); - - break; - } - - return true; - } - }, EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED); - } + for (int i = 0; i < gridCount(); i++) + grid(i).events().localListen( + new SwapUnswapLocalListener(), EVT_CACHE_OBJECT_SWAPPED, EVT_CACHE_OBJECT_UNSWAPPED); jcache().put("key", 1); @@ -254,13 +226,8 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti boolean nearEnabled = nearEnabled(c); - if (nearEnabled) { - executeOnLocalOrRemoteJvm(i, new TestIgniteIdxRunnable() { - @Override public void run(int idx) throws Exception { - assertTrue(((IgniteKernal)ignite(idx)).internalCache().context().isNear()); - } - }); - } + if (nearEnabled) + executeOnLocalOrRemoteJvm(i, new IsNearTask()); Integer nearPeekVal = nearEnabled ? 1 : null; @@ -476,4 +443,74 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti assertFalse(affinity(cache).isPrimaryOrBackup(other, key)); } } + + /** + * + */ + private static class SwapUnswapLocalListener implements IgnitePredicate { + /** Logger. */ + @LoggerResource + private IgniteLogger log; + + /** Ignite. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + log.info("Received event: " + evt); + + switch (evt.type()) { + case EVT_CACHE_OBJECT_SWAPPED: + ignite.atomicLong("swapEvts", 0, false).incrementAndGet(); + + break; + case EVT_CACHE_OBJECT_UNSWAPPED: + ignite.atomicLong("unswapEvts", 0, false).incrementAndGet(); + + break; + } + + return true; + } + } + + /** + * + */ + private static class CheckAffinityTask extends TestIgniteIdxRunnable { + /** Size. */ + private final int size; + + /** + * @param size Size. + */ + public CheckAffinityTask(int size) { + this.size = size; + } + + /** {@inheritDoc} */ + @Override public void run(int idx) throws Exception { + assertEquals(0, ((IgniteKernal)ignite).internalCache().context().tm().idMapSize()); + + IgniteCache cache = ignite.cache(null); + ClusterNode node = ((IgniteKernal)ignite).localNode(); + + for (int k = 0; k < size; k++) { + if (affinity(cache).isPrimaryOrBackup(node, k)) + assertEquals("Check failed for node: " + node.id(), k, + cache.localPeek(k, CachePeekMode.ONHEAP, CachePeekMode.OFFHEAP)); + } + } + } + + /** + * + */ + private static class IsNearTask extends TestIgniteIdxRunnable { + /** {@inheritDoc} */ + @Override public void run(int idx) throws Exception { + assertTrue(((IgniteKernal)ignite).internalCache().context().isNear()); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index f54fe06..d133a84 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.MarshallerExclusions; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -1471,6 +1472,8 @@ public abstract class GridAbstractTest extends TestCase { if (!isMultiJvmObject(ignite)) try { + job.setIgnite(ignite); + return job.call(idx); } catch (Exception e) { @@ -1532,11 +1535,7 @@ public abstract class GridAbstractTest extends TestCase { IgniteProcessProxy proxy = (IgniteProcessProxy)ignite; - return proxy.remoteCompute().call(new IgniteCallable() { - @Override public R call() throws Exception { - return job.call(idx); - } - }); + return proxy.remoteCompute().call(new ExecuteRemotelyTask<>(job, idx)); } /** @@ -1546,15 +1545,7 @@ public abstract class GridAbstractTest extends TestCase { * @param job Job. */ public static R executeRemotely(IgniteProcessProxy proxy, final TestIgniteCallable job) { - final UUID id = proxy.getId(); - - return proxy.remoteCompute().call(new IgniteCallable() { - @Override public R call() throws Exception { - Ignite ignite = Ignition.ignite(id); - - return job.call(ignite); - } - }); + return proxy.remoteCompute().call(new TestRemoteTask<>(proxy.getId(), job)); } /** @@ -1571,6 +1562,8 @@ public abstract class GridAbstractTest extends TestCase { final String cacheName = cache.getName(); return proxy.remoteCompute().call(new IgniteCallable() { + private static final long serialVersionUID = -3868429485920845137L; + @Override public R call() throws Exception { Ignite ignite = Ignition.ignite(id); IgniteCache cache = ignite.cache(cacheName); @@ -1745,6 +1738,22 @@ public abstract class GridAbstractTest extends TestCase { } /** + * @param name Name. + * @param remote Remote. + * @param thisRemote This remote. + */ + public static IgniteEx grid(String name, boolean remote, boolean thisRemote) { + if (!remote) + return (IgniteEx)G.ignite(name); + else { + if (thisRemote) + return IgniteNodeRunner.startedInstance(); + else + return IgniteProcessProxy.ignite(name); + } + } + + /** * */ private static interface WriteReplaceOwner { @@ -1781,6 +1790,67 @@ public abstract class GridAbstractTest extends TestCase { } /** + * Remote computation task. + */ + private static class TestRemoteTask implements IgniteCallable { + /** */ + private static final long serialVersionUID = 0L; + + /** Node ID. */ + private final UUID id; + + /** Job. */ + private final TestIgniteCallable job; + + /** + * @param id Id. + * @param job Job. + */ + public TestRemoteTask(UUID id, TestIgniteCallable job) { + this.id = id; + this.job = job; + } + + /** {@inheritDoc} */ + @Override public R call() throws Exception { + Ignite ignite = Ignition.ignite(id); + + return job.call(ignite); + } + } + + /** + * + */ + private static class ExecuteRemotelyTask implements IgniteCallable { + /** Ignite. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** Job. */ + private final TestIgniteIdxCallable job; + + /** Index. */ + private final int idx; + + /** + * @param job Job. + * @param idx Index. + */ + public ExecuteRemotelyTask(TestIgniteIdxCallable job, int idx) { + this.job = job; + this.idx = idx; + } + + /** {@inheritDoc} */ + @Override public R call() throws Exception { + job.setIgnite(ignite); + + return job.call(idx); + } + } + + /** * Test counters. */ protected class TestCounters { @@ -1923,17 +1993,27 @@ public abstract class GridAbstractTest extends TestCase { } /** */ - public static interface TestIgniteIdxCallable extends Serializable { + public static abstract class TestIgniteIdxCallable implements Serializable { + @IgniteInstanceResource + protected Ignite ignite; + + /** + * @param ignite Ignite. + */ + public void setIgnite(Ignite ignite) { + this.ignite = ignite; + } + /** * @param idx Grid index. */ - R call(int idx) throws Exception; + protected abstract R call(int idx) throws Exception; } /** */ - public abstract static class TestIgniteIdxRunnable implements TestIgniteIdxCallable { + public abstract static class TestIgniteIdxRunnable extends TestIgniteIdxCallable { /** {@inheritDoc} */ - @Override public Object call(int idx) throws Exception { + @Override public Void call(int idx) throws Exception { run(idx); return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java index eb72252..406318f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java @@ -42,6 +42,9 @@ import org.jetbrains.annotations.Nullable; * Test resources for injection. */ public class IgniteTestResources { + /** Marshaller class name. */ + public static final String MARSH_CLASS_NAME = "test.marshaller.class"; + /** */ private static final IgniteLogger rootLog = new GridTestLog4jLogger(false); @@ -230,8 +233,9 @@ public class IgniteTestResources { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public synchronized Marshaller getMarshaller() throws IgniteCheckedException { - String marshallerName = GridTestProperties.getProperty(GridTestProperties.MARSH_CLASS_NAME); + public static synchronized Marshaller getMarshaller() throws IgniteCheckedException { + String marshallerName = + System.getProperty(MARSH_CLASS_NAME, GridTestProperties.getProperty(GridTestProperties.MARSH_CLASS_NAME)); Marshaller marsh; http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 4bcf51e..e4c2129 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -246,10 +246,12 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { protected static boolean nearEnabled(final IgniteCache cache) { CacheConfiguration cfg = GridAbstractTest.executeOnLocalOrRemoteJvm(cache, new TestCacheCallable() { - @Override public CacheConfiguration call(Ignite ignite, IgniteCache cache) throws Exception { - return ((IgniteKernal)ignite).internalCache(cache.getName()).context().config(); - } - }); + private static final long serialVersionUID = 0L; + + @Override public CacheConfiguration call(Ignite ignite, IgniteCache cache) throws Exception { + return ((IgniteKernal)ignite).internalCache(cache.getName()).context().config(); + } + }); return isNearEnabled(cfg); } @@ -285,10 +287,13 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { * @throws Exception If failed. */ @SuppressWarnings("unchecked") - protected static void loadAll(Cache cache, final Set keys, final boolean replaceExistingValues) throws Exception { + protected static void loadAll(Cache cache, final Set keys, final boolean replaceExistingValues) + throws Exception { IgniteCache cacheCp = (IgniteCache)cache; GridAbstractTest.executeOnLocalOrRemoteJvm(cacheCp, new TestCacheRunnable() { + private static final long serialVersionUID = -3030833765012500545L; + @Override public void run(Ignite ignite, IgniteCache cache) throws Exception { final AtomicReference ex = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5fc682f1/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java index e1959e5..57fbcfc 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/AffinityProcessProxy.java @@ -19,12 +19,12 @@ package org.apache.ignite.testframework.junits.multijvm; import java.util.Collection; import java.util.Map; -import java.util.UUID; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCompute; -import org.apache.ignite.Ignition; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; import org.jetbrains.annotations.Nullable; /** @@ -38,160 +38,388 @@ public class AffinityProcessProxy implements Affinity { /** Cache name. */ private final String cacheName; - /** Grid id. */ - private final UUID gridId; - /** * @param cacheName Cache name. - * @param proxy Ignite ptocess proxy. + * @param proxy Ignite process proxy. */ public AffinityProcessProxy(String cacheName, IgniteProcessProxy proxy) { this.cacheName = cacheName; - gridId = proxy.getId(); - compute = proxy.remoteCompute(); - } - - /** - * Returns cache instance. Method to be called from closure at another JVM. - * - * @return Cache. - */ - private Affinity affinity() { - return Ignition.ignite(gridId).affinity(cacheName); + this.compute = proxy.remoteCompute(); } /** {@inheritDoc} */ @Override public int partitions() { - return (int)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().partitions(); - } - }); + return compute.call(new PartitionsTask(cacheName)); } /** {@inheritDoc} */ - @Override public int partition(final K key) { - return (int)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().partition(key); - } - }); + @Override public int partition(K key) { + return compute.call(new PartitionTask<>(cacheName, key)); } /** {@inheritDoc} */ - @Override public boolean isPrimary(final ClusterNode n, final K key) { - return (boolean)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().isPrimary(n, key); - } - }); + @Override public boolean isPrimary(ClusterNode n, K key) { + return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, true, false)); } /** {@inheritDoc} */ - @Override public boolean isBackup(final ClusterNode n, final K key) { - return (boolean)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().isBackup(n, key); - } - }); + @Override public boolean isBackup(ClusterNode n, K key) { + return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, false, true)); } /** {@inheritDoc} */ - @Override public boolean isPrimaryOrBackup(final ClusterNode n, final K key) { - return (boolean)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().isPrimaryOrBackup(n, key); - } - }); + @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) { + return compute.call(new PrimaryOrBackupNodeTask<>(cacheName, key, n, true, true)); } /** {@inheritDoc} */ - @Override public int[] primaryPartitions(final ClusterNode n) { - return (int[])compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().primaryPartitions(n); - } - }); + @Override public int[] primaryPartitions(ClusterNode n) { + return compute.call(new GetPartitionsTask(cacheName, n, true, false)); } /** {@inheritDoc} */ - @Override public int[] backupPartitions(final ClusterNode n) { - return (int[])compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().backupPartitions(n); - } - }); + @Override public int[] backupPartitions(ClusterNode n) { + return compute.call(new GetPartitionsTask(cacheName, n, false, true)); } /** {@inheritDoc} */ - @Override public int[] allPartitions(final ClusterNode n) { - return (int[])compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().allPartitions(n); - } - }); + @Override public int[] allPartitions(ClusterNode n) { + return compute.call(new GetPartitionsTask(cacheName, n, true, true)); } /** {@inheritDoc} */ - @Override public Object affinityKey(final K key) { - return compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().affinityKey(key); - } - }); + @Override public Object affinityKey(K key) { + return compute.call(new AffinityKeyTask<>(cacheName, key)); } /** {@inheritDoc} */ - @Override public Map> mapKeysToNodes(final Collection keys) { - return (Map>)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().mapKeysToNodes(keys); - } - }); + @Override public Map> mapKeysToNodes(Collection keys) { + return compute.call(new MapKeysToNodesTask<>(cacheName, keys)); } /** {@inheritDoc} */ - @Nullable @Override public ClusterNode mapKeyToNode(final K key) { - return (ClusterNode)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().mapKeyToNode(key); - } - }); + @Nullable @Override public ClusterNode mapKeyToNode(K key) { + return compute.call(new MapKeyToNodeTask<>(cacheName, key)); } /** {@inheritDoc} */ - @Override public Collection mapKeyToPrimaryAndBackups(final K key) { - return (Collection)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().mapKeyToPrimaryAndBackups(key); - } - }); + @Override public Collection mapKeyToPrimaryAndBackups(K key) { + return compute.call(new MapKeyToPrimaryAndBackupsTask<>(cacheName, key)); } /** {@inheritDoc} */ - @Override public ClusterNode mapPartitionToNode(final int part) { - return (ClusterNode)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().mapPartitionToNode(part); - } - }); + @Override public ClusterNode mapPartitionToNode(int part) { + return compute.call(new MapPartitionToNode<>(cacheName, part)); } /** {@inheritDoc} */ - @Override public Map mapPartitionsToNodes(final Collection parts) { - return (Map)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().mapPartitionsToNodes(parts); - } - }); + @Override public Map mapPartitionsToNodes(Collection parts) { + return compute.call(new MapPartitionsToNodes<>(cacheName, parts)); } /** {@inheritDoc} */ - @Override public Collection mapPartitionToPrimaryAndBackups(final int part) { - return (Collection)compute.call(new IgniteCallable() { - @Override public Object call() throws Exception { - return affinity().mapPartitionToPrimaryAndBackups(part); - } - }); + @Override public Collection mapPartitionToPrimaryAndBackups(int part) { + return compute.call(new MapPartitionsToPrimaryAndBackupsTask<>(cacheName, part)); + } + + /** + * + */ + private static class PrimaryOrBackupNodeTask extends AffinityTaskAdapter { + /** Key. */ + private final K key; + + /** Node. */ + private final ClusterNode n; + + /** Primary. */ + private final boolean primary; + + /** Backup. */ + private final boolean backup; + + /** + * @param cacheName Cache name. + * @param key Key. + * @param n N. + */ + public PrimaryOrBackupNodeTask(String cacheName, K key, ClusterNode n, + boolean primary, boolean backup) { + super(cacheName); + this.key = key; + this.n = n; + this.primary = primary; + this.backup = backup; + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + if (primary && backup) + return affinity().isPrimaryOrBackup(n, key); + else if (primary) + return affinity().isPrimary(n, key); + else if (backup) + return affinity().isBackup(n, key); + else + throw new IllegalStateException("primary or backup or both flags should be switched on"); + } + } + + /** + * + */ + private static class MapKeyToPrimaryAndBackupsTask extends AffinityTaskAdapter> { + /** Key. */ + private final K key; + + /** + * @param cacheName Cache name. + * @param key Key. + */ + public MapKeyToPrimaryAndBackupsTask(String cacheName, K key) { + super(cacheName); + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Collection call() throws Exception { + return affinity().mapKeyToPrimaryAndBackups(key); + } + } + + /** + * + */ + private static class PartitionsTask extends AffinityTaskAdapter { + /** + * @param cacheName Cache name. + */ + public PartitionsTask(String cacheName) { + super(cacheName); + } + + /** {@inheritDoc} */ + @Override public Integer call() throws Exception { + return affinity().partitions(); + } + } + + /** + * + */ + private static class PartitionTask extends AffinityTaskAdapter { + /** Key. */ + private final K key; + + /** + * @param cacheName Cache name. + * @param key Key. + */ + public PartitionTask(String cacheName, K key) { + super(cacheName); + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Integer call() throws Exception { + return affinity().partition(key); + } + } + + /** + * + */ + private static class GetPartitionsTask extends AffinityTaskAdapter { + /** Node. */ + private final ClusterNode n; + + /** Primary. */ + private final boolean primary; + + /** Backup. */ + private final boolean backup; + + /** + * @param cacheName Cache name. + * @param n N. + * @param primary Primary. + * @param backup Backup. + */ + public GetPartitionsTask(String cacheName, ClusterNode n, boolean primary, boolean backup) { + super(cacheName); + this.n = n; + this.primary = primary; + this.backup = backup; + } + + /** {@inheritDoc} */ + @Override public int[] call() throws Exception { + if (primary && backup) + return affinity().allPartitions(n); + else if (primary) + return affinity().primaryPartitions(n); + else if (backup) + return affinity().backupPartitions(n); + else + throw new IllegalStateException("primary or backup or both flags should be switched on"); + } + } + + /** + * + */ + private static class AffinityKeyTask extends AffinityTaskAdapter { + /** Key. */ + private final K key; + + /** + * @param cacheName Cache name. + * @param key Key. + */ + public AffinityKeyTask(String cacheName, K key) { + super(cacheName); + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return affinity().affinityKey(key); + } + } + + /** + * @param + */ + private static class MapKeysToNodesTask extends AffinityTaskAdapter>> { + /** Keys. */ + private final Collection keys; + + /** + * @param cacheName Cache name. + * @param keys Keys. + */ + public MapKeysToNodesTask(String cacheName, Collection keys) { + super(cacheName); + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public Map> call() throws Exception { + return affinity().mapKeysToNodes(keys); + } + } + + /** + * + */ + private static class MapKeyToNodeTask extends AffinityTaskAdapter { + /** Key. */ + private final K key; + + /** + * @param cacheName Cache name. + * @param key Key. + */ + public MapKeyToNodeTask(String cacheName, K key) { + super(cacheName); + this.key = key; + } + + /** {@inheritDoc} */ + @Override public ClusterNode call() throws Exception { + return affinity().mapKeyToNode(key); + } + } + + /** + * + */ + private static class MapPartitionToNode extends AffinityTaskAdapter { + /** Partition. */ + private final int part; + + /** + * @param cacheName Cache name. + * @param part Partition. + */ + public MapPartitionToNode(String cacheName, int part) { + super(cacheName); + this.part = part; + } + + /** {@inheritDoc} */ + @Override public ClusterNode call() throws Exception { + return affinity().mapPartitionToNode(part); + } + } + + /** + * + */ + private static class MapPartitionsToNodes extends AffinityTaskAdapter> { + /** Parts. */ + private final Collection parts; + + /** + * @param cacheName Cache name. + * @param parts Parts. + */ + public MapPartitionsToNodes(String cacheName, Collection parts) { + super(cacheName); + this.parts = parts; + } + + /** {@inheritDoc} */ + @Override public Map call() throws Exception { + return affinity().mapPartitionsToNodes(parts); + } + } + + /** + * + */ + private static class MapPartitionsToPrimaryAndBackupsTask extends AffinityTaskAdapter> { + /** Partition. */ + private final int part; + + /** + * @param cacheName Cache name. + * @param part Partition. + */ + public MapPartitionsToPrimaryAndBackupsTask(String cacheName, int part) { + super(cacheName); + this.part = part; + } + + /** {@inheritDoc} */ + @Override public Collection call() throws Exception { + return affinity().mapPartitionToPrimaryAndBackups(part); + } + } + + /** + * + */ + private abstract static class AffinityTaskAdapter implements IgniteCallable { + /** Ignite. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** Cache name. */ + protected final String cacheName; + + /** + * @param cacheName Cache name. + */ + public AffinityTaskAdapter(String cacheName) { + this.cacheName = cacheName; + } + + /** + * @return Affinity. + */ + protected Affinity affinity() { + return ignite.affinity(cacheName); + } } } \ No newline at end of file