ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ira...@apache.org
Subject [1/2] ignite git commit: IGNITE-5874 Store TTL expire times in B+ tree on per-partition basis - Fixes #3231.
Date Fri, 11 May 2018 15:49:57 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 01f60542e -> 89c775737
Updated Tags:  refs/tags/1.0.0-RELEASE-TEST-RC1 [created] a37da05fb
  refs/tags/1.0.0-RELEASE-TEST-RC2 [created] 16ba1421d
  refs/tags/1.0.0-RELEASE-TEST-RC3 [created] 1b3c8184d
  refs/tags/1.0.0-RELEASE-TEST-RC4 [created] d9ab06450
  refs/tags/1.0.1-RELEASE-TEST-RC5 [created] eee94fa28
  refs/tags/1.0.1-RELEASE-TEST-RC6 [created] 4de9e8dc5
  refs/tags/1.10.0.ea1 [created] 0dae7316d
  refs/tags/1.10.0.ea2 [created] ab4808545
  refs/tags/1.10.0.ea3 [created] 62df82653
  refs/tags/1.10.0.ea5 [created] 30460d2a5
  refs/tags/1.10.1.ea4 [created] 8c83d167d
  refs/tags/1.5.1-QAVS1901 [created] 5c8283571
  refs/tags/1.5.10 [created] 4bf9edd68
  refs/tags/1.5.11 [created] 460f0078f
  refs/tags/1.5.12 [created] fb675772d
  refs/tags/1.5.14 [created] 1362553b4
  refs/tags/1.5.15 [created] 78e671138
  refs/tags/1.5.16 [created] b39f54f21
  refs/tags/1.5.17 [created] d4de1cc0f
  refs/tags/1.5.18 [created] 67b8c1ae2
  refs/tags/1.5.19 [created] 953e9db0d
  refs/tags/1.5.20 [created] fca3c78c7
  refs/tags/1.5.21 [created] bd1916bfe
  refs/tags/1.5.22 [created] 597ea0cf5
  refs/tags/1.5.23 [created] f2832c6ff
  refs/tags/1.5.24 [created] 692cdf3e4
  refs/tags/1.5.25 [created] c1a354a7a
  refs/tags/1.5.26 [created] 0d4dbc416
  refs/tags/1.5.27 [created] 847b27814
  refs/tags/1.5.28 [created] 55dbc8a62
  refs/tags/1.5.29 [created] a1980c6f6
  refs/tags/1.5.30 [created] ae7e6cc95
  refs/tags/1.5.31 [created] 767f88bcf
  refs/tags/1.5.32 [created] f57631d98
  refs/tags/1.5.33 [created] 1986b93b2
  refs/tags/1.5.4 [created] db62c7a78
  refs/tags/1.5.5 [created] 8f3ae6bab
  refs/tags/1.5.5-QATEST [created] 6b4e4be3a
  refs/tags/1.5.6 [created] 75591a935
  refs/tags/1.5.7 [created] c9020c405
  refs/tags/1.5.7-QATEST [created] 80b21ebb8
  refs/tags/1.5.7-TEST [created] 57c19c20a
  refs/tags/1.5.8 [created] 210366eb7
  refs/tags/1.5.9 [created] cc595929c
  refs/tags/1.6.1 [created] b418cea04
  refs/tags/1.6.10 [created] f1c424bd2
  refs/tags/1.6.11 [created] babff41f0
  refs/tags/1.6.12 [created] a22010d32
  refs/tags/1.6.2 [created] 072e3b3ce
  refs/tags/1.6.3 [created] 9d0212018
  refs/tags/1.6.5 [created] 62c101cf3
  refs/tags/1.6.6 [created] f14c6fb25
  refs/tags/1.6.7 [created] 8e5ecdde0
  refs/tags/1.6.8 [created] 4ef6b1743
  refs/tags/1.6.8-QAVS1902 [created] 5739b6a1c
  refs/tags/1.6.9 [created] 93723542e
  refs/tags/1.7.1 [created] 3dd286282
  refs/tags/1.7.2 [created] 9e67197e3
  refs/tags/1.7.3 [created] 596479c91
  refs/tags/1.7.4 [created] 3fae2e313
  refs/tags/1.7.4-p1 [created] 0da3c2ed0
  refs/tags/1.7.5 [created] ba3cccc88
  refs/tags/1.8.0-b1 [created] 947266517
  refs/tags/1.8.0.b2 [created] 5b0cedfe4
  refs/tags/1.8.1 [created] 8fe1fc191
  refs/tags/1.8.2 [created] f255ff094
  refs/tags/2.2.4-test [created] e8e0d75d7
  refs/tags/2.2.5-test [created] 36cb161e9


http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java
new file mode 100644
index 0000000..6fa039d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.migration;
+
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IndexStorage;
+import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.util.IgniteTree;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree.WITHOUT_KEY;
+
+/**
+ * Ignite native persistence migration task upgrades existed PendingTrees to per-partition basis. It's ignore possible
+ * assertions errors when a pointer to an entry exists in tree but the entry itself was removed due to some reason (e.g.
+ * when partition was evicted after restart).
+ *
+ * Task goes through persistent cache groups and copy entries to certain partitions.
+ */
+public class UpgradePendingTreeToPerPartitionTask implements IgniteCallable<Boolean> {
+    /** */
+    private static final String PENDING_ENTRIES_TREE_NAME = "PendingEntries";
+
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final int BATCH_SIZE = 500;
+
+    /** */
+    @IgniteInstanceResource
+    private IgniteEx node;
+
+    /** */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override public Boolean call() throws IgniteException {
+        GridCacheSharedContext<Object, Object> sharedCtx = node.context().cache().context();
+
+        for (CacheGroupContext grp : sharedCtx.cache().cacheGroups()) {
+            if (!grp.persistenceEnabled() || !grp.affinityNode()) {
+                if (!grp.persistenceEnabled())
+                    log.info("Skip pending tree upgrade for non-persistent cache group: [grpId=" + grp.groupId() +
+                        ", grpName=" + grp.name() + ']');
+                else
+                    log.info("Skip pending tree upgrade on non-affinity node for cache group: [grpId=" + grp.groupId() +
+                        ", grpName=" + grp.name() + ']');
+
+                continue;
+            }
+
+            try {
+                processCacheGroup(grp);
+            }
+            catch (Exception ex) {
+                if (Thread.interrupted() || X.hasCause(ex, InterruptedException.class))
+                    log.info("Upgrade pending tree has been cancelled.");
+                else
+                    log.warning("Failed to upgrade pending tree for cache group:  [grpId=" + grp.groupId() +
+                        ", grpName=" + grp.name() + ']', ex);
+
+                return false;
+            }
+
+            if (Thread.interrupted()) {
+                log.info("Upgrade pending tree has been cancelled.");
+
+                return false;
+            }
+        }
+
+        log.info("All pending trees upgraded successfully.");
+
+        return true;
+    }
+
+    /**
+     * Converts CacheGroup pending tree to per-partition basis.
+     *
+     * @param grp Cache group.
+     * @throws IgniteCheckedException If error occurs.
+     */
+    private void processCacheGroup(CacheGroupContext grp) throws IgniteCheckedException {
+        assert grp.offheap() instanceof GridCacheOffheapManager;
+
+        PendingEntriesTree oldPendingTree;
+
+        final IgniteCacheDatabaseSharedManager db = grp.shared().database();
+
+        db.checkpointReadLock();
+        try {
+            IndexStorage indexStorage = ((GridCacheOffheapManager)grp.offheap()).getIndexStorage();
+
+            //TODO: IGNITE-5874: replace with some check-method to avoid unnecessary page allocation.
+            RootPage pendingRootPage = indexStorage.getOrAllocateForTree(PENDING_ENTRIES_TREE_NAME);
+
+            if (pendingRootPage.isAllocated()) {
+                log.info("No pending tree found for cache group: [grpId=" + grp.groupId() +
+                    ", grpName=" + grp.name() + ']');
+
+                // Nothing to do here as just allocated tree is obviously empty.
+                indexStorage.dropRootPage(PENDING_ENTRIES_TREE_NAME);
+
+                return;
+            }
+
+            oldPendingTree = new PendingEntriesTree(
+                grp,
+                PENDING_ENTRIES_TREE_NAME,
+                grp.dataRegion().pageMemory(),
+                pendingRootPage.pageId().pageId(),
+                ((GridCacheOffheapManager)grp.offheap()).reuseListForIndex(null),
+                false
+            );
+        }
+        finally {
+            db.checkpointReadUnlock();
+        }
+
+        processPendingTree(grp, oldPendingTree);
+
+        if (Thread.currentThread().isInterrupted())
+            return;
+
+        db.checkpointReadLock();
+        try {
+            oldPendingTree.destroy();
+        }
+        finally {
+            db.checkpointReadUnlock();
+        }
+    }
+
+    /**
+     * Move pending rows for CacheGroup entries to per-partition PendingTree.
+     * Invalid pending rows will be ignored.
+     *
+     * @param grp Cache group.
+     * @param oldPendingEntries Old-style PendingTree.
+     * @throws IgniteCheckedException If error occurs.
+     */
+    private void processPendingTree(CacheGroupContext grp, PendingEntriesTree oldPendingEntries)
+        throws IgniteCheckedException {
+        final PageMemory pageMemory = grp.dataRegion().pageMemory();
+
+        final IgniteCacheDatabaseSharedManager db = grp.shared().database();
+
+        final Set<Integer> cacheIds = grp.cacheIds();
+
+        PendingRow row = null;
+
+        int processedEntriesCnt = 0;
+        int skippedEntries = 0;
+
+        // Re-acquire checkpoint lock for every next batch.
+        while (!Thread.currentThread().isInterrupted()) {
+            int cnt = 0;
+
+            db.checkpointReadLock();
+            try {
+                GridCursor<PendingRow> cursor = oldPendingEntries.find(row, null, WITHOUT_KEY);
+
+                while (cnt++ < BATCH_SIZE && cursor.next()) {
+                    row = cursor.get();
+
+                    assert row.link != 0 && row.expireTime != 0 : row;
+
+                    GridCacheEntryEx entry;
+
+                    // Lost cache or lost entry.
+                    if (!cacheIds.contains(row.cacheId) || (entry = getEntry(grp, row)) == null) {
+                        skippedEntries++;
+
+                        oldPendingEntries.removex(row);
+
+                        continue;
+                    }
+
+                    entry.lockEntry();
+                    try {
+                        if (processRow(pageMemory, grp, row))
+                            processedEntriesCnt++;
+                        else
+                            skippedEntries++;
+                    }
+                    finally {
+                        entry.unlockEntry();
+                    }
+
+                    oldPendingEntries.removex(row);
+                }
+
+                if (cnt < BATCH_SIZE)
+                    break;
+            }
+            finally {
+                db.checkpointReadUnlock();
+            }
+        }
+
+        log.info("PendingTree upgraded: " +
+            "[grpId=" + grp.groupId() +
+            ", grpName=" + grp.name() +
+            ", processedEntries=" + processedEntriesCnt +
+            ", failedEntries=" + skippedEntries +
+            ']');
+    }
+
+    /**
+     * Return CacheEntry instance for lock purpose.
+     *
+     * @param grp Cache group
+     * @param row Pending row.
+     * @return CacheEntry if found or null otherwise.
+     */
+    private GridCacheEntryEx getEntry(CacheGroupContext grp, PendingRow row) {
+        try {
+            CacheDataRowAdapter rowData = new CacheDataRowAdapter(row.link);
+
+            rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY);
+
+            GridCacheContext cctx = grp.shared().cacheContext(row.cacheId);
+
+            assert cctx != null;
+
+            return cctx.cache().entryEx(rowData.key());
+        }
+        catch (Throwable ex) {
+            if (Thread.currentThread().isInterrupted() || X.hasCause(ex, InterruptedException.class))
+                throw new IgniteException(new InterruptedException());
+
+            log.warning("Failed to move old-version pending entry " +
+                "to per-partition PendingTree: key not found (skipping): " +
+                "[grpId=" + grp.groupId() +
+                ", grpName=" + grp.name() +
+                ", pendingRow=" + row + "]");
+
+            return null;
+        }
+
+    }
+
+    /**
+     * Validates PendingRow and add it to per-partition PendingTree.
+     *
+     * @param pageMemory Page memory.
+     * @param grp Cache group.
+     * @param row Pending row.
+     * @return {@code True} if pending row successfully moved, {@code False} otherwise.
+     */
+    private boolean processRow(PageMemory pageMemory, CacheGroupContext grp, PendingRow row) {
+        final long pageId = PageIdUtils.pageId(row.link);
+
+        final int partition = PageIdUtils.partId(pageId);
+
+        assert partition >= 0;
+
+        try {
+            final long page = pageMemory.acquirePage(grp.groupId(), pageId);
+            long pageAddr = pageMemory.readLock(grp.groupId(), pageId, page);
+            try {
+                assert PageIO.getType(pageAddr) != 0;
+                assert PageIO.getVersion(pageAddr) != 0;
+
+                IgniteCacheOffheapManager.CacheDataStore store =
+                    ((GridCacheOffheapManager)grp.offheap()).dataStore(partition);
+
+                if (store == null) {
+                    log.warning("Failed to move old-version pending entry " +
+                        "to per-partition PendingTree: Node has no partition anymore (skipping): " +
+                        "[grpId=" + grp.groupId() +
+                        ", grpName=" + grp.name() +
+                        ", partId=" + partition +
+                        ", pendingRow=" + row + "]");
+
+                    return false;
+                }
+
+                assert store instanceof GridCacheOffheapManager.GridCacheDataStore;
+                assert store.pendingTree() != null;
+
+                store.pendingTree().invoke(row, WITHOUT_KEY, new PutIfAbsentClosure(row));
+            }
+            finally {
+                pageMemory.readUnlock(grp.groupId(), pageId, page);
+            }
+        }
+        catch (AssertionError | Exception ex) {
+            if (Thread.currentThread().isInterrupted() || X.hasCause(ex, InterruptedException.class)) {
+                Thread.currentThread().interrupt();
+
+                throw new IgniteException(ex);
+            }
+
+            String msg = "Unexpected error occurs while moving old-version pending entry " +
+                "to per-partition PendingTree. Seems page doesn't longer exists (skipping): " +
+                "[grpId=" + grp.groupId() +
+                ", grpName=" + grp.name() +
+                ", partId=" + partition +
+                ", pendingRow=" + row + ']';
+
+            if (log.isDebugEnabled())
+                log.warning(msg, ex);
+            else
+                log.warning(msg);
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** */
+    private static class PutIfAbsentClosure implements IgniteTree.InvokeClosure<PendingRow> {
+        /** */
+        private final PendingRow pendingRow;
+
+        /** */
+        private IgniteTree.OperationType op;
+
+        /** */
+        PutIfAbsentClosure(PendingRow pendingRow) {
+            this.pendingRow = pendingRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void call(@Nullable PendingRow oldRow) throws IgniteCheckedException {
+            op = (oldRow == null) ? IgniteTree.OperationType.PUT : IgniteTree.OperationType.NOOP;
+        }
+
+        /** {@inheritDoc} */
+        @Override public PendingRow newRow() {
+            return pendingRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteTree.OperationType operationType() {
+            return op;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index a5236c2..c940c39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -191,7 +191,6 @@ public abstract class PageIO {
     /** */
     public static final short T_DATA_REF_METASTORAGE_LEAF = 23;
 
-
     /** Index for payload == 1. */
     public static final short T_H2_EX_REF_LEAF_START = 10000;
 
@@ -215,8 +214,8 @@ public abstract class PageIO {
      * @param ver Page format version.
      */
     protected PageIO(int type, int ver) {
-        assert ver > 0 && ver < 65535: ver;
-        assert type > 0 && type < 65535: type;
+        assert ver > 0 && ver < 65535 : ver;
+        assert type > 0 && type < 65535 : type;
 
         this.type = type;
         this.ver = ver;
@@ -245,7 +244,7 @@ public abstract class PageIO {
     public static void setType(long pageAddr, int type) {
         PageUtils.putShort(pageAddr, TYPE_OFF, (short)type);
 
-        assert getType(pageAddr) == type;
+        assert getType(pageAddr) == type : getType(pageAddr);
     }
 
     /**
@@ -268,7 +267,7 @@ public abstract class PageIO {
      * @param pageAddr Page address.
      * @param ver Version.
      */
-    private static void setVersion(long pageAddr, int ver) {
+    protected static void setVersion(long pageAddr, int ver) {
         PageUtils.putShort(pageAddr, VER_OFF, (short)ver);
 
         assert getVersion(pageAddr) == ver;
@@ -580,7 +579,7 @@ public abstract class PageIO {
      * @param pageSize Page size.
      * @param sb Sb.
      */
-    protected abstract void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException ;
+    protected abstract void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException;
 
     /**
      * @param addr Address.

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
index 3d79884..fe6b7a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
@@ -42,9 +42,13 @@ public class PagePartitionMetaIO extends PageMetaIO {
     /** */
     private static final int NEXT_PART_META_PAGE_OFF = PARTITION_STATE_OFF + 1;
 
+    /** End of page partition meta. */
+    static final int END_OF_PARTITION_PAGE_META = NEXT_PART_META_PAGE_OFF + 8;
+
     /** */
     public static final IOVersions<PagePartitionMetaIO> VERSIONS = new IOVersions<>(
-        new PagePartitionMetaIO(1)
+        new PagePartitionMetaIO(1),
+        new PagePartitionMetaIOV2(2)
     );
 
     /** {@inheritDoc} */
@@ -150,6 +154,7 @@ public class PagePartitionMetaIO extends PageMetaIO {
 
     /**
      * Returns partition counters page identifier, page with caches in cache group sizes.
+     *
      * @param pageAddr Partition metadata page address.
      * @return Next meta partial page ID or {@code 0} if it does not exist.
      */
@@ -167,19 +172,39 @@ public class PagePartitionMetaIO extends PageMetaIO {
         PageUtils.putLong(pageAddr, NEXT_PART_META_PAGE_OFF, cntrsPageId);
     }
 
+    /**
+     * Returns partition pending tree root. Pending tree is used to tracking expiring entries.
+     *
+     * @param pageAddr Page address.
+     * @return Pending Tree root page.
+     */
+    public long getPendingTreeRoot(long pageAddr) {
+        throw new UnsupportedOperationException("Per partition pending tree is not supported by " +
+            "this PagePartitionMetaIO version: ver=" + getVersion());
+    }
+
+    /**
+     * Sets new partition pending tree root.
+     *
+     * @param pageAddr Page address.
+     * @param treeRoot Pending Tree root
+     */
+    public void setPendingTreeRoot(long pageAddr, long treeRoot) {
+        throw new UnsupportedOperationException("Per partition pending tree is not supported by " +
+            "this PagePartitionMetaIO version: ver=" + getVersion());
+    }
+
     /** {@inheritDoc} */
     @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
         super.printPage(pageAddr, pageSize, sb);
 
         byte state = getPartitionState(pageAddr);
 
-        sb
-            .a(",\nPagePartitionMeta[\n\tsize=").a(getSize(pageAddr))
+        sb.a(",\nPagePartitionMeta[\n\tsize=").a(getSize(pageAddr))
             .a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr))
             .a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr))
             .a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")")
             .a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr))
-            .a("\n]")
-            ;
+            .a("\n]");
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
new file mode 100644
index 0000000..70556a1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.tree.io;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.util.GridStringBuilder;
+
+/**
+ * IO for partition metadata pages.
+ * Persistent partition contains it's own PendingTree.
+ */
+public class PagePartitionMetaIOV2 extends PagePartitionMetaIO {
+    /** */
+    private static final int PENDING_TREE_ROOT_OFF = PagePartitionMetaIO.END_OF_PARTITION_PAGE_META;
+
+    /**
+     * @param ver Version.
+     */
+    public PagePartitionMetaIOV2(int ver) {
+        super(ver);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initNewPage(long pageAddr, long pageId, int pageSize) {
+        super.initNewPage(pageAddr, pageId, pageSize);
+
+        setPendingTreeRoot(pageAddr, 0L);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getPendingTreeRoot(long pageAddr) {
+        return PageUtils.getLong(pageAddr, PENDING_TREE_ROOT_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setPendingTreeRoot(long pageAddr, long treeRoot) {
+        PageUtils.putLong(pageAddr, PENDING_TREE_ROOT_OFF, treeRoot);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException {
+        byte state = getPartitionState(pageAddr);
+
+        sb.a("PagePartitionMeta[\n\ttreeRoot=").a(getReuseListRoot(pageAddr));
+        sb.a(",\n\tpendingTreeRoot=").a(getLastSuccessfulFullSnapshotId(pageAddr));
+        sb.a(",\n\tlastSuccessfulFullSnapshotId=").a(getLastSuccessfulFullSnapshotId(pageAddr));
+        sb.a(",\n\tlastSuccessfulSnapshotId=").a(getLastSuccessfulSnapshotId(pageAddr));
+        sb.a(",\n\tnextSnapshotTag=").a(getNextSnapshotTag(pageAddr));
+        sb.a(",\n\tlastSuccessfulSnapshotTag=").a(getLastSuccessfulSnapshotTag(pageAddr));
+        sb.a(",\n\tlastAllocatedPageCount=").a(getLastAllocatedPageCount(pageAddr));
+        sb.a(",\n\tcandidatePageCount=").a(getCandidatePageCount(pageAddr));
+        sb.a(",\n\tsize=").a(getSize(pageAddr));
+        sb.a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr));
+        sb.a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr));
+        sb.a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")");
+        sb.a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr));
+        sb.a("\n]");
+    }
+
+    /**
+     * Upgrade page to PagePartitionMetaIOV2
+     *
+     * @param pageAddr Page address.
+     */
+    public void upgradePage(long pageAddr) {
+        assert PageIO.getType(pageAddr) == getType();
+        assert PageIO.getVersion(pageAddr) < 2;
+
+        PageIO.setVersion(pageAddr, getVersion());
+        setPendingTreeRoot(pageAddr, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
index 43a2303..ebe6f29 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java
@@ -55,10 +55,11 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac
         Mockito.when(topologyMock.partitions()).thenReturn(3);
 
         List<GridDhtLocalPartition> localPartitions = Lists.newArrayList(
-                partitionMock(0, 1, 1),
-                partitionMock(1, 2, 2),
-                partitionMock(2, 3, 3)
+            partitionMock(0, 1, 1),
+            partitionMock(1, 2, 2),
+            partitionMock(2, 3, 3)
         );
+
         Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions);
         Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions);
     }
@@ -82,10 +83,13 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac
      */
     private GridDhtPartitionsSingleMessage from(@Nullable Map<Integer, T2<Long, Long>> countersMap, @Nullable Map<Integer, Long> sizesMap) {
         GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage();
+
         if (countersMap != null)
             msg.addPartitionUpdateCounters(0, countersMap);
+
         if (sizesMap != null)
             msg.addPartitionSizes(0, sizesMap);
+
         return msg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
index 99614ed..a02ed11 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java
@@ -17,32 +17,35 @@
 
 package org.apache.ignite.internal.processors.cache.persistence;
 
+import java.io.Serializable;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
- *
+ * Cause by https://issues.apache.org/jira/browse/IGNITE-7278
  */
 public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
     /** */
@@ -52,7 +55,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
     private static final int ENTRIES_COUNT = 10_000;
 
     /** */
-    public static final String CACHE_NAME = "cache1";
+    protected static final String CACHE_NAME = "cache1";
 
     /** Checkpoint delay. */
     private volatile int checkpointDelay = -1;
@@ -79,21 +82,23 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
 
         DataStorageConfiguration memCfg = new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(
-                new DataRegionConfiguration().setMaxSize(400 * 1024 * 1024).setPersistenceEnabled(true))
+                new DataRegionConfiguration()
+                    .setMaxSize(400 * 1024 * 1024)
+                    .setPersistenceEnabled(true))
             .setWalMode(WALMode.LOG_ONLY)
             .setCheckpointFrequency(checkpointDelay);
 
         cfg.setDataStorageConfiguration(memCfg);
 
-        CacheConfiguration ccfg1 = new CacheConfiguration();
+        CacheConfiguration ccfg = new CacheConfiguration();
 
-        ccfg1.setName(CACHE_NAME);
-        ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
-        ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        ccfg1.setAffinity(new RendezvousAffinityFunction(false, 128));
-        ccfg1.setBackups(2);
+        ccfg.setName(CACHE_NAME);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
+        ccfg.setBackups(2);
 
-        cfg.setCacheConfiguration(ccfg1);
+        cfg.setCacheConfiguration(ccfg);
 
         return cfg;
     }
@@ -197,7 +202,6 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
      * @throws Exception if failed.
      */
     public void testRebalncingDuringLoad_10_10_1_1() throws Exception {
@@ -205,7 +209,6 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
      * @throws Exception if failed.
      */
     public void testRebalncingDuringLoad_10_500_8_16() throws Exception {
@@ -227,7 +230,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
 
         final Ignite load = ignite(0);
 
-        load.active(true);
+        load.cluster().active(true);
 
         try (IgniteDataStreamer<Object, Object> s = load.dataStreamer(CACHE_NAME)) {
             s.allowOverwrite(true);
@@ -245,10 +248,13 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
                 Random rnd = ThreadLocalRandom.current();
 
                 while (!done.get()) {
-                    Map<Integer, Integer> map = new TreeMap<>();
+                    Map<Integer, Person> map = new TreeMap<>();
 
-                    for (int i = 0; i < batch; i++)
-                        map.put(rnd.nextInt(ENTRIES_COUNT), rnd.nextInt());
+                    for (int i = 0; i < batch; i++) {
+                        int key = rnd.nextInt(ENTRIES_COUNT);
+
+                        map.put(key, new Person("fn" + key, "ln" + key));
+                    }
 
                     cache.putAll(map);
                 }
@@ -277,4 +283,51 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest {
 
         busyFut.get();
     }
+
+    /**
+     *
+     */
+    static class Person implements Serializable {
+        /** */
+        @GridToStringInclude
+        @QuerySqlField(index = true, groups = "full_name")
+        private String fName;
+
+        /** */
+        @GridToStringInclude
+        @QuerySqlField(index = true, groups = "full_name")
+        private String lName;
+
+        /**
+         * @param fName First name.
+         * @param lName Last name.
+         */
+        public Person(String fName, String lName) {
+            this.fName = fName;
+            this.lName = lName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            IgnitePersistentStoreCacheGroupsTest.Person person = (IgnitePersistentStoreCacheGroupsTest.Person)o;
+
+            return Objects.equals(fName, person.fName) && Objects.equals(lName, person.lName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(fName, lName);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java
deleted file mode 100644
index 66b2047..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence;
-
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- * Cause by https://issues.apache.org/jira/browse/IGNITE-7278
- */
-public class IgnitePdsContinuousRestartTest2 extends GridCommonAbstractTest {
-    /** */
-    private static final int GRID_CNT = 4;
-
-    /** */
-    private static final int ENTRIES_COUNT = 10_000;
-
-    /** */
-    public static final String CACHE_NAME = "cache1";
-
-    /** Checkpoint delay. */
-    private volatile int checkpointDelay = -1;
-
-    /** */
-    private boolean cancel;
-
-    /**
-     * Default constructor.
-     */
-    public IgnitePdsContinuousRestartTest2() {
-
-    }
-
-    /**
-     * @param cancel Cancel.
-     */
-    public IgnitePdsContinuousRestartTest2(boolean cancel) {
-        this.cancel = cancel;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        DataStorageConfiguration memCfg = new DataStorageConfiguration()
-            .setDefaultDataRegionConfiguration(
-                new DataRegionConfiguration()
-                    .setMaxSize(400 * 1024 * 1024)
-                    .setPersistenceEnabled(true))
-            .setWalMode(WALMode.LOG_ONLY)
-            .setCheckpointFrequency(checkpointDelay);
-
-        cfg.setDataStorageConfiguration(memCfg);
-
-        CacheConfiguration ccfg = new CacheConfiguration();
-
-        ccfg.setName(CACHE_NAME);
-        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
-        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
-        ccfg.setBackups(2);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_500_1_1() throws Exception {
-        checkRebalancingDuringLoad(1000, 500, 1, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_500_1_1() throws Exception {
-        checkRebalancingDuringLoad(8000, 500, 1, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_20000_1_1() throws Exception {
-        checkRebalancingDuringLoad(1000, 20000, 1, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_8000_1_1() throws Exception {
-        checkRebalancingDuringLoad(8000, 8000, 1, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_500_8_1() throws Exception {
-        checkRebalancingDuringLoad(1000, 500, 8, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_500_8_1() throws Exception {
-        checkRebalancingDuringLoad(8000, 500, 8, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_20000_8_1() throws Exception {
-        checkRebalancingDuringLoad(1000, 20000, 8, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_8000_8_1() throws Exception {
-        checkRebalancingDuringLoad(8000, 8000, 8, 1);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_500_8_16() throws Exception {
-        checkRebalancingDuringLoad(1000, 500, 8, 16);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_500_8_16() throws Exception {
-        checkRebalancingDuringLoad(8000, 500, 8, 16);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_1000_20000_8_16() throws Exception {
-        checkRebalancingDuringLoad(1000, 20000, 8, 16);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testRebalancingDuringLoad_8000_8000_8_16() throws Exception {
-        checkRebalancingDuringLoad(8000, 8000, 8, 16);
-    }
-
-    /**
-     *
-     * @throws Exception if failed.
-     */
-    public void testRebalncingDuringLoad_10_10_1_1() throws Exception {
-        checkRebalancingDuringLoad(10, 10, 1, 1);
-    }
-
-    /**
-     *
-     * @throws Exception if failed.
-     */
-    public void testRebalncingDuringLoad_10_500_8_16() throws Exception {
-        checkRebalancingDuringLoad(10, 500, 8, 16);
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    private void checkRebalancingDuringLoad(
-        int restartDelay,
-        int checkpointDelay,
-        int threads,
-        final int batch
-    ) throws Exception {
-        this.checkpointDelay = checkpointDelay;
-
-        startGrids(GRID_CNT);
-
-        final Ignite load = ignite(0);
-
-        load.cluster().active(true);
-
-        try (IgniteDataStreamer<Object, Object> s = load.dataStreamer(CACHE_NAME)) {
-            s.allowOverwrite(true);
-
-            for (int i = 0; i < ENTRIES_COUNT; i++)
-                s.addData(i, i);
-        }
-
-        final AtomicBoolean done = new AtomicBoolean(false);
-
-        IgniteInternalFuture<?> busyFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
-            /** {@inheritDoc} */
-            @Override public Object call() throws Exception {
-                IgniteCache<Object, Object> cache = load.cache(CACHE_NAME);
-                Random rnd = ThreadLocalRandom.current();
-
-                while (!done.get()) {
-                    Map<Integer, Integer> map = new TreeMap<>();
-
-                    for (int i = 0; i < batch; i++)
-                        map.put(rnd.nextInt(ENTRIES_COUNT), rnd.nextInt());
-
-                    cache.putAll(map);
-                }
-
-                return null;
-            }
-        }, threads, "updater");
-
-        long end = System.currentTimeMillis() + 90_000;
-
-        Random rnd = ThreadLocalRandom.current();
-
-        while (System.currentTimeMillis() < end) {
-            int idx = rnd.nextInt(GRID_CNT - 1) + 1;
-
-            stopGrid(idx, cancel);
-
-            U.sleep(restartDelay);
-
-            startGrid(idx);
-
-            U.sleep(restartDelay);
-        }
-
-        done.set(true);
-
-        busyFut.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java
new file mode 100644
index 0000000..d5b3f55
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+/**
+ * Cause by https://issues.apache.org/jira/browse/IGNITE-5879
+ */
+public class IgnitePdsContinuousRestartTestWithExpiryPolicy extends IgnitePdsContinuousRestartTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     * Default constructor.
+     */
+    public IgnitePdsContinuousRestartTestWithExpiryPolicy() {
+        super(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+        discoverySpi.setIpFinder(ipFinder);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setGroupName("Group1");
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
+        ccfg.setBackups(2);
+        ccfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1)));
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
index 1825666..03dc445 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java
@@ -33,7 +33,7 @@ public abstract class IgniteBaselineAbstractFullApiSelfTest extends GridCacheAbs
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(
                 new DataRegionConfiguration()
-                    .setMaxSize(200 * 1024 * 1024)
+                    .setMaxSize(256 * 1024 * 1024)
                     .setPersistenceEnabled(true))
             .setWalMode(WALMode.LOG_ONLY));
 
@@ -41,6 +41,13 @@ public abstract class IgniteBaselineAbstractFullApiSelfTest extends GridCacheAbs
     }
 
     /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 4;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
new file mode 100644
index 0000000..be09e70
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import com.google.common.base.Strings;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test TTL worker with persistence enabled
+ */
+public class IgnitePdsWithTtlTest extends GridCommonAbstractTest {
+    /** */
+    public static final String CACHE = "expirableCache";
+
+    /** */
+    private static final int EXPIRATION_TIMEOUT = 10;
+
+    /** */
+    public static final int ENTRIES = 7000;
+
+    /** */
+    private static final TcpDiscoveryVmIpFinder FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        //protection if test failed to finish, e.g. by error
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+        disco.setIpFinder(FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        final CacheConfiguration ccfg = new CacheConfiguration();
+        ccfg.setName(CACHE);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 128));
+        ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT)));
+        ccfg.setEagerTtl(true);
+        ccfg.setGroupName("group1");
+
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setMaxSize(256L * 1024 * 1024)
+                        .setPersistenceEnabled(true)
+                ).setWalMode(WALMode.DEFAULT));
+
+        cfg.setCacheConfiguration(ccfg);
+        return cfg;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testTtlIsApplied() throws Exception {
+        loadAndWaitForCleanup(false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testTtlIsAppliedAfterRestart() throws Exception {
+        loadAndWaitForCleanup(true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    private void loadAndWaitForCleanup(boolean restartGrid) throws Exception {
+        IgniteEx srv = startGrid(0);
+        srv.cluster().active(true);
+
+        fillCache(srv.cache(CACHE));
+
+        if (restartGrid) {
+            stopGrid(0);
+            srv = startGrid(0);
+            srv.cluster().active(true);
+        }
+
+        final IgniteCache<Integer, String> cache = srv.cache(CACHE);
+
+        pringStatistics((IgniteCacheProxy)cache, "After restart from LFS");
+
+        waitAndCheckExpired(cache);
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRebalancingWithTtlExpirable() throws Exception {
+        IgniteEx srv = startGrid(0);
+        srv.cluster().active(true);
+
+        fillCache(srv.cache(CACHE));
+
+        //causes rebalancing start
+        srv = startGrid(1);
+
+        final IgniteCache<Integer, String> cache = srv.cache(CACHE);
+
+        pringStatistics((IgniteCacheProxy)cache, "After rebalancing start");
+
+        waitAndCheckExpired(cache);
+
+        stopAllGrids();
+    }
+
+    /** */
+    protected void fillCache(IgniteCache<Integer, String> cache) {
+        cache.putAll(new TreeMap<Integer, String>() {{
+            for (int i = 0; i < ENTRIES; i++)
+                put(i, Strings.repeat("Some value " + i, 125));
+        }});
+
+        //Touch entries.
+        for (int i = 0; i < ENTRIES; i++)
+            cache.get(i); // touch entries
+
+        pringStatistics((IgniteCacheProxy)cache, "After cache puts");
+    }
+
+    /** */
+    protected void waitAndCheckExpired(final IgniteCache<Integer, String> cache) throws IgniteInterruptedCheckedException {
+        GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return cache.size() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(EXPIRATION_TIMEOUT + 1));
+
+        pringStatistics((IgniteCacheProxy)cache, "After timeout");
+
+        for (int i = 0; i < ENTRIES; i++)
+            assertNull(cache.get(i));
+    }
+
+    /** */
+    private void pringStatistics(IgniteCacheProxy cache, String msg) {
+        System.out.println(msg + " {{");
+        cache.context().printMemoryStats();
+        System.out.println("}} " + msg);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 1e32320..ab81d8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicC
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsDataRegionMetricsTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.DefaultPageSizeBackwardsCompatibilityTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsPageReplacementTest;
@@ -111,6 +112,7 @@ public class IgnitePdsTestSuite extends TestSuite {
         // TODO uncomment when https://issues.apache.org/jira/browse/IGNITE-7510 is fixed
         // suite.addTestSuite(IgnitePdsClientNearCachePutGetTest.class);
         suite.addTestSuite(IgniteDbPutGetWithCacheStoreTest.class);
+        suite.addTestSuite(IgnitePdsWithTtlTest.class);
 
         suite.addTestSuite(IgniteClusterActivateDeactivateTestWithPersistence.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 3f6f713..76cfe4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -20,7 +20,7 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest;
-import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest2;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedCacheDataTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedStoreTest;
@@ -90,7 +90,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         // Rebalancing test
         suite.addTestSuite(IgnitePdsContinuousRestartTest.class);
-        suite.addTestSuite(IgnitePdsContinuousRestartTest2.class);
+        suite.addTestSuite(IgnitePdsContinuousRestartTestWithExpiryPolicy.class);
 
         suite.addTestSuite(IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes.class);
 
@@ -115,7 +115,6 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         suite.addTestSuite(IgnitePdsWholeClusterRestartTest.class);
 
-
         // Rebalancing test
         suite.addTestSuite(IgniteWalHistoryReservationsTest.class);
 


Mime
View raw message