ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [09/50] [abbrv] ignite git commit: Fixed "IGNITE-2630 Make 'updateCntr' available through CacheInterceptor API".
Date Fri, 22 Apr 2016 12:29:51 GMT
Fixed "IGNITE-2630 Make 'updateCntr' available through CacheInterceptor API".


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cead1e0d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cead1e0d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cead1e0d

Branch: refs/heads/ignite-db-x-10884
Commit: cead1e0d31479f3925bfea092c29a85c4f40469d
Parents: d3569f7
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Thu Apr 7 20:06:12 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Thu Apr 7 20:06:33 2016 +0300

----------------------------------------------------------------------
 .../ignite/cache/CacheInterceptorEntry.java     |   39 +
 .../cache/query/CacheQueryEntryEvent.java       |   12 +-
 .../processors/cache/CacheLazyEntry.java        |   47 +-
 .../processors/cache/GridCacheMapEntry.java     |   19 +-
 ...erceptorPartitionCounterLocalSanityTest.java |  687 ++++++++++++
 ...torPartitionCounterRandomOperationsTest.java | 1054 ++++++++++++++++++
 .../IgniteCacheInterceptorSelfTestSuite.java    |    2 +
 7 files changed, 1844 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java
new file mode 100644
index 0000000..61be00a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import javax.cache.Cache;
+
+/**
+ * A cache interceptor map entry.
+ *
+ * @param <K> The type of key.
+ * @param <V> The type of value.
+ */
+public abstract class CacheInterceptorEntry<K, V> implements Cache.Entry<K, V> {
+    /**
+     * Each cache update increases partition counter. The same cache updates have on the same value of counter
+     * on primary and backup nodes. This value can be useful to communicate with external applications.
+     * The value has sense only for entries get by {@link CacheInterceptor#onAfterPut(Cache.Entry)} and
+     * {@link CacheInterceptor#onAfterRemove(Cache.Entry)} methods. For entries got by other methods will return
+     * {@code 0}.
+     *
+     * @return Value of counter for this entry.
+     */
+    public abstract long getPartitionUpdateCounter();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
index 2c1c5e6..3a7994f 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
@@ -24,18 +24,18 @@ import javax.cache.event.EventType;
 /**
  * A Cache continuous query entry event.
  *
- * @param <K> the type of key
- * @param <V> the type of value
+ * @param <K> The type of key.
+ * @param <V> The type of value.
  */
 public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEvent<K, V> {
     /**
      * Constructs a cache entry event from a given cache as source.
      *
-     * @param source the cache that originated the event
-     * @param eventType Event type.
+     * @param src The cache that originated the event.
+     * @param evtType Event type.
      */
-    public CacheQueryEntryEvent(Cache source, EventType eventType) {
-        super(source, eventType);
+    public CacheQueryEntryEvent(Cache src, EventType evtType) {
+        super(src, evtType);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
index c1fcb77..c8cfc99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import javax.cache.Cache;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheInterceptorEntry;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  *
  */
-public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
+public class CacheLazyEntry<K, V> extends CacheInterceptorEntry<K, V> {
     /** Cache context. */
     protected GridCacheContext cctx;
 
@@ -46,6 +46,9 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
     /** Keep binary flag. */
     private boolean keepBinary;
 
+    /** Update counter. */
+    private Long updateCntr;
+
     /**
      * @param cctx Cache context.
      * @param keyObj Key cache object.
@@ -78,6 +81,32 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
      * @param key Key value.
      * @param valObj Cache object
      * @param keepBinary Keep binary flag.
+     * @param updateCntr Partition update counter.
+     * @param val Cache value.
+     */
+    public CacheLazyEntry(GridCacheContext ctx,
+        KeyCacheObject keyObj,
+        K key,
+        CacheObject valObj,
+        V val,
+        boolean keepBinary,
+        Long updateCntr
+    ) {
+        this.cctx = ctx;
+        this.keyObj = keyObj;
+        this.key = key;
+        this.valObj = valObj;
+        this.val = val;
+        this.keepBinary = keepBinary;
+        this.updateCntr = updateCntr;
+    }
+
+    /**
+     * @param ctx Cache context.
+     * @param keyObj Key cache object.
+     * @param key Key value.
+     * @param valObj Cache object
+     * @param keepBinary Keep binary flag.
      * @param val Cache value.
      */
     public CacheLazyEntry(GridCacheContext ctx,
@@ -144,6 +173,20 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public long getPartitionUpdateCounter() {
+        return updateCntr == null ? 0L : updateCntr;
+    }
+
+    /**
+     * Sets update counter.
+     *
+     * @param updateCntr Update counter.
+     */
+    public void updateCounter(Long updateCntr) {
+        this.updateCntr = updateCntr;
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <T> T unwrap(Class<T> cls) {
         if (cls.isAssignableFrom(Ignite.class))

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index a448307..1a052f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1258,7 +1258,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             cctx.store().put(tx, key, val, newVer);
 
         if (intercept)
-            cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary));
+            cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary, updateCntr0));
 
         return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateCntr0) :
             new GridCacheUpdateTxResult(false, null);
@@ -1308,7 +1308,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         IgniteBiTuple<Boolean, Object> interceptRes = null;
 
-        Cache.Entry entry0 = null;
+        CacheLazyEntry entry0 = null;
 
         Long updateCntr0;
 
@@ -1473,8 +1473,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             onMarkedObsolete();
         }
 
-        if (intercept)
+        if (intercept) {
+            entry0.updateCounter(updateCntr0);
+
             cctx.config().getInterceptor().onAfterRemove(entry0);
+        }
 
         if (valid) {
             CacheObject ret;
@@ -1820,9 +1823,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             if (intercept) {
                 if (op == GridCacheOperation.UPDATE)
-                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary));
+                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, 0L));
                 else
-                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary));
+                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary, 0L));
             }
         }
 
@@ -2394,7 +2397,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             else {
                 if (intercept) {
                     interceptRes = cctx.config().getInterceptor().onBeforeRemove(new CacheLazyEntry(cctx, key, key0,
-                        oldVal, old0, keepBinary));
+                        oldVal, old0, keepBinary, updateCntr0));
 
                     if (cctx.cancelRemove(interceptRes))
                         return new GridCacheUpdateAtomicResult(false,
@@ -2497,9 +2500,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             if (intercept) {
                 if (op == GridCacheOperation.UPDATE)
-                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary));
+                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, updateCntr0));
                 else
-                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary));
+                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary, updateCntr0));
 
                 if (interceptRes != null)
                     oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityTest.java
new file mode 100644
index 0000000..5db2781
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityTest.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheInterceptorAdapter;
+import org.apache.ignite.cache.CacheInterceptorEntry;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheInterceptorPartitionCounterLocalSanityTest extends GridCommonAbstractTest {
+    /** */
+    private static final int NODES = 1;
+
+    /** */
+    private static final int KEYS = 50;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    private static final int ITERATION_CNT = 100;
+
+    /** */
+    private static BlockingQueue<Cache.Entry<TestKey, TestValue>> afterPutEvts;
+
+    /** */
+    private static BlockingQueue<Cache.Entry<TestKey, TestValue>> afterRmvEvts;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        afterPutEvts = new BlockingArrayQueue<>();
+        afterRmvEvts = new BlockingArrayQueue<>();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocal() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(
+            ATOMIC,
+            ONHEAP_TIERED,
+            true);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalTxWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            true);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void doTestPartitionCounterOperation(CacheConfiguration<Object, Object> ccfg)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                if (i % 20 == 0)
+                    log.info("Iteration: " + i);
+
+                randomUpdate(rnd, expData, grid(0).cache(ccfg.getName()));
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param expData Expected cache data.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        ConcurrentMap<Object, Object> expData,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new TestKey(rnd.nextInt(KEYS));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(11);
+
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        Transaction tx = null;
+
+        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+        try {
+            //log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, newVal, oldVal, false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 1: {
+                    cache.getAndPut(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, newVal, oldVal, false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 2: {
+                    cache.remove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, null, oldVal, true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 3: {
+                    cache.getAndRemove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, null, oldVal, true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, newVal, oldVal, false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    waitAndCheckEvent(key, null, oldVal, true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        waitAndCheckEvent(key, newVal, null, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(afterPutEvts);
+
+                    break;
+                }
+
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        waitAndCheckEvent(key, newVal, null, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(afterPutEvts);
+
+                    break;
+                }
+
+                case 8: {
+                    cache.replace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        waitAndCheckEvent(key, newVal, oldVal, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(afterPutEvts);
+
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        waitAndCheckEvent(key, newVal, oldVal, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(afterPutEvts);
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            waitAndCheckEvent(key, newVal, oldVal, false);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(afterPutEvts);
+                        }
+                    }
+                    else {
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
+
+                        checkNoEvent(afterPutEvts);
+                    }
+
+                    break;
+                }
+
+                default:
+                    fail("Op:" + op);
+            }
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        int val = rnd.nextInt(3);
+
+        if (val == 0)
+            return READ_COMMITTED;
+        else if (val == 1)
+            return REPEATABLE_READ;
+        else
+            return SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new TestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @param rmv Remove operation.
+     * @throws Exception If failed.
+     */
+    private void waitAndCheckEvent(
+        Object key,
+        Object val,
+        Object oldVal,
+        boolean rmv)
+        throws Exception {
+        BlockingQueue<Cache.Entry<TestKey, TestValue>> evtsQueue = rmv ? afterRmvEvts : afterPutEvts;
+
+        if (val == null && oldVal == null) {
+            checkNoEvent(evtsQueue);
+
+            return;
+        }
+
+        Cache.Entry<TestKey, TestValue> entry = evtsQueue.poll(5, SECONDS);
+
+        assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', entry);
+        assertEquals(key, entry.getKey());
+        assertEquals(rmv ? oldVal : val, entry.getValue());
+
+        CacheInterceptorEntry interceptorEntry = entry.unwrap(CacheInterceptorEntry.class);
+
+        assertNotNull(interceptorEntry);
+
+        // For local cache partition counter always zero.
+        assertEquals(0, interceptorEntry.getPartitionUpdateCounter());
+
+        assertNull(evtsQueue.peek());
+    }
+
+    /**
+     * @param evtsQueue Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(BlockingQueue<Cache.Entry<TestKey, TestValue>> evtsQueue) throws Exception {
+        Cache.Entry<TestKey, TestValue> evt = evtsQueue.poll(50, MILLISECONDS);
+
+        assertTrue(evt == null || evt.getValue() == null);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @param store If {@code true} configures dummy cache store.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        boolean store) {
+        CacheConfiguration<TestKey, TestValue> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setCacheMode(LOCAL);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        ccfg.setInterceptor(new TestInterceptor());
+
+        return (CacheConfiguration)ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public TestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey that = (TestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(@NotNull Object o) {
+            return key - ((TestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestInterceptor extends CacheInterceptorAdapter<TestKey, TestValue> {
+        /** {@inheritDoc} */
+        @Override public void onAfterPut(Cache.Entry<TestKey, TestValue> e) {
+            e.getKey();
+            e.getValue();
+
+            afterPutEvts.add(e);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAfterRemove(Cache.Entry<TestKey, TestValue> e) {
+            e.getKey();
+            e.getValue();
+
+            afterRmvEvts.add(e);
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TestValue implements Serializable {
+        /** */
+        @GridToStringInclude
+        protected final Integer val1;
+
+        /** */
+        @GridToStringInclude
+        protected final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue that = (TestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+        /** */
+        private Object val;
+
+        /** */
+        private boolean retOld;
+
+        /**
+         * @param val Value to set.
+         * @param retOld Return old value flag.
+         */
+        EntrySetValueProcessor(Object val, boolean retOld) {
+            this.val = val;
+            this.retOld = retOld;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+            Object old = retOld ? e.getValue() : null;
+
+            if (val != null)
+                e.setValue(val);
+            else
+                e.remove();
+
+            return old;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EntrySetValueProcessor.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
new file mode 100644
index 0000000..055374d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java
@@ -0,0 +1,1054 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheInterceptorAdapter;
+import org.apache.ignite.cache.CacheInterceptorEntry;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.jetbrains.annotations.NotNull;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheInterceptorPartitionCounterRandomOperationsTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static final int KEYS = 50;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    public static final int ITERATION_CNT = 100;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static ConcurrentMap<UUID, BlockingQueue<Cache.Entry<TestKey, TestValue>>>
+        afterPutEvts = new ConcurrentHashMap<>();
+
+    /** */
+    private static ConcurrentMap<UUID, BlockingQueue<Cache.Entry<TestKey, TestValue>>>
+        afterRmvEvts = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        afterPutEvts.clear();
+        afterRmvEvts.clear();
+
+        for (int i = 0; i < NODES; i++) {
+            afterRmvEvts.put(grid(i).cluster().localNode().id(),
+                new BlockingArrayQueue<Cache.Entry<TestKey, TestValue>>());
+            afterPutEvts.put(grid(i).cluster().localNode().id(),
+                new BlockingArrayQueue<Cache.Entry<TestKey, TestValue>>());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            true);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            true);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValues() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_VALUES,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValuesWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_VALUES,
+            true);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTiered() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            OFFHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicNoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            true);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicatedWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            true);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValues() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapValuesExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_VALUES,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTiered() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapTieredExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            OFFHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsWithStore() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            true);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        doTestPartitionCounterOperation(ccfg);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    protected void doTestPartitionCounterOperation(CacheConfiguration<Object, Object> ccfg)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                if (i % 20 == 0)
+                    log.info("Iteration: " + i);
+
+                for (int idx = 0; idx < NODES; idx++)
+                    randomUpdate(rnd, expData, partCntr, grid(idx).cache(ccfg.getName()));
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param expData Expected cache data.
+     * @param partCntr Partition counter.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        ConcurrentMap<Object, Object> expData,
+        Map<Integer, Long> partCntr,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new TestKey(rnd.nextInt(KEYS));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(11);
+
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        Transaction tx = null;
+
+        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+        try {
+            //log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 1: {
+                    cache.getAndPut(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 2: {
+                    cache.remove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal, true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 3: {
+                    cache.getAndRemove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal, true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal, true);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, null, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(getInterceptorQueues(cache, key, false));
+
+                    break;
+                }
+
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, null, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(getInterceptorQueues(cache, key, false));
+
+                    break;
+                }
+
+                case 8: {
+                    cache.replace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(getInterceptorQueues(cache, key, false));
+
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(getInterceptorQueues(cache, key, false));
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            updatePartitionCounter(cache, key, partCntr);
+
+                            waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(getInterceptorQueues(cache, key, false));
+                        }
+                    }
+                    else {
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
+
+                        checkNoEvent(getInterceptorQueues(cache, key, false));
+                    }
+
+                    break;
+                }
+
+                default:
+                    fail("Op:" + op);
+            }
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param rmv Remove operation.
+     * @return Queues.
+     */
+    @NotNull private List<BlockingQueue<Cache.Entry<TestKey, TestValue>>> getInterceptorQueues(
+        IgniteCache<Object, Object> cache,
+        Object key,
+        boolean rmv
+    ) {
+        Collection<ClusterNode> nodes =
+            cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL ?
+                affinity(cache).mapKeyToPrimaryAndBackups(key) :
+                Collections.singletonList(affinity(cache).mapKeyToNode(key));
+
+        assert nodes.size() > 0;
+
+        List<BlockingQueue<Cache.Entry<TestKey, TestValue>>> queues = new ArrayList<>();
+
+        for (ClusterNode node : nodes)
+            queues.add(rmv ? afterRmvEvts.get(node.id()) : afterPutEvts.get(node.id()));
+
+        return queues;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        int val = rnd.nextInt(3);
+
+        if (val == 0)
+            return READ_COMMITTED;
+        else if (val == 1)
+            return REPEATABLE_READ;
+        else
+            return SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key
+     * @param cntrs Partition counters.
+     */
+    private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+        Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+        int part = aff.partition(key);
+
+        Long partCntr = cntrs.get(part);
+
+        if (partCntr == null)
+            partCntr = 0L;
+
+        cntrs.put(part, ++partCntr);
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new TestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @param partCntrs Partition counters.
+     * @param aff Affinity function.
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @param rmv Remove operation.
+     * @throws Exception If failed.
+     */
+    private void waitAndCheckEvent(IgniteCache<Object, Object> cache,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        Object key,
+        Object val,
+        Object oldVal,
+        boolean rmv)
+        throws Exception {
+        Collection<BlockingQueue<Cache.Entry<TestKey, TestValue>>> entries = getInterceptorQueues(cache, key,
+            rmv);
+
+        assert !entries.isEmpty();
+
+        if (val == null && oldVal == null) {
+            checkNoEvent(entries);
+
+            return;
+        }
+
+        for (BlockingQueue<Cache.Entry<TestKey, TestValue>> evtsQueue : entries) {
+            Cache.Entry<TestKey, TestValue> entry = evtsQueue.poll(5, SECONDS);
+
+            assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', entry);
+            assertEquals(key, entry.getKey());
+            assertEquals(rmv ? oldVal : val, entry.getValue());
+
+            long cntr = partCntrs.get(aff.partition(key));
+            CacheInterceptorEntry interceptorEntry = entry.unwrap(CacheInterceptorEntry.class);
+
+            assertNotNull(cntr);
+            assertNotNull(interceptorEntry);
+
+            assertEquals(cntr, interceptorEntry.getPartitionUpdateCounter());
+
+            assertNull(evtsQueue.peek());
+        }
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(Collection<BlockingQueue<Cache.Entry<TestKey, TestValue>>> evtsQueues)
+        throws Exception {
+        for (BlockingQueue<Cache.Entry<TestKey, TestValue>> evtsQueue : evtsQueues) {
+            Cache.Entry<TestKey, TestValue> evt = evtsQueue.poll(50, MILLISECONDS);
+
+            assertTrue(evt == null || evt.getValue() == null);
+        }
+    }
+
+    /**
+     *
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @param store If {@code true} configures dummy cache store.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        boolean store) {
+        CacheConfiguration<TestKey, TestValue> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        ccfg.setInterceptor(new TestInterceptor());
+
+        return (CacheConfiguration)ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public TestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey that = (TestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((TestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestInterceptor extends CacheInterceptorAdapter<TestKey, TestValue> {
+        /** {@inheritDoc} */
+        @Override public void onAfterPut(Cache.Entry<TestKey, TestValue> e) {
+            e.getKey();
+            e.getValue();
+
+            UUID id = e.unwrap(Ignite.class).cluster().localNode().id();
+
+            BlockingQueue<Cache.Entry<TestKey, TestValue>> ents = afterPutEvts.get(id);
+
+            if (ents == null) {
+                ents = new BlockingArrayQueue<>();
+
+                BlockingQueue<Cache.Entry<TestKey, TestValue>> oldVal = afterPutEvts.putIfAbsent(id, ents);
+
+                ents = oldVal == null ? ents : oldVal;
+            }
+
+            ents.add(e);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAfterRemove(Cache.Entry<TestKey, TestValue> e) {
+            e.getKey();
+            e.getValue();
+
+            UUID id = e.unwrap(Ignite.class).cluster().localNode().id();
+
+            BlockingQueue<Cache.Entry<TestKey, TestValue>> ents = afterRmvEvts.get(id);
+
+            if (ents == null) {
+                ents = new BlockingArrayQueue<>();
+
+                BlockingQueue<Cache.Entry<TestKey, TestValue>> oldVal = afterRmvEvts.putIfAbsent(id, ents);
+
+                ents = oldVal == null ? ents : oldVal;
+            }
+
+            ents.add(e);
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TestValue implements Serializable {
+        /** */
+        @GridToStringInclude
+        protected final Integer val1;
+
+        /** */
+        @GridToStringInclude
+        protected final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue that = (TestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+        /** */
+        private Object val;
+
+        /** */
+        private boolean retOld;
+
+        /**
+         * @param val Value to set.
+         * @param retOld Return old value flag.
+         */
+        public EntrySetValueProcessor(Object val, boolean retOld) {
+            this.val = val;
+            this.retOld = retOld;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+            Object old = retOld ? e.getValue() : null;
+
+            if (val != null)
+                e.setValue(val);
+            else
+                e.remove();
+
+            return old;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EntrySetValueProcessor.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
index 9b219b7..d19ecd7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java
@@ -55,6 +55,8 @@ public class IgniteCacheInterceptorSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheOnCopyFlagReplicatedSelfTest.class);
         suite.addTestSuite(GridCacheOnCopyFlagLocalSelfTest.class);
         suite.addTestSuite(GridCacheOnCopyFlagAtomicSelfTest.class);
+        suite.addTestSuite(CacheInterceptorPartitionCounterRandomOperationsTest.class);
+        suite.addTestSuite(CacheInterceptorPartitionCounterLocalSanityTest.class);
 
         return suite;
     }


Mime
View raw message