ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5192 For optimistic tx do not commit entries if filter do not pass
Date Fri, 23 Jun 2017 09:21:09 GMT
Repository: ignite
Updated Branches:
  refs/heads/master c098d75d4 -> 63debcdb1


ignite-5192 For optimistic tx do not commit entries if filter do not pass


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/63debcdb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/63debcdb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/63debcdb

Branch: refs/heads/master
Commit: 63debcdb1d6bfb5402ca65197d516f82df105574
Parents: c098d75
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jun 23 12:21:01 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jun 23 12:21:01 2017 +0300

----------------------------------------------------------------------
 .../cache/distributed/near/GridNearTxLocal.java |   5 +-
 ...cTransactionsWithFilterSingleServerTest.java |  28 ++
 ...cheOptimisticTransactionsWithFilterTest.java | 493 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite2.java       |   5 +
 4 files changed, 530 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/63debcdb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 55905b7..81e5ca8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -2325,8 +2325,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
AutoClosea
                             if (hasFilters) {
                                 success = isAll(e.context(), key, cacheVal, filter);
 
-                                if (!success)
+                                if (!success) {
                                     e.value(cacheVal, false, false);
+
+                                    e.op(READ);
+                                }
                             }
                             else
                                 success = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/63debcdb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOptimisticTransactionsWithFilterSingleServerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOptimisticTransactionsWithFilterSingleServerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOptimisticTransactionsWithFilterSingleServerTest.java
new file mode 100644
index 0000000..04e757a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOptimisticTransactionsWithFilterSingleServerTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ *
+ */
+public class CacheOptimisticTransactionsWithFilterSingleServerTest extends CacheOptimisticTransactionsWithFilterTest
{
+    /** {@inheritDoc} */
+    @Override protected int serversNumber() {
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/63debcdb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOptimisticTransactionsWithFilterTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOptimisticTransactionsWithFilterTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOptimisticTransactionsWithFilterTest.java
new file mode 100644
index 0000000..84035f6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOptimisticTransactionsWithFilterTest.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheOptimisticTransactionsWithFilterTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static final TransactionIsolation[] ISOLATIONS = {REPEATABLE_READ, READ_COMMITTED};
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(serversNumber());
+
+        client = true;
+
+        startGrid(serversNumber());
+
+        startGrid(serversNumber() + 1);
+
+        client = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @return Number of server nodes. In addition 2 clients are started.
+     */
+    protected int serversNumber() {
+        return 4;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCasReplace() throws Exception {
+        executeTestForAllCaches(new TestClosure() {
+            @Override public void apply(Ignite ignite, String cacheName) throws Exception
{
+                int nodeIdx = ThreadLocalRandom.current().nextInt(serversNumber() + 2);
+
+                final IgniteCache<Integer, Integer> otherCache = ignite(nodeIdx).cache(cacheName);
+
+                IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+
+                for (final Integer key : testKeys(cache)) {
+                    for (int i = 0; i < 3; i++) {
+                        for (TransactionIsolation isolation : ISOLATIONS) {
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.replace(key, 1, 2));
+
+                                assertNull(cache.get(key));
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, null), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.replace(key, 1, 2));
+
+                                assertNull(cache.get(key));
+
+                                tx.rollback();
+                            }
+
+                            checkCacheData(F.asMap(key, null), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.replace(key, 1, 2));
+
+                                GridTestUtils.runAsync(new Runnable() {
+                                    @Override public void run() {
+                                        otherCache.put(key, 1);
+                                    }
+                                }).get();
+
+                                assertNull(cache.get(key));
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, 1), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.replace(key, 2, 3));
+
+                                GridTestUtils.runAsync(new Runnable() {
+                                    @Override public void run() {
+                                        otherCache.put(key, 10);
+                                    }
+                                }).get();
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, 10), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertTrue(cache.replace(key, 10, 1));
+
+                                GridTestUtils.runAsync(new Runnable() {
+                                    @Override public void run() {
+                                        otherCache.put(key, 2);
+                                    }
+                                }).get();
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, 2), cacheName);
+
+                            cache.remove(key);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutIfAbsent() throws Exception {
+        executeTestForAllCaches(new TestClosure() {
+            @Override public void apply(Ignite ignite, String cacheName) throws Exception
{
+                int nodeIdx = ThreadLocalRandom.current().nextInt(serversNumber() + 2);
+
+                final IgniteCache<Integer, Integer> otherCache = ignite(nodeIdx).cache(cacheName);
+
+                IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+
+                for (final Integer key : testKeys(cache)) {
+                    for (int i = 0; i < 3; i++) {
+                        for (TransactionIsolation isolation : ISOLATIONS) {
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertTrue(cache.putIfAbsent(key, 1));
+
+                                assertEquals((Integer)1, cache.get(key));
+
+                                tx.rollback();
+                            }
+
+                            checkCacheData(F.asMap(key, null), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertTrue(cache.putIfAbsent(key, 1));
+
+                                GridTestUtils.runAsync(new Runnable() {
+                                    @Override public void run() {
+                                        otherCache.put(key, 2);
+                                    }
+                                }).get();
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, 2), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.putIfAbsent(key, 3));
+
+                                GridTestUtils.runAsync(new Runnable() {
+                                    @Override public void run() {
+                                        otherCache.remove(key);
+                                    }
+                                }).get();
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, null), cacheName);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplace() throws Exception {
+        executeTestForAllCaches(new TestClosure() {
+            @Override public void apply(Ignite ignite, String cacheName) throws Exception
{
+                int nodeIdx = ThreadLocalRandom.current().nextInt(serversNumber() + 2);
+
+                final IgniteCache<Integer, Integer> otherCache = ignite(nodeIdx).cache(cacheName);
+
+                IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+
+                for (final Integer key : testKeys(cache)) {
+                    for (int i = 0; i < 3; i++) {
+                        for (TransactionIsolation isolation : ISOLATIONS) {
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.replace(key, 1));
+
+                                assertNull(cache.get(key));
+
+                                tx.rollback();
+                            }
+
+                            assertNull(cache.get(key));
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.replace(key, 1));
+
+                                GridTestUtils.runAsync(new Runnable() {
+                                    @Override public void run() {
+                                        otherCache.put(key, 2);
+                                    }
+                                }).get();
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, 2), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertTrue(cache.replace(key, 3));
+
+                                GridTestUtils.runAsync(new Runnable() {
+                                    @Override public void run() {
+                                        otherCache.remove(key);
+                                    }
+                                }).get();
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, null), cacheName);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoveWithOldValue() throws Exception {
+        executeTestForAllCaches(new TestClosure() {
+            @Override public void apply(Ignite ignite, String cacheName) throws Exception
{
+                int nodeIdx = ThreadLocalRandom.current().nextInt(serversNumber() + 2);
+
+                final IgniteCache<Integer, Integer> otherCache = ignite(nodeIdx).cache(cacheName);
+
+                IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+
+                for (final Integer key : testKeys(cache)) {
+                    for (int i = 0; i < 3; i++) {
+                        for (TransactionIsolation isolation : ISOLATIONS) {
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.remove(key, 1));
+
+                                assertNull(cache.get(key));
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, null), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.remove(key, 1));
+
+                                assertNull(cache.get(key));
+
+                                tx.rollback();
+                            }
+
+                            checkCacheData(F.asMap(key, null), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.remove(key, 1));
+
+                                GridTestUtils.runAsync(new Runnable() {
+                                    @Override public void run() {
+                                        otherCache.put(key, 1);
+                                    }
+                                }).get();
+
+                                assertNull(cache.get(key));
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, 1), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertFalse(cache.remove(key, 2));
+
+                                GridTestUtils.runAsync(new Runnable() {
+                                    @Override public void run() {
+                                        otherCache.put(key, 10);
+                                    }
+                                }).get();
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, 10), cacheName);
+
+                            try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
isolation)) {
+                                assertTrue(cache.remove(key, 10));
+
+                                GridTestUtils.runAsync(new Runnable() {
+                                    @Override public void run() {
+                                        otherCache.put(key, 2);
+                                    }
+                                }).get();
+
+                                tx.commit();
+                            }
+
+                            checkCacheData(F.asMap(key, 2), cacheName);
+
+                            cache.remove(key);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * @param c Closure.
+     * @throws Exception If failed.
+     */
+    private void executeTestForAllCaches(TestClosure c) throws Exception {
+        for (CacheConfiguration ccfg : cacheConfigurations()) {
+            ignite(0).createCache(ccfg);
+
+            log.info("Run test for cache [cache=" + ccfg.getCacheMode() +
+                ", backups=" + ccfg.getBackups() +
+                ", near=" + (ccfg.getNearConfiguration() != null) + "]");
+
+            ignite(serversNumber() + 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+
+            try {
+                for (int i = 0; i < serversNumber() + 2; i++) {
+                    log.info("Run test for node [node=" + i + ", client=" + ignite(i).configuration().isClientMode()
+ ']');
+
+                    c.apply(ignite(i), ccfg.getName());
+                }
+            }
+            finally {
+                ignite(0).destroyCache(ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @return Test keys.
+     * @throws Exception If failed.
+     */
+    private List<Integer> testKeys(IgniteCache<Integer, Integer> cache) throws
Exception {
+        CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        List<Integer> keys = new ArrayList<>();
+
+        if (!cache.unwrap(Ignite.class).configuration().isClientMode()) {
+            if (ccfg.getCacheMode() == PARTITIONED && serversNumber() > 1)
+                keys.add(nearKey(cache));
+
+            keys.add(primaryKey(cache));
+
+            if (ccfg.getBackups() != 0 && serversNumber() > 1)
+                keys.add(backupKey(cache));
+        }
+        else
+            keys.add(nearKey(cache));
+
+        return keys;
+    }
+
+    /**
+     * @return Cache configurations to test.
+     */
+    private List<CacheConfiguration> cacheConfigurations() {
+        List<CacheConfiguration> cfgs = new ArrayList<>();
+
+        cfgs.add(cacheConfiguration(PARTITIONED, 0, false));
+        cfgs.add(cacheConfiguration(PARTITIONED, 1, false));
+        cfgs.add(cacheConfiguration(PARTITIONED, 1, true));
+        cfgs.add(cacheConfiguration(REPLICATED, 0, false));
+
+        return cfgs;
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param nearCache If {@code true} near cache is enabled.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        boolean nearCache) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        if (nearCache)
+            ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    static interface TestClosure {
+        /**
+         * @param ignite Node.
+         * @param cacheName Cache name.
+         * @throws Exception If failed.
+         */
+        public void apply(Ignite ignite, String cacheName) throws Exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/63debcdb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index c69cac6..158b118 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -29,6 +29,8 @@ import org.apache.ignite.internal.processors.cache.CacheEnumOperationsSingleNode
 import org.apache.ignite.internal.processors.cache.CacheEnumOperationsTest;
 import org.apache.ignite.internal.processors.cache.CacheExchangeMessageDuplicatedStateTest;
 import org.apache.ignite.internal.processors.cache.CacheMemoryPolicyConfigurationTest;
+import org.apache.ignite.internal.processors.cache.CacheOptimisticTransactionsWithFilterSingleServerTest;
+import org.apache.ignite.internal.processors.cache.CacheOptimisticTransactionsWithFilterTest;
 import org.apache.ignite.internal.processors.cache.CrossCacheTxNearEnabledRandomOperationsTest;
 import org.apache.ignite.internal.processors.cache.CrossCacheTxRandomOperationsTest;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicMessageCountSelfTest;
@@ -277,6 +279,9 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new TestSuite(IgniteNearClientCacheCloseTest.class));
         suite.addTest(new TestSuite(IgniteClientCacheStartFailoverTest.class));
 
+        suite.addTest(new TestSuite(CacheOptimisticTransactionsWithFilterSingleServerTest.class));
+        suite.addTest(new TestSuite(CacheOptimisticTransactionsWithFilterTest.class));
+
         return suite;
     }
 }


Mime
View raw message