ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/8] ignite git commit: ignite-4932 When possible for cache 'get' read directly from offheap without entry creation.
Date Fri, 12 May 2017 07:13:52 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index e1d4484..56041ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -384,7 +385,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K,
V> {
 
         UUID subjId = ctx.subjectIdPerCall(null, opCtx);
 
-        Map<K, V> vals = new HashMap<>(keys.size(), 1.0f);
+        Map<K, V> vals = U.newHashMap(keys.size());
 
         if (keyCheck)
             validateCacheKeys(keys);
@@ -392,97 +393,142 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K,
V> {
         final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry()
: null);
 
         boolean success = true;
+        boolean readNoEntry = ctx.readNoEntry(expiry, false);
+        final boolean evt = !skipVals;
 
         for (K key : keys) {
             if (key == null)
                 throw new NullPointerException("Null key.");
 
-            GridCacheEntryEx entry = null;
-
             KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
 
-            while (true) {
-                try {
-                    entry = entryEx(cacheKey);
+            boolean skipEntry = readNoEntry;
 
-                    if (entry != null) {
-                        CacheObject v;
+            if (readNoEntry) {
+                CacheDataRow row = ctx.offheap().read(cacheKey);
 
-                        if (needVer) {
-                            EntryGetResult res = entry.innerGetVersioned(
-                                null,
-                                null,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                subjId,
-                                null,
-                                taskName,
-                                expiry,
-                                !deserializeBinary,
-                                null);
-
-                            if (res != null) {
-                                ctx.addResult(
-                                    vals,
-                                    cacheKey,
-                                    res,
-                                    skipVals,
-                                    false,
-                                    deserializeBinary,
-                                    true,
-                                    needVer);
-                            }
-                            else
-                                success = false;
-                        }
-                        else {
-                            v = entry.innerGet(
-                                null,
+                if (row != null) {
+                    long expireTime = row.expireTime();
+
+                    if (expireTime == 0 || expireTime > U.currentTimeMillis()) {
+                        ctx.addResult(vals,
+                            cacheKey,
+                            row.value(),
+                            skipVals,
+                            false,
+                            deserializeBinary,
+                            true,
+                            null,
+                            row.version(),
+                            0,
+                            0,
+                            needVer);
+
+                        if (configuration().isStatisticsEnabled() && !skipVals)
+                            metrics0().onRead(true);
+
+                        if (evt) {
+                            ctx.events().readEvent(cacheKey,
                                 null,
-                                /*read-through*/false,
-                                /**update-metrics*/true,
-                                /**event*/!skipVals,
+                                row.value(),
                                 subjId,
-                                null,
                                 taskName,
-                                expiry,
                                 !deserializeBinary);
+                        }
+                    }
+                    else
+                        skipEntry = false;
+                }
+                else
+                    success = false;
+            }
 
-                            if (v != null) {
-                                ctx.addResult(vals,
-                                    cacheKey,
-                                    v,
-                                    skipVals,
-                                    false,
-                                    deserializeBinary,
-                                    true,
+            if (!skipEntry) {
+                GridCacheEntryEx entry = null;
+
+                while (true) {
+                    try {
+                        entry = entryEx(cacheKey);
+
+                        if (entry != null) {
+                            CacheObject v;
+
+                            if (needVer) {
+                                EntryGetResult res = entry.innerGetVersioned(
                                     null,
-                                    0,
-                                    0);
+                                    null,
+                                    /*update-metrics*/false,
+                                    /*event*/evt,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiry,
+                                    !deserializeBinary,
+                                    null);
+
+                                if (res != null) {
+                                    ctx.addResult(
+                                        vals,
+                                        cacheKey,
+                                        res,
+                                        skipVals,
+                                        false,
+                                        deserializeBinary,
+                                        true,
+                                        needVer);
+                                }
+                                else
+                                    success = false;
+                            }
+                            else {
+                                v = entry.innerGet(
+                                    null,
+                                    null,
+                                    /*read-through*/false,
+                                    /*update-metrics*/true,
+                                    /*event*/evt,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiry,
+                                    !deserializeBinary);
+
+                                if (v != null) {
+                                    ctx.addResult(vals,
+                                        cacheKey,
+                                        v,
+                                        skipVals,
+                                        false,
+                                        deserializeBinary,
+                                        true,
+                                        null,
+                                        0,
+                                        0);
+                                }
+                                else
+                                    success = false;
                             }
-                            else
-                                success = false;
                         }
-                    }
-                    else {
-                        if (!storeEnabled && configuration().isStatisticsEnabled()
&& !skipVals)
-                            metrics0().onRead(false);
+                        else {
+                            if (!storeEnabled && configuration().isStatisticsEnabled()
&& !skipVals)
+                                metrics0().onRead(false);
 
-                        success = false;
+                            success = false;
+                        }
+
+                        break; // While.
+                    }
+                    catch (GridCacheEntryRemovedException ignored) {
+                        // No-op, retry.
+                    }
+                    finally {
+                        if (entry != null)
+                            ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
                     }
 
-                    break; // While.
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    // No-op, retry.
+                    if (!success && storeEnabled)
+                        break;
                 }
-                finally {
-                    if (entry != null)
-                        ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
-                }
-
-                if (!success && storeEnabled)
-                    break;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 663040d..5961b8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -4099,6 +4099,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
 
+                    assert dataPacket != null : msg;
+
                     if (dataPacket.hasJoiningNodeData())
                         spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
new file mode 100644
index 0000000..9250e0b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ModifiedExpiryPolicy;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static volatile CountDownLatch processorStartLatch;
+
+    /** */
+    private static volatile CountDownLatch hangLatch;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0);
+
+        client = true;
+
+        startGrid(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicGet() throws Exception {
+        getTest(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxGet() throws Exception {
+        getTest(TRANSACTIONAL);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void getTest(CacheAtomicityMode atomicityMode) throws Exception {
+        boolean getAll[] = {true, false};
+        boolean cfgExpiryPlc[] = {false};
+        boolean withExpiryPlc[] = {false};
+        boolean heapCache[] = {false};
+
+        for (boolean getAll0 : getAll) {
+            for (boolean expiryPlc0 : cfgExpiryPlc) {
+                for (boolean withExpiryPlc0 : withExpiryPlc) {
+                    for (boolean heapCache0 : heapCache)
+                        doGet(atomicityMode, heapCache0, getAll0, expiryPlc0, withExpiryPlc0);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @param heapCache Heap cache flag.
+     * @param getAll Test getAll flag.
+     * @param cfgExpiryPlc Configured expiry policy flag.
+     * @param withExpiryPlc Custom expiry policy flag.
+     * @throws Exception If failed.
+     */
+    private void doGet(CacheAtomicityMode atomicityMode,
+        boolean heapCache,
+        final boolean getAll,
+        final boolean cfgExpiryPlc,
+        final boolean withExpiryPlc) throws Exception {
+        log.info("Test get [getAll=" + getAll + ", cfgExpiryPlc=" + cfgExpiryPlc + ']');
+
+        Ignite srv = ignite(0);
+
+        Ignite client = ignite(1);
+
+        final IgniteCache cache = client.createCache(cacheConfiguration(atomicityMode, heapCache,
cfgExpiryPlc));
+
+        final Map<Object, Object> data = new HashMap<>();
+
+        data.put(1, 1);
+        data.put(2, 2);
+
+        try {
+            // Get from compute closure.
+            {
+                cache.putAll(data);
+
+                hangLatch = new CountDownLatch(1);
+                processorStartLatch = new CountDownLatch(1);
+
+                IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>()
{
+                    @Override public Void call() throws Exception {
+                        if (getAll)
+                            cache.invokeAll(data.keySet(), new HangEntryProcessor());
+                        else
+                            cache.invoke(1, new HangEntryProcessor());
+
+                        return null;
+                    }
+                });
+
+                try {
+                    boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS);
+
+                    assertTrue(wait);
+
+                    if (getAll) {
+                        assertEquals(data, client.compute().affinityCall(cache.getName(),
1,
+                            new GetAllClosure(data.keySet(), cache.getName(), withExpiryPlc)));
+                    }
+                    else {
+                        assertEquals(1, client.compute().affinityCall(cache.getName(), 1,
+                            new GetClosure(1, cache.getName(), withExpiryPlc)));
+                    }
+
+                    hangLatch.countDown();
+
+                    fut.get();
+                }
+                finally {
+                    hangLatch.countDown();
+                }
+            }
+
+            // Local get.
+            {
+                cache.putAll(data);
+
+                hangLatch = new CountDownLatch(1);
+                processorStartLatch = new CountDownLatch(1);
+
+                IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>()
{
+                    @Override public Void call() throws Exception {
+                        if (getAll)
+                            cache.invokeAll(data.keySet(), new HangEntryProcessor());
+                        else
+                            cache.invoke(1, new HangEntryProcessor());
+
+                        return null;
+                    }
+                });
+
+                try {
+                    boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS);
+
+                    assertTrue(wait);
+
+                    IgniteCache srvCache = srv.cache(cache.getName());
+
+                    if (withExpiryPlc)
+                        srvCache = srvCache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+                    if (getAll) {
+                        assertEquals(data, srvCache.getAll(data.keySet()));
+                        assertEquals(data.size(), srvCache.getEntries(data.keySet()).size());
+                    }
+                    else {
+                        assertEquals(1, srvCache.get(1));
+                        assertEquals(1, srvCache.getEntry(1).getValue());
+                    }
+
+                    hangLatch.countDown();
+
+                    fut.get();
+                }
+                finally {
+                    hangLatch.countDown();
+                }
+            }
+        }
+        finally {
+            client.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param atomicityMode Atomicity mode.
+     * @param heapCache Heap cache flag.
+     * @param expiryPlc Expiry policy flag.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode,
+        boolean heapCache,
+        boolean expiryPlc) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setOnheapCacheEnabled(heapCache);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setName("testCache");
+
+        if (expiryPlc)
+            ccfg.setExpiryPolicyFactory(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES));
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    static class HangEntryProcessor implements CacheEntryProcessor {
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry entry, Object... arguments) {
+            assert processorStartLatch != null;
+            assert hangLatch != null;
+
+            try {
+                processorStartLatch.countDown();
+
+                if (!hangLatch.await(60, TimeUnit.SECONDS))
+                    throw new RuntimeException("Failed to wait for latch");
+            }
+            catch (Exception e) {
+                System.out.println("Unexpected error: " + e);
+
+                throw new EntryProcessorException(e);
+            }
+
+            entry.setValue(U.currentTimeMillis());
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class GetClosure implements IgniteCallable<Object> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private final int key;
+
+        /** */
+        private final String cacheName;
+
+        /** */
+        private final boolean withExpiryPlc;
+
+        /**
+         * @param key Key.
+         * @param cacheName Cache name.
+         * @param withExpiryPlc Custom expiry policy flag.
+         */
+        GetClosure(int key, String cacheName, boolean withExpiryPlc) {
+            this.key = key;
+            this.cacheName = cacheName;
+            this.withExpiryPlc = withExpiryPlc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            IgniteCache cache = ignite.cache(cacheName);
+
+            if (withExpiryPlc)
+                cache = cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+            Object val = cache.get(key);
+
+            CacheEntry e = cache.getEntry(key);
+
+            assertEquals(val, e.getValue());
+
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class GetAllClosure implements IgniteCallable<Object> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private final Set<Object> keys;
+
+        /** */
+        private final String cacheName;
+
+        /** */
+        private final boolean withExpiryPlc;
+
+        /**
+         * @param keys Keys.
+         * @param cacheName Cache name.
+         * @param withExpiryPlc Custom expiry policy flag.
+         */
+        GetAllClosure(Set<Object> keys, String cacheName, boolean withExpiryPlc) {
+            this.keys = keys;
+            this.cacheName = cacheName;
+            this.withExpiryPlc = withExpiryPlc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            IgniteCache cache = ignite.cache(cacheName);
+
+            if (withExpiryPlc)
+                cache = cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+            Map vals = cache.getAll(keys);
+
+            Collection<CacheEntry> entries = cache.getEntries(keys);
+
+            assertEquals(vals.size(), entries.size());
+
+            for (CacheEntry entry : entries) {
+                Object val = vals.get(entry.getKey());
+
+                assertEquals(val, entry.getValue());
+            }
+
+            return vals;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 3ff1bff..2b79367 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -1009,7 +1009,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         if (cacheMode() != PARTITIONED)
             return;
 
-        factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1));
+        factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 2));
 
         nearCache = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
index 5c12f84..7d4f90e 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java
@@ -79,10 +79,8 @@ public class GridHashMapLoadTest extends GridCommonAbstractTest {
 
         while (true) {
             Integer key = i++;
-            Integer val = i++;
 
-            map.put(key, new GridCacheMapEntry(ctx, ctx.toCacheKeyObject(key),
-                key.hashCode(), ctx.toCacheObject(val)) {
+            map.put(key, new GridCacheMapEntry(ctx, ctx.toCacheKeyObject(key)) {
                 @Override public boolean tmLock(IgniteInternalTx tx,
                     long timeout,
                     @Nullable GridCacheVersion serOrder,

http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 04a3753..943c5f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheVariableTopologySelf
 import org.apache.ignite.internal.processors.cache.IgniteAtomicCacheEntryProcessorNodeJoinTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNodeJoinTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest;
 import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
 import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest;
@@ -267,6 +268,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
 
         suite.addTest(new TestSuite(IgniteOnePhaseCommitInvokeTest.class));
 
+        suite.addTest(new TestSuite(IgniteCacheNoSyncForGetTest.class));
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java
new file mode 100644
index 0000000..83fe665
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetFromComputeBenchmark.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Benchmark created to verify that slow EntryProcessor does not affect 'get' performance.
+ */
+public class IgniteGetFromComputeBenchmark extends IgniteCacheAbstractBenchmark<Integer,
Object> {
+    /** */
+    private static final String CACHE_NAME = "atomic";
+
+    /** */
+    private IgniteCompute compute;
+
+    /** */
+    private IgniteCache asyncCache;
+
+    /** */
+    private ThreadLocal<IgniteFuture> invokeFut = new ThreadLocal<>();
+
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        if (args.preloadAmount() > args.range())
+            throw new IllegalArgumentException("Preloading amount (\"-pa\", \"--preloadAmount\")
" +
+                "must by less then the range (\"-r\", \"--range\").");
+
+        String cacheName = cache().getName();
+
+        println(cfg, "Loading data for cache: " + cacheName);
+
+        long start = System.nanoTime();
+
+        try (IgniteDataStreamer<Object, Object> dataLdr = ignite().dataStreamer(cacheName))
{
+            for (int i = 0; i < args.preloadAmount(); i++) {
+                dataLdr.addData(i, new SampleValue(i));
+
+                if (i % 100000 == 0) {
+                    if (Thread.currentThread().isInterrupted())
+                        break;
+
+                    println("Loaded entries: " + i);
+                }
+            }
+        }
+
+        println(cfg, "Finished populating data [time=" + ((System.nanoTime() - start) / 1_000_000)
+ "ms, " +
+            "amount=" + args.preloadAmount() + ']');
+
+        compute = ignite().compute();
+
+        asyncCache = cache().withAsync();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        IgniteFuture fut = invokeFut.get();
+
+        if (fut == null || fut.isDone()) {
+            Set<Integer> keys = new TreeSet<>();
+
+            for (int i = 0; i < 3; i++)
+                keys.add(nextRandom(args.range()));
+
+            asyncCache.invokeAll(keys, new SlowEntryProcessor(0));
+
+            invokeFut.set(asyncCache.future());
+        }
+
+        int key = nextRandom(args.range());
+
+        compute.affinityCall(CACHE_NAME, key, new GetClosure(key));
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<Integer, Object> cache() {
+        return ignite().cache(CACHE_NAME);
+    }
+
+    /**
+     *
+     */
+    public static class GetClosure implements IgniteCallable<Object> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private final int key;
+
+        /**
+         * @param key Key.
+         */
+        public GetClosure(int key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return ignite.cache(CACHE_NAME).get(key);
+        }
+    }
+
+    /**
+     *
+     */
+    public static class SlowEntryProcessor implements CacheEntryProcessor<Integer, Object,
Object> {
+        /** */
+        private Object val;
+
+        /**
+         * @param val Value.
+         */
+        public SlowEntryProcessor(Object val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Integer, Object> entry, Object...
args) {
+            try {
+                Thread.sleep(10);
+            }
+            catch (InterruptedException ignore) {
+                // No-op.
+            }
+
+            entry.setValue(val);
+
+            return null;
+        }
+    }
+}


Mime
View raw message