Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7047218270 for ; Tue, 3 Nov 2015 10:28:16 +0000 (UTC) Received: (qmail 80642 invoked by uid 500); 3 Nov 2015 10:28:16 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 80525 invoked by uid 500); 3 Nov 2015 10:28:16 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 80449 invoked by uid 99); 3 Nov 2015 10:28:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Nov 2015 10:28:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 14225E00DE; Tue, 3 Nov 2015 10:28:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 03 Nov 2015 10:28:19 -0000 Message-Id: In-Reply-To: <2873b30100e44f639aad01aaccbfd958@git.apache.org> References: <2873b30100e44f639aad01aaccbfd958@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/16] ignite git commit: ignite-1397: Load/consistency tests. http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java new file mode 100644 index 0000000..f8a1689 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.yardstickframework.BenchmarkConfiguration; + +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Invoke retry failover benchmark.

Each client maintains a local map that it updates together with cache. Client + * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the + * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared + * to the values in the cache. + */ +public class IgniteTransactionalInvokeRetryBenchmark extends IgniteFailoverAbstractBenchmark { + /** */ + private final ConcurrentMap map = new ConcurrentHashMap<>(); + + /** */ + private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); + + /** */ + private volatile Exception ex; + + /** {@inheritDoc} */ + @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + Thread thread = new Thread(new Runnable() { + @Override public void run() { + try { + final int timeout = args.cacheOperationTimeoutMillis(); + final int keysCnt = args.keysCount(); + + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(args.cacheConsistencyCheckingPeriod() * 1000); + + rwl.writeLock().lock(); + + try { + println("Start cache validation."); + + long startTime = U.currentTimeMillis(); + + Map notEqualsCacheVals = new HashMap<>(); + Map notEqualsLocMapVals = new HashMap<>(); + + for (int k = 0; k < args.range(); k++) { + if (k % 10_000 == 0) + println("Start validation for keys like 'key-" + k + "-*'"); + + for (int i = 0; i < keysCnt; i++) { + String key = "key-" + k + "-" + cfg.memberId() + "-" + i; + + asyncCache.get(key); + Long cacheVal = asyncCache.future().get(timeout); + + AtomicLong aVal = map.get(key); + Long mapVal = aVal != null ? aVal.get() : null; + + if (!Objects.equals(cacheVal, mapVal)) { + notEqualsCacheVals.put(key, cacheVal); + notEqualsLocMapVals.put(key, mapVal); + } + } + } + + assert notEqualsCacheVals.size() == notEqualsLocMapVals.size() : "Invalid state " + + "[cacheMapVals=" + notEqualsCacheVals + ", mapVals=" + notEqualsLocMapVals + "]"; + + if (!notEqualsCacheVals.isEmpty()) { + // Print all usefull information and finish. + for (Map.Entry eLocMap : notEqualsLocMapVals.entrySet()) { + String key = eLocMap.getKey(); + Long mapVal = eLocMap.getValue(); + Long cacheVal = notEqualsCacheVals.get(key); + + println(cfg, "Got different values [key='" + key + + "', cacheVal=" + cacheVal + ", localMapVal=" + mapVal + "]"); + } + + println(cfg, "Local driver map contant:\n " + map); + + println(cfg, "Cache content:"); + + for (int k2 = 0; k2 < args.range(); k2++) { + for (int i2 = 0; i2 < keysCnt; i2++) { + String key2 = "key-" + k2 + "-" + cfg.memberId() + "-" + i2; + + asyncCache.get(key2); + Long val = asyncCache.future().get(timeout); + + if (val != null) + println(cfg, "Entry [key=" + key2 + ", val=" + val + "]"); + } + } + + throw new IllegalStateException("Cache and local map are in inconsistent state."); + } + + println("Cache validation successfully finished in " + + (U.currentTimeMillis() - startTime) / 1000 + " sec."); + } + finally { + rwl.writeLock().unlock(); + } + } + } + catch (Throwable e) { + ex = new Exception(e); + + println("Got exception: " + e); + + e.printStackTrace(); + + if (e instanceof Error) + throw (Error)e; + } + } + }, "cache-" + cacheName() + "-validator"); + + thread.setDaemon(true); + + thread.start(); + } + + /** {@inheritDoc} */ + @Override public boolean test(Map ctx) throws Exception { + final int k = nextRandom(args.range()); + + final String[] keys = new String[args.keysCount()]; + + assert keys.length > 0 : "Count of keys: " + keys.length; + + for (int i = 0; i < keys.length; i++) + keys[i] = "key-" + k + "-" + cfg.memberId() + "-" + i; + + for (String key : keys) { + rwl.readLock().lock(); + + try { + if (ex != null) + throw ex; + + asyncCache.invoke(key, new IncrementCacheEntryProcessor()); + asyncCache.future().get(args.cacheOperationTimeoutMillis()); + + AtomicLong prevVal = map.putIfAbsent(key, new AtomicLong(0)); + + if (prevVal != null) + prevVal.incrementAndGet(); + } + finally { + rwl.readLock().unlock(); + } + } + + if (ex != null) + throw ex; + + return true; + } + + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-invoke-retry"; + } + + /** + */ + private static class IncrementCacheEntryProcessor implements CacheEntryProcessor { + /** */ + private static final long serialVersionUID = 0; + + /** {@inheritDoc} */ + @Override public Long process(MutableEntry entry, + Object... arguments) throws EntryProcessorException { + long newVal = entry.getValue() == null ? 0 : entry.getValue() + 1; + + entry.setValue(newVal); + + return newVal; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java new file mode 100644 index 0000000..4cbcf67 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +/** + * Invoke retry failover benchmark. + *

+ * Each client maintains a local map that it updates together with cache. + * Client invokes an increment closure for all generated keys and atomically increments value for corresponding + * keys in the local map. To validate cache contents, all writes from the client are stopped, values in + * the local map are compared to the values in the cache. + */ +public class IgniteTransactionalOffHeapInvokeRetryBenchmark extends IgniteTransactionalInvokeRetryBenchmark { + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-offheap-invoke-retry"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java new file mode 100644 index 0000000..7fa2d1a --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +/** + * Transactional write invoke failover benchmark. + *

+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master', + * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction + * and randomly chooses between read and write scenarios: + *

    + *
  • Reads value associated with the master key and child keys. Values must be equal.
  • + *
  • Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment + * closure on child keys. No validation is performed.
  • + *
+ */ +public class IgniteTransactionalOffHeapWriteInvokeBenchmark extends IgniteTransactionalWriteInvokeBenchmark { + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-offheap-write-invoke"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java new file mode 100644 index 0000000..bdecca7 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +/** + * Transactional write read failover benchmark. + *

+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1', + * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with + * each key. Values must be equal. Client increments value by 1, commits the transaction. + */ +public class IgniteTransactionalOffHeapWriteReadBenchmark extends IgniteTransactionalWriteReadBenchmark { + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-offheap-write-read"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java new file mode 100644 index 0000000..1a8ee14 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import javax.cache.CacheException; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionRollbackException; + +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Transactional write invoke failover benchmark. + *

+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master', + * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction + * and randomly chooses between read and write scenarios: + *

    + *
  • Reads value associated with the master key and child keys. Values must be equal.
  • + *
  • Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment + * closure on child keys. No validation is performed.
  • + *
+ */ +public class IgniteTransactionalWriteInvokeBenchmark extends IgniteFailoverAbstractBenchmark { + /** {@inheritDoc} */ + @Override public boolean test(Map ctx) throws Exception { + final int k = nextRandom(args.range()); + + assert args.keysCount() > 0 : "Count of keys: " + args.keysCount(); + + final String[] keys = new String[args.keysCount()]; + + final String masterKey = "key-" + k + "-master"; + + for (int i = 0; i < keys.length; i++) + keys[i] = "key-" + k + "-" + i; + + final int scenario = nextRandom(2); + + return doInTransaction(ignite(), new Callable() { + @Override public Boolean call() throws Exception { + final int timeout = args.cacheOperationTimeoutMillis(); + + switch (scenario) { + case 0: // Read scenario. + Map map = new HashMap<>(); + + asyncCache.get(masterKey); + Long cacheVal = asyncCache.future().get(timeout); + + map.put(masterKey, cacheVal); + + for (String key : keys) { + asyncCache.get(key); + cacheVal = asyncCache.future().get(timeout); + + map.put(key, cacheVal); + } + + Set values = new HashSet<>(map.values()); + + if (values.size() != 1) { + // Print all usefull information and finish. + println(cfg, "Got different values for keys [map=" + map + "]"); + + println(cfg, "Cache content:"); + + for (int k = 0; k < args.range(); k++) { + for (int i = 0; i < args.keysCount(); i++) { + String key = "key-" + k + "-" + i; + + asyncCache.get(key); + Long val = asyncCache.future().get(timeout); + + if (val != null) + println(cfg, "Entry [key=" + key + ", val=" + val + "]"); + } + } + + throw new IllegalStateException("Found different values for keys (see above information)."); + } + + break; + case 1: // Invoke scenario. + asyncCache.get(masterKey); + Long val = asyncCache.future().get(timeout); + + asyncCache.put(masterKey, val == null ? 0 : val + 1); + asyncCache.future().get(timeout); + + for (String key : keys) { + asyncCache.invoke(key, new IncrementCacheEntryProcessor()); + asyncCache.future().get(timeout); + } + + break; + } + + return true; + } + }); + } + + /** + * @param ignite Ignite instance. + * @param clo Closure. + * @return Result of closure execution. + * @throws Exception + */ + public static T doInTransaction(Ignite ignite, Callable clo) throws Exception { + while (true) { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + T res = clo.call(); + + tx.commit(); + + return res; + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) { + ClusterTopologyException topEx = (ClusterTopologyException)e.getCause(); + + topEx.retryReadyFuture().get(); + } + else + throw e; + } + catch (ClusterTopologyException e) { + e.retryReadyFuture().get(); + } + catch (TransactionRollbackException ignore) { + // Safe to retry right away. + } + } + } + + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-write-invoke"; + } + + /** + */ + private static class IncrementCacheEntryProcessor implements CacheEntryProcessor { + /** */ + private static final long serialVersionUID = 0; + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry entry, + Object... arguments) throws EntryProcessorException { + entry.setValue(entry.getValue() == null ? 0 : entry.getValue() + 1); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java new file mode 100644 index 0000000..c962749 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache.failover; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionRollbackException; + +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Transactional write read failover benchmark. + *

+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1', + * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with + * each key. Values must be equal. Client increments value by 1, commits the transaction. + */ +public class IgniteTransactionalWriteReadBenchmark extends IgniteFailoverAbstractBenchmark { + /** {@inheritDoc} */ + @Override public boolean test(Map ctx) throws Exception { + final int k = nextRandom(args.range()); + + assert args.keysCount() > 0 : "Count of keys: " + args.keysCount(); + + final String[] keys = new String[args.keysCount()]; + + for (int i = 0; i < keys.length; i++) + keys[i] = "key-" + k + "-" + i; + + return doInTransaction(ignite(), new Callable() { + @Override public Boolean call() throws Exception { + Map map = new HashMap<>(); + + final int timeout = args.cacheOperationTimeoutMillis(); + + for (String key : keys) { + asyncCache.get(key); + Long val = asyncCache.future().get(timeout); + + map.put(key, val); + } + + Set values = new HashSet<>(map.values()); + + if (values.size() != 1) { + // Print all usefull information and finish. + println(cfg, "Got different values for keys [map=" + map + "]"); + + println(cfg, "Cache content:"); + + for (int k = 0; k < args.range(); k++) { + for (int i = 0; i < args.keysCount(); i++) { + String key = "key-" + k + "-" + i; + + asyncCache.get(key); + Long val = asyncCache.future().get(timeout); + + if (val != null) + println(cfg, "Entry [key=" + key + ", val=" + val + "]"); + } + } + + throw new IllegalStateException("Found different values for keys (see above information)."); + } + + final Long oldVal = map.get(keys[0]); + + final Long newVal = oldVal == null ? 0 : oldVal + 1; + + for (String key : keys) { + asyncCache.put(key, newVal); + asyncCache.future().get(timeout); + } + + return true; + } + }); + } + + /** + * @param ignite Ignite instance. + * @param clo Closure. + * @return Result of closure execution. + * @throws Exception + */ + public static T doInTransaction(Ignite ignite, Callable clo) throws Exception { + while (true) { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + T res = clo.call(); + + tx.commit(); + + return res; + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) { + ClusterTopologyException topEx = (ClusterTopologyException)e.getCause(); + + topEx.retryReadyFuture().get(); + } + else + throw e; + } + catch (ClusterTopologyException e) { + e.retryReadyFuture().get(); + } + catch (TransactionRollbackException ignore) { + // Safe to retry right away. + } + } + } + + /** {@inheritDoc} */ + @Override protected String cacheName() { + return "tx-write-read"; + } +}