ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [22/28] ignite git commit: IGNITE-2559 Fixed Transaction hangs if entry processor is not serializable. This closes #951.
Date Mon, 22 Aug 2016 03:26:54 GMT
IGNITE-2559 Fixed Transaction hangs if entry processor is not serializable. This closes #951.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8aa534a6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8aa534a6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8aa534a6

Branch: refs/heads/master
Commit: 8aa534a6124c066801e6627f36179934653fe59f
Parents: 97d1a6f
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Thu Aug 18 18:21:22 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Thu Aug 18 18:21:22 2016 +0300

----------------------------------------------------------------------
 .../GridNearPessimisticTxPrepareFuture.java     |   2 +
 .../near/GridNearTxFinishFuture.java            |  28 +-
 .../cache/distributed/near/GridNearTxLocal.java |  10 +-
 .../cache/transactions/IgniteTxHandler.java     |   1 -
 .../CacheEntryProcessorNonSerializableTest.java | 410 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 6 files changed, 437 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 5d347d7..ef2edc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -291,6 +291,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                     }
 
                     fut.onError(e);
+
+                    break;
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 39f3ff3..adde63c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -97,7 +97,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     @GridToStringInclude
     private GridNearTxLocal tx;
 
-    /** Commit flag. */
+    /** Commit flag. This flag used only for one-phase commit transaction. */
     private boolean commit;
 
     /** Node mappings. */
@@ -313,6 +313,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 }
             }
 
+            if (commit && tx.commitError() != null)
+                err = tx.commitError();
+
             if (initialized() || err != null) {
                 if (tx.needCheckBackup()) {
                     assert tx.onePhaseCommit();
@@ -386,9 +389,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
     /**
      * Initializes future.
+     *
+     * @param commit Commit flag.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    void finish() {
+    void finish(boolean commit) {
         if (tx.onNeedCheckBackup()) {
             assert tx.onePhaseCommit();
 
@@ -403,15 +408,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
         try {
             if (tx.finish(commit) || (!commit && tx.state() == UNKNOWN)) {
-                if ((tx.onePhaseCommit() && needFinishOnePhase()) || (!tx.onePhaseCommit()
&& mappings != null)) {
+                if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || (!tx.onePhaseCommit()
&& mappings != null)) {
                     if (mappings.single()) {
                         GridDistributedTxMapping mapping = mappings.singleMapping();
 
                         if (mapping != null)
-                            finish(mapping);
+                            finish(mapping, commit);
                     }
                     else
-                        finish(mappings.mappings());
+                        finish(mappings.mappings(), commit);
                 }
 
                 markInitialized();
@@ -543,13 +548,14 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
+     * @param commit Commit flag.
      * @return {@code True} if need to send finish request for one phase commit transaction.
      */
-    private boolean needFinishOnePhase() {
+    private boolean needFinishOnePhase(boolean commit) {
         if (tx.mappings().empty())
             return false;
 
-        boolean finish = tx.txState().hasNearCache(cctx);
+        boolean finish = tx.txState().hasNearCache(cctx) || !commit;
 
         if (finish) {
             GridDistributedTxMapping mapping = tx.mappings().singleMapping();
@@ -605,17 +611,19 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
     /**
      * @param mappings Mappings.
+     * @param commit Commit flag.
      */
-    private void finish(Iterable<GridDistributedTxMapping> mappings) {
+    private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit)
{
         // Create mini futures.
         for (GridDistributedTxMapping m : mappings)
-            finish(m);
+            finish(m, commit);
     }
 
     /**
      * @param m Mapping.
+     * @param commit Commit flag.
      */
-    private void finish(GridDistributedTxMapping m) {
+    private void finish(GridDistributedTxMapping m, boolean commit) {
         ClusterNode n = m.node();
 
         assert !m.empty();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 62cf74b..28c60d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -857,19 +857,19 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                     // Make sure that here are no exceptions.
                     prepareFut.get();
 
-                    fut0.finish();
+                    fut0.finish(true);
                 }
                 catch (Error | RuntimeException e) {
                     COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
 
-                    fut0.onDone(e);
+                    fut0.finish(false);
 
                     throw e;
                 }
                 catch (IgniteCheckedException e) {
                     COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
 
-                    fut0.onDone(e);
+                    fut0.finish(false);
                 }
             }
         });
@@ -917,7 +917,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                     log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']');
             }
 
-            fut.finish();
+            fut.finish(false);
         }
         else {
             prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -933,7 +933,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
                     GridNearTxFinishFuture fut0 = rollbackFut;
 
-                    fut0.finish();
+                    fut0.finish(false);
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ba30e10..7c3c206 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -639,7 +639,6 @@ public class IgniteTxHandler {
                     ", node=" + nodeId + ']');
             }
 
-
             fut.onResult(nodeId, res);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorNonSerializableTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorNonSerializableTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorNonSerializableTest.java
new file mode 100644
index 0000000..79aa34f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorNonSerializableTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.NotSerializableException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheEntryProcessorNonSerializableTest extends GridCommonAbstractTest {
+    /** */
+    private static final int EXPECTED_VALUE = 42;
+
+    /** */
+    private static final int WRONG_VALUE = -1;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 3;
+
+    /** */
+    public static final int ITERATION_CNT = 1;
+
+    /** */
+    public static final int KEYS = 10;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setClientMode(client);
+        cfg.setMarshaller(new OptimizedMarshaller());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(getServerNodeCount());
+
+        client = true;
+
+        startGrid(getServerNodeCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @return Server nodes.
+     */
+    private int getServerNodeCount() {
+        return NODES;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticOnePhaseCommit() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 1);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticOnePhaseCommitWithNearCache() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 1)
+            .setNearConfiguration(new NearCacheConfiguration());
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticOnePhaseCommitFullSync() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticOnePhaseCommitFullSyncWithNearCache() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1)
+            .setNearConfiguration(new NearCacheConfiguration());
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimistic() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 2);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticWithNearCache() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 2)
+            .setNearConfiguration(new NearCacheConfiguration());
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticFullSync() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 2);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticOnePhaseCommit() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 1);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticOnePhaseCommitFullSync() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticOnePhaseCommitFullSyncWithNearCache() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1)
+            .setNearConfiguration(new NearCacheConfiguration());
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimistic() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 2);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticFullSync() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 2);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticFullSyncWithNearCache() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 2);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void doTestInvokeTest(CacheConfiguration ccfg, TransactionConcurrency txConcurrency,
+        TransactionIsolation txIsolation) throws Exception {
+        IgniteEx cln = grid(getServerNodeCount());
+
+        grid(0).createCache(ccfg);
+
+        IgniteCache clnCache;
+
+        if (ccfg.getNearConfiguration() != null)
+            clnCache = cln.createNearCache(ccfg.getName(), ccfg.getNearConfiguration());
+        else
+            clnCache = cln.cache(ccfg.getName());
+
+        putKeys(clnCache, EXPECTED_VALUE);
+
+        try {
+            // Explicit tx.
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                try (final Transaction tx = cln.transactions().txStart(txConcurrency, txIsolation))
{
+                    putKeys(clnCache, WRONG_VALUE);
+
+                    clnCache.invoke(KEYS, new NonSerialazibleEntryProcessor());
+
+                    GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+                        @Override public Object call() throws Exception {
+                            tx.commit();
+
+                            return null;
+                        }
+                    }, NotSerializableException.class);
+                }
+
+                checkKeys(clnCache, EXPECTED_VALUE);
+            }
+
+            // From affinity node.
+            Ignite grid = grid(ThreadLocalRandom.current().nextInt(NODES));
+
+            final IgniteCache cache = grid.cache(ccfg.getName());
+
+            // Explicit tx.
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                try (final Transaction tx = grid.transactions().txStart(txConcurrency, txIsolation))
{
+                    putKeys(cache, WRONG_VALUE);
+
+                    cache.invoke(KEYS, new NonSerialazibleEntryProcessor());
+
+                    GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+                        @Override public Object call() throws Exception {
+                            tx.commit();
+
+                            return null;
+                        }
+                    }, NotSerializableException.class);
+                }
+
+                checkKeys(cache, EXPECTED_VALUE);
+            }
+
+            final IgniteCache clnCache0 = clnCache;
+
+            // Implicit tx.
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        clnCache0.invoke(KEYS, new NonSerialazibleEntryProcessor());
+
+                        return null;
+                    }
+                }, NotSerializableException.class);
+            }
+
+            checkKeys(clnCache, EXPECTED_VALUE);
+        }
+        finally {
+            grid(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param val Value.
+     */
+    private void putKeys(IgniteCache cache, int val) {
+        cache.put(KEYS, val);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param expVal Expected value.
+     */
+    private void checkKeys(IgniteCache cache, int expVal) {
+        assertEquals(expVal, cache.get(KEYS));
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(CacheWriteSynchronizationMode wrMode, int
backup) {
+        return new CacheConfiguration("test-cache-" + wrMode + "-" + backup)
+            .setAtomicityMode(TRANSACTIONAL)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setBackups(backup);
+    }
+
+    /**
+     *
+     */
+    private static class NonSerialazibleEntryProcessor implements EntryProcessor<Integer,
Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<Integer, Integer> entry, Object...
arguments)
+            throws EntryProcessorException {
+            entry.setValue(42);
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8aa534a6/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 8c3f4de..84e1502 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -137,6 +137,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheGet
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxExceptionSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxExceptionSelfTest;
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxExceptionSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorNonSerializableTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultiThreadedSelfTest;
@@ -186,6 +187,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheAtomicLocalInvokeTest.class);
         suite.addTestSuite(IgniteCacheAtomicLocalWithStoreInvokeTest.class);
         suite.addTestSuite(IgniteCacheTxInvokeTest.class);
+        suite.addTestSuite(CacheEntryProcessorNonSerializableTest.class);
         suite.addTestSuite(IgniteCacheEntryProcessorCallTest.class);
         GridTestUtils.addTestIfNeeded(suite, CacheEntryProcessorCopySelfTest.class, ignoredTests);
         suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class);


Mime
View raw message