Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B3B0B18423 for ; Fri, 21 Aug 2015 08:13:21 +0000 (UTC) Received: (qmail 46203 invoked by uid 500); 21 Aug 2015 08:13:21 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 46172 invoked by uid 500); 21 Aug 2015 08:13:21 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 46163 invoked by uid 99); 21 Aug 2015 08:13:21 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 08:13:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 29167182206 for ; Fri, 21 Aug 2015 08:13:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.775 X-Spam-Level: * X-Spam-Status: No, score=1.775 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id BuunDXHdb6BM for ; Fri, 21 Aug 2015 08:13:18 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 7705136934 for ; Fri, 21 Aug 2015 08:13:09 +0000 (UTC) Received: (qmail 45412 invoked by uid 99); 21 Aug 2015 08:13:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 08:13:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7CAA7E1144; Fri, 21 Aug 2015 08:13:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 21 Aug 2015 08:13:26 -0000 Message-Id: <2afb4fe185714428b85eb6712c22c38b@git.apache.org> In-Reply-To: <3a7fd2ad7c94497f8a0155751bbaffaf@git.apache.org> References: <3a7fd2ad7c94497f8a0155751bbaffaf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [20/34] incubator-ignite git commit: # ignite-1.3.3-p3 added test for cross cache transaction operations # ignite-1.3.3-p3 added test for cross cache transaction operations Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/05fda0cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/05fda0cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/05fda0cc Branch: refs/heads/master Commit: 05fda0cceda69ef569b554facca7c2397dbaaad4 Parents: ef14950 Author: sboikov Authored: Thu Aug 20 13:08:32 2015 +0300 Committer: sboikov Committed: Thu Aug 20 13:08:32 2015 +0300 ---------------------------------------------------------------------- .../cache/CrossCacheTxRandomOperationsTest.java | 490 +++++++++++++++++++ ...gniteCachePutRetryTransactionalSelfTest.java | 15 +- 2 files changed, 500 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05fda0cc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java new file mode 100644 index 0000000..f3159a3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.fair.*; +import org.apache.ignite.cache.affinity.rendezvous.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE1 = "cache1"; + + /** */ + private static final String CACHE2 = "cache2"; + + /** */ + private static final int GRID_CNT = 5; + + /** */ + private static final int KEY_RANGE = 1000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + if (gridName.equals(getTestGridName(GRID_CNT - 1))) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperations() throws Exception { + txOperations(PARTITIONED, FULL_SYNC, false); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsPrimarySync() throws Exception { + txOperations(PARTITIONED, PRIMARY_SYNC, false); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsFairAffinity() throws Exception { + txOperations(PARTITIONED, FULL_SYNC, true); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsReplicated() throws Exception { + txOperations(REPLICATED, FULL_SYNC, false); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsReplicatedPrimarySync() throws Exception { + txOperations(REPLICATED, PRIMARY_SYNC, false); + } + + /** + * @param name Cache name. + * @param cacheMode Cache mode. + * @param writeSync Write synchronization mode. + * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, + CacheMode cacheMode, + CacheWriteSynchronizationMode writeSync, + boolean fairAff) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(writeSync); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(1); + + ccfg.setAffinity(fairAff ? new FairAffinityFunction() : new RendezvousAffinityFunction()); + + return ccfg; + } + + /** + * @param cacheMode Cache mode. + * @param writeSync Write synchronization mode. + * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}. + * @throws Exception If failed. + */ + private void txOperations(CacheMode cacheMode, + CacheWriteSynchronizationMode writeSync, + boolean fairAff) throws Exception { + Ignite ignite = ignite(0); + + try { + ignite.createCache(cacheConfiguration(CACHE1, cacheMode, writeSync, fairAff)); + ignite.createCache(cacheConfiguration(CACHE2, cacheMode, writeSync, fairAff)); + + txOperations(PESSIMISTIC, REPEATABLE_READ, false); + txOperations(PESSIMISTIC, REPEATABLE_READ, true); + + txOperations(OPTIMISTIC, REPEATABLE_READ, false); + txOperations(OPTIMISTIC, REPEATABLE_READ, true); + } + finally { + ignite.destroyCache(CACHE1); + ignite.destroyCache(CACHE2); + } + } + + /** + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param client If {@code true} uses client node. + */ + private void txOperations(TransactionConcurrency concurrency, + TransactionIsolation isolation, + boolean client) { + Map expData1 = new HashMap<>(); + Map expData2 = new HashMap<>(); + + Ignite ignite = client ? ignite(GRID_CNT - 1) : ignite(0); + + assertEquals(client, (boolean)ignite.configuration().isClientMode()); + + IgniteCache cache1 = ignite.cache(CACHE1); + IgniteCache cache2 = ignite.cache(CACHE2); + + assertNotNull(cache1); + assertNotNull(cache2); + assertNotSame(cache1, cache2); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + long seed = System.currentTimeMillis(); + + log.info("Test tx operations [concurrency=" + concurrency + + ", isolation=" + isolation + + ", client=" + client + + ", seed=" + seed + ']'); + + IgniteTransactions txs = ignite.transactions(); + + List keys = new ArrayList<>(); + + for (int i = 0; i < KEY_RANGE; i++) + keys.add(new TestKey(i)); + + for (int i = 0; i < 10_000; i++) { + if (i % 100 == 0) + log.info("Iteration: " + i); + + boolean rollback = i % 10 == 0; + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + cacheOperation(expData1, rnd, cache1, rollback); + cacheOperation(expData2, rnd, cache2, rollback); + + if (rollback) + tx.rollback(); + else + tx.commit(); + } + } + + List> caches1 = new ArrayList<>(); + List> caches2 = new ArrayList<>(); + + for (int i = 0; i < GRID_CNT; i++) { + caches1.add(ignite(i).cache(CACHE1)); + caches2.add(ignite(i).cache(CACHE2)); + } + + checkData(caches1, keys, expData1); + checkData(caches2, keys, expData2); + + cache1.removeAll(); + cache2.removeAll(); + } + + /** + * @param caches Caches. + * @param keys Keys. + * @param expData Expected data. + */ + private void checkData(List> caches, + List keys, Map expData) { + for (IgniteCache cache : caches) { + for (TestKey key : keys) { + TestValue val = cache.get(key); + TestValue expVal = expData.get(key); + + assertEquals(expVal, val); + } + } + } + + /** + * @param expData Expected cache data. + * @param rnd Random. + * @param cache Cache. + * @param willRollback {@code True} if will rollback transaction. + */ + private void cacheOperation( + Map expData, + ThreadLocalRandom rnd, + IgniteCache cache, + boolean willRollback) { + TestKey key = key(rnd); + TestValue val = new TestValue(rnd.nextLong()); + + switch (rnd.nextInt(8)) { + case 0: { + cache.put(key, val); + + if (!willRollback) + expData.put(key, val); + + break; + } + + case 1: { + TestValue oldVal = cache.getAndPut(key, val); + + TestValue expOld = expData.get(key); + + assertEquals(expOld, oldVal); + + if (!willRollback) + expData.put(key, val); + + break; + } + + case 2: { + boolean rmv = cache.remove(key); + + assertEquals(expData.containsKey(key), rmv); + + if (!willRollback) + expData.remove(key); + + break; + } + + case 3: { + TestValue oldVal = cache.getAndRemove(key); + + TestValue expOld = expData.get(key); + + assertEquals(expOld, oldVal); + + if (!willRollback) + expData.remove(key); + + break; + } + + case 4: { + boolean put = cache.putIfAbsent(key, val); + + boolean expPut = !expData.containsKey(key); + + assertEquals(expPut, put); + + if (expPut && !willRollback) + expData.put(key, val); + + break; + } + + case 5: { + TestValue oldVal = cache.invoke(key, new TestEntryProcessor(val.value())); + TestValue expOld = expData.get(key); + + assertEquals(expOld, oldVal); + + if (!willRollback) + expData.put(key, val); + + break; + } + + case 6: { + TestValue oldVal = cache.invoke(key, new TestEntryProcessor(null)); + TestValue expOld = expData.get(key); + + assertEquals(expOld, oldVal); + + break; + } + + case 7: { + TestValue oldVal = cache.get(key); + TestValue expOld = expData.get(key); + + assertEquals(expOld, oldVal); + + break; + } + + default: + assert false; + } + } + + /** + * @param rnd Random. + * @return Key. + */ + private TestKey key(ThreadLocalRandom rnd) { + return new TestKey(rnd.nextInt(KEY_RANGE)); + } + + /** + * + */ + private static class TestKey implements Serializable { + /** */ + private long key; + + /** + * @param key Key. + */ + public TestKey(long key) { + this.key = key; + } + + /** + * @return Key. + */ + public long key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey other = (TestKey)o; + + return key == other.key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(key ^ (key >>> 32)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestKey.class, this); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private long val; + + /** + * @param val Value. + */ + public TestValue(long val) { + this.val = val; + } + + /** + * @return Value. + */ + public long value() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue other = (TestValue)o; + + return val == other.val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * + */ + private static class TestEntryProcessor implements CacheEntryProcessor { + /** */ + private Long val; + + /** + * @param val Value. + */ + public TestEntryProcessor(@Nullable Long val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public TestValue process(MutableEntry e, Object... args) { + TestValue old = e.getValue(); + + if (val != null) + e.setValue(new TestValue(val)); + + return old; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05fda0cc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 9a6bb31..9c4446d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -61,8 +61,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr IgniteAtomicLong atomic = ignite(0).atomicLong("TestAtomic", 0, true); IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { - @Override - public Object call() throws Exception { + @Override public Object call() throws Exception { while (!finished.get()) { stopGrid(3); @@ -157,7 +156,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr * @param ignite Ignite instance. * @param clo Closure. * @return Result of closure execution. - * @throws Exception + * @throws Exception If failed. */ private T doInTransaction(Ignite ignite, Callable clo) throws Exception { while (true) { @@ -213,10 +212,16 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public Void call() throws Exception { - ((IgniteCache)cache).put("key-" + base + "-" + i, "value-" + i); + String key1 = "key-" + base + "-" + i; + String key2 = "key-" + base; + + assert key1.compareTo(key2) > 0; + + ((IgniteCache)cache).put(key1, "value-" + i); - ((IgniteCache>)cache).invoke("key-" + base, new AddEntryProcessor("value-" + i)); + ((IgniteCache>)cache).invoke(key2, new AddEntryProcessor("value-" + i)); return null; }