ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [35/49] ignite git commit: ignite-1607 Implemented deadlock-free optimistic serializable tx mode
Date Thu, 29 Oct 2015 09:02:43 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4074eee..c1e9202 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -25,12 +25,10 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
@@ -64,7 +62,6 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridFunc;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -78,7 +75,6 @@ import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
@@ -131,16 +127,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /** TX handler. */
     private IgniteTxHandler txHandler;
 
-    /** All transactions. */
-    private final Queue<IgniteInternalTx> committedQ = new ConcurrentLinkedDeque8<>();
-
-    /** Preparing transactions. */
-    private final Queue<IgniteInternalTx> prepareQ = new ConcurrentLinkedDeque8<>();
-
-    /** Minimum start version. */
-    private final ConcurrentNavigableMap<GridCacheVersion, AtomicInt> startVerCnts
=
-        new ConcurrentSkipListMap<>();
-
     /** Committed local transactions. */
     private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVers
=
         new GridBoundedConcurrentOrderedMap<>(Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT,
DFLT_MAX_COMPLETED_TX_CNT));
@@ -308,41 +294,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * USE ONLY FOR MEMORY PROFILING DURING TESTS.
      */
     @Override public void printMemoryStats() {
-        IgniteInternalTx firstTx = committedQ.peek();
-
-        int committedSize = committedQ.size();
-
-        Map.Entry<GridCacheVersion, AtomicInt> startVerEntry = startVerCnts.firstEntry();
-
-        GridCacheVersion minStartVer = null;
-        long dur = 0;
-
-        if (committedSize > 3000) {
-            minStartVer = new GridCacheVersion(Integer.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE,
Integer.MAX_VALUE, 0);
-
-            IgniteInternalTx stuck = null;
-
-            for (IgniteInternalTx tx : txs())
-                if (tx.startVersion().isLess(minStartVer)) {
-                    minStartVer = tx.startVersion();
-                    dur = U.currentTimeMillis() - tx.startTime();
-
-                    stuck = tx;
-                }
-
-            X.println("Stuck transaction: " + stuck);
-        }
-
         X.println(">>> ");
         X.println(">>> Transaction manager memory stats [grid=" + cctx.gridName()
+ ']');
         X.println(">>>   threadMapSize: " + threadMap.size());
-        X.println(">>>   idMap [size=" + idMap.size() + ", minStartVer=" + minStartVer
+ ", dur=" + dur + "ms]");
-        X.println(">>>   committedQueue [size=" + committedSize +
-            ", firstStartVersion=" + (firstTx == null ? "null" : firstTx.startVersion())
+
-            ", firstEndVersion=" + (firstTx == null ? "null" : firstTx.endVersion()) + ']');
-        X.println(">>>   prepareQueueSize: " + prepareQ.size());
-        X.println(">>>   startVerCntsSize [size=" + startVerCnts.size() +
-            ", firstVer=" + startVerEntry + ']');
+        X.println(">>>   idMap [size=" + idMap.size() + ']');
         X.println(">>>   completedVersSize: " + completedVers.size());
     }
 
@@ -361,27 +316,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @return Committed queue size.
-     */
-    public int commitQueueSize() {
-        return committedQ.size();
-    }
-
-    /**
-     * @return Prepare queue size.
-     */
-    public int prepareQueueSize() {
-        return prepareQ.size();
-    }
-
-    /**
-     * @return Start version counts.
-     */
-    public int startVersionCountsSize() {
-        return startVerCnts.size();
-    }
-
-    /**
      * @return Committed versions size.
      */
     public int completedVersionsSize() {
@@ -493,42 +427,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             return null;
         }
 
-        if (cctx.txConfig().isTxSerializableEnabled()) {
-            AtomicInt next = new AtomicInt(1);
-
-            boolean loop = true;
-
-            while (loop) {
-                AtomicInt prev = startVerCnts.putIfAbsent(tx.startVersion(), next);
-
-                if (prev == null)
-                    break; // Put succeeded - exit.
-
-                // Previous value was 0, which means that it will be deleted
-                // by another thread in "decrementStartVersionCount(..)" method.
-                // In that case, we delete here too, so we can safely try again.
-                for (;;) {
-                    int p = prev.get();
-
-                    assert p >= 0 : p;
-
-                    if (p == 0) {
-                        if (startVerCnts.remove(tx.startVersion(), prev))
-                            if (log.isDebugEnabled())
-                                log.debug("Removed count from onCreated callback: " + tx);
-
-                        break; // Retry outer loop.
-                    }
-
-                    if (prev.compareAndSet(p, p + 1)) {
-                        loop = false; // Increment succeeded - exit outer loop.
-
-                        break;
-                    }
-                }
-            }
-        }
-
         if (tx.timeout() > 0) {
             cctx.time().addTimeoutObject(tx);
 
@@ -822,117 +720,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
         }
 
-        boolean txSerEnabled = cctx.txConfig().isTxSerializableEnabled();
-
-        // Clean up committed transactions queue.
-        if (tx.pessimistic() && tx.local()) {
-            if (tx.enforceSerializable() && txSerEnabled) {
-                for (Iterator<IgniteInternalTx> it = committedQ.iterator(); it.hasNext();)
{
-                    IgniteInternalTx committedTx = it.next();
-
-                    assert committedTx != tx;
-
-                    // Clean up.
-                    if (isSafeToForget(committedTx))
-                        it.remove();
-                }
-            }
-
-            // Nothing else to do in pessimistic mode.
-            return;
-        }
-
-        if (txSerEnabled && tx.optimistic() && tx.enforceSerializable())
{
-            Set<IgniteTxKey> readSet = tx.readSet();
-            Set<IgniteTxKey> writeSet = tx.writeSet();
-
-            GridCacheVersion startTn = tx.startVersion();
-
-            GridCacheVersion finishTn = cctx.versions().last();
-
-            // Add future to prepare queue only on first prepare call.
-            if (tx.markPreparing())
-                prepareQ.offer(tx);
-
-            // Check that our read set does not intersect with write set
-            // of all transactions that completed their write phase
-            // while our transaction was in read phase.
-            for (Iterator<IgniteInternalTx> it = committedQ.iterator(); it.hasNext();)
{
-                IgniteInternalTx committedTx = it.next();
-
-                assert committedTx != tx;
-
-                // Clean up.
-                if (isSafeToForget(committedTx)) {
-                    it.remove();
-
-                    continue;
-                }
-
-                GridCacheVersion tn = committedTx.endVersion();
-
-                // We only care about transactions
-                // with tn > startTn and tn <= finishTn
-                if (tn.compareTo(startTn) <= 0 || tn.compareTo(finishTn) > 0)
-                    continue;
-
-                if (tx.serializable()) {
-                    if (GridFunc.intersects(committedTx.writeSet(), readSet)) {
-                        tx.setRollbackOnly();
-
-                        throw new IgniteTxOptimisticCheckedException("Failed to prepare transaction
" +
-                            "(committed vs. read-set conflict): " + tx);
-                    }
-                }
-            }
-
-            // Check that our read and write sets do not intersect with write
-            // sets of all active transactions.
-            for (Iterator<IgniteInternalTx> iter = prepareQ.iterator(); iter.hasNext();)
{
-                IgniteInternalTx prepareTx = iter.next();
-
-                if (prepareTx == tx)
-                    // Skip yourself.
-                    continue;
-
-                // Optimistically remove completed transactions.
-                if (prepareTx.done()) {
-                    iter.remove();
-
-                    if (log.isDebugEnabled())
-                        log.debug("Removed finished transaction from active queue: " + prepareTx);
-
-                    continue;
-                }
-
-                // Check if originating node left.
-                if (cctx.discovery().node(prepareTx.nodeId()) == null) {
-                    iter.remove();
-
-                    rollbackTx(prepareTx);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Removed and rolled back transaction because sender node
left grid: " +
-                            CU.txString(prepareTx));
-
-                    continue;
-                }
-
-                if (tx.serializable() && !prepareTx.isRollbackOnly()) {
-                    Set<IgniteTxKey> prepareWriteSet = prepareTx.writeSet();
-
-                    if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) {
-                        // Remove from active set.
-                        iter.remove();
-
-                        tx.setRollbackOnly();
-
-                        throw new IgniteTxOptimisticCheckedException(
-                            "Failed to prepare transaction (read-set/write-set conflict):
" + tx);
-                    }
-                }
-            }
-        }
+        if (tx.pessimistic() && tx.local())
+            return; // Nothing else to do in pessimistic mode.
 
         // Optimistic.
         assert tx.optimistic() || !tx.local();
@@ -945,40 +734,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @param tx Transaction to check.
-     * @return {@code True} if transaction can be discarded.
-     */
-    private boolean isSafeToForget(IgniteInternalTx tx) {
-        Map.Entry<GridCacheVersion, AtomicInt> e = startVerCnts.firstEntry();
-
-        if (e == null)
-            return true;
-
-        assert e.getValue().get() >= 0;
-
-        return tx.endVersion().compareTo(e.getKey()) <= 0;
-    }
-
-    /**
-     * Decrement start version count.
-     *
-     * @param tx Cache transaction.
-     */
-    private void decrementStartVersionCount(IgniteInternalTx tx) {
-        AtomicInt cnt = startVerCnts.get(tx.startVersion());
-
-        assert cnt != null : "Failed to find start version count for transaction [startVerCnts="
+ startVerCnts +
-            ", tx=" + tx + ']';
-
-        assert cnt.get() > 0;
-
-        if (cnt.decrementAndGet() == 0)
-            if (startVerCnts.remove(tx.startVersion(), cnt))
-                if (log.isDebugEnabled())
-                    log.debug("Removed start version for transaction: " + tx);
-    }
-
-    /**
      * @param tx Transaction.
      */
     private void removeObsolete(IgniteInternalTx tx) {
@@ -1237,6 +992,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param tx Transaction.
+     * @return {@code True} if transaction read entries should be unlocked.
+     */
+    private boolean unlockReadEntries(IgniteInternalTx tx) {
+        if (tx.pessimistic())
+            return !tx.readCommitted();
+        else
+            return tx.serializable();
+    }
+
+    /**
      * Commits a transaction.
      *
      * @param tx Transaction to commit.
@@ -1290,8 +1056,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             // 4. Unlock write resources.
             unlockMultiple(tx, tx.writeEntries());
 
-            // 5. For pessimistic transaction, unlock read resources if required.
-            if (tx.pessimistic() && !tx.readCommitted())
+            // 5. Unlock read resources if required.
+            if (unlockReadEntries(tx))
                 unlockMultiple(tx, tx.readEntries());
 
             // 6. Notify evictions.
@@ -1303,25 +1069,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
             // 8. Assign transaction number at the end of transaction.
             tx.endVersion(cctx.versions().next(tx.topologyVersion()));
 
-            // 9. Clean start transaction number for this transaction.
-            if (cctx.txConfig().isTxSerializableEnabled())
-                decrementStartVersionCount(tx);
-
-            // 10. Add to committed queue only if it is possible
-            //    that this transaction can affect other ones.
-            if (cctx.txConfig().isTxSerializableEnabled() && tx.enforceSerializable()
&& !isSafeToForget(tx))
-                committedQ.add(tx);
-
-            // 11. Remove from per-thread storage.
+            // 9. Remove from per-thread storage.
             clearThreadMap(tx);
 
-            // 12. Unregister explicit locks.
+            // 10. Unregister explicit locks.
             if (!tx.alternateVersions().isEmpty()) {
                 for (GridCacheVersion ver : tx.alternateVersions())
                     idMap.remove(ver);
             }
 
-            // 13. Remove Near-2-DHT mappings.
+            // 11. Remove Near-2-DHT mappings.
             if (tx instanceof GridCacheMappedVersion) {
                 GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion();
 
@@ -1329,10 +1086,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
                     mappedVers.remove(mapped);
             }
 
-            // 14. Clear context.
+            // 12. Clear context.
             resetContext();
 
-            // 15. Update metrics.
+            // 14. Update metrics.
             if (!tx.dht() && tx.local()) {
                 if (!tx.system())
                     cctx.txMetrics().onTxCommit();
@@ -1378,8 +1135,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             // 2. Unlock write resources.
             unlockMultiple(tx, tx.writeEntries());
 
-            // 3. For pessimistic transaction, unlock read resources if required.
-            if (tx.pessimistic() && !tx.readCommitted())
+            // 3. Unlock read resources if required.
+            if (unlockReadEntries(tx))
                 unlockMultiple(tx, tx.readEntries());
 
             // 4. Notify evictions.
@@ -1388,26 +1145,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
             // 5. Remove obsolete entries.
             removeObsolete(tx);
 
-            // 6. Clean start transaction number for this transaction.
-            if (cctx.txConfig().isTxSerializableEnabled())
-                decrementStartVersionCount(tx);
-
-            // 7. Remove from per-thread storage.
+            // 6. Remove from per-thread storage.
             clearThreadMap(tx);
 
-            // 8. Unregister explicit locks.
+            // 7. Unregister explicit locks.
             if (!tx.alternateVersions().isEmpty())
                 for (GridCacheVersion ver : tx.alternateVersions())
                     idMap.remove(ver);
 
-            // 9. Remove Near-2-DHT mappings.
+            // 8. Remove Near-2-DHT mappings.
             if (tx instanceof GridCacheMappedVersion)
                 mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion());
 
-            // 10. Clear context.
+            // 9. Clear context.
             resetContext();
 
-            // 11. Update metrics.
+            // 10. Update metrics.
             if (!tx.dht() && tx.local()) {
                 if (!tx.system())
                     cctx.txMetrics().onTxRollback();
@@ -1445,30 +1198,26 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
             // 1. Unlock write resources.
             unlockMultiple(tx, tx.writeEntries());
 
-            // 2. For pessimistic transaction, unlock read resources if required.
-            if (tx.pessimistic() && !tx.readCommitted())
+            // 2. Unlock read resources if required.
+            if (unlockReadEntries(tx))
                 unlockMultiple(tx, tx.readEntries());
 
             // 3. Notify evictions.
             notifyEvitions(tx);
 
-            // 4. Clean start transaction number for this transaction.
-            if (cctx.txConfig().isTxSerializableEnabled())
-                decrementStartVersionCount(tx);
-
-            // 5. Remove from per-thread storage.
+            // 4. Remove from per-thread storage.
             clearThreadMap(tx);
 
-            // 6. Unregister explicit locks.
+            // 5. Unregister explicit locks.
             if (!tx.alternateVersions().isEmpty())
                 for (GridCacheVersion ver : tx.alternateVersions())
                     idMap.remove(ver);
 
-            // 7. Remove Near-2-DHT mappings.
+            // 6. Remove Near-2-DHT mappings.
             if (tx instanceof GridCacheMappedVersion)
                 mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion());
 
-            // 8. Clear context.
+            // 7. Clear context.
             resetContext();
 
             if (log.isDebugEnabled())
@@ -1635,6 +1384,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         // we wait for the lock.
         long timeout = tx.timeout() == 0 ? 0 : remainingTime;
 
+        GridCacheVersion serOrder = (tx.serializable() && tx.optimistic()) ? tx.nearXidVersion()
: null;
+
         for (IgniteTxEntry txEntry1 : entries) {
             // Check if this entry was prepared before.
             if (!txEntry1.markPrepared() || txEntry1.explicitVersion() != null)
@@ -1649,7 +1400,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter
{
                     assert !entry1.detached() : "Expected non-detached entry for near transaction
" +
                         "[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']';
 
-                    if (!entry1.tmLock(tx, timeout)) {
+                    GridCacheVersion serReadVer = txEntry1.serializableReadVersion();
+
+                    assert serReadVer == null || (tx.optimistic() && tx.serializable())
: txEntry1;
+
+                    if (!entry1.tmLock(tx, timeout, serOrder, serReadVer)) {
                         // Unlock locks locked so far.
                         for (IgniteTxEntry txEntry2 : entries) {
                             if (txEntry2 == txEntry1)

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 36f1c36..68d03cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -53,9 +53,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter
{
     /** Last version. */
     private volatile GridCacheVersion last;
 
-    /** Serializable transaction flag. */
-    private boolean txSerEnabled;
-
     /** Data center ID. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private byte dataCenterId;
@@ -64,6 +61,9 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter
{
     private long gridStartTime;
 
     /** */
+    private GridCacheVersion ISOLATED_STREAMER_VER;
+
+    /** */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
             assert evt.type() == EVT_NODE_METRICS_UPDATED;
@@ -79,8 +79,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter
{
 
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
-        txSerEnabled = cctx.gridConfig().getTransactionConfiguration().isTxSerializableEnabled();
-
         last = new GridCacheVersion(0, 0, order.get(), 0, dataCenterId);
 
         cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED);
@@ -154,6 +152,27 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter
{
     }
 
     /**
+     * Version for entries loaded with isolated streamer, should be less than any version
generated
+     * for entries update.
+     *
+     * @return Version for entries loaded with isolated streamer.
+     */
+    public GridCacheVersion isolatedStreamerVersion() {
+        if (ISOLATED_STREAMER_VER == null) {
+            long topVer = 1;
+
+            if (gridStartTime == 0)
+                gridStartTime = cctx.kernalContext().discovery().gridStartTime();
+
+            topVer += (gridStartTime - TOP_VER_BASE_TIME) / 1000;
+
+            ISOLATED_STREAMER_VER = new GridCacheVersion((int)topVer, 0, 0, 1, dataCenterId);
+        }
+
+        return ISOLATED_STREAMER_VER;
+    }
+
+    /**
      * @return Next version based on current topology.
      */
     public GridCacheVersion next() {
@@ -235,36 +254,18 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter
{
 
         int locNodeOrder = (int)cctx.localNode().order();
 
-        if (txSerEnabled) {
-            synchronized (this) {
-                long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet();
-
-                GridCacheVersion next = new GridCacheVersion(
-                    (int)topVer,
-                    globalTime,
-                    ord,
-                    locNodeOrder,
-                    dataCenterId);
-
-                last = next;
+        long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet();
 
-                return next;
-            }
-        }
-        else {
-            long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet();
-
-            GridCacheVersion next = new GridCacheVersion(
-                (int)topVer,
-                globalTime,
-                ord,
-                locNodeOrder,
-                dataCenterId);
+        GridCacheVersion next = new GridCacheVersion(
+            (int)topVer,
+            globalTime,
+            ord,
+            locNodeOrder,
+            dataCenterId);
 
-            last = next;
+        last = next;
 
-            return next;
-        }
+        return next;
     }
 
     /**
@@ -273,12 +274,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter
{
      * @return Last generated version.
      */
     public GridCacheVersion last() {
-        if (txSerEnabled) {
-            synchronized (this) {
-                return last;
-            }
-        }
-        else
-            return last;
+        return last;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index ab2a6e8..2190bf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1556,7 +1556,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
             AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
 
-            GridCacheVersion ver = cctx.versions().next(topVer);
+            GridCacheVersion ver = cctx.versions().isolatedStreamerVersion();
 
             long ttl = CU.TTL_ETERNAL;
             long expiryTime = CU.EXPIRE_TIME_ETERNAL;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
index a6e960d..6c4e894 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
@@ -54,7 +54,7 @@ import org.apache.ignite.lang.IgniteUuid;
  *  Read access with this level happens the same way as with {@link TransactionIsolation#REPEATABLE_READ}
level.
  *  However, in {@link TransactionConcurrency#OPTIMISTIC} mode, if some transactions cannot
be serially isolated
  *  from each other, then one winner will be picked and the other transactions in conflict
will result in
- * {@link org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException} being
thrown.
+ * {@link TransactionOptimisticException} being thrown.
  * </li>
  * </ul>
  * <p>

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
index d7671f0..c3be3c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
@@ -42,8 +42,7 @@ public enum TransactionIsolation {
      * @param ord Ordinal value.
      * @return Enumerated value or {@code null} if ordinal out of range.
      */
-    @Nullable
-    public static TransactionIsolation fromOrdinal(int ord) {
+    @Nullable public static TransactionIsolation fromOrdinal(int ord) {
         return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
new file mode 100644
index 0000000..c2f9fab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionOptimisticException;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheNearReaderUpdateTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static final int SRVS = 4;
+
+    /** */
+    private static final int CLIENTS = 3;
+
+    /** */
+    private static Map<Integer, Integer> storeMap = new ConcurrentHashMap8<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(SRVS);
+
+        client = true;
+
+        startGridsMultiThreaded(SRVS, CLIENTS);
+
+        client = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60_000;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetUpdateMultithreaded() throws Exception {
+        List<CacheConfiguration<Integer, Integer>> cfgs = new ArrayList<>();
+
+        cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false));
+        cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false));
+        cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true));
+        cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false));
+
+        {
+            CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED,
FULL_SYNC, 1, false, false);
+
+            GridTestUtils.setMemoryMode(null, ccfg, GridTestUtils.TestMemoryMode.OFFHEAP_TIERED,
0, 0);
+
+            cfgs.add(ccfg);
+        }
+
+        final List<Ignite> putNodes = new ArrayList<>();
+
+        for (int i = 0; i < SRVS + CLIENTS - 1; i++)
+            putNodes.add(ignite(i));
+
+        final List<Ignite> getNodes = new ArrayList<>();
+
+        getNodes.add(ignite(SRVS + CLIENTS - 1));
+        getNodes.add(ignite(0));
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cfgs) {
+            logCacheInfo(ccfg);
+
+            getUpdateMultithreaded(ccfg, putNodes, getNodes, null, null);
+
+            if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+                getUpdateMultithreaded(ccfg, putNodes, getNodes, PESSIMISTIC,  REPEATABLE_READ);
+
+                getUpdateMultithreaded(ccfg, putNodes, getNodes, OPTIMISTIC,  REPEATABLE_READ);
+
+                getUpdateMultithreaded(ccfg, putNodes, getNodes, OPTIMISTIC,  SERIALIZABLE);
+            }
+        }
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param putNodes Nodes executing updates.
+     * @param getNodes Nodes executing gets.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void getUpdateMultithreaded(CacheConfiguration<Integer, Integer> ccfg,
+        final List<Ignite> putNodes,
+        final List<Ignite> getNodes,
+        final TransactionConcurrency concurrency,
+        final TransactionIsolation isolation) throws Exception {
+        log.info("Execute updates [concurrency=" + concurrency + ", isolation=" + isolation
+ ']');
+
+        final Ignite ignite0 = ignite(0);
+
+        final String cacheName = ignite0.createCache(ccfg).getName();
+
+        try {
+            for (int i = 0; i < 5; i++) {
+                final Integer key = i;
+
+                final AtomicInteger putThreadIdx = new AtomicInteger();
+                final AtomicInteger getThreadIdx = new AtomicInteger();
+
+                final int PUT_THREADS = 20;
+                final int GET_THREAD = 20;
+
+                final CyclicBarrier barrier = new CyclicBarrier(PUT_THREADS + GET_THREAD);
+
+                final IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new
Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        int idx = putThreadIdx.getAndIncrement() % putNodes.size();
+
+                        Ignite ignite = putNodes.get(idx);
+
+                        IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
+
+                        IgniteTransactions txs = ignite.transactions();
+
+                        Thread.currentThread().setName("update-thread-" + ignite.name());
+
+                        barrier.await();
+
+                        for (int i = 0; i < 100; i++) {
+                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                            if (concurrency != null) {
+                                try (Transaction tx = txs.txStart(concurrency, isolation))
{
+                                    cache.put(key, rnd.nextInt());
+
+                                    tx.commit();
+                                }
+                                catch (TransactionOptimisticException ignore) {
+                                    assertEquals(concurrency, OPTIMISTIC);
+                                    assertEquals(isolation, SERIALIZABLE);
+                                }
+                            }
+                            else
+                                cache.put(key, rnd.nextInt());
+                        }
+
+                        return null;
+                    }
+                }, PUT_THREADS, "update-thread");
+
+                IgniteInternalFuture<?> getFut = GridTestUtils.runMultiThreadedAsync(new
Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        int idx = getThreadIdx.getAndIncrement() % getNodes.size();
+
+                        Ignite ignite = getNodes.get(idx);
+
+                        IgniteCache<Integer, Integer> cache;
+
+                        if (ignite.configuration().isClientMode())
+                            cache = ignite.createNearCache(cacheName, new NearCacheConfiguration<Integer,
Integer>());
+                        else
+                            cache = ignite.cache(cacheName);
+
+                        Thread.currentThread().setName("get-thread-" + ignite.name());
+
+                        barrier.await();
+
+                        while (!updateFut.isDone())
+                            cache.get(key);
+
+                        return null;
+                    }
+                }, GET_THREAD, "get-thread");
+
+                updateFut.get();
+                getFut.get();
+
+                Integer val = (Integer)ignite0.cache(cacheName).get(key);
+
+                log.info("Iteration [iter=" + i + ", val=" + val + ']');
+
+                for (Ignite getNode : getNodes) {
+                    IgniteCache<Integer, Integer> cache = getNode.cache(cacheName);
+
+                    if (getNode.configuration().isClientMode() ||
+                        cache.getConfiguration(CacheConfiguration.class).getNearConfiguration()
!= null)
+                    assertNotNull(getNode.cache(cacheName).localPeek(key));
+                }
+
+                checkValue(key, val, cacheName);
+
+                for (int n = 0; n < SRVS + CLIENTS; n++) {
+                    val = n;
+
+                    ignite(n).cache(cacheName).put(key, val);
+
+                    checkValue(key, val, cacheName);
+                }
+            }
+        }
+        finally {
+            destroyCache(ignite0, cacheName);
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @param expVal Expected value.
+     * @param cacheName Cache name.
+     */
+    private void checkValue(Object key, Object expVal, String cacheName) {
+        for (int i = 0; i < SRVS + CLIENTS; i++) {
+            IgniteCache<Object, Object> cache = ignite(i).cache(cacheName);
+
+            assertEquals(expVal, cache.get(key));
+        }
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     */
+    private void logCacheInfo(CacheConfiguration<?, ?> ccfg) {
+        log.info("Test cache [mode=" + ccfg.getCacheMode() +
+            ", sync=" + ccfg.getWriteSynchronizationMode() +
+            ", backups=" + ccfg.getBackups() +
+            ", memMode=" + ccfg.getMemoryMode() +
+            ", near=" + (ccfg.getNearConfiguration() != null) +
+            ", store=" + ccfg.isWriteThrough() +
+            ", evictPlc=" + (ccfg.getEvictionPolicy() != null) +
+            ", swap=" + ccfg.isSwapEnabled()  +
+            ", maxOffheap=" + ccfg.getOffHeapMaxMemory()  +
+            ']');
+    }
+
+    /**
+     * @param ignite Node.
+     * @param cacheName Cache name.
+     */
+    private void destroyCache(Ignite ignite, String cacheName) {
+        storeMap.clear();
+
+        ignite.destroyCache(cacheName);
+
+        for (Ignite ignite0 : G.allGrids()) {
+            GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite0.configuration().getSwapSpaceSpi();
+
+            spi.clearAll();
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @param storeEnabled If {@code true} adds cache store.
+     * @param nearCache If {@code true} near cache is enabled.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(
+        CacheMode cacheMode,
+        CacheWriteSynchronizationMode syncMode,
+        int backups,
+        boolean storeEnabled,
+        boolean nearCache) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(syncMode);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        if (storeEnabled) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setWriteThrough(true);
+            ccfg.setReadThrough(true);
+        }
+
+        if (nearCache)
+            ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>>
{
+        /** {@inheritDoc} */
+        @Override public CacheStore<Integer, Integer> create() {
+            return new CacheStoreAdapter<Integer, Integer>() {
+                @Override public Integer load(Integer key) throws CacheLoaderException {
+                    return storeMap.get(key);
+                }
+
+                @Override public void write(Cache.Entry<? extends Integer, ? extends Integer>
entry) {
+                    storeMap.put(entry.getKey(), entry.getValue());
+                }
+
+                @Override public void delete(Object key) {
+                    storeMap.remove(key);
+                }
+            };
+        }
+    }
+}


Mime
View raw message