Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DBE6018C3F for ; Tue, 9 Jun 2015 06:48:24 +0000 (UTC) Received: (qmail 37567 invoked by uid 500); 9 Jun 2015 06:48:24 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 37535 invoked by uid 500); 9 Jun 2015 06:48:24 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 37526 invoked by uid 99); 9 Jun 2015 06:48:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2015 06:48:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 09 Jun 2015 06:46:06 +0000 Received: (qmail 34479 invoked by uid 99); 9 Jun 2015 06:47:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2015 06:47:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4E060DFF16; Tue, 9 Jun 2015 06:47:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sergi@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 09 Jun 2015 06:48:33 -0000 Message-Id: <86f77b91f025450fa582538aa8bf0bb7@git.apache.org> In-Reply-To: <2f02124015af44e0a406305c0c682c57@git.apache.org> References: <2f02124015af44e0a406305c0c682c57@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [48/50] incubator-ignite git commit: ignite-484-1 - test X-Virus-Checked: Checked by ClamAV on apache.org ignite-484-1 - test Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e197640b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e197640b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e197640b Branch: refs/heads/ignite-484-1 Commit: e197640bbc9dcef4cf872552566fce6d6dbbd5f3 Parents: ae3279a Author: S.Vladykin Authored: Tue Jun 9 09:26:55 2015 +0300 Committer: S.Vladykin Committed: Tue Jun 9 09:26:55 2015 +0300 ---------------------------------------------------------------------- .../IgniteCacheQueryNodeRestartSelfTest2.java | 331 +++++++++++++++++++ 1 file changed, 331 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e197640b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java new file mode 100644 index 0000000..1f0a6e6 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.rendezvous.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Test for distributed queries with node restarts. + */ +public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest { + /** */ + private static final String PARTITIONED_QRY = "select co.id, count(*) cnt\n" + + "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" + + "where pe.id = pu.personId and pu.productId = pr.id and pr.companyId = co.id \n" + + "group by co.id order by cnt desc, co.id"; + + /** */ + private static final String REPLICATED_QRY = "select pr.id, co.id\n" + + "from \"pr\".Product pr, \"co\".Company co\n" + + "where pr.companyId = co.id\n" + + "order by co.id, pr.id "; + + /** */ + private static final int GRID_CNT = 6; + + /** */ + private static final int PERS_CNT = 600; + + /** */ + private static final int PURCHASE_CNT = 6000; + + /** */ + private static final int COMPANY_CNT = 25; + + /** */ + private static final int PRODUCT_CNT = 100; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + int i = 0; + + CacheConfiguration[] ccs = new CacheConfiguration[4]; + + for (String name : F.asList("pe", "pu")) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setName(name); + cc.setCacheMode(PARTITIONED); + cc.setBackups(2); + cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setRebalanceMode(CacheRebalanceMode.SYNC); + cc.setAffinity(new RendezvousAffinityFunction(false, name.equals("pe") ? 50 : 60)); + + if (name.equals("pe")) { + cc.setIndexedTypes( + Integer.class, Person.class + ); + } + else if (name.equals("pu")) { + cc.setIndexedTypes( + AffinityKey.class, Purchase.class + ); + } + + ccs[i++] = cc; + } + + for (String name : F.asList("co", "pr")) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setName(name); + cc.setCacheMode(REPLICATED); + cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setRebalanceMode(CacheRebalanceMode.SYNC); + cc.setAffinity(new RendezvousAffinityFunction(false, 50)); + + if (name.equals("co")) { + cc.setIndexedTypes( + Integer.class, Company.class + ); + } + else if (name.equals("pr")) { + cc.setIndexedTypes( + Integer.class, Product.class + ); + } + + ccs[i++] = cc; + } + + c.setCacheConfiguration(ccs); + + return c; + } + + private void fillCaches() { + IgniteCache co = grid(0).cache("co"); + + for (int i = 0; i < COMPANY_CNT; i++) + co.put(i, new Company(i)); + + IgniteCache pr = grid(0).cache("pr"); + + Random rnd = new GridRandom(); + + for (int i = 0; i < PRODUCT_CNT; i++) + pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT))); + + IgniteCache pe = grid(0).cache("pe"); + + for (int i = 0; i < PERS_CNT; i++) + pe.put(i, new Person(i)); + + IgniteCache, Purchase> pu = grid(0).cache("pu"); + + for (int i = 0; i < PURCHASE_CNT; i++) { + int persId = rnd.nextInt(PERS_CNT); + int prodId = rnd.nextInt(PRODUCT_CNT); + + pu.put(new AffinityKey<>(i, persId), new Purchase(persId, prodId)); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testRestarts() throws Exception { + int duration = 150 * 1000; + int qryThreadNum = 4; + int restartThreadsNum = 1; // 4 + 2 = 6 nodes + final int nodeLifeTime = 2 * 1000; + final int logFreq = 50; + + startGridsMultiThreaded(GRID_CNT); + + final AtomicIntegerArray locks = new AtomicIntegerArray(GRID_CNT); + + fillCaches(); + + final List> pRes = grid(0).cache("pu").query(new SqlFieldsQuery(PARTITIONED_QRY)).getAll(); + + Thread.sleep(3000); + + assertEquals(pRes, grid(0).cache("pu").query(new SqlFieldsQuery(PARTITIONED_QRY)).getAll()); + + final List> rRes = grid(0).cache("co").query(new SqlFieldsQuery(REPLICATED_QRY)).getAll(); + + final AtomicInteger qryCnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteInternalFuture fut1 = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + GridRandom rnd = new GridRandom(); + + while (!done.get()) { + int g; + + do { + g = rnd.nextInt(locks.length()); + } + while (!locks.compareAndSet(g, 0, 1)); + +// if (rnd.nextBoolean()) { // Partitioned query. + IgniteCache cache = grid(g).cache("pu"); + + assertEquals(pRes, cache.query(new SqlFieldsQuery(PARTITIONED_QRY)).getAll()); +// } +// else { // Replicated query. +// IgniteCache cache = grid(g).cache("co"); +// +// assertEquals(rRes, cache.query(new SqlFieldsQuery(REPLICATED_QRY)).getAll()); +// } + + locks.set(g, 0); + + int c = qryCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Executed queries: " + c); + } + } + }, qryThreadNum); + + final AtomicInteger restartCnt = new AtomicInteger(); + + IgniteInternalFuture fut2 = multithreadedAsync(new Callable() { + @SuppressWarnings({"BusyWait"}) + @Override public Object call() throws Exception { + GridRandom rnd = new GridRandom(); + + while (!done.get()) { + int g; + + do { + g = rnd.nextInt(locks.length()); + } + while (!locks.compareAndSet(g, 0, -1)); + + stopGrid(g); + + Thread.sleep(rnd.nextInt(nodeLifeTime)); + + startGrid(g); + + Thread.sleep(rnd.nextInt(nodeLifeTime)); + + locks.set(g, 0); + + int c = restartCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Node restarts: " + c); + } + + return true; + } + }, restartThreadsNum); + + Thread.sleep(duration); + + info("Stopping.."); + + done.set(true); + + fut2.get(); + + info("Restarts stopped."); + + fut1.get(); + + info("Queries stopped."); + } + + // Partitioned + private static class Person implements Serializable { + @QuerySqlField(index = true) + int id; + + Person(int id) { + this.id = id; + } + } + + private static class Purchase implements Serializable { + @QuerySqlField(index = true) + int personId; + + @QuerySqlField(index = true) + int productId; + + Purchase(int personId, int productId) { + this.personId = personId; + this.productId = productId; + } + } + + // Replicated + private static class Company implements Serializable { + @QuerySqlField(index = true) + int id; + + Company(int id) { + this.id = id; + } + } + + private static class Product implements Serializable { + @QuerySqlField(index = true) + int id; + + @QuerySqlField(index = true) + int companyId; + + Product(int id, int companyId) { + this.id = id; + this.companyId = companyId; + } + } +}