Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 354B7185C3 for ; Fri, 21 Aug 2015 09:05:05 +0000 (UTC) Received: (qmail 69191 invoked by uid 500); 21 Aug 2015 09:05:05 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 69152 invoked by uid 500); 21 Aug 2015 09:05:05 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 69138 invoked by uid 99); 21 Aug 2015 09:05:05 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 09:05:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id A4BE218237C for ; Fri, 21 Aug 2015 09:05:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.794 X-Spam-Level: * X-Spam-Status: No, score=1.794 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.006] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id wXEav1hf4XC8 for ; Fri, 21 Aug 2015 09:04:49 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 07C3425CF4 for ; Fri, 21 Aug 2015 09:04:47 +0000 (UTC) Received: (qmail 66864 invoked by uid 99); 21 Aug 2015 09:04:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 09:04:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1288DE1144; Fri, 21 Aug 2015 09:04:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 21 Aug 2015 09:04:49 -0000 Message-Id: In-Reply-To: <779fb856b3ed494ea4064fff6d37aa26@git.apache.org> References: <779fb856b3ed494ea4064fff6d37aa26@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/44] incubator-ignite git commit: IGNITE-1265 - EntryProcessorTest when nodes joining topology. IGNITE-1265 - EntryProcessorTest when nodes joining topology. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ccaa2b20 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ccaa2b20 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ccaa2b20 Branch: refs/heads/ignite-gg-9615-1 Commit: ccaa2b20dab5438603471796b7155f309261a41f Parents: 013d707 Author: Alexey Goncharuk Authored: Tue Aug 18 18:38:36 2015 -0700 Committer: Alexey Goncharuk Committed: Tue Aug 18 18:38:36 2015 -0700 ---------------------------------------------------------------------- .../IgniteCacheEntryProcessorNodeJoinTest.java | 216 +++++++++++++++++++ .../IgniteCacheEntryProcessorRestartTest.java | 185 ---------------- 2 files changed, 216 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ccaa2b20/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java new file mode 100644 index 0000000..9c17ebd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Tests cache in-place modification logic with iterative value increment. + */ +public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of nodes to test on. */ + private static final int GRID_CNT = 2; + + /** Number of increment iterations. */ + private static final int NUM_SETS = 50; + + /** Helper for excluding stopped node from iteration logic. */ + private AtomicReferenceArray grids; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cache = new CacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setAtomicityMode(TRANSACTIONAL); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setBackups(1); + cache.setRebalanceMode(SYNC); + + cfg.setCacheConfiguration(cache); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(GRID_CNT); + + grids = new AtomicReferenceArray<>(GRID_CNT); + + for (int i = 0; i < GRID_CNT; i++) + grids.set(i, grid(i)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + grids = null; + } + + /** + * @throws Exception If failed. + */ + public void testSingleEntryProcessorNodeJoin() throws Exception { + checkEntryProcessorNodeJoin(false); + } + + /** + * @throws Exception If failed. + */ + public void testAllEntryProcessorNodeJoin() throws Exception { + checkEntryProcessorNodeJoin(true); + } + + /** + * @throws Exception If failed. + */ + private void checkEntryProcessorNodeJoin(boolean invokeAll) throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + final AtomicReference error = new AtomicReference<>(); + final int started = 6; + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + for (int i = 0; i < started; i++) { + U.sleep(1_000); + + startGrid(GRID_CNT + i); + } + } + catch (Exception e) { + error.compareAndSet(null, e); + } + } + }, 1, "starter"); + + try { + checkIncrement(invokeAll); + } + finally { + stop.set(true); + + fut.get(getTestTimeout()); + } + + for (int i = 0; i < NUM_SETS; i++) { + for (int g = 0; g < GRID_CNT + started; g++) { + Set vals = ignite(g).>cache(null).get("set-" + i); + + assertNotNull(vals); + assertEquals(100, vals.size()); + } + } + } + + /** + * @throws Exception If failed. + */ + private void checkIncrement(boolean invokeAll) throws Exception { + for (int k = 0; k < 100; k++) { + if (invokeAll) { + IgniteCache> cache = ignite(0).cache(null); + + Map procs = new LinkedHashMap<>(); + + for (int i = 0; i < NUM_SETS; i++) { + String key = "set-" + i; + + String val = "value-" + k; + + cache.invoke(key, new Processor(val)); + } + + cache.invokeAll(procs); + } + else { + for (int i = 0; i < NUM_SETS; i++) { + String key = "set-" + i; + + String val = "value-" + k; + + IgniteCache> cache = ignite(0).cache(null); + + cache.invoke(key, new Processor(val)); + } + } + } + } + + /** */ + private static class Processor implements EntryProcessor, Void>, Serializable { + /** */ + private String val; + + private Processor(String val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry> e, Object... args) { + Set vals = e.getValue(); + + if (vals == null) + vals = new HashSet<>(); + + vals.add(val); + + e.setValue(vals); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ccaa2b20/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java deleted file mode 100644 index c027ee4..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorRestartTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; - -import javax.cache.processor.*; -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheRebalanceMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; - -/** - * Tests cache in-place modification logic with iterative value increment. - */ -public class IgniteCacheEntryProcessorRestartTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of nodes to test on. */ - private static final int GRID_CNT = 2; - - /** Number of increment iterations. */ - private static final int NUM_SETS = 50; - - /** Helper for excluding stopped node from iteration logic. */ - private AtomicReferenceArray grids; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration cache = new CacheConfiguration(); - - cache.setCacheMode(PARTITIONED); - cache.setAtomicityMode(TRANSACTIONAL); - cache.setWriteSynchronizationMode(FULL_SYNC); - cache.setBackups(1); - cache.setRebalanceMode(SYNC); - - cfg.setCacheConfiguration(cache); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(IP_FINDER); - - TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); - - commSpi.setSharedMemoryPort(-1); - - cfg.setCommunicationSpi(commSpi); - - cfg.setDiscoverySpi(disco); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(GRID_CNT); - - grids = new AtomicReferenceArray<>(GRID_CNT); - - for (int i = 0; i < GRID_CNT; i++) - grids.set(i, grid(i)); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - grids = null; - } - - /** - * @throws Exception If failed. - */ - public void testEntryProcessorRestart() throws Exception { - final AtomicBoolean stop = new AtomicBoolean(); - final AtomicReference error = new AtomicReference<>(); - final int started = 6; - - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - try { - for (int i = 0; i < started; i++) { - U.sleep(1_000); - - startGrid(GRID_CNT + i); - } - } - catch (Exception e) { - error.compareAndSet(null, e); - } - } - }, 1, "starter"); - - try { - checkIncrement(); - } - finally { - stop.set(true); - - fut.get(getTestTimeout()); - } - - for (int i = 0; i < NUM_SETS; i++) { - for (int g = 0; g < GRID_CNT + started; g++) { - Set vals = ignite(g).>cache(null).get("set-" + i); - - assertNotNull(vals); - assertEquals(100, vals.size()); - } - } - } - - /** - * @throws Exception If failed. - */ - private void checkIncrement() throws Exception { - for (int k = 0; k < 100; k++) { - for (int i = 0; i < NUM_SETS; i++) { - String key = "set-" + i; - - String val = "value-" + k; - - IgniteCache> cache = ignite(0).cache(null); - - cache.invoke(key, new Processor(val)); - } - } - } - - /** */ - private static class Processor implements EntryProcessor, Void>, Serializable { - /** */ - private String val; - - private Processor(String val) { - this.val = val; - } - - /** {@inheritDoc} */ - @Override public Void process(MutableEntry> e, Object... args) { - Set vals = e.getValue(); - - if (vals == null) - vals = new HashSet<>(); - - vals.add(val); - - e.setValue(vals); - - return null; - } - } -}