ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-2630 Added sanity tests for local cache.
Date Thu, 07 Apr 2016 11:07:51 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2630 b4d997dae -> 668895563


IGNITE-2630 Added sanity tests for local cache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/66889556
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/66889556
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/66889556

Branch: refs/heads/ignite-2630
Commit: 668895563d9b6be8449d6e571dbf093120d858b9
Parents: b4d997d
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Thu Apr 7 14:07:46 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Thu Apr 7 14:07:46 2016 +0300

----------------------------------------------------------------------
 ...nCounterLocalSanityRandomOperationsTest.java | 687 +++++++++++++++++++
 .../IgniteCacheInterceptorSelfTestSuite.java    |   1 +
 2 files changed, 688 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/66889556/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityRandomOperationsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityRandomOperationsTest.java
new file mode 100644
index 0000000..6633b8e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityRandomOperationsTest.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheInterceptorAdapter;
+import org.apache.ignite.cache.CacheInterceptorEntry;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheInterceptorPartitionCounterLocalSanityRandomOperationsTest extends GridCommonAbstractTest
{
+    /** */
+    private static final int NODES = 1;
+
+    /** */
+    private static final int KEYS = 50;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    private static final int ITERATION_CNT = 100;
+
+    /** */
+    private static BlockingQueue<Cache.Entry<TestKey, TestValue>> afterPutEvts;
+
+    /** */
+    private static BlockingQueue<Cache.Entry<TestKey, TestValue>> afterRmvEvts;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        afterPutEvts = new BlockingArrayQueue<>();
+        afterRmvEvts = new BlockingArrayQueue<>();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocal() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(
+            ATOMIC,
+            ONHEAP_TIERED,
+            true);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalTxWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            true);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void doTestPartitionCounterOperation(CacheConfiguration<Object, Object>
ccfg)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                if (i % 20 == 0)
+                    log.info("Iteration: " + i);
+
+                randomUpdate(rnd, expData, grid(0).cache(ccfg.getName()));
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param expData Expected cache data.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        ConcurrentMap<Object, Object> expData,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new TestKey(rnd.nextInt(KEYS));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(11);
+
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        Transaction tx = null;
+
+        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL
&& rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+        try {
+            //log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, newVal, oldVal, false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 1: {
+                    cache.getAndPut(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, newVal, oldVal, false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 2: {
+                    cache.remove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, null, oldVal, true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 3: {
+                    cache.getAndRemove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, null, oldVal, true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, newVal, oldVal, false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, null, oldVal, true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        waitAndCheckEvent(key, newVal, null, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(afterPutEvts);
+
+                    break;
+                }
+
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        waitAndCheckEvent(key, newVal, null, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(afterPutEvts);
+
+                    break;
+                }
+
+                case 8: {
+                    cache.replace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        waitAndCheckEvent(key, newVal, oldVal, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(afterPutEvts);
+
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        waitAndCheckEvent(key, newVal, oldVal, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(afterPutEvts);
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            waitAndCheckEvent(key, newVal, oldVal, false);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(afterPutEvts);
+                        }
+                    }
+                    else {
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
+
+                        checkNoEvent(afterPutEvts);
+                    }
+
+                    break;
+                }
+
+                default:
+                    fail("Op:" + op);
+            }
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        int val = rnd.nextInt(3);
+
+        if (val == 0)
+            return READ_COMMITTED;
+        else if (val == 1)
+            return REPEATABLE_READ;
+        else
+            return SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new TestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @param rmv Remove operation.
+     * @throws Exception If failed.
+     */
+    private void waitAndCheckEvent(
+        Object key,
+        Object val,
+        Object oldVal,
+        boolean rmv)
+        throws Exception {
+        BlockingQueue<Cache.Entry<TestKey, TestValue>> evtsQueue = rmv ? afterRmvEvts
: afterPutEvts;
+
+        if (val == null && oldVal == null) {
+            checkNoEvent(evtsQueue);
+
+            return;
+        }
+
+        Cache.Entry<TestKey, TestValue> entry = evtsQueue.poll(5, SECONDS);
+
+        assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal="
+ oldVal + ']', entry);
+        assertEquals(key, entry.getKey());
+        assertEquals(rmv ? oldVal : val, entry.getValue());
+
+        CacheInterceptorEntry interceptorEntry = entry.unwrap(CacheInterceptorEntry.class);
+
+        assertNotNull(interceptorEntry);
+
+        // For local cache partition counter always zero.
+        assertEquals(0, interceptorEntry.getPartitionUpdateCounter());
+
+        assertNull(evtsQueue.peek());
+    }
+
+    /**
+     * @param evtsQueue Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(BlockingQueue<Cache.Entry<TestKey, TestValue>>
evtsQueue) throws Exception {
+        Cache.Entry<TestKey, TestValue> evt = evtsQueue.poll(50, MILLISECONDS);
+
+        assertTrue(evt == null || evt.getValue() == null);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @param store If {@code true} configures dummy cache store.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        boolean store) {
+        CacheConfiguration<TestKey, TestValue> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setCacheMode(LOCAL);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        ccfg.setInterceptor(new TestInterceptor());
+
+        return (CacheConfiguration)ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>>
{
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException
{
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public TestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey that = (TestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(@NotNull Object o) {
+            return key - ((TestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestInterceptor extends CacheInterceptorAdapter<TestKey, TestValue>
{
+        /** {@inheritDoc} */
+        @Override public void onAfterPut(Cache.Entry<TestKey, TestValue> e) {
+            e.getKey();
+            e.getValue();
+
+            afterPutEvts.add(e);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAfterRemove(Cache.Entry<TestKey, TestValue> e) {
+            e.getKey();
+            e.getValue();
+
+            afterRmvEvts.add(e);
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TestValue implements Serializable {
+        /** */
+        @GridToStringInclude
+        protected final Integer val1;
+
+        /** */
+        @GridToStringInclude
+        protected final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue that = (TestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class EntrySetValueProcessor implements EntryProcessor<Object, Object,
Object> {
+        /** */
+        private Object val;
+
+        /** */
+        private boolean retOld;
+
+        /**
+         * @param val Value to set.
+         * @param retOld Return old value flag.
+         */
+        EntrySetValueProcessor(Object val, boolean retOld) {
+            this.val = val;
+            this.retOld = retOld;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> e, Object... args)
{
+            Object old = retOld ? e.getValue() : null;
+
+            if (val != null)
+                e.setValue(val);
+            else
+                e.remove();
+
+            return old;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EntrySetValueProcessor.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/66889556/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
index 745fb90..e94b968 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
@@ -56,6 +56,7 @@ public class IgniteCacheInterceptorSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheOnCopyFlagLocalSelfTest.class);
         suite.addTestSuite(GridCacheOnCopyFlagAtomicSelfTest.class);
         suite.addTestSuite(CacheInterceptorPartitionCounterRandomOperationsTest.class);
+        suite.addTestSuite(CacheInterceptorPartitionCounterLocalSanityRandomOperationsTest.class);
 
         return suite;
     }


Mime
View raw message