Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 646A3184BE for ; Thu, 18 Jun 2015 13:15:50 +0000 (UTC) Received: (qmail 67387 invoked by uid 500); 18 Jun 2015 13:15:50 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 67356 invoked by uid 500); 18 Jun 2015 13:15:50 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 67347 invoked by uid 99); 18 Jun 2015 13:15:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jun 2015 13:15:50 +0000 X-ASF-Spam-Status: No, hits=-2000.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 18 Jun 2015 13:13:32 +0000 Received: (qmail 64505 invoked by uid 99); 18 Jun 2015 13:15:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jun 2015 13:15:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7D292E3CA2; Thu, 18 Jun 2015 13:15:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 18 Jun 2015 13:15:37 -0000 Message-Id: <8ce284a5a3a34304b5892229b18f2a1a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/50] incubator-ignite git commit: ignite-484-1 - replicated client cache test added X-Virus-Checked: Checked by ClamAV on apache.org ignite-484-1 - replicated client cache test added Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/10febf28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/10febf28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/10febf28 Branch: refs/heads/ignite-1003-debug Commit: 10febf28fdf3966ffcb369c5725792b604be6c18 Parents: 1fe215e Author: S.Vladykin Authored: Wed Jun 17 17:11:00 2015 +0300 Committer: S.Vladykin Committed: Wed Jun 17 17:11:00 2015 +0300 ---------------------------------------------------------------------- ...lientQueryReplicatedNodeRestartSelfTest.java | 381 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 1 + 2 files changed, 382 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/10febf28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java new file mode 100644 index 0000000..23f44c0 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.rendezvous.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Test for distributed queries with replicated client cache and node restarts. + */ +public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCommonAbstractTest { + /** */ + private static final String QRY = "select co.id, count(*) cnt\n" + + "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" + + "where pe.id = pu.personId and pu.productId = pr.id and pr.companyId = co.id \n" + + "group by co.id order by cnt desc, co.id"; + + /** */ + private static final P1 DATA_NODES_FILTER = new P1() { + @Override public boolean apply(ClusterNode clusterNode) { + String gridName = clusterNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME); + + return !gridName.endsWith(String.valueOf(GRID_CNT - 1)); // The last one is client only. + } + }; + + /** */ + private static final int GRID_CNT = 5; + + /** */ + private static final int PERS_CNT = 600; + + /** */ + private static final int PURCHASE_CNT = 6000; + + /** */ + private static final int COMPANY_CNT = 25; + + /** */ + private static final int PRODUCT_CNT = 100; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + X.println("grid name: " + gridName); + + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + int i = 0; + + CacheConfiguration[] ccs = new CacheConfiguration[4]; + + for (String name : F.asList("co", "pr", "pe", "pu")) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setNodeFilter(DATA_NODES_FILTER); + cc.setName(name); + cc.setCacheMode(REPLICATED); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setRebalanceMode(SYNC); + cc.setAffinity(new RendezvousAffinityFunction(false, 50)); + + switch (name) { + case "co": + cc.setIndexedTypes( + Integer.class, Company.class + ); + + break; + + case "pr": + cc.setIndexedTypes( + Integer.class, Product.class + ); + + break; + + case "pe": + cc.setIndexedTypes( + Integer.class, Person.class + ); + + break; + + case "pu": + cc.setIndexedTypes( + AffinityKey.class, Purchase.class + ); + + break; + } + + ccs[i++] = cc; + } + + c.setCacheConfiguration(ccs); + + return c; + } + + /** + * + */ + private void fillCaches() { + IgniteCache co = grid(0).cache("co"); + + for (int i = 0; i < COMPANY_CNT; i++) + co.put(i, new Company(i)); + + IgniteCache pr = grid(0).cache("pr"); + + Random rnd = new GridRandom(); + + for (int i = 0; i < PRODUCT_CNT; i++) + pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT))); + + IgniteCache pe = grid(0).cache("pe"); + + for (int i = 0; i < PERS_CNT; i++) + pe.put(i, new Person(i)); + + IgniteCache, Purchase> pu = grid(0).cache("pu"); + + for (int i = 0; i < PURCHASE_CNT; i++) { + int persId = rnd.nextInt(PERS_CNT); + int prodId = rnd.nextInt(PRODUCT_CNT); + + pu.put(new AffinityKey<>(i, persId), new Purchase(persId, prodId)); + } + } + + /** + * @param c Cache. + * @param client If it must be a client cache. + */ + private void assertClient(IgniteCache c, boolean client) { + assertTrue(((IgniteCacheProxy)c).context().affinityNode() == !client); + } + + /** + * @throws Exception If failed. + */ + public void testRestarts() throws Exception { + int duration = 90 * 1000; + int qryThreadNum = 5; + int restartThreadsNum = 2; // 2 of 4 data nodes + final int nodeLifeTime = 2 * 1000; + final int logFreq = 10; + + startGridsMultiThreaded(GRID_CNT); + + final AtomicIntegerArray locks = new AtomicIntegerArray(GRID_CNT - 1); // The last is client only. + + fillCaches(); + + final List> pRes = grid(0).cache("pu").query(new SqlFieldsQuery(QRY)).getAll(); + + Thread.sleep(3000); + + assertEquals(pRes, grid(0).cache("pu").query(new SqlFieldsQuery(QRY)).getAll()); + + assertFalse(pRes.isEmpty()); + + final AtomicInteger qryCnt = new AtomicInteger(); + final AtomicBoolean qrysDone = new AtomicBoolean(); + + for (int i = 0; i < GRID_CNT - 1; i++) { + for (String cacheName : F.asList("co", "pr", "pe", "pu")) + assertClient(grid(i).cache(cacheName), false); + } + + for (String cacheName : F.asList("co", "pr", "pe", "pu")) + assertClient(grid(GRID_CNT - 1).cache(cacheName), true); + + final IgniteCache clientCache = grid(GRID_CNT - 1).cache("pu"); + + IgniteInternalFuture fut1 = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + GridRandom rnd = new GridRandom(); + + while (!qrysDone.get()) { + SqlFieldsQuery qry = new SqlFieldsQuery(QRY); + + boolean smallPageSize = rnd.nextBoolean(); + + if (smallPageSize) + qry.setPageSize(3); + + try { + assertEquals(pRes, clientCache.query(qry).getAll()); + } + catch (CacheException e) { + assertTrue("On large page size must retry.", smallPageSize); + + boolean failedOnRemoteFetch = false; + + for (Throwable th = e; th != null; th = th.getCause()) { + if (!(th instanceof CacheException)) + continue; + + if (th.getMessage() != null && + th.getMessage().startsWith("Failed to fetch data from node:")) { + failedOnRemoteFetch = true; + + break; + } + } + + if (!failedOnRemoteFetch) { + e.printStackTrace(); + + fail("Must fail inside of GridResultPage.fetchNextPage or subclass."); + } + } + + int c = qryCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Executed queries: " + c); + } + } + }, qryThreadNum); + + final AtomicInteger restartCnt = new AtomicInteger(); + + final AtomicBoolean restartsDone = new AtomicBoolean(); + + IgniteInternalFuture fut2 = multithreadedAsync(new Callable() { + @SuppressWarnings({"BusyWait"}) + @Override public Object call() throws Exception { + GridRandom rnd = new GridRandom(); + + while (!restartsDone.get()) { + int g; + + do { + g = rnd.nextInt(locks.length()); + } + while (!locks.compareAndSet(g, 0, -1)); + + stopGrid(g); + + Thread.sleep(rnd.nextInt(nodeLifeTime)); + + startGrid(g); + + Thread.sleep(rnd.nextInt(nodeLifeTime)); + + locks.set(g, 0); + + int c = restartCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Node restarts: " + c); + } + + return true; + } + }, restartThreadsNum); + + Thread.sleep(duration); + + info("Stopping.."); + + restartsDone.set(true); + + fut2.get(); + + info("Restarts stopped."); + + qrysDone.set(true); + + fut1.get(); + + info("Queries stopped."); + } + + /** + * + */ + private static class Person implements Serializable { + @QuerySqlField(index = true) + int id; + + Person(int id) { + this.id = id; + } + } + + /** + * + */ + private static class Purchase implements Serializable { + @QuerySqlField(index = true) + int personId; + + @QuerySqlField(index = true) + int productId; + + Purchase(int personId, int productId) { + this.personId = personId; + this.productId = productId; + } + } + + /** + * + */ + private static class Company implements Serializable { + @QuerySqlField(index = true) + int id; + + Company(int id) { + this.id = id; + } + } + + /** + * + */ + private static class Product implements Serializable { + @QuerySqlField(index = true) + int id; + + @QuerySqlField(index = true) + int companyId; + + Product(int id, int companyId) { + this.id = id; + this.companyId = companyId; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/10febf28/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index c5a2f15..dee3078 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -67,6 +67,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheClientQueryReplicatedNodeRestartSelfTest.class); suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class); suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class); suite.addTestSuite(GridCacheQuerySerializationSelfTest.class);