ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [28/36] ignite git commit: ignite-3484
Date Fri, 08 Sep 2017 12:36:00 GMT
ignite-3484


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/91b99117
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/91b99117
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/91b99117

Branch: refs/heads/ignite-6149
Commit: 91b9911731a387a3199ddbbc22704bc14af09995
Parents: c966451
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Sep 6 12:22:22 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Sep 6 17:34:10 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    |  10 +-
 .../mvcc/CacheCoordinatorsSharedManager.java    | 113 +++--
 .../processors/cache/tree/CacheDataTree.java    |   2 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 424 ++++++++++++++++++-
 4 files changed, 515 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/91b99117/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index ed52b85..9a4b17b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1552,18 +1552,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-            // TODO IGNITE-3484: need special findCeiling method.
+            // TODO IGNITE-3484: need special method.
 
             GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId,
key, topVer, mvccCntr),
-                null,
-                CacheDataRowAdapter.RowData.NO_KEY);
+                new MvccSearchRow(cacheId, key, topVer + 1, mvccCntr)/*,
+                CacheDataRowAdapter.RowData.NO_KEY*/);
 
             CacheDataRow row = null;
 
             if (cur.next())
                 row = cur.get();
 
-            afterRowFound(row, key);
+            assert row == null || key.equals(row.key());
+
+            //afterRowFound(row, key);
 
             return row;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/91b99117/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index 807d18a..f780922 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -34,6 +36,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -52,15 +55,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     private final CoordinatorAssignmentHistory assignHist = new CoordinatorAssignmentHistory();
 
     /** */
-    private final AtomicLong mvccCntr = new AtomicLong(0L);
+    private final AtomicLong mvccCntr = new AtomicLong(1L);
 
     /** */
-    private final AtomicLong committedCntr = new AtomicLong(0L);
+    private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
 
     /** */
     private final ConcurrentHashMap<GridCacheVersion, Long> activeTxs = new ConcurrentHashMap<>();
 
     /** */
+    private final Map<Long, Integer> activeQueries = new HashMap<>();
+
+    /** */
     private final ConcurrentMap<Long, MvccCounterFuture> cntrFuts = new ConcurrentHashMap<>();
 
     /** */
@@ -210,21 +216,6 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         }
     }
 
-
-    /**
-     * @param txId Transaction ID.
-     * @return Counter.
-     */
-    private long assignTxCounter(GridCacheVersion txId) {
-        long nextCtr = mvccCntr.incrementAndGet();
-
-        Object old = activeTxs.put(txId, nextCtr);
-
-        assert old == null : txId;
-
-        return nextCtr;
-    }
-
     /**
      * @param nodeId Sender node ID.
      * @param msg Message.
@@ -322,7 +313,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param msg Message.
      */
     private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg)
{
-        activeTxs.remove(msg.txId());
+        onTxDone(msg.txId());
 
         if (!msg.skipResponse()) {
             try {
@@ -359,19 +350,93 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     }
 
     /**
+     * @param txId Transaction ID.
+     * @return Counter.
+     */
+    private synchronized long assignTxCounter(GridCacheVersion txId) {
+        long nextCtr = mvccCntr.getAndIncrement();
+
+        Object old = activeTxs.put(txId, nextCtr);
+
+        assert old == null : txId;
+
+        return nextCtr;
+    }
+
+    /**
+     * @param txId Transaction ID.
+     */
+    private synchronized void onTxDone(GridCacheVersion txId) {
+        Long cntr = activeTxs.remove(txId);
+
+        assert cntr != null;
+
+        committedCntr.setIfGreater(cntr);
+    }
+
+    /**
      * @param qryNodeId Node initiated query.
      * @return Counter for query.
      */
-    private long assignQueryCounter(UUID qryNodeId) {
-        // TODO IGNITE-3478
-        return 3;
+    private synchronized long assignQueryCounter(UUID qryNodeId) {
+        Long mvccCntr = committedCntr.get();
+
+        Long minActive = minActiveTx();
+
+        if (minActive != null && minActive < mvccCntr)
+            mvccCntr = minActive - 1;
+
+        Integer queries = activeQueries.get(mvccCntr);
+
+        if (queries != null)
+            activeQueries.put(mvccCntr, queries + 1);
+        else
+            activeQueries.put(mvccCntr, 1);
+
+        return mvccCntr;
     }
 
     /**
-     * @param cntr Query counter.
+     * @param mvccCntr Query counter.
      */
-    private void onQueryDone(long cntr) {
-        // TODO IGNITE-3478
+    private synchronized void onQueryDone(long mvccCntr) {
+        Integer queries = activeQueries.get(mvccCntr);
+
+        assert queries != null : mvccCntr;
+
+        int left = queries - 1;
+
+        assert left >= 0 : left;
+
+        if (left == 0)
+            activeQueries.remove(mvccCntr);
+    }
+
+    private synchronized long cleanupVersion() {
+        long cntr = committedCntr.get();
+
+        Long minActive = minActiveTx();
+
+        if (minActive != null && minActive < cntr)
+            cntr = minActive - 1;
+
+        for (Long qryCntr : activeQueries.keySet()) {
+            if (qryCntr <= cntr)
+                cntr = qryCntr - 1;
+        }
+
+        return cntr;
+    }
+
+    @Nullable private Long minActiveTx() {
+        Long min = null;
+
+        for (Map.Entry<GridCacheVersion, Long> e : activeTxs.entrySet()) {
+            if (min == null || e.getValue() < min)
+                min = e.getValue();
+        }
+
+        return min;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/91b99117/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index e846768..4b4860b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -163,7 +163,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow>
{
         cmp = Long.compare(row.mvccUpdateTopologyVersion(), mvccTopVer);
 
         if (cmp != 0)
-            return 0;
+            return cmp;
 
         long mvccCntr = io.getMvccUpdateCounter(pageAddr, idx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/91b99117/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 19f1dc7..4c6b206 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -18,21 +18,34 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -158,7 +171,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
         startGridsMultiThreaded(SRVS);
 
         try {
-            for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations())
{
+            for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) {
                 logCacheInfo(ccfg);
 
                 ignite(0).createCache(ccfg);
@@ -211,10 +224,383 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testSimplePutGetAll() throws Exception {
+        Ignite node = startGrid(0);
+
+        IgniteTransactions txs = node.transactions();
+
+        final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED,
FULL_SYNC, 0));
+
+        final int KEYS = 10_000;
+
+        Set<Integer> keys = new HashSet<>();
+
+        for (int k = 0; k < KEYS; k++)
+            keys.add(k);
+
+        Map<Object, Object> map = cache.getAll(keys);
+
+        assertTrue(map.isEmpty());
+
+        for (int v = 0; v < 3; v++) {
+            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                for (int k = 0; k < KEYS; k++) {
+                    if (k % 2 == 0)
+                        cache.put(k, v);
+                }
+
+                tx.commit();
+            }
+
+            map = cache.getAll(keys);
+
+            for (int k = 0; k < KEYS; k++) {
+                if (k % 2 == 0)
+                    assertEquals(v, map.get(k));
+                else
+                    assertNull(map.get(k));
+            }
+
+            assertEquals(KEYS / 2, map.size());
+
+            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                map = cache.getAll(keys);
+
+                for (int k = 0; k < KEYS; k++) {
+                    if (k % 2 == 0)
+                        assertEquals(v, map.get(k));
+                    else
+                        assertNull(map.get(k));
+                }
+
+                assertEquals(KEYS / 2, map.size());
+
+                tx.commit();
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll() throws Exception {
+        final int RANGE = 20;
+
+        final long time = 10_000;
+
+        final int writers = 4;
+
+        final int readers = 4;
+
+        GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean> writer
=
+            new GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean>()
{
+            @Override public void apply(Integer idx, IgniteCache<Object, Object> cache,
AtomicBoolean stop) {
+                final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                int min = idx * RANGE;
+                int max = min + RANGE;
+
+                info("Thread range [min=" + min + ", max=" + max + ']');
+
+                Map<Integer, Integer> map = new HashMap<>();
+
+                int v = idx * 1_000_000;
+
+                while (!stop.get()) {
+                    while (map.size() < RANGE)
+                        map.put(rnd.nextInt(min, max), v);
+
+                    try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                        cache.putAll(map);
+
+                        tx.commit();
+                    }
+
+                    map.clear();
+
+                    v++;
+                }
+
+                info("Writer done, updates: " + v);
+            }
+        };
+
+        GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean> reader
=
+            new GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean>()
{
+                @Override public void apply(Integer idx, IgniteCache<Object, Object>
cache, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Set<Integer> keys = new LinkedHashSet<>();
+
+                    Map<Integer, Set<Integer>> uniqueReads = new HashMap<>();
+
+                    for (int i = 0; i < writers; i++)
+                        uniqueReads.put(i, new HashSet<Integer>());
+
+                    while (!stop.get()) {
+                        int range = rnd.nextInt(0, writers);
+
+                        int min = range * RANGE;
+                        int max = min + RANGE;
+
+                        while (keys.size() < RANGE)
+                            keys.add(rnd.nextInt(min, max));
+
+                        Map<Object, Object> map = cache.getAll(keys);
+
+                        assertTrue("Invalid map size: " + map.size(),
+                            map.isEmpty() || map.size() == RANGE);
+
+                        Integer val0 = null;
+
+                        for (Map.Entry<Object, Object> e: map.entrySet()) {
+                            Object val = e.getValue();
+
+                            assertNotNull(val);
+
+                            if (val0 == null) {
+                                uniqueReads.get(range).add((Integer)val);
+
+                                val0 = (Integer)val;
+                            }
+                            else {
+                                if (!F.eq(val0, val)) {
+                                    assertEquals("Unexpected value [range=" + range + ",
key=" + e.getKey() + ']',
+                                        val0,
+                                        val);
+                                }
+                            }
+                        }
+
+                        keys.clear();
+                    }
+
+                    info("Reader done, unique reads: ");
+
+                    for (Map.Entry<Integer, Set<Integer>> e : uniqueReads.entrySet())
+                        info("Range [idx=" + e.getKey() + ", uniqueReads=" + e.getValue().size()
+ ']');
+                }
+            };
+
+        readWriteTest(time, writers, readers, null, writer, reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsSumGetAll() throws Exception {
+        final int ACCOUNTS = 20;
+
+        final int ACCOUNT_START_VAL = 1000;
+
+        final long time = 10_000;
+
+        final int writers = 1;
+
+        final int readers = 1;
+
+        final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object,
Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                Map<Integer, Account> accounts = new HashMap<>();
+
+                for (int i = 0; i < ACCOUNTS; i++)
+                    accounts.put(i, new Account(ACCOUNT_START_VAL));
+
+                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.putAll(accounts);
+
+                    tx.commit();
+                }
+            }
+        };
+
+        GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean> writer
=
+            new GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean>()
{
+                @Override public void apply(Integer idx, IgniteCache<Object, Object>
cache, AtomicBoolean stop) {
+                    final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        cnt++;
+
+                        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ))
{
+                            Integer id1 = rnd.nextInt(ACCOUNTS);
+                            Integer id2 = rnd.nextInt(ACCOUNTS);
+
+                            if (id1.equals(id2))
+                                continue;
+
+                            Account a1;
+                            Account a2;
+
+                            TreeSet<Integer> keys = new TreeSet<>();
+
+                            keys.add(id1);
+                            keys.add(id2);
+
+                            Map<Object, Object> accounts = cache.getAll(keys);
+
+                            a1 = (Account)accounts.get(id1);
+                            a2 = (Account)accounts.get(id1);
+
+                            assertNotNull(a1);
+                            assertNotNull(a2);
+
+                            cache.put(id1, new Account(a1.val + 1));
+                            cache.put(id2, new Account(a2.val - 1));
+
+                            tx.commit();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean> reader
=
+            new GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean>()
{
+                @Override public void apply(Integer idx, IgniteCache<Object, Object>
cache, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Set<Integer> keys = new LinkedHashSet<>();
+
+                    while (!stop.get()) {
+                        while (keys.size() < ACCOUNTS)
+                            keys.add(rnd.nextInt(ACCOUNTS));
+
+                        Map<Object, Object> accounts = cache.getAll(keys);
+
+                        assertEquals(ACCOUNTS, accounts.size());
+
+                        int sum = 0;
+
+                        for (int i = 0; i < ACCOUNTS; i++) {
+                            Account account = (Account)accounts.get(i);
+
+                            assertNotNull(account);
+
+                            sum += account.val;
+                        }
+
+                        assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+                    }
+
+                    if (idx == 0) {
+                        Map<Object, Object> accounts = cache.getAll(keys);
+
+                        int sum = 0;
+
+                        for (int i = 0; i < ACCOUNTS; i++) {
+                            Account account = (Account)accounts.get(i);
+
+                            info("Account [id=" + i + ", val=" + account.val + ']');
+
+                            sum += account.val;
+                        }
+
+                        info("Sum: " + sum);
+                    }
+                }
+            };
+
+        readWriteTest(time, writers, readers, init, writer, reader);
+    }
+
+    /**
+     * @param time Test time.
+     * @param writers Number of writers.
+     * @param readers Number of readers.
+     * @param writer Writers threads closure.
+     * @param reader Readers threads closure.
+     * @throws Exception If failed.
+     */
+    private void readWriteTest(final long time,
+        final int writers,
+        final int readers,
+        IgniteInClosure<IgniteCache<Object, Object>> init,
+        final GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean>
writer,
+        final GridInClosure3<Integer, IgniteCache<Object, Object>, AtomicBoolean>
reader) throws Exception {
+        final Ignite ignite = startGrid(0);
+
+        final IgniteCache<Object, Object> cache = ignite.createCache(cacheConfiguration(PARTITIONED,
FULL_SYNC, 0));
+
+        if (init != null)
+            init.apply(cache);
+
+        final long stopTime = U.currentTimeMillis() + time;
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        try {
+            final AtomicInteger writerIdx = new AtomicInteger();
+
+            IgniteInternalFuture<?> writeFut = GridTestUtils.runMultiThreadedAsync(new
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try {
+                        int idx = writerIdx.getAndIncrement();
+
+                        writer.apply(idx, cache, stop);
+                    }
+                    catch (Throwable e) {
+                        error("Unexpected error: " + e, e);
+
+                        stop.set(true);
+
+                        fail("Unexpected error: " + e);
+                    }
+
+                    return null;
+                }
+            }, writers, "writer");
+
+            final AtomicInteger readerIdx = new AtomicInteger();
+
+            IgniteInternalFuture<?> readFut = GridTestUtils.runMultiThreadedAsync(new
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try {
+                        int idx = readerIdx.getAndIncrement();
+
+                        reader.apply(idx, cache, stop);
+                    }
+                    catch (Throwable e) {
+                        error("Unexpected error: " + e, e);
+
+                        stop.set(true);
+
+                        fail("Unexpected error: " + e);
+                    }
+
+                    return null;
+                }
+            }, readers, "reader");
+
+            while (System.currentTimeMillis() < stopTime && !stop.get())
+                Thread.sleep(1000);
+
+            stop.set(true);
+
+            writeFut.get();
+            readFut.get();
+        }
+        finally {
+            stop.set(true);
+        }
+    }
+
+    /**
      * @return Cache configurations.
      */
-    private List<CacheConfiguration<Integer, Integer>> cacheConfigurations()
{
-        List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
+    private List<CacheConfiguration<Object, Object>> cacheConfigurations() {
+        List<CacheConfiguration<Object, Object>> ccfgs = new ArrayList<>();
 
         ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0));
         ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1));
@@ -262,16 +648,17 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
      * @param backups Number of backups.
      * @return Cache configuration.
      */
-    private CacheConfiguration<Integer, Integer> cacheConfiguration(
+    private CacheConfiguration<Object, Object> cacheConfiguration(
         CacheMode cacheMode,
         CacheWriteSynchronizationMode syncMode,
         int backups) {
-        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
         ccfg.setCacheMode(cacheMode);
         ccfg.setAtomicityMode(TRANSACTIONAL);
         ccfg.setWriteSynchronizationMode(syncMode);
         ccfg.setMvccEnabled(true);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 1));
 
         if (cacheMode == PARTITIONED)
             ccfg.setBackups(backups);
@@ -299,4 +686,31 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
             assertTrue(ackFuts.isEmpty());
         }
     }
+
+    /**
+     *
+     */
+    static class Account {
+        /** */
+        private final int val;
+
+        /**
+         * @param val Value.
+         */
+        public Account(int val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public int value() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Account.class, this);
+        }
+    }
 }


Mime
View raw message