ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1607 WIP
Date Mon, 12 Oct 2015 11:53:13 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1607-read 190310da0 -> b1d07144f


ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1d07144
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1d07144
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1d07144

Branch: refs/heads/ignite-1607-read
Commit: b1d07144f9725dea579c16c47805009c9e4f60f1
Parents: 190310d
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Oct 12 09:58:34 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 12 14:27:34 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  10 +-
 ...arOptimisticSerializableTxPrepareFuture.java | 187 ++++++++++++-------
 .../near/GridNearTxFinishFuture.java            |  19 +-
 .../CacheSerializableTransactionsTest.java      |  98 ++++++++++
 .../processors/cache/IgniteTxAbstractTest.java  |  42 +----
 .../IgniteTxMultiThreadedAbstractTest.java      |  15 +-
 ...CachePartitionedTxMultiThreadedSelfTest.java |   8 -
 7 files changed, 263 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9378017..fd0fb92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -756,9 +756,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
             // Cache version for optimistic check.
             startVer = ver;
 
-            if (retVer)
-                resVer = isNear() ? ((GridNearCacheEntry)this).dhtVersion() : startVer;
-
             GridCacheMvcc mvcc = mvccExtras();
 
             owner = mvcc == null ? null : mvcc.anyOwner();
@@ -876,6 +873,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
             if (ret != null && expiryPlc != null)
                 updateTtl(expiryPlc);
+
+            if (retVer) {
+                resVer = isNear() ? ((GridNearCacheEntry)this).dhtVersion() : startVer;
+
+                if (resVer == null)
+                    ret = null;
+            }
         }
 
         if (ret != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 04c4851..6bd21b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -76,8 +77,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
 public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPrepareFutureAdapter
     implements GridCacheMvccFuture<IgniteInternalTx> {
     /** */
-    @GridToStringInclude
-    private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+    private KeyLockFuture keyLockFut = new KeyLockFuture();
 
     /** */
     private final AtomicReference<ClientRemapFuture> remapFutRef;
@@ -103,10 +103,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
             log.debug("Transaction future received owner changed callback: " + entry);
 
         if ((entry.context().isNear() || entry.context().isLocal()) && owner != null
&& tx.hasWriteKey(entry.txKey())) {
-            lockKeys.remove(entry.txKey());
-
-            // This will check for locks.
-            onDone();
+            keyLockFut.onKeyLocked(entry.txKey());
 
             return true;
         }
@@ -151,10 +148,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
     }
 
     /**
-     * @param nodeId Failed node ID.
+     * @param m Failed mapping.
      * @param e Error.
      */
-    void onError(@Nullable UUID nodeId, Throwable e) {
+    void onError(@Nullable GridDistributedTxMapping m, Throwable e) {
         if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class))
{
             if (tx.onePhaseCommit()) {
                 tx.markForBackupCheck();
@@ -165,28 +162,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
             }
         }
 
-        if (e instanceof IgniteTxOptimisticCheckedException && nodeId != null)
-            tx.removeMapping(nodeId);
+        if (e instanceof IgniteTxOptimisticCheckedException && m != null)
+            tx.removeMapping(m.node().id());
 
         err.compareAndSet(null, e);
-    }
-
-    /**
-     * @return {@code True} if all locks are owned.
-     */
-    private boolean checkLocks() {
-        boolean locked = lockKeys.isEmpty();
 
-        if (locked) {
-            if (log.isDebugEnabled())
-                log.debug("All locks are acquired for near prepare future: " + this);
-        }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys +
']');
-        }
-
-        return locked;
+        keyLockFut.onDone(e);
     }
 
     /** {@inheritDoc} */
@@ -199,7 +180,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
                     if (f.futureId().equals(res.miniId())) {
                         assert f.node().id().equals(nodeId);
 
-                        f.onResult(nodeId, res);
+                        f.onResult(res);
                     }
                 }
             }
@@ -208,14 +189,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
 
     /** {@inheritDoc} */
     @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
-        this.err.compareAndSet(null, err);
-
-        err = this.err.get();
-
-        // If locks were not acquired yet, delay completion.
-        if (isDone() || (err == null && !checkLocks()))
+        if (isDone())
             return false;
 
+        this.err.compareAndSet(null, err);
+
         return onComplete();
     }
 
@@ -490,10 +468,15 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings
= new HashMap<>();
 
         for (IgniteTxEntry read : reads)
-            map(read, topVer, mappings, false);
+            map(read, topVer, mappings, false, remap);
 
         for (IgniteTxEntry write : writes)
-            map(write, topVer, mappings, true);
+            map(write, topVer, mappings, true, remap);
+
+        keyLockFut.onAllKeysAdded();
+
+        if (!remap)
+            add(keyLockFut);
 
         if (isDone()) {
             if (log.isDebugEnabled())
@@ -518,28 +501,32 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
             add(fut);
         }
 
-        Collection<MiniFuture> futs = (Collection)futures();
+        Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
 
-        Iterator<MiniFuture> it = futs.iterator();
+        Iterator<IgniteInternalFuture<?>> it = futs.iterator();
 
         while (it.hasNext()) {
-            MiniFuture fut = it.next();
+            IgniteInternalFuture<?> fut0 = it.next();
 
-            if (skipFuture(remap, fut))
+            if (skipFuture(remap, fut0))
                 continue;
 
+            MiniFuture fut = (MiniFuture)fut0;
+
             IgniteCheckedException err = prepare(fut);
 
             if (err != null) {
                 while (it.hasNext()) {
-                    fut = it.next();
+                    fut0 = it.next();
 
-                    if (skipFuture(remap, fut))
+                    if (skipFuture(remap, fut0))
                         continue;
 
+                    fut = (MiniFuture)fut0;
+
                     tx.removeMapping(fut.mapping().node().id());
 
-                    fut.onResult(err);
+                    fut.onResult(new IgniteCheckedException("Failed to prepare transaction.",
err));
                 }
 
                 break;
@@ -554,8 +541,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
      * @param fut Future.
      * @return {@code True} if skip future during remap.
      */
-    private boolean skipFuture(boolean remap, MiniFuture fut) {
-        return remap && fut.rcvRes.get();
+    private boolean skipFuture(boolean remap, IgniteInternalFuture<?> fut) {
+        return !(isMini(fut)) || (remap && ((MiniFuture)fut).rcvRes.get());
     }
 
     /**
@@ -613,7 +600,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
             prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>()
{
                 @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse>
prepFut) {
                     try {
-                        fut.onResult(n.id(), prepFut.get());
+                        fut.onResult(prepFut.get());
                     }
                     catch (IgniteCheckedException e) {
                         fut.onResult(e);
@@ -647,12 +634,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
      * @param topVer Topology version.
      * @param curMapping Current mapping.
      * @param waitLock Wait lock flag.
+     * @param remap Remap flag.
      */
     private void map(
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
-        boolean waitLock
+        boolean waitLock,
+        boolean remap
     ) {
         GridCacheContext cacheCtx = entry.context();
 
@@ -678,9 +667,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         else
             entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
 
-        if (cacheCtx.isNear() || cacheCtx.isLocal()) {
+        if (!remap && (cacheCtx.isNear() || cacheCtx.isLocal())) {
             if (waitLock && entry.explicitVersion() == null)
-                lockKeys.add(entry.txKey());
+                keyLockFut.addLockKey(entry.txKey());
         }
 
         IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear());
@@ -735,16 +724,23 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>,
String>() {
-            @Override public String apply(IgniteInternalFuture<?> f) {
-                return "[node=" + ((MiniFuture)f).node().id() +
-                    ", loc=" + ((MiniFuture)f).node().isLocal() +
-                    ", done=" + f.isDone() + "]";
-            }
-        });
+        Collection<String> futs = F.viewReadOnly(futures(),
+            new C1<IgniteInternalFuture<?>, String>() {
+                @Override public String apply(IgniteInternalFuture<?> f) {
+                    return "[node=" + ((MiniFuture)f).node().id() +
+                        ", loc=" + ((MiniFuture)f).node().isLocal() +
+                        ", done=" + f.isDone() + "]";
+                }
+            },
+            new P1<IgniteInternalFuture<?>>() {
+                @Override public boolean apply(IgniteInternalFuture<?> f) {
+                    return isMini(f);
+                }
+            });
 
         return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this,
             "innerFuts", futs,
+            "keyLockFut", keyLockFut,
             "tx", tx,
             "super", super.toString());
     }
@@ -829,7 +825,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
          */
         void onResult(Throwable e) {
             if (rcvRes.compareAndSet(false, true)) {
-                onError(m.node().id(), e);
+                onError(m, e);
 
                 if (log.isDebugEnabled())
                     log.debug("Failed to get future result [fut=" + this + ", err=" + e +
']');
@@ -860,17 +856,16 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
         }
 
         /**
-         * @param nodeId Failed node ID.
          * @param res Result callback.
          */
-        void onResult(UUID nodeId, final GridNearTxPrepareResponse res) {
+        void onResult(final GridNearTxPrepareResponse res) {
             if (isDone())
                 return;
 
             if (rcvRes.compareAndSet(false, true)) {
                 if (res.error() != null) {
                     // Fail the whole compound future.
-                    onError(nodeId, res.error());
+                    onError(m, res.error());
 
                     onDone(res.error());
                 }
@@ -884,11 +879,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
                         ClientRemapFuture remapFut = new ClientRemapFuture();
 
                         if (remapFutRef.compareAndSet(null, remapFut)) {
-                            Collection<MiniFuture> futs = (Collection)futures();
+                            Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
 
-                            for (MiniFuture fut : futs) {
-                                if (fut != this)
-                                    remapFut.add(fut);
+                            for (IgniteInternalFuture<?> fut : futs) {
+                                if (isMini(fut) && fut != this)
+                                    remapFut.add((MiniFuture)fut);
                             }
 
                             remapFut.markInitialized();
@@ -975,4 +970,68 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre
             return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(),
"err", error());
         }
     }
+
+    /**
+     * Keys lock future.
+     */
+    private class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse>
{
+        /** */
+        @GridToStringInclude
+        private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+
+        /** */
+        private volatile boolean allKeysAdded;
+
+        /**
+         * @param key Key to track for locking.
+         */
+        private void addLockKey(IgniteTxKey key) {
+            assert !allKeysAdded;
+
+            lockKeys.add(key);
+        }
+
+        /**
+         * @param key Locked keys.
+         */
+        private void onKeyLocked(IgniteTxKey key) {
+            lockKeys.remove(key);
+
+            checkLocks();
+        }
+
+        /**
+         * Moves future to the ready state.
+         */
+        private void onAllKeysAdded() {
+            allKeysAdded = true;
+
+            checkLocks();
+        }
+
+        /**
+         * @return {@code True} if all locks are owned.
+         */
+        private boolean checkLocks() {
+            boolean locked = lockKeys.isEmpty();
+
+            if (locked && allKeysAdded) {
+                if (log.isDebugEnabled())
+                    log.debug("All locks are acquired for near prepare future: " + this);
+
+                onDone((GridNearTxPrepareResponse)null);
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys
+ ']');
+            }
+
+            return locked;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(KeyLockFuture.class, this, super.toString());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 85311cc..2c1a79f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -266,9 +266,18 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             }
 
             if (tx.onePhaseCommit()) {
-                finishOnePhase();
+                try {
+                    boolean commit = this.commit && err == null;
+
+                    tx.finish(commit);
+
+                    finishOnePhase(commit);
 
-                tx.tmFinish(commit && err == null);
+                    tx.tmFinish(commit);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to finish transaction: " + tx, e);
+                }
             }
 
             Throwable th = this.err.get();
@@ -509,9 +518,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
-     *
+     * @param commit Commit flag.
      */
-    private void finishOnePhase() {
+    private void finishOnePhase(boolean commit) {
         // No need to send messages as transaction was already committed on remote node.
         // Finish local mapping only as we need send commit message to backups.
         for (GridDistributedTxMapping m : mappings.values()) {
@@ -521,6 +530,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 // Add new future.
                 if (fut != null)
                     add(fut);
+
+                break;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 0285016..1302698 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -381,6 +381,73 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
+    public void testTxRollback() throws Exception {
+        Ignite ignite0 = ignite(0);
+        Ignite ignite1 = ignite(1);
+
+        final IgniteTransactions txs0 = ignite0.transactions();
+        final IgniteTransactions txs1 = ignite1.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg);
+                IgniteCache<Integer, Integer> cache1 = ignite1.cache(ccfg.getName());
+
+                List<Integer> keys = testKeys(cache0);
+
+                for (Integer key : keys) {
+                    log.info("Test key: " + key);
+
+                    Integer expVal = null;
+
+                    for (int i = 0; i < 100; i++) {
+                        try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Integer val = cache0.get(key);
+
+                            assertEquals(expVal, val);
+
+                            cache0.put(key, i);
+
+                            tx.rollback();
+                        }
+
+                        try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Integer val = cache0.get(key);
+
+                            assertEquals(expVal, val);
+
+                            cache0.put(key, i);
+
+                            tx.commit();
+
+                            expVal = i;
+                        }
+
+                        try (Transaction tx = txs1.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            Integer val = cache1.get(key);
+
+                            assertEquals(expVal, val);
+
+                            cache1.put(key, val);
+
+                            tx.commit();
+                        }
+                    }
+
+                    checkValue(key, expVal, cache0.getName());
+                }
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testTxCommitReadOnlyGetAll() throws Exception {
         Ignite ignite0 = ignite(0);
 
@@ -428,6 +495,37 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
+    public void testTxCommitReadWriteTwoNodes() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            logCacheInfo(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg);
+
+                Integer key0 = primaryKey(ignite(0).cache(null));
+                Integer key1 = primaryKey(ignite(1).cache(null));
+
+                try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    cache.put(key0, key0);
+
+                    cache.get(key1);
+
+                    tx.commit();
+                }
+            }
+            finally {
+                destroyCache(ignite0, ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testTxConflictRead1() throws Exception {
         txConflictRead(true);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java
index fcf46cf..dff0344 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java
@@ -175,9 +175,7 @@ abstract class IgniteTxAbstractTest extends GridCommonAbstractTest {
         for (int i = 0; i < iterations(); i++) {
             IgniteCache<Integer, String> cache = jcache(gridIdx);
 
-            Transaction tx = ignite(gridIdx).transactions().txStart(concurrency, isolation,
0, 0);
-
-            try {
+            try (Transaction tx = ignite(gridIdx).transactions().txStart(concurrency, isolation,
0, 0)) {
                 int prevKey = -1;
 
                 for (Integer key : getKeys()) {
@@ -236,46 +234,22 @@ abstract class IgniteTxAbstractTest extends GridCommonAbstractTest {
                     debug("Committed transaction [i=" + i + ", tx=" + tx + ']');
             }
             catch (TransactionOptimisticException e) {
-                if (concurrency != OPTIMISTIC || isolation != SERIALIZABLE) {
-                    error("Received invalid optimistic failure.", e);
+                if (!(concurrency == OPTIMISTIC && isolation == SERIALIZABLE)) {
+                    log.error("Unexpected error: " + e, e);
 
                     throw e;
                 }
-
-                if (isTestDebug())
-                    info("Optimistic transaction failure (will rollback) [i=" + i + ", msg="
+ e.getMessage() +
-                        ", tx=" + tx.xid() + ']');
-
-                try {
-                    tx.rollback();
-                }
-                catch (IgniteException ex) {
-                    error("Failed to rollback optimistic failure: " + tx, ex);
-
-                    throw ex;
-                }
             }
-            catch (Exception e) {
-                error("Transaction failed (will rollback): " + tx, e);
-
-                tx.rollback();
+            catch (Throwable e) {
+                log.error("Unexpected error: " + e, e);
 
                 throw e;
             }
-            catch (Error e) {
-                error("Error when executing transaction (will rollback): " + tx, e);
-
-                tx.rollback();
+        }
 
-                throw e;
-            }
-            finally {
-                Transaction t = ignite(gridIdx).transactions().tx();
+        Transaction tx = ignite(gridIdx).transactions().tx();
 
-                assert t == null : "Thread should not have transaction upon completion ['t==tx'="
+ (t == tx) +
-                    ", t=" + t + (t != tx ? "tx=" + tx : "tx=''") + ']';
-            }
-        }
+        assertNull("Thread should not have transaction upon completion", tx);
 
         if (printMemoryStats()) {
             if (cntr.getAndIncrement() % 100 == 0)

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
index 191feb6..f13ba8c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
@@ -250,8 +250,8 @@ public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstract
 
                                     break;
                                 }
-                                catch(TransactionOptimisticException e) {
-                                    log.info("Got error, will retry: " + e);
+                                catch (TransactionOptimisticException e) {
+                                    // Retry.
                                 }
                             }
                         }
@@ -300,8 +300,17 @@ public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstract
 
             assertEquals((long)THREADS * ITERATIONS, total);
 
+            // Try to update one more time to make sure cache is in consistent state.
+            try (Transaction tx = grid(0).transactions().txStart(OPTIMISTIC, SERIALIZABLE))
{
+                long val = cache.get(key);
+
+                cache.put(key, val);
+
+                tx.commit();
+            }
+
             for (int i = 0; i < gridCount(); i++)
-                assertEquals(total, (Object)cache.get(key));
+                assertEquals(total, grid(i).cache(null).get(key));
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java
index 346bd34..f76361a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java
@@ -40,11 +40,6 @@ public class GridCachePartitionedTxMultiThreadedSelfTest extends IgniteTxMultiTh
     private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** {@inheritDoc} */
-    @Override public void testOptimisticSerializableCommitMultithreaded() throws Exception
{
-        fail("https://issues.apache.org/jira/browse/IGNITE-806");
-    }
-
-    /** {@inheritDoc} */
     @SuppressWarnings({"ConstantConditions"})
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration c = super.getConfiguration(gridName);
@@ -53,9 +48,6 @@ public class GridCachePartitionedTxMultiThreadedSelfTest extends IgniteTxMultiTh
 
         CacheConfiguration cc = defaultCacheConfiguration();
 
-        // TODO IGNITE-1607 add test with near cache.
-        cc.setNearConfiguration(null);
-
         cc.setCacheMode(PARTITIONED);
         cc.setBackups(1);
 


Mime
View raw message