ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [08/43] ignite git commit: IGNITE-3057 - Optimization for transactions that do not acquire locks
Date Thu, 19 May 2016 09:37:41 GMT
IGNITE-3057 - Optimization for transactions that do not acquire locks


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b476fdf4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b476fdf4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b476fdf4

Branch: refs/heads/ignite-3163
Commit: b476fdf48151edd7be0b1911a4e004faad2aeb67
Parents: 7b0edfb
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Apr 27 14:04:24 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Apr 27 16:07:34 2016 +0300

----------------------------------------------------------------------
 .../cache/distributed/near/GridNearTxLocal.java |  32 +++
 .../cache/transactions/IgniteTxManager.java     |  40 +++
 .../processors/cache/CacheTxFastFinishTest.java | 253 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 +
 4 files changed, 328 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b476fdf4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ae4972e..515d284 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -71,6 +71,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
+import static org.apache.ignite.transactions.TransactionState.PREPARED;
 import static org.apache.ignite.transactions.TransactionState.PREPARING;
 import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
 import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
@@ -816,6 +817,18 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         if (log.isDebugEnabled())
             log.debug("Committing near local tx: " + this);
 
+        if (fastFinish()) {
+            state(PREPARING);
+            state(PREPARED);
+            state(COMMITTING);
+
+            cctx.tm().fastFinishTx(this, true);
+
+            state(COMMITTED);
+
+            return new GridFinishedFuture<>((IgniteInternalTx)this);
+        }
+
         prepareAsync();
 
         GridNearTxFinishFuture fut = commitFut.get();
@@ -860,6 +873,18 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         if (log.isDebugEnabled())
             log.debug("Rolling back near tx: " + this);
 
+        if (fastFinish()) {
+            state(PREPARING);
+            state(PREPARED);
+            state(ROLLING_BACK);
+
+            cctx.tm().fastFinishTx(this, false);
+
+            state(ROLLED_BACK);
+
+            return new GridFinishedFuture<>((IgniteInternalTx)this);
+        }
+
         GridNearTxFinishFuture fut = rollbackFut.get();
 
         if (fut != null)
@@ -908,6 +933,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /**
+     * @return {@code True} if 'fast finish' path can be used for transaction completion.
+     */
+    private boolean fastFinish() {
+        return writeMap().isEmpty() && ((optimistic() && !serializable())
|| readMap().isEmpty());
+    }
+
+    /**
      * Prepares next batch of entries in dht transaction.
      *
      * @param reads Read entries.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b476fdf4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index e96a472..5dcd53d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1199,6 +1199,46 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
     }
 
     /**
+     * Fast finish transaction. Can be used only if no locks were acquired.
+     *
+     * @param tx Transaction to finish.
+     * @param commit {@code True} if transaction is committed, {@code false} if rolled back.
+     */
+    public void fastFinishTx(IgniteInternalTx tx, boolean commit) {
+        assert tx != null;
+        assert tx.writeMap().isEmpty();
+        assert tx.optimistic() || tx.readMap().isEmpty();
+
+        ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
+
+        if (txIdMap.remove(tx.xidVersion(), tx)) {
+            // 1. Notify evictions.
+            notifyEvitions(tx);
+
+            // 2. Remove obsolete entries.
+            removeObsolete(tx);
+
+            // 3. Remove from per-thread storage.
+            clearThreadMap(tx);
+
+            // 4. Clear context.
+            resetContext();
+
+            // 5. Update metrics.
+            if (!tx.dht() && tx.local()) {
+                if (!tx.system()) {
+                    if (commit)
+                        cctx.txMetrics().onTxCommit();
+                    else
+                        cctx.txMetrics().onTxRollback();
+                }
+
+                tx.txState().onTxEnd(cctx, tx, commit);
+            }
+        }
+    }
+
+    /**
      * Tries to minimize damage from partially-committed transaction.
      *
      * @param tx Tx to uncommit.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b476fdf4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java
new file mode 100644
index 0000000..35b1405
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheTxFastFinishTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private boolean nearCache;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setBackups(1);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        if (nearCache)
+            ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFastFinishTxNearCache() throws Exception {
+        nearCache = true;
+
+        fastFinishTx();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFastFinishTx() throws Exception {
+        fastFinishTx();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void fastFinishTx() throws Exception {
+        startGrid(0);
+
+        fastFinishTx(ignite(0));
+
+        client = true;
+
+        startGrid(1);
+
+        for (int i = 0; i < 2; i++)
+            fastFinishTx(ignite(i));
+
+        client = false;
+
+        startGrid(2);
+
+        for (int i = 0; i < 3; i++)
+            fastFinishTx(ignite(i));
+
+        startGrid(3);
+
+        for (int i = 0; i < 4; i++)
+            fastFinishTx(ignite(i));
+
+        stopGrid(1);
+
+        for (int i = 0; i < 4; i++) {
+            if (i != 1)
+                fastFinishTx(ignite(i));
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     */
+    private void fastFinishTx(Ignite ignite) {
+        IgniteTransactions txs = ignite.transactions();
+
+        IgniteCache cache = ignite.cache(null);
+
+        for (boolean commit : new boolean[]{true, false}) {
+            for (TransactionConcurrency c : TransactionConcurrency.values()) {
+                for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                    try (Transaction tx = txs.txStart(c, isolation)) {
+                        checkFastTxFinish(tx, commit);
+                    }
+                }
+            }
+
+            for (int i = 0; i < 100; i++) {
+                try (Transaction tx = txs.txStart(OPTIMISTIC, REPEATABLE_READ)) {
+                    cache.get(i);
+
+                    checkFastTxFinish(tx, commit);
+                }
+
+                try (Transaction tx = txs.txStart(OPTIMISTIC, READ_COMMITTED)) {
+                    cache.get(i);
+
+                    checkFastTxFinish(tx, commit);
+                }
+            }
+
+            for (int i = 0; i < 100; i++) {
+                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    cache.get(i);
+
+                    checkNormalTxFinish(tx, commit);
+                }
+
+                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.get(i);
+
+                    checkNormalTxFinish(tx, commit);
+                }
+            }
+
+            for (int i = 0; i < 100; i++) {
+                for (TransactionConcurrency c : TransactionConcurrency.values()) {
+                    for (TransactionIsolation isolation : TransactionIsolation.values())
{
+                        try (Transaction tx = txs.txStart(c, isolation)) {
+                            cache.put(i, i);
+
+                            checkNormalTxFinish(tx, commit);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param tx Transaction.
+     * @param commit Commit flag.
+     */
+    private void checkFastTxFinish(Transaction tx, boolean commit) {
+        if (commit)
+            tx.commit();
+        else
+            tx.rollback();
+
+        IgniteInternalTx tx0 = ((TransactionProxyImpl)tx).tx();
+
+        assertNull(fieldValue(tx0, "prepFut"));
+        assertNull(fieldValue(tx0, "commitFut"));
+        assertNull(fieldValue(tx0, "rollbackFut"));
+    }
+
+    /**
+     * @param tx Transaction.
+     * @param commit Commit flag.
+     */
+    private void checkNormalTxFinish(Transaction tx, boolean commit) {
+        IgniteInternalTx tx0 = ((TransactionProxyImpl)tx).tx();
+
+        if (commit) {
+            tx.commit();
+
+            assertNotNull(fieldValue(tx0, "prepFut"));
+            assertNotNull(fieldValue(tx0, "commitFut"));
+        }
+        else {
+            tx.rollback();
+
+            assertNotNull(fieldValue(tx0, "rollbackFut"));
+        }
+    }
+
+    /**
+     * @param obj Obejct.
+     * @param fieldName Field name.
+     * @return Field value.
+     */
+    private Object fieldValue(Object obj, String fieldName) {
+        Object val = GridTestUtils.getFieldValue(obj, fieldName);
+
+        if (val == null)
+            return null;
+
+        if (val instanceof AtomicReference)
+            return ((AtomicReference)val).get();
+
+        return val;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b476fdf4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 7f1d7df..003b12c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryProcessorCopySelfTe
 import org.apache.ignite.internal.processors.cache.CacheFutureExceptionSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheNamesSelfTest;
 import org.apache.ignite.internal.processors.cache.CachePutEventListenerErrorSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheTxFastFinishTest;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityApiSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityMapperSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityRoutingSelfTest;
@@ -298,6 +299,8 @@ public class IgniteCacheTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteTxConfigCacheSelfTest.class);
 
+        suite.addTestSuite(CacheTxFastFinishTest.class);
+
         return suite;
     }
 }


Mime
View raw message