ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5937
Date Tue, 10 Oct 2017 15:27:08 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5937 bb969db04 -> 3255ce228


ignite-5937


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3255ce22
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3255ce22
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3255ce22

Branch: refs/heads/ignite-5937
Commit: 3255ce22813ffc056001ab78df4bf8a2a747365f
Parents: bb969db
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Oct 10 16:33:37 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Oct 10 18:21:14 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManager.java        |  43 +++++--
 .../cache/IgniteCacheOffheapManagerImpl.java    |  32 ++++-
 .../distributed/dht/GridDhtCacheEntry.java      |   5 +-
 .../persistence/GridCacheOffheapManager.java    |   7 ++
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 125 ++++++++++++++-----
 5 files changed, 168 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3255ce22/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9e3d0fb..2c070fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -189,6 +189,21 @@ public interface IgniteCacheOffheapManager {
         throws IgniteCheckedException;
 
     /**
+     * @param entry Entry.
+     * @param val Value.
+     * @param ver Version.
+     * @param mvccVer Mvcc update version.
+     * @return {@code True} if value was inserted.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean mvccInitialValue(
+        GridCacheMapEntry entry,
+        @Nullable CacheObject val,
+        GridCacheVersion ver,
+        MvccCoordinatorVersion mvccVer
+    ) throws IgniteCheckedException;
+
+    /**
      * @param primary {@code True} if on primary node.
      * @param entry Entry.
      * @param val Value.
@@ -220,18 +235,10 @@ public interface IgniteCacheOffheapManager {
 
     /**
      * @param entry Entry.
-     * @param val Value.
-     * @param ver Version.
-     * @param mvccVer Mvcc update version.
-     * @return {@code True} if value was inserted.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean mvccInitialValue(
-        GridCacheMapEntry entry,
-        @Nullable CacheObject val,
-        GridCacheVersion ver,
-        MvccCoordinatorVersion mvccVer
-    ) throws IgniteCheckedException;
+    public void mvccRemoveAll(GridCacheMapEntry entry)
+        throws IgniteCheckedException;
 
     /**
      * @param cctx Cache context.
@@ -542,6 +549,7 @@ public interface IgniteCacheOffheapManager {
 
         /**
          * @param cctx Cache context.
+         * @param primary {@code True} if update is executed on primary node.
          * @param key Key.
          * @param val Value.
          * @param ver Version.
@@ -557,6 +565,14 @@ public interface IgniteCacheOffheapManager {
             GridCacheVersion ver,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
 
+        /**
+         * @param cctx Cache context.
+         * @param primary {@code True} if update is executed on primary node.
+         * @param key Key.
+         * @param mvccVer Mvcc version.
+         * @return List of transactions to wait for.
+         * @throws IgniteCheckedException If failed.
+         */
         @Nullable GridLongList mvccRemove(
             GridCacheContext cctx,
             boolean primary,
@@ -568,6 +584,13 @@ public interface IgniteCacheOffheapManager {
          * @param key Key.
          * @throws IgniteCheckedException If failed.
          */
+        void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
+
+        /**
+         * @param cctx Cache context.
+         * @param key Key.
+         * @throws IgniteCheckedException If failed.
+         */
         void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3255ce22/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 4fb5bfd..2bff203 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -416,8 +416,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         boolean primary,
         GridCacheMapEntry entry,
         MvccCoordinatorVersion mvccVer
-    )
-        throws IgniteCheckedException {
+    ) throws IgniteCheckedException {
         return dataStore(entry.localPartition()).mvccRemove(entry.context(),
             primary,
             entry.key(),
@@ -425,6 +424,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
+    @Override public void mvccRemoveAll(GridCacheMapEntry entry) throws IgniteCheckedException
{
+        dataStore(entry.localPartition()).mvccRemoveAll(entry.context(), entry.key());
+    }
+
+    /** {@inheritDoc} */
     @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition
part)
         throws IgniteCheckedException {
         dataStore(part).updateIndexes(cctx, key);
@@ -1558,6 +1562,30 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             }
         }
 
+        /** {@inheritDoc} */
+        @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws
IgniteCheckedException {
+            key.valueBytes(cctx.cacheObjectContext());
+
+            int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+            GridCursor<CacheDataRow> cur = dataTree.find(
+                new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
+                new MvccSearchRow(cacheId, key, 1, 1),
+                CacheDataRowAdapter.RowData.KEY_ONLY);
+
+            while (cur.next()) {
+                CacheDataRow row = cur.get();
+
+                assert row.link() != 0;
+
+                boolean rmvd = dataTree.removex(row);
+
+                assert rmvd;
+
+                rowStore.removeRow(row.link());
+            }
+        }
+
         /**
          * @param cleanupRows Rows to cleanup.
          * @param findRmv {@code True} if need keep removed row entry.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3255ce22/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 77cc642..a3309a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -585,7 +585,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
                         ']');
                 }
 
-                removeValue();
+                if (cctx.mvccEnabled())
+                    cctx.offheap().mvccRemoveAll(this);
+                else
+                    removeValue();
 
                 // Give to GC.
                 update(null, 0L, 0L, ver, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3255ce22/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 45b78d4..e5a9736 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1294,6 +1294,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl
imple
         }
 
         /** {@inheritDoc} */
+        @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws
IgniteCheckedException {
+            CacheDataStore delegate = init0(false);
+
+            delegate.mvccRemoveAll(cctx, key);
+        }
+
+        /** {@inheritDoc} */
         @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws
IgniteCheckedException {
             CacheDataStore delegate = init0(false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3255ce22/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 1abc116..35ceed1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1296,42 +1296,49 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_SingleNode() throws Exception {
-        accountsTxGetAll(1, 0, 0, 64, ReadMode.GET_ALL);
+        accountsTxGetAll(1, 0, 0, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_SingleNode_SinglePartition() throws Exception {
-        accountsTxGetAll(1, 0, 0, 1, ReadMode.GET_ALL);
+        accountsTxGetAll(1, 0, 0, 1, false, ReadMode.GET_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxGetAll_WithRemoves_SingleNode_SinglePartition() throws Exception
{
+        accountsTxGetAll(1, 0, 0, 1, true, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups0() throws Exception {
-        accountsTxGetAll(4, 2, 0, 64, ReadMode.GET_ALL);
+        accountsTxGetAll(4, 2, 0, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups1() throws Exception {
-        accountsTxGetAll(4, 2, 1, 64, ReadMode.GET_ALL);
+        accountsTxGetAll(4, 2, 1, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups2() throws Exception {
-        accountsTxGetAll(4, 2, 2, 64, ReadMode.GET_ALL);
+        accountsTxGetAll(4, 2, 2, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxScan_SingleNode_SinglePartition() throws Exception {
-        accountsTxGetAll(1, 0, 0, 1, ReadMode.SCAN);
+        accountsTxGetAll(1, 0, 0, 1, false, ReadMode.SCAN);
     }
 
     /**
@@ -1339,6 +1346,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
      * @param cacheParts Number of cache partitions.
+     * @param withRmvs If {@code true} then in addition to puts tests also executes removes.
      * @param readMode Read mode.
      * @throws Exception If failed.
      */
@@ -1347,6 +1355,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
         final int clients,
         int cacheBackups,
         int cacheParts,
+        final boolean withRmvs,
         final ReadMode readMode
     )
         throws Exception
@@ -1376,6 +1385,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
             }
         };
 
+        final Set<Integer> rmvdIds = new HashSet<>();
+
         GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer =
             new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
                 @Override public void apply(Integer idx, List<IgniteCache> caches,
AtomicBoolean stop) {
@@ -1400,8 +1411,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
                         keys.add(id1);
                         keys.add(id2);
 
-                        Integer cntr1;
-                        Integer cntr2;
+                        Integer cntr1 = null;
+                        Integer cntr2 = null;
 
                         try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ))
{
                             MvccTestAccount a1;
@@ -1412,28 +1423,74 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
                             a1 = accounts.get(id1);
                             a2 = accounts.get(id2);
 
-                            assertNotNull(a1);
-                            assertNotNull(a2);
+                            if (!withRmvs) {
+                                assertNotNull(a1);
+                                assertNotNull(a2);
 
-                            cntr1 = a1.updateCnt + 1;
-                            cntr2 = a2.updateCnt + 1;
+                                cntr1 = a1.updateCnt + 1;
+                                cntr2 = a2.updateCnt + 1;
 
-                            cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
-                            cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
+                                cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
+                                cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
+                            }
+                            else {
+                                if (a1 != null || a2 != null) {
+                                    if (a1 != null && a2 != null) {
+                                        Integer rmvd = null;
+
+                                        if (rnd.nextInt(10) == 0) {
+                                            synchronized (rmvdIds) {
+                                                if (rmvdIds.size() < 10) {
+                                                    rmvd = rnd.nextBoolean() ? id1 : id2;
+
+                                                    assertTrue(rmvdIds.add(rmvd));
+                                                }
+                                            }
+                                        }
+
+                                        if (rmvd != null) {
+                                            cache.remove(rmvd);
+
+                                            cache.put(rmvd.equals(id1) ? id2 : id1,
+                                                new MvccTestAccount(a1.val + a2.val, 1));
+                                        }
+                                        else {
+                                            cache.put(id1, new MvccTestAccount(a1.val + 1,
1));
+                                            cache.put(id2, new MvccTestAccount(a2.val - 1,
1));
+                                        }
+                                    }
+                                    else {
+                                        if (a1 == null) {
+                                            cache.put(id1, new MvccTestAccount(100, 1));
+                                            cache.put(id2, new MvccTestAccount(a2.val - 100,
1));
+
+                                            assertTrue(rmvdIds.remove(id1));
+                                        }
+                                        else {
+                                            cache.put(id1, new MvccTestAccount(a1.val - 100,
1));
+                                            cache.put(id2, new MvccTestAccount(100, 1));
+
+                                            assertTrue(rmvdIds.remove(id2));
+                                        }
+                                    }
+                                }
+                            }
 
                             tx.commit();
                         }
 
-                        Map<Integer, MvccTestAccount> accounts = cache.getAll(keys);
+                        if (!withRmvs) {
+                            Map<Integer, MvccTestAccount> accounts = cache.getAll(keys);
 
-                        MvccTestAccount a1 = accounts.get(id1);
-                        MvccTestAccount a2 = accounts.get(id2);
+                            MvccTestAccount a1 = accounts.get(id1);
+                            MvccTestAccount a2 = accounts.get(id2);
 
-                        assertNotNull(a1);
-                        assertNotNull(a2);
+                            assertNotNull(a1);
+                            assertNotNull(a2);
 
-                        assertTrue(a1.updateCnt >= cntr1);
-                        assertTrue(a2.updateCnt >= cntr2);
+                            assertTrue(a1.updateCnt >= cntr1);
+                            assertTrue(a2.updateCnt >= cntr2);
+                        }
                     }
 
                     info("Writer finished, updates: " + cnt);
@@ -1469,23 +1526,26 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
                         else
                             accounts = cache.getAll(keys);
 
-                        assertEquals(ACCOUNTS, accounts.size());
+                        if (!withRmvs)
+                            assertEquals(ACCOUNTS, accounts.size());
 
                         int sum = 0;
 
                         for (int i = 0; i < ACCOUNTS; i++) {
                             MvccTestAccount account = accounts.get(i);
 
-                            assertNotNull(account);
-
-                            sum += account.val;
+                            if (account != null) {
+                                sum += account.val;
 
-                            Integer cntr = lastUpdateCntrs.get(i);
+                                Integer cntr = lastUpdateCntrs.get(i);
 
-                            if (cntr != null)
-                                assertTrue(cntr <= account.updateCnt);
+                                if (cntr != null)
+                                    assertTrue(cntr <= account.updateCnt);
 
-                            lastUpdateCntrs.put(i, cntr);
+                                lastUpdateCntrs.put(i, cntr);
+                            }
+                            else
+                                assertTrue(withRmvs);
                         }
 
                         assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
@@ -1501,9 +1561,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
                         for (int i = 0; i < ACCOUNTS; i++) {
                             MvccTestAccount account = accounts.get(i);
 
-                            info("Account [id=" + i + ", val=" + account.val + ']');
+                            assertTrue(account != null || withRmvs);
+
+                            info("Account [id=" + i + ", val=" + (account != null ? account.val
: null) + ']');
 
-                            sum += account.val;
+                            if (account != null)
+                                sum += account.val;
                         }
 
                         info("Sum: " + sum);


Mime
View raw message