ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-2630 Added CacheInterceptorEntry class with getPartitionCounter method. Added tests.
Date Mon, 28 Mar 2016 19:35:16 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2630 [created] 8a9a048c9


IGNITE-2630 Added CacheInterceptorEntry class with getPartitionCounter method. Added tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8a9a048c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8a9a048c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8a9a048c

Branch: refs/heads/ignite-2630
Commit: 8a9a048c92fb24eacd82a75646062e0c4aa2e406
Parents: 35fd10d
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Mon Mar 28 22:35:17 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Mon Mar 28 22:35:17 2016 +0300

----------------------------------------------------------------------
 .../ignite/cache/CacheInterceptorEntry.java     |  39 +
 .../processors/cache/CacheLazyEntry.java        |  46 +-
 .../processors/cache/GridCacheMapEntry.java     |  19 +-
 ...torPartitionCounterRandomOperationsTest.java | 967 +++++++++++++++++++
 .../IgniteCacheInterceptorSelfTestSuite.java    |   1 +
 5 files changed, 1062 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8a9a048c/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java
b/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java
new file mode 100644
index 0000000..a9bd8f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import javax.cache.Cache;
+
+/**
+ * A cache interceptor map entry.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of value
+ */
+public abstract class CacheInterceptorEntry<K, V> implements Cache.Entry<K, V>
{
+    /**
+     * Each cache update increases partition counter. The same cache updates have on the
same value of counter
+     * on primary and backup nodes. This value can be useful to communicate with external
applications.
+     * The value has sense only for entries get by {@link CacheInterceptor#onAfterPut(Cache.Entry)}
and
+     * {@link CacheInterceptor#onAfterRemove(Cache.Entry)} methods. For entries got by other
methods will return
+     * {@code 0}.
+     *
+     * @return Value of counter for this entry.
+     */
+    public abstract long getPartitionUpdateCounter();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8a9a048c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
index 6ec17c0..b7db62d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import javax.cache.Cache;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheInterceptorEntry;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  *
  */
-public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
+public class CacheLazyEntry<K, V> extends CacheInterceptorEntry<K, V> {
     /** Cache context. */
     protected GridCacheContext cctx;
 
@@ -46,6 +46,9 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V>
{
     /** Keep binary flag. */
     private boolean keepBinary;
 
+    /** Update counter. */
+    private Long updateCntr;
+
     /**
      * @param cctx Cache context.
      * @param keyObj Key cache object.
@@ -85,6 +88,31 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V>
{
         K key,
         CacheObject valObj,
         V val,
+        boolean keepBinary,
+        Long updateCntr
+    ) {
+        this.cctx = ctx;
+        this.keyObj = keyObj;
+        this.key = key;
+        this.valObj = valObj;
+        this.val = val;
+        this.keepBinary = keepBinary;
+        this.updateCntr = updateCntr;
+    }
+
+    /**
+     * @param ctx Cache context.
+     * @param keyObj Key cache object.
+     * @param key Key value.
+     * @param valObj Cache object
+     * @param keepBinary Keep binary flag.
+     * @param val Cache value.
+     */
+    public CacheLazyEntry(GridCacheContext<K, V> ctx,
+        KeyCacheObject keyObj,
+        K key,
+        CacheObject valObj,
+        V val,
         boolean keepBinary
     ) {
         this.cctx = ctx;
@@ -137,6 +165,20 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K,
V> {
     }
 
     /** {@inheritDoc} */
+    @Override public long getPartitionUpdateCounter() {
+        return updateCntr == null ? 0L : updateCntr;
+    }
+
+    /**
+     * Sets update counter.
+     *
+     * @param updateCntr Update counter.
+     */
+    public void updateCounter(Long updateCntr) {
+        this.updateCntr = updateCntr;
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <T> T unwrap(Class<T> cls) {
         if (cls.isAssignableFrom(Ignite.class))

http://git-wip-us.apache.org/repos/asf/ignite/blob/8a9a048c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c5df29b..58347f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1253,7 +1253,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
             cctx.store().put(tx, key, val, newVer);
 
         if (intercept)
-            cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0,
val, val0, keepBinary));
+            cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0,
val, val0, keepBinary, updateCntr0));
 
         return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateCntr0)
:
             new GridCacheUpdateTxResult(false, null);
@@ -1303,7 +1303,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
         IgniteBiTuple<Boolean, Object> interceptRes = null;
 
-        Cache.Entry entry0 = null;
+        CacheLazyEntry entry0 = null;
 
         Long updateCntr0;
 
@@ -1468,8 +1468,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
             onMarkedObsolete();
         }
 
-        if (intercept)
+        if (intercept) {
+            entry0.updateCounter(updateCntr0);
+
             cctx.config().getInterceptor().onAfterRemove(entry0);
+        }
 
         if (valid) {
             CacheObject ret;
@@ -1816,9 +1819,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
             if (intercept) {
                 if (op == GridCacheOperation.UPDATE)
-                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key,
key0, updated, updated0, keepBinary));
+                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key,
key0, updated, updated0, keepBinary, 0L));
                 else
-                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx,
key, key0, old, old0, keepBinary));
+                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx,
key, key0, old, old0, keepBinary, 0L));
             }
         }
 
@@ -2389,7 +2392,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
             else {
                 if (intercept) {
                     interceptRes = cctx.config().getInterceptor().onBeforeRemove(new CacheLazyEntry(cctx,
key, key0,
-                        oldVal, old0, keepBinary));
+                        oldVal, old0, keepBinary, updateCntr0));
 
                     if (cctx.cancelRemove(interceptRes))
                         return new GridCacheUpdateAtomicResult(false,
@@ -2492,9 +2495,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
             if (intercept) {
                 if (op == GridCacheOperation.UPDATE)
-                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key,
key0, updated, updated0, keepBinary));
+                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key,
key0, updated, updated0, keepBinary, updateCntr0));
                 else
-                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx,
key, key0, oldVal, old0, keepBinary));
+                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx,
key, key0, oldVal, old0, keepBinary, updateCntr0));
 
                 if (interceptRes != null)
                     oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/8a9a048c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
new file mode 100644
index 0000000..eaae7c5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheInterceptorAdapter;
+import org.apache.ignite.cache.CacheInterceptorEntry;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheInterceptorPartitionCounterRandomOperationsTest extends GridCommonAbstractTest
{
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static final int KEYS = 50;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    public static final int ITERATION_CNT = 100;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static ConcurrentMap<UUID, BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>>
+        afterPutEvts = new ConcurrentHashMap<>();
+
+    /** */
+    private static ConcurrentMap<UUID, BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>>
+        afterRmvEvts = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        afterPutEvts.clear();
+        afterRmvEvts.clear();
+
+        for (int i = 0; i < NODES; i++) {
+            afterRmvEvts.put(grid(i).cluster().localNode().id(),
+                new BlockingArrayQueue<Cache.Entry<QueryTestKey, QueryTestValue>>());
+            afterPutEvts.put(grid(i).cluster().localNode().id(),
+                new BlockingArrayQueue<Cache.Entry<QueryTestKey, QueryTestValue>>());
+        }
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValues() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_VALUES,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTiered() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicNoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValues() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValuesExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTiered() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTieredExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    protected void doTestPartitionCounterOperation(CacheConfiguration<Object, Object>
ccfg)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                if (i % 20 == 0)
+                    log.info("Iteration: " + i);
+
+                for (int idx = 0; idx < NODES; idx++)
+                    randomUpdate(rnd, expData, partCntr, grid(idx).cache(ccfg.getName()));
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param expData Expected cache data.
+     * @param partCntr Partition counter.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        ConcurrentMap<Object, Object> expData,
+        Map<Integer, Long> partCntr,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new QueryTestKey(rnd.nextInt(1));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(11);
+
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        Transaction tx = null;
+
+        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL
&& rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+        try {
+            //log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal,
false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 1: {
+                    cache.getAndPut(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal,
false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 2: {
+                    cache.remove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal,
true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 3: {
+                    cache.getAndRemove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal,
true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal,
false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal,
true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal,
null, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(getInterceptorQueues(cache, key, false));
+
+                    break;
+                }
+
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal,
null, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(getInterceptorQueues(cache, key, false));
+
+                    break;
+                }
+
+                case 8: {
+                    cache.replace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal,
oldVal, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(getInterceptorQueues(cache, key, false));
+
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal,
oldVal, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(getInterceptorQueues(cache, key, false));
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            updatePartitionCounter(cache, key, partCntr);
+
+                            waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal,
oldVal, false);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(getInterceptorQueues(cache, key, false));
+                        }
+                    }
+                    else {
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
+
+                        checkNoEvent(getInterceptorQueues(cache, key, false));
+                    }
+
+                    break;
+                }
+
+                default:
+                    fail("Op:" + op);
+            }
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param rmv Remove operation.
+     * @return Queues.
+     */
+    @NotNull private List<BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>>
getInterceptorQueues(
+        IgniteCache<Object, Object> cache,
+        Object key,
+        boolean rmv
+    ) {
+        Collection<ClusterNode> nodes =
+            Collections.singletonList(affinity(cache).mapKeyToPrimaryAndBackups(key).iterator().next());
+
+        List<BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>>
queues = new ArrayList<>();
+
+        for (ClusterNode node : nodes)
+            queues.add(rmv ? afterRmvEvts.get(node.id()) : afterPutEvts.get(node.id()));
+
+        return queues;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        int val = rnd.nextInt(3);
+
+        if (val == 0)
+            return READ_COMMITTED;
+        else if (val == 1)
+            return REPEATABLE_READ;
+        else
+            return SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key
+     * @param cntrs Partition counters.
+     */
+    private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key,
Map<Integer, Long> cntrs) {
+        Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+        int part = aff.partition(key);
+
+        Long partCntr = cntrs.get(part);
+
+        if (partCntr == null)
+            partCntr = 0L;
+
+        cntrs.put(part, ++partCntr);
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new QueryTestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @param partCntrs Partition counters.
+     * @param aff Affinity function.
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @param rmv Remove operation.
+     * @throws Exception If failed.
+     */
+    private void waitAndCheckEvent(IgniteCache cache,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        Object key,
+        Object val,
+        Object oldVal,
+        boolean rmv)
+        throws Exception {
+        Collection<BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>>>
entries = getInterceptorQueues(cache, key,
+            rmv);
+
+        if (val == null && oldVal == null) {
+            checkNoEvent(entries);
+
+            return;
+        }
+
+        for (BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> evtsQueue
: entries) {
+            Cache.Entry<QueryTestKey, QueryTestValue> entry = evtsQueue.poll(5, SECONDS);
+
+            assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal="
+ oldVal + ']', entry);
+            assertEquals(key, entry.getKey());
+            assertEquals(rmv ? oldVal : val, entry.getValue());
+
+            long cntr = partCntrs.get(aff.partition(key));
+            CacheInterceptorEntry qryEntryEvt = entry.unwrap(CacheInterceptorEntry.class);
+
+            assertNotNull(cntr);
+            assertNotNull(qryEntryEvt);
+
+            assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+        }
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(Collection<BlockingQueue<Cache.Entry<QueryTestKey,
QueryTestValue>>> evtsQueues)
+        throws Exception {
+        for (BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> evtsQueue
: evtsQueues) {
+            Cache.Entry<QueryTestKey, QueryTestValue> evt = evtsQueue.poll(50, MILLISECONDS);
+
+            assertTrue(evt == null || evt.getValue() == null);
+        }
+    }
+
+    /**
+     *
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @param store If {@code true} configures dummy cache store.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        boolean store) {
+        CacheConfiguration<QueryTestKey, QueryTestValue> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        ccfg.setInterceptor(new TestInterceptor());
+
+        return (CacheConfiguration)ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>>
{
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException
{
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public QueryTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestKey that = (QueryTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((QueryTestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestInterceptor extends CacheInterceptorAdapter<QueryTestKey,
QueryTestValue> {
+        /** {@inheritDoc} */
+        @Override public void onAfterPut(Cache.Entry<QueryTestKey, QueryTestValue>
e) {
+            e.getKey();
+            e.getValue();
+
+            UUID id = e.unwrap(Ignite.class).cluster().localNode().id();
+
+            BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> ents = afterPutEvts.get(id);
+
+            if (ents == null) {
+                ents = new BlockingArrayQueue<>();
+
+                BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> oldVal
= afterPutEvts.putIfAbsent(id, ents);
+
+                ents = oldVal == null ? ents : oldVal;
+            }
+
+            ents.add(e);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAfterRemove(Cache.Entry<QueryTestKey, QueryTestValue>
e) {
+            e.getKey();
+            e.getValue();
+
+            UUID id = e.unwrap(Ignite.class).cluster().localNode().id();
+
+            BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> ents = afterRmvEvts.get(id);
+
+            if (ents == null) {
+                ents = new BlockingArrayQueue<>();
+
+                BlockingQueue<Cache.Entry<QueryTestKey, QueryTestValue>> oldVal
= afterRmvEvts.putIfAbsent(id, ents);
+
+                ents = oldVal == null ? ents : oldVal;
+            }
+
+            ents.add(e);
+        }
+    }
+
+    /**
+     *
+     */
+    public static class QueryTestValue implements Serializable {
+        /** */
+        @GridToStringInclude
+        protected final Integer val1;
+
+        /** */
+        @GridToStringInclude
+        protected final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public QueryTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestValue that = (QueryTestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestValue.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object,
Object> {
+        /** */
+        private Object val;
+
+        /** */
+        private boolean retOld;
+
+        /**
+         * @param val Value to set.
+         * @param retOld Return old value flag.
+         */
+        public EntrySetValueProcessor(Object val, boolean retOld) {
+            this.val = val;
+            this.retOld = retOld;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> e, Object... args)
{
+            Object old = retOld ? e.getValue() : null;
+
+            if (val != null)
+                e.setValue(val);
+            else
+                e.remove();
+
+            return old;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EntrySetValueProcessor.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8a9a048c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
index 9b219b7..745fb90 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
@@ -55,6 +55,7 @@ public class IgniteCacheInterceptorSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheOnCopyFlagReplicatedSelfTest.class);
         suite.addTestSuite(GridCacheOnCopyFlagLocalSelfTest.class);
         suite.addTestSuite(GridCacheOnCopyFlagAtomicSelfTest.class);
+        suite.addTestSuite(CacheInterceptorPartitionCounterRandomOperationsTest.class);
 
         return suite;
     }


Mime
View raw message