Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 52136200C72 for ; Fri, 12 May 2017 09:13:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 50BBE160BA3; Fri, 12 May 2017 07:13:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D0BC7160BCB for ; Fri, 12 May 2017 09:13:50 +0200 (CEST) Received: (qmail 32720 invoked by uid 500); 12 May 2017 07:13:49 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 32585 invoked by uid 99); 12 May 2017 07:13:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 May 2017 07:13:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 378E6E080D; Fri, 12 May 2017 07:13:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 12 May 2017 07:13:52 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/8] ignite git commit: ignite-4932 When possible for cache 'get' read directly from offheap without entry creation. archived-at: Fri, 12 May 2017 07:13:52 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index e1d4484..56041ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -384,7 +385,7 @@ public class GridLocalAtomicCache extends GridLocalCache { UUID subjId = ctx.subjectIdPerCall(null, opCtx); - Map vals = new HashMap<>(keys.size(), 1.0f); + Map vals = U.newHashMap(keys.size()); if (keyCheck) validateCacheKeys(keys); @@ -392,97 +393,142 @@ public class GridLocalAtomicCache extends GridLocalCache { final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null); boolean success = true; + boolean readNoEntry = ctx.readNoEntry(expiry, false); + final boolean evt = !skipVals; for (K key : keys) { if (key == null) throw new NullPointerException("Null key."); - GridCacheEntryEx entry = null; - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - while (true) { - try { - entry = entryEx(cacheKey); + boolean skipEntry = readNoEntry; - if (entry != null) { - CacheObject v; + if (readNoEntry) { + CacheDataRow row = ctx.offheap().read(cacheKey); - if (needVer) { - EntryGetResult res = entry.innerGetVersioned( - null, - null, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiry, - !deserializeBinary, - null); - - if (res != null) { - ctx.addResult( - vals, - cacheKey, - res, - skipVals, - false, - deserializeBinary, - true, - needVer); - } - else - success = false; - } - else { - v = entry.innerGet( - null, + if (row != null) { + long expireTime = row.expireTime(); + + if (expireTime == 0 || expireTime > U.currentTimeMillis()) { + ctx.addResult(vals, + cacheKey, + row.value(), + skipVals, + false, + deserializeBinary, + true, + null, + row.version(), + 0, + 0, + needVer); + + if (configuration().isStatisticsEnabled() && !skipVals) + metrics0().onRead(true); + + if (evt) { + ctx.events().readEvent(cacheKey, null, - /*read-through*/false, - /**update-metrics*/true, - /**event*/!skipVals, + row.value(), subjId, - null, taskName, - expiry, !deserializeBinary); + } + } + else + skipEntry = false; + } + else + success = false; + } - if (v != null) { - ctx.addResult(vals, - cacheKey, - v, - skipVals, - false, - deserializeBinary, - true, + if (!skipEntry) { + GridCacheEntryEx entry = null; + + while (true) { + try { + entry = entryEx(cacheKey); + + if (entry != null) { + CacheObject v; + + if (needVer) { + EntryGetResult res = entry.innerGetVersioned( null, - 0, - 0); + null, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiry, + !deserializeBinary, + null); + + if (res != null) { + ctx.addResult( + vals, + cacheKey, + res, + skipVals, + false, + deserializeBinary, + true, + needVer); + } + else + success = false; + } + else { + v = entry.innerGet( + null, + null, + /*read-through*/false, + /*update-metrics*/true, + /*event*/evt, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + + if (v != null) { + ctx.addResult(vals, + cacheKey, + v, + skipVals, + false, + deserializeBinary, + true, + null, + 0, + 0); + } + else + success = false; } - else - success = false; } - } - else { - if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals) - metrics0().onRead(false); + else { + if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals) + metrics0().onRead(false); - success = false; + success = false; + } + + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, retry. + } + finally { + if (entry != null) + ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); } - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - // No-op, retry. + if (!success && storeEnabled) + break; } - finally { - if (entry != null) - ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); - } - - if (!success && storeEnabled) - break; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 663040d..5961b8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -4099,6 +4099,8 @@ class ServerImpl extends TcpDiscoveryImpl { DiscoveryDataPacket dataPacket = msg.gridDiscoveryData(); + assert dataPacket != null : msg; + if (dataPacket.hasJoiningNodeData()) spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration())); http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java new file mode 100644 index 0000000..9250e0b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ModifiedExpiryPolicy; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +@SuppressWarnings("unchecked") +public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static volatile CountDownLatch processorStartLatch; + + /** */ + private static volatile CountDownLatch hangLatch; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(0); + + client = true; + + startGrid(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicGet() throws Exception { + getTest(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testTxGet() throws Exception { + getTest(TRANSACTIONAL); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void getTest(CacheAtomicityMode atomicityMode) throws Exception { + boolean getAll[] = {true, false}; + boolean cfgExpiryPlc[] = {false}; + boolean withExpiryPlc[] = {false}; + boolean heapCache[] = {false}; + + for (boolean getAll0 : getAll) { + for (boolean expiryPlc0 : cfgExpiryPlc) { + for (boolean withExpiryPlc0 : withExpiryPlc) { + for (boolean heapCache0 : heapCache) + doGet(atomicityMode, heapCache0, getAll0, expiryPlc0, withExpiryPlc0); + } + } + } + } + + /** + * @param atomicityMode Cache atomicity mode. + * @param heapCache Heap cache flag. + * @param getAll Test getAll flag. + * @param cfgExpiryPlc Configured expiry policy flag. + * @param withExpiryPlc Custom expiry policy flag. + * @throws Exception If failed. + */ + private void doGet(CacheAtomicityMode atomicityMode, + boolean heapCache, + final boolean getAll, + final boolean cfgExpiryPlc, + final boolean withExpiryPlc) throws Exception { + log.info("Test get [getAll=" + getAll + ", cfgExpiryPlc=" + cfgExpiryPlc + ']'); + + Ignite srv = ignite(0); + + Ignite client = ignite(1); + + final IgniteCache cache = client.createCache(cacheConfiguration(atomicityMode, heapCache, cfgExpiryPlc)); + + final Map data = new HashMap<>(); + + data.put(1, 1); + data.put(2, 2); + + try { + // Get from compute closure. + { + cache.putAll(data); + + hangLatch = new CountDownLatch(1); + processorStartLatch = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + if (getAll) + cache.invokeAll(data.keySet(), new HangEntryProcessor()); + else + cache.invoke(1, new HangEntryProcessor()); + + return null; + } + }); + + try { + boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS); + + assertTrue(wait); + + if (getAll) { + assertEquals(data, client.compute().affinityCall(cache.getName(), 1, + new GetAllClosure(data.keySet(), cache.getName(), withExpiryPlc))); + } + else { + assertEquals(1, client.compute().affinityCall(cache.getName(), 1, + new GetClosure(1, cache.getName(), withExpiryPlc))); + } + + hangLatch.countDown(); + + fut.get(); + } + finally { + hangLatch.countDown(); + } + } + + // Local get. + { + cache.putAll(data); + + hangLatch = new CountDownLatch(1); + processorStartLatch = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + if (getAll) + cache.invokeAll(data.keySet(), new HangEntryProcessor()); + else + cache.invoke(1, new HangEntryProcessor()); + + return null; + } + }); + + try { + boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS); + + assertTrue(wait); + + IgniteCache srvCache = srv.cache(cache.getName()); + + if (withExpiryPlc) + srvCache = srvCache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create()); + + if (getAll) { + assertEquals(data, srvCache.getAll(data.keySet())); + assertEquals(data.size(), srvCache.getEntries(data.keySet()).size()); + } + else { + assertEquals(1, srvCache.get(1)); + assertEquals(1, srvCache.getEntry(1).getValue()); + } + + hangLatch.countDown(); + + fut.get(); + } + finally { + hangLatch.countDown(); + } + } + } + finally { + client.destroyCache(cache.getName()); + } + } + + /** + * @param atomicityMode Atomicity mode. + * @param heapCache Heap cache flag. + * @param expiryPlc Expiry policy flag. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode, + boolean heapCache, + boolean expiryPlc) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setOnheapCacheEnabled(heapCache); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setName("testCache"); + + if (expiryPlc) + ccfg.setExpiryPolicyFactory(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES)); + + return ccfg; + } + + /** + * + */ + static class HangEntryProcessor implements CacheEntryProcessor { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry entry, Object... arguments) { + assert processorStartLatch != null; + assert hangLatch != null; + + try { + processorStartLatch.countDown(); + + if (!hangLatch.await(60, TimeUnit.SECONDS)) + throw new RuntimeException("Failed to wait for latch"); + } + catch (Exception e) { + System.out.println("Unexpected error: " + e); + + throw new EntryProcessorException(e); + } + + entry.setValue(U.currentTimeMillis()); + + return null; + } + } + + /** + * + */ + public static class GetClosure implements IgniteCallable { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private final int key; + + /** */ + private final String cacheName; + + /** */ + private final boolean withExpiryPlc; + + /** + * @param key Key. + * @param cacheName Cache name. + * @param withExpiryPlc Custom expiry policy flag. + */ + GetClosure(int key, String cacheName, boolean withExpiryPlc) { + this.key = key; + this.cacheName = cacheName; + this.withExpiryPlc = withExpiryPlc; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + IgniteCache cache = ignite.cache(cacheName); + + if (withExpiryPlc) + cache = cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create()); + + Object val = cache.get(key); + + CacheEntry e = cache.getEntry(key); + + assertEquals(val, e.getValue()); + + return val; + } + } + + /** + * + */ + public static class GetAllClosure implements IgniteCallable { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private final Set keys; + + /** */ + private final String cacheName; + + /** */ + private final boolean withExpiryPlc; + + /** + * @param keys Keys. + * @param cacheName Cache name. + * @param withExpiryPlc Custom expiry policy flag. + */ + GetAllClosure(Set keys, String cacheName, boolean withExpiryPlc) { + this.keys = keys; + this.cacheName = cacheName; + this.withExpiryPlc = withExpiryPlc; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + IgniteCache cache = ignite.cache(cacheName); + + if (withExpiryPlc) + cache = cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create()); + + Map vals = cache.getAll(keys); + + Collection entries = cache.getEntries(keys); + + assertEquals(vals.size(), entries.size()); + + for (CacheEntry entry : entries) { + Object val = vals.get(entry.getKey()); + + assertEquals(val, entry.getValue()); + } + + return vals; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index 3ff1bff..2b79367 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -1009,7 +1009,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs if (cacheMode() != PARTITIONED) return; - factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1)); + factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 2)); nearCache = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java index 5c12f84..7d4f90e 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java @@ -79,10 +79,8 @@ public class GridHashMapLoadTest extends GridCommonAbstractTest { while (true) { Integer key = i++; - Integer val = i++; - map.put(key, new GridCacheMapEntry(ctx, ctx.toCacheKeyObject(key), - key.hashCode(), ctx.toCacheObject(val)) { + map.put(key, new GridCacheMapEntry(ctx, ctx.toCacheKeyObject(key)) { @Override public boolean tmLock(IgniteInternalTx tx, long timeout, @Nullable GridCacheVersion serOrder, http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 04a3753..943c5f5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheVariableTopologySelf import org.apache.ignite.internal.processors.cache.IgniteAtomicCacheEntryProcessorNodeJoinTest; import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNodeJoinTest; import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest; import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop; import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest; @@ -267,6 +268,8 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(IgniteOnePhaseCommitInvokeTest.class)); + suite.addTest(new TestSuite(IgniteCacheNoSyncForGetTest.class)); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java new file mode 100644 index 0000000..83fe665 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.yardstick.cache.model.SampleValue; +import org.yardstickframework.BenchmarkConfiguration; + +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Benchmark created to verify that slow EntryProcessor does not affect 'get' performance. + */ +public class IgniteGetFromComputeBenchmark extends IgniteCacheAbstractBenchmark { + /** */ + private static final String CACHE_NAME = "atomic"; + + /** */ + private IgniteCompute compute; + + /** */ + private IgniteCache asyncCache; + + /** */ + private ThreadLocal invokeFut = new ThreadLocal<>(); + + /** {@inheritDoc} */ + @Override public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + if (args.preloadAmount() > args.range()) + throw new IllegalArgumentException("Preloading amount (\"-pa\", \"--preloadAmount\") " + + "must by less then the range (\"-r\", \"--range\")."); + + String cacheName = cache().getName(); + + println(cfg, "Loading data for cache: " + cacheName); + + long start = System.nanoTime(); + + try (IgniteDataStreamer dataLdr = ignite().dataStreamer(cacheName)) { + for (int i = 0; i < args.preloadAmount(); i++) { + dataLdr.addData(i, new SampleValue(i)); + + if (i % 100000 == 0) { + if (Thread.currentThread().isInterrupted()) + break; + + println("Loaded entries: " + i); + } + } + } + + println(cfg, "Finished populating data [time=" + ((System.nanoTime() - start) / 1_000_000) + "ms, " + + "amount=" + args.preloadAmount() + ']'); + + compute = ignite().compute(); + + asyncCache = cache().withAsync(); + } + + /** {@inheritDoc} */ + @Override public boolean test(Map ctx) throws Exception { + IgniteFuture fut = invokeFut.get(); + + if (fut == null || fut.isDone()) { + Set keys = new TreeSet<>(); + + for (int i = 0; i < 3; i++) + keys.add(nextRandom(args.range())); + + asyncCache.invokeAll(keys, new SlowEntryProcessor(0)); + + invokeFut.set(asyncCache.future()); + } + + int key = nextRandom(args.range()); + + compute.affinityCall(CACHE_NAME, key, new GetClosure(key)); + + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteCache cache() { + return ignite().cache(CACHE_NAME); + } + + /** + * + */ + public static class GetClosure implements IgniteCallable { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private final int key; + + /** + * @param key Key. + */ + public GetClosure(int key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return ignite.cache(CACHE_NAME).get(key); + } + } + + /** + * + */ + public static class SlowEntryProcessor implements CacheEntryProcessor { + /** */ + private Object val; + + /** + * @param val Value. + */ + public SlowEntryProcessor(Object val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry entry, Object... args) { + try { + Thread.sleep(10); + } + catch (InterruptedException ignore) { + // No-op. + } + + entry.setValue(val); + + return null; + } + } +}