Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 58B21200C8C for ; Mon, 22 May 2017 17:12:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 57A61160BA5; Mon, 22 May 2017 15:12:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C2D22160BE1 for ; Mon, 22 May 2017 17:12:32 +0200 (CEST) Received: (qmail 52559 invoked by uid 500); 22 May 2017 15:12:31 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 51304 invoked by uid 99); 22 May 2017 15:12:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 May 2017 15:12:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7B12EE9641; Mon, 22 May 2017 15:12:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 22 May 2017 15:12:59 -0000 Message-Id: <033cbc68a440479a9d651321a0bafdcb@git.apache.org> In-Reply-To: <9fa8b040ac584bb7a9fddc61f3a941b7@git.apache.org> References: <9fa8b040ac584bb7a9fddc61f3a941b7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/50] [abbrv] ignite git commit: Ignite-5075 pending archived-at: Mon, 22 May 2017 15:12:35 -0000 Ignite-5075 pending Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4bbf3af4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4bbf3af4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4bbf3af4 Branch: refs/heads/ignite-5075-pds Commit: 4bbf3af412637969c6fff93d1f3890bcb0cf903c Parents: 70a2fe8 Author: Igor Seliverstov Authored: Fri May 19 12:04:04 2017 +0300 Committer: Igor Seliverstov Committed: Fri May 19 12:33:16 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheGroupsTest.java | 159 ++++++++++++++++--- 1 file changed, 141 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4bbf3af4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 403973b..3e3d3be 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -39,6 +39,7 @@ import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryProcessor; @@ -51,6 +52,7 @@ import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory; @@ -59,6 +61,7 @@ import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -1501,12 +1504,38 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { int[] backups = cacheMode == REPLICATED ? new int[]{Integer.MAX_VALUE} : new int[]{0, 1, 2, 3}; for (int backups0 : backups) - cacheApiTest(cacheMode, atomicityMode, backups0, false); + cacheApiTest(cacheMode, atomicityMode, backups0, false, false, false); int backups0 = cacheMode == REPLICATED ? Integer.MAX_VALUE : backups[ThreadLocalRandom.current().nextInt(backups.length)]; - cacheApiTest(cacheMode, atomicityMode, backups0, true); + cacheApiTest(cacheMode, atomicityMode, backups0, true, false, false); + + if (cacheMode == PARTITIONED) { + // Hire the f variable is used as a bit set where 2 last bits + // determine whether a near cache is used on server/client side. + // The case without near cache is already tested at this point. + for (int f : new int[]{1, 2, 3}) { + cacheApiTest(cacheMode, atomicityMode, backups0, false, nearSrv(f), nearClient(f)); + cacheApiTest(cacheMode, atomicityMode, backups0, true, nearSrv(f), nearClient(f)); + } + } + } + + /** + * @param flag Flag. + * @return {@code True} if near cache should be used on a client side. + */ + private boolean nearClient(int flag) { + return (flag & 0b01) == 0b01; + } + + /** + * @param flag Flag. + * @return {@code True} if near cache should be used on a server side. + */ + private boolean nearSrv(int flag) { + return (flag & 0b10) == 0b10; } /** @@ -1514,20 +1543,37 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { * @param atomicityMode Atomicity mode. * @param backups Number of backups. * @param heapCache On heap cache flag. - */ - private void cacheApiTest(final CacheMode cacheMode, - final CacheAtomicityMode atomicityMode, - final int backups, - final boolean heapCache) throws Exception { + * @param nearSrv {@code True} if near cache should be used on a server side. + * @param nearClient {@code True} if near cache should be used on a client side. + */ + private void cacheApiTest(CacheMode cacheMode, + CacheAtomicityMode atomicityMode, + int backups, + boolean heapCache, + boolean nearSrv, + boolean nearClient) throws Exception { Ignite srv0 = ignite(0); - srv0.createCache(cacheConfiguration(GROUP1, "cache-0", cacheMode, atomicityMode, backups, heapCache)); + NearCacheConfiguration nearCfg = nearSrv ? new NearCacheConfiguration() : null; + + srv0.createCache(cacheConfiguration(GROUP1, "cache-0", cacheMode, atomicityMode, backups, heapCache) + .setNearConfiguration(nearCfg)); + srv0.createCache(cacheConfiguration(GROUP1, "cache-1", cacheMode, atomicityMode, backups, heapCache)); - srv0.createCache(cacheConfiguration(GROUP2, "cache-2", cacheMode, atomicityMode, backups, heapCache)); + srv0.createCache(cacheConfiguration(GROUP2, "cache-2", cacheMode, atomicityMode, backups, heapCache) + .setNearConfiguration(nearCfg)); + srv0.createCache(cacheConfiguration(null, "cache-3", cacheMode, atomicityMode, backups, heapCache)); awaitPartitionMapExchange(); + if(nearClient) { + Ignite clientNode = ignite(4); + + clientNode.createNearCache("cache-0", new NearCacheConfiguration()); + clientNode.createNearCache("cache-2", new NearCacheConfiguration()); + } + try { for (final Ignite node : Ignition.allGrids()) { List> ops = new ArrayList<>(); @@ -1535,10 +1581,6 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { for (int i = 0; i < 4; i++) ops.add(testSet(node.cache("cache-" + i), cacheMode, atomicityMode, backups, heapCache, node)); - // sync operations - for (Callable op : ops) - op.call(); - // async operations GridTestUtils.runMultiThreaded(ops, "cacheApiTest"); } @@ -1585,7 +1627,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { /** * @param cache Cache. */ - private void cacheApiTest(IgniteCache cache) { + private void cacheApiTest(IgniteCache cache) throws Exception { cachePutAllGetAll(cache); cachePutRemove(cache); cachePutGet(cache); @@ -1593,15 +1635,77 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { cacheQuery(cache); cacheInvokeAll(cache); cacheInvoke(cache); + cacheDataStreamer(cache); } + /** + * @param cache Cache. + */ private void tearDown(IgniteCache cache) { cache.clear(); cache.removeAll(); } + /** + * @param cache Cache. + */ + private void cacheDataStreamer(final IgniteCache cache) throws Exception { + final int keys = 1000; + final int loaders = 4; + + final Integer[] data = generateData(keys * loaders); + + // stream through a client node + Ignite clientNode = ignite(4); + + List> cls = new ArrayList<>(loaders); + + for (final int i : sequence(loaders)) { + final IgniteDataStreamer ldr = clientNode.dataStreamer(cache.getName()); + ldr.autoFlushFrequency(0); + + cls.add(new Callable() { + @Override + public Void call() throws Exception { + List futs = new ArrayList<>(keys); + + for (int j = 0, size = keys * loaders; j < size; j++) { + if (j % loaders == i) + futs.add(ldr.addData(j, data[j])); + + if(j % (100 * loaders) == 0) + ldr.flush(); + } + + ldr.flush(); + + for (IgniteFuture fut : futs) + fut.get(); + + return null; + } + }); + } + + GridTestUtils.runMultiThreaded(cls, "loaders"); + + Set keysSet = sequence(data.length); + + for (Cache.Entry entry : (IgniteCache)cache) { + assertTrue(keysSet.remove(entry.getKey())); + assertEquals(data[entry.getKey()], entry.getValue()); + } + + assertTrue(keysSet.isEmpty()); + + tearDown(cache); + } + + /** + * @param cache Cache. + */ private void cachePutAllGetAll(IgniteCache cache) { - Map data = generateDataMap(10000); + Map data = generateDataMap(1000); cache.putAll(data); @@ -1616,6 +1720,9 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { tearDown(cache); } + /** + * @param cache Cache. + */ private void cachePutRemove(IgniteCache cache) { Random rnd = ThreadLocalRandom.current(); @@ -1631,6 +1738,9 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { tearDown(cache); } + /** + * @param cache Cache. + */ private void cachePutGet(IgniteCache cache) { Random rnd = ThreadLocalRandom.current(); @@ -1646,6 +1756,9 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { tearDown(cache); } + /** + * @param cache Cache. + */ private void cachePutGetAndPut(IgniteCache cache) { Random rnd = ThreadLocalRandom.current(); @@ -1666,8 +1779,11 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { tearDown(cache); } + /** + * @param cache Cache. + */ private void cacheQuery(IgniteCache cache) { - Map data = generateDataMap(10000); + Map data = generateDataMap(1000); cache.putAll(data); @@ -1689,8 +1805,12 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { tearDown(cache); } + /** + * @param cache Cache. + */ private void cacheInvokeAll(IgniteCache cache) { - Map data = generateDataMap(10000); + int keys = 1000; + Map data = generateDataMap(keys); cache.putAll(data); @@ -1710,12 +1830,15 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { } }, data, one, two); - assertEquals(10000, res.size()); + assertEquals(keys, res.size()); assertEquals(one + two, (Object)res.get(0).get()); tearDown(cache); } + /** + * @param cache Cache. + */ private void cacheInvoke(IgniteCache cache) { Random rnd = ThreadLocalRandom.current();