ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [1/2] ignite git commit: IGNITE-8610 Fixed checkpoint record search for WAL delta rebalancing - Fixes #4090.
Date Fri, 15 Jun 2018 10:06:22 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 8d43ee835 -> 10aa02aca


http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 7dfe20d..2b6a821 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -748,7 +748,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             int p = partCntrs.partitionAt(i);
             long initCntr = partCntrs.initialUpdateCounterAt(i);
 
-            FileWALPointer startPtr = (FileWALPointer)database.searchPartitionCounter(grp.groupId(), p, initCntr);
+            FileWALPointer startPtr = (FileWALPointer)database.checkpointHistory().searchPartitionCounter(
+                grp.groupId(), p, initCntr);
 
             if (startPtr == null)
                 throw new IgniteCheckedException("Could not find start pointer for partition [part=" + p + ", partCntrSince=" + initCntr + "]");
@@ -1009,7 +1010,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
                             long from = partMap.initialUpdateCounterAt(idx);
                             long to = partMap.updateCounterAt(idx);
 
-                            if (entry.partitionCounter() >= from && entry.partitionCounter() <= to) {
+                            if (entry.partitionCounter() > from && entry.partitionCounter() <= to) {
                                 if (entry.partitionCounter() == to)
                                     reachedPartitionEnd = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 58dec23..4e59ad1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -769,14 +769,25 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
 
     /**
      * @param discoEvt Before exchange for the given discovery event.
+     *
+     * @return {@code True} if partitions have been restored from persistent storage.
      */
-    public void beforeExchange(GridDhtPartitionsExchangeFuture discoEvt) throws IgniteCheckedException {
-        // No-op.
+    public boolean beforeExchange(GridDhtPartitionsExchangeFuture discoEvt) throws IgniteCheckedException {
+        return false;
     }
 
     /**
-     * @param fut Partition exchange future.
+     * Called when all partitions have been fully restored and pre-created on node start.
+     *
+     * @throws IgniteCheckedException If failed.
      */
+    public void onStateRestored() throws IgniteCheckedException {
+        // No-op.
+    }
+
+        /**
+         * @param fut Partition exchange future.
+         */
     public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture fut) {
         // No-op.
     }
@@ -1097,6 +1108,15 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
     }
 
     /**
+     * Marks last checkpoint as inapplicable for WAL rebalance for given group {@code grpId}.
+     *
+     * @param grpId Group id.
+     */
+    public void lastCheckpointInapplicableForWalRebalance(int grpId) {
+        // No-op.
+    }
+
+    /**
      * Warns on first eviction.
      * @param regCfg data region configuration.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
new file mode 100644
index 0000000..f6433e1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntry.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
+
+import java.lang.ref.SoftReference;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.CacheState;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class represents checkpoint state.
+ */
+public class CheckpointEntry {
+    /** Checkpoint timestamp. */
+    private final long cpTs;
+
+    /** Checkpoint end mark. */
+    private final WALPointer cpMark;
+
+    /** Checkpoint ID. */
+    private final UUID cpId;
+
+    /** State of groups and partitions snapshotted at the checkpoint begin. */
+    private volatile SoftReference<GroupStateLazyStore> grpStateLazyStore;
+
+    /**
+     * Checkpoint entry constructor.
+     *
+     * If {@code grpStates} is null then it will be inited lazy from wal pointer.
+     *
+     * @param cpTs Checkpoint timestamp.
+     * @param cpMark Checkpoint mark pointer.
+     * @param cpId Checkpoint ID.
+     * @param cacheGrpStates Cache groups states.
+     */
+    public CheckpointEntry(
+        long cpTs,
+        WALPointer cpMark,
+        UUID cpId,
+        @Nullable Map<Integer, CacheState> cacheGrpStates
+    ) {
+        this.cpTs = cpTs;
+        this.cpMark = cpMark;
+        this.cpId = cpId;
+        this.grpStateLazyStore = new SoftReference<>(new GroupStateLazyStore(cacheGrpStates));
+    }
+
+    /**
+     * @return Checkpoint timestamp.
+     */
+    public long timestamp() {
+        return cpTs;
+    }
+
+    /**
+     * @return Checkpoint ID.
+     */
+    public UUID checkpointId() {
+        return cpId;
+    }
+
+    /**
+     * @return Checkpoint mark.
+     */
+    public WALPointer checkpointMark() {
+        return cpMark;
+    }
+
+    /**
+     * @param cctx Cache shred context.
+     */
+    public Map<Integer, GroupState> groupState(
+        GridCacheSharedContext cctx
+    ) throws IgniteCheckedException {
+        GroupStateLazyStore store = initIfNeeded(cctx);
+
+        return store.grpStates;
+    }
+
+    /**
+     * @param cctx Cache shred context.
+     * @return Group lazy store.
+     */
+    private GroupStateLazyStore initIfNeeded(GridCacheSharedContext cctx) throws IgniteCheckedException {
+        GroupStateLazyStore store = grpStateLazyStore.get();
+
+        if (store == null) {
+            store = new GroupStateLazyStore();
+
+            grpStateLazyStore = new SoftReference<>(store);
+        }
+
+        store.initIfNeeded(cctx, cpMark);
+
+        return store;
+    }
+
+    /**
+     * @param cctx Cache shared context.
+     * @param grpId Cache group ID.
+     * @param part Partition ID.
+     * @return Partition counter or {@code null} if not found.
+     */
+    public Long partitionCounter(GridCacheSharedContext cctx, int grpId, int part) {
+        GroupStateLazyStore store;
+
+        try {
+            store = initIfNeeded(cctx);
+        }
+        catch (IgniteCheckedException e) {
+            return null;
+        }
+
+        return store.partitionCounter(grpId, part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "CheckpointEntry [id=" + cpId + ", timestamp=" + cpTs + ", ptr=" + cpMark + "]";
+    }
+
+    /**
+     *
+     */
+    public static class GroupState {
+        /** */
+        private int[] parts;
+
+        /** */
+        private long[] cnts;
+
+        /** */
+        private int idx;
+
+        /**
+         * @param partsCnt Partitions count.
+         */
+        private GroupState(int partsCnt) {
+            parts = new int[partsCnt];
+            cnts = new long[partsCnt];
+        }
+
+        /**
+         * @param partId Partition ID to add.
+         * @param cntr Partition counter.
+         */
+        public void addPartitionCounter(int partId, long cntr) {
+            if (idx == parts.length)
+                throw new IllegalStateException("Failed to add new partition to the partitions state " +
+                    "(no enough space reserved) [partId=" + partId + ", reserved=" + parts.length + ']');
+
+            if (idx > 0) {
+                if (parts[idx - 1] >= partId)
+                    throw new IllegalStateException("Adding partition in a wrong order [prev=" + parts[idx - 1] +
+                        ", cur=" + partId + ']');
+            }
+
+            parts[idx] = partId;
+
+            cnts[idx] = cntr;
+
+            idx++;
+        }
+
+        /**
+         * Gets partition counter by partition ID.
+         *
+         * @param partId Partition ID.
+         * @return Partition update counter (will return {@code -1} if partition is not present in the record).
+         */
+        public long counterByPartition(int partId) {
+            int idx = indexByPartition(partId);
+
+            return idx >= 0 ? cnts[idx] : -1;
+        }
+
+        /**
+         *
+         */
+        public long size(){
+            return idx;
+        }
+
+        /**
+         * @param partId Partition ID to search.
+         * @return Non-negative index of partition if found or negative value if not found.
+         */
+        public int indexByPartition(int partId) {
+            return Arrays.binarySearch(parts, 0, idx, partId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "GroupState [cap=" + parts.length + ", size=" + idx + ']';
+        }
+    }
+
+    /**
+     *  Group state lazy store.
+     */
+    public static class GroupStateLazyStore {
+        /** */
+        private static final AtomicIntegerFieldUpdater<GroupStateLazyStore> initGuardUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(GroupStateLazyStore.class, "initGuard");
+
+        /** Cache states. Initialized lazily. */
+        private volatile Map<Integer, GroupState> grpStates;
+
+        /** */
+        private final CountDownLatch latch;
+
+        /** */
+        @SuppressWarnings("unused")
+        private volatile int initGuard;
+
+        /** Initialization exception. */
+        private IgniteCheckedException initEx;
+
+        /**
+         * Default constructor.
+         */
+        private GroupStateLazyStore() {
+            this(null);
+        }
+
+        /**
+         * @param cacheGrpStates Cache group state.
+         */
+        private GroupStateLazyStore(Map<Integer, CacheState> cacheGrpStates) {
+            if (cacheGrpStates != null) {
+                initGuard = 1;
+
+                latch = new CountDownLatch(0);
+            }
+            else
+                latch = new CountDownLatch(1);
+
+            grpStates = remap(cacheGrpStates);
+        }
+
+        /**
+         * @param stateRec Cache group state.
+         */
+        private Map<Integer, GroupState> remap(Map<Integer, CacheState> stateRec) {
+            if (stateRec == null)
+                return Collections.emptyMap();
+
+            Map<Integer, GroupState> grpStates = new HashMap<>(stateRec.size());
+
+            for (Integer grpId : stateRec.keySet()) {
+                CacheState recState = stateRec.get(grpId);
+
+                GroupState grpState = new GroupState(recState.size());
+
+                for (int i = 0; i < recState.size(); i++) {
+                    byte partState = recState.stateByIndex(i);
+
+                    if (GridDhtPartitionState.fromOrdinal(partState) != GridDhtPartitionState.OWNING)
+                        continue;
+
+                    grpState.addPartitionCounter(
+                        recState.partitionByIndex(i),
+                        recState.partitionCounterByIndex(i)
+                    );
+                }
+
+                grpStates.put(grpId, grpState);
+            }
+
+            return grpStates;
+        }
+
+        /**
+         * @param grpId Group id.
+         * @param part Partition id.
+         * @return Partition counter.
+         */
+        private Long partitionCounter(int grpId, int part) {
+            assert initGuard != 0 : initGuard;
+
+            if (initEx != null || grpStates == null)
+                return null;
+
+            GroupState state = grpStates.get(grpId);
+
+            if (state != null) {
+                long cntr = state.counterByPartition(part);
+
+                return cntr < 0 ? null : cntr;
+            }
+
+            return null;
+        }
+
+        /**
+         * @param cctx Cache shared context.
+         * @param ptr Checkpoint wal pointer.
+         * @throws IgniteCheckedException If failed to read WAL entry.
+         */
+        private void initIfNeeded(
+            GridCacheSharedContext cctx,
+            WALPointer ptr
+        ) throws IgniteCheckedException {
+            if (initGuardUpdater.compareAndSet(this, 0, 1)) {
+                try (WALIterator it = cctx.wal().replay(ptr)) {
+                    if (it.hasNextX()) {
+                        IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
+
+                        CheckpointRecord rec = (CheckpointRecord)tup.get2();
+
+                        Map<Integer, CacheState> stateRec = rec.cacheGroupStates();
+
+                        grpStates = remap(stateRec);
+                    }
+                    else
+                        initEx = new IgniteCheckedException(
+                            "Failed to find checkpoint record at the given WAL pointer: " + ptr);
+                }
+                catch (IgniteCheckedException e) {
+                    initEx = e;
+
+                    throw e;
+                }
+                finally {
+                    latch.countDown();
+                }
+            }
+            else {
+                U.await(latch);
+
+                if (initEx != null)
+                    throw initEx;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntryType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntryType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntryType.java
new file mode 100644
index 0000000..66619bd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointEntryType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
+
+/**
+ * Checkpoint entry types.
+ */
+public enum CheckpointEntryType {
+    /** */
+    START,
+
+    /** */
+    END
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
new file mode 100644
index 0000000..d6cc297
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
+
+/**
+ * Checkpoint history. Holds chronological ordered map with {@link CheckpointEntry CheckpointEntries}.
+ * Data is loaded from corresponding checkpoint directory.
+ * This directory holds files for checkpoint start and end.
+ */
+public class CheckpointHistory {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Cache shared context. */
+    private final GridCacheSharedContext<?, ?> cctx;
+
+    /**
+     * Maps checkpoint's timestamp (from CP file name) to CP entry.
+     * Using TS provides historical order of CP entries in map ( first is oldest )
+     */
+    private final NavigableMap<Long, CheckpointEntry> histMap = new ConcurrentSkipListMap<>();
+
+    /** The maximal number of checkpoints hold in memory. */
+    private final int maxCpHistMemSize;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     */
+    public CheckpointHistory(GridKernalContext ctx) {
+        cctx = ctx.cache().context();
+        log = ctx.log(getClass());
+
+        DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration();
+
+        maxCpHistMemSize = Math.min(dsCfg.getWalHistorySize(),
+            IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, 100));
+    }
+
+    /**
+     * @param checkpoints Checkpoints.
+     */
+    public void initialize(List<CheckpointEntry> checkpoints) {
+        for (CheckpointEntry e : checkpoints)
+            histMap.put(e.timestamp(), e);
+    }
+
+    /**
+     * @param cpTs Checkpoint timestamp.
+     * @return Initialized entry.
+     * @throws IgniteCheckedException If failed to initialize entry.
+     */
+    private CheckpointEntry entry(Long cpTs) throws IgniteCheckedException {
+        CheckpointEntry entry = histMap.get(cpTs);
+
+        if (entry == null)
+            throw new IgniteCheckedException("Checkpoint entry was removed: " + cpTs);
+
+        return entry;
+    }
+
+    /**
+     * @return First checkpoint entry if exists. Otherwise {@code null}.
+     */
+    public CheckpointEntry firstCheckpoint() {
+        Map.Entry<Long,CheckpointEntry> entry = histMap.firstEntry();
+
+        return entry != null ? entry.getValue() : null;
+    }
+
+    /**
+     * @return Last checkpoint entry if exists. Otherwise {@code null}.
+     */
+    public CheckpointEntry lastCheckpoint() {
+        Map.Entry<Long,CheckpointEntry> entry = histMap.lastEntry();
+
+        return entry != null ? entry.getValue() : null;
+    }
+
+    /**
+     * @return First checkpoint WAL pointer if exists. Otherwise {@code null}.
+     */
+    public WALPointer firstCheckpointPointer() {
+        CheckpointEntry entry = firstCheckpoint();
+
+        return entry != null ? entry.checkpointMark() : null;
+    }
+
+    /**
+     * @return Collection of checkpoint timestamps.
+     */
+    public Collection<Long> checkpoints(boolean descending) {
+        if (descending)
+            return histMap.descendingKeySet();
+
+        return histMap.keySet();
+    }
+
+    /**
+     *
+     */
+    public Collection<Long> checkpoints() {
+        return checkpoints(false);
+    }
+
+    /**
+     * Adds checkpoint entry after the corresponding WAL record has been written to WAL. The checkpoint itself
+     * is not finished yet.
+     *
+     * @param entry Entry to add.
+     */
+    public void addCheckpoint(CheckpointEntry entry) {
+        histMap.put(entry.timestamp(), entry);
+    }
+
+    /**
+     * @return {@code true} if there is space for next checkpoint.
+     */
+    public boolean hasSpace() {
+        return histMap.size() + 1 <= maxCpHistMemSize;
+    }
+
+    /**
+     * Clears checkpoint history after WAL truncation.
+     *
+     * @return List of checkpoint entries removed from history.
+     */
+    public List<CheckpointEntry> onWalTruncated(WALPointer ptr) {
+        List<CheckpointEntry> removed = new ArrayList<>();
+
+        FileWALPointer highBound = (FileWALPointer)ptr;
+
+        for (CheckpointEntry cpEntry : histMap.values()) {
+            FileWALPointer cpPnt = (FileWALPointer)cpEntry.checkpointMark();
+
+            if (highBound.compareTo(cpPnt) <= 0)
+                break;
+
+            if (cctx.wal().reserved(cpEntry.checkpointMark())) {
+                U.warn(log, "Could not clear historyMap due to WAL reservation on cp: " + cpEntry +
+                    ", history map size is " + histMap.size());
+
+                break;
+            }
+
+            histMap.remove(cpEntry.timestamp());
+
+            removed.add(cpEntry);
+        }
+
+        return removed;
+    }
+
+    /**
+     * Clears checkpoint history after checkpoint finish.
+     *
+     * @return List of checkpoints removed from history.
+     */
+    public List<CheckpointEntry> onCheckpointFinished(GridCacheDatabaseSharedManager.Checkpoint chp, boolean truncateWal) {
+        List<CheckpointEntry> removed = new ArrayList<>();
+
+        int deleted = 0;
+
+        while (histMap.size() > maxCpHistMemSize) {
+            Map.Entry<Long, CheckpointEntry> entry = histMap.firstEntry();
+
+            CheckpointEntry cpEntry = entry.getValue();
+
+            if (cctx.wal().reserved(cpEntry.checkpointMark())) {
+                U.warn(log, "Could not clear historyMap due to WAL reservation on cpEntry " + cpEntry.checkpointId() +
+                    ", history map size is " + histMap.size());
+
+                break;
+            }
+
+            if (truncateWal)
+                deleted += cctx.wal().truncate(null, cpEntry.checkpointMark());
+
+            histMap.remove(entry.getKey());
+
+            removed.add(cpEntry);
+        }
+
+        chp.walFilesDeleted(deleted);
+
+        return removed;
+    }
+
+    /**
+     * Tries to search for a WAL pointer for the given partition counter start.
+     *
+     * @param grpId Cache group ID.
+     * @param part Partition ID.
+     * @param partCntrSince Partition counter or {@code null} to search for minimal counter.
+     * @return Checkpoint entry or {@code null} if failed to search.
+     */
+    @Nullable public WALPointer searchPartitionCounter(int grpId, int part, long partCntrSince) {
+        CheckpointEntry entry = searchCheckpointEntry(grpId, part, partCntrSince);
+
+        if (entry == null)
+            return null;
+
+        return entry.checkpointMark();
+    }
+
+    /**
+     * Tries to search for a WAL pointer for the given partition counter start.
+     *
+     * @param grpId Cache group ID.
+     * @param part Partition ID.
+     * @param partCntrSince Partition counter or {@code null} to search for minimal counter.
+     * @return Checkpoint entry or {@code null} if failed to search.
+     */
+    @Nullable public CheckpointEntry searchCheckpointEntry(int grpId, int part, long partCntrSince) {
+        for (Long cpTs : checkpoints(true)) {
+            try {
+                CheckpointEntry entry = entry(cpTs);
+
+                Long foundCntr = entry.partitionCounter(cctx, grpId, part);
+
+                if (foundCntr != null && foundCntr <= partCntrSince)
+                    return entry;
+            }
+            catch (IgniteCheckedException ignore) {
+                break;
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Finds and reserves earliest valid checkpoint for each of given groups and partitions.
+     *
+     * @param groupsAndPartitions Groups and partitions to find and reserve earliest valid checkpoint.
+     *
+     * @return Map (groupId, Map (partitionId, earliest valid checkpoint to history search)).
+     */
+    public Map<Integer, Map<Integer, CheckpointEntry>> searchAndReserveCheckpoints(
+        final Map<Integer, Set<Integer>> groupsAndPartitions
+    ) {
+        if (F.isEmpty(groupsAndPartitions))
+            return Collections.emptyMap();
+
+        final Map<Integer, Map<Integer, CheckpointEntry>> res = new HashMap<>();
+
+        CheckpointEntry prevReserved = null;
+
+        // Iterate over all possible checkpoints starting from latest and moving to earliest.
+        for (Long cpTs : checkpoints(true)) {
+            CheckpointEntry chpEntry = null;
+
+            try {
+                chpEntry = entry(cpTs);
+
+                boolean reserved = cctx.wal().reserve(chpEntry.checkpointMark());
+
+                // If checkpoint WAL history can't be reserved, stop searching.
+                if (!reserved)
+                    break;
+
+                for (Integer grpId : groupsAndPartitions.keySet())
+                    if (!isCheckpointApplicableForGroup(grpId, chpEntry))
+                        groupsAndPartitions.remove(grpId);
+
+                // All groups are no more applicable, release history and stop searching.
+                if (groupsAndPartitions.isEmpty()) {
+                    cctx.wal().release(chpEntry.checkpointMark());
+
+                    break;
+                }
+
+                // Release previous checkpoint marker.
+                if (prevReserved != null)
+                    cctx.wal().release(prevReserved.checkpointMark());
+
+                prevReserved = chpEntry;
+
+                for (Map.Entry<Integer, CheckpointEntry.GroupState> state : chpEntry.groupState(cctx).entrySet()) {
+                    int grpId = state.getKey();
+                    CheckpointEntry.GroupState cpGroupState = state.getValue();
+
+                    Set<Integer> applicablePartitions = groupsAndPartitions.get(grpId);
+
+                    if (F.isEmpty(applicablePartitions))
+                        continue;
+
+                    Set<Integer> inapplicablePartitions = null;
+
+                    for (Integer partId : applicablePartitions) {
+                        int pIdx = cpGroupState.indexByPartition(partId);
+
+                        if (pIdx >= 0)
+                            res.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, chpEntry);
+                        else {
+                            if (inapplicablePartitions == null)
+                                inapplicablePartitions = new HashSet<>();
+
+                            // Partition is no more applicable for history search, exclude partition from searching.
+                            inapplicablePartitions.add(partId);
+                        }
+                    }
+
+                    if (!F.isEmpty(inapplicablePartitions))
+                        for (Integer partId : inapplicablePartitions)
+                            applicablePartitions.remove(partId);
+                }
+
+                // Remove groups from search with empty set of applicable partitions.
+                for (Map.Entry<Integer, Set<Integer>> e : groupsAndPartitions.entrySet())
+                    if (e.getValue().isEmpty())
+                        groupsAndPartitions.remove(e.getKey());
+            }
+            catch (IgniteCheckedException ex) {
+                U.error(log, "Failed to process checkpoint: " + (chpEntry != null ? chpEntry : "none"), ex);
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Checkpoint is not applicable when:
+     * 1) WAL was disabled somewhere after given checkpoint.
+     * 2) Checkpoint doesn't contain specified {@code grpId}.
+     *
+     * @param grpId Group ID.
+     * @param cp Checkpoint.
+     */
+    private boolean isCheckpointApplicableForGroup(int grpId, CheckpointEntry cp) throws IgniteCheckedException {
+        GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager) cctx.database();
+
+        if (dbMgr.isCheckpointInapplicableForWalRebalance(cp.timestamp(), grpId))
+            return false;
+
+        if (!cp.groupState(cctx).containsKey(grpId))
+            return false;
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index c5f3793..cb80961 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -836,7 +836,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /** {@inheritDoc} */
-    @Override public void release(WALPointer start) throws IgniteCheckedException {
+    @Override public void release(WALPointer start) {
         assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start;
 
         if (mode == WALMode.NONE)
@@ -2001,14 +2001,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     Thread.currentThread().interrupt();
                 }
                 finally {
-                    try {
-                        if (currReservedSegment != -1)
-                            release(new FileWALPointer(currReservedSegment, 0, 0));
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Can't release raw WAL segment [idx=" + currReservedSegment +
-                            "] after compression", e);
-                    }
+                    if (currReservedSegment != -1)
+                        release(new FileWALPointer(currReservedSegment, 0, 0));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java
index 7edcea9..bdadf97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java
@@ -186,7 +186,7 @@ public class VisorWalTask extends VisorMultiNodeTask<VisorWalTaskArg, VisorWalTa
             GridCacheDatabaseSharedManager dbMgr,
             FileWriteAheadLogManager wal
         ) throws IgniteCheckedException{
-            WALPointer lowBoundForTruncate = dbMgr.checkpointHistory().lowCheckpointBound();
+            WALPointer lowBoundForTruncate = dbMgr.checkpointHistory().firstCheckpointPointer();
 
             if (lowBoundForTruncate == null)
                 return Collections.emptyList();
@@ -227,7 +227,7 @@ public class VisorWalTask extends VisorMultiNodeTask<VisorWalTaskArg, VisorWalTa
             GridCacheDatabaseSharedManager dbMgr,
             FileWriteAheadLogManager wal
         ) throws IgniteCheckedException {
-            WALPointer lowBoundForTruncate = dbMgr.checkpointHistory().lowCheckpointBound();
+            WALPointer lowBoundForTruncate = dbMgr.checkpointHistory().firstCheckpointPointer();
 
             if (lowBoundForTruncate == null)
                 return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
index a090381..f06494b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
@@ -18,12 +18,23 @@
 package org.apache.ignite.internal.processors.cache.persistence;
 
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceTest;
 
 /**
  *
  */
 public class IgnitePdsAtomicCacheHistoricalRebalancingTest extends IgnitePdsAtomicCacheRebalancingTest {
     /** {@inheritDoc */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc */
     @Override protected void beforeTest() throws Exception {
         // Use rebalance from WAL if possible.
         System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
@@ -33,8 +44,16 @@ public class IgnitePdsAtomicCacheHistoricalRebalancingTest extends IgnitePdsAtom
 
     /** {@inheritDoc */
     @Override protected void afterTest() throws Exception {
+        boolean walRebalanceInvoked = !IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.allRebalances()
+            .isEmpty();
+
+        IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.cleanup();
+
         System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
 
         super.afterTest();
+
+        if (!walRebalanceInvoked)
+            throw new AssertionError("WAL rebalance hasn't been invoked.");
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
index 31980ef..347412d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.processors.cache.persistence;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -47,7 +49,6 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -66,12 +67,21 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
     /** */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
-    /** Cache name. */
-    private static final String cacheName = "cache";
+    /** Default cache. */
+    private static final String CACHE = "cache";
+
+    /** Cache with node filter. */
+    private static final String FILTERED_CACHE = "filtered";
+
+    /** Cache with enabled indexes. */
+    private static final String INDEXED_CACHE = "indexed";
 
     /** */
     protected boolean explicitTx;
 
+    /** Set to enable filtered cache on topology. */
+    private boolean filteredCacheEnabled;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -80,18 +90,19 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
 
         cfg.setRebalanceThreadPoolSize(2);
 
-        CacheConfiguration ccfg1 = cacheConfiguration(cacheName)
+        CacheConfiguration ccfg1 = cacheConfiguration(CACHE)
             .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE)
-            .setBackups(1)
+            .setBackups(2)
             .setRebalanceMode(CacheRebalanceMode.ASYNC)
             .setIndexedTypes(Integer.class, Integer.class)
             .setAffinity(new RendezvousAffinityFunction(false, 32))
             .setRebalanceBatchesPrefetchCount(2)
             .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 
-        CacheConfiguration ccfg2 = cacheConfiguration("indexed");
-        ccfg2.setBackups(1);
-        ccfg2.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        CacheConfiguration ccfg2 = cacheConfiguration(INDEXED_CACHE)
+            .setBackups(2)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 
         QueryEntity qryEntity = new QueryEntity(Integer.class.getName(), TestValue.class.getName());
 
@@ -108,36 +119,34 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
 
         ccfg2.setQueryEntities(Collections.singleton(qryEntity));
 
-        // Do not start filtered cache on coordinator.
-        if (gridName.endsWith("0")) {
-            cfg.setCacheConfiguration(ccfg1, ccfg2);
-        }
-        else {
-            CacheConfiguration ccfg3 = cacheConfiguration("filtered");
-            ccfg3.setPartitionLossPolicy(PartitionLossPolicy.READ_ONLY_SAFE);
-            ccfg3.setBackups(1);
-            ccfg3.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-            ccfg3.setNodeFilter(new CoordinatorNodeFilter());
-
-            cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3);
-        }
-
-        DataStorageConfiguration memCfg = new DataStorageConfiguration();
+        List<CacheConfiguration> cacheCfgs = new ArrayList<>();
+        cacheCfgs.add(ccfg1);
+        cacheCfgs.add(ccfg2);
 
-        memCfg.setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4);
-        memCfg.setPageSize(1024);
-        memCfg.setWalMode(WALMode.LOG_ONLY);
+        if (filteredCacheEnabled && !gridName.endsWith("0")) {
+            CacheConfiguration ccfg3 = cacheConfiguration(FILTERED_CACHE)
+                .setPartitionLossPolicy(PartitionLossPolicy.READ_ONLY_SAFE)
+                .setBackups(2)
+                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setNodeFilter(new CoordinatorNodeFilter());
 
-        DataRegionConfiguration memPlcCfg = new DataRegionConfiguration();
+            cacheCfgs.add(ccfg3);
+        }
 
-        memPlcCfg.setName("dfltDataRegion");
-        memPlcCfg.setMaxSize(150L * 1024 * 1024);
-        memPlcCfg.setInitialSize(100L * 1024 * 1024);
-        memPlcCfg.setPersistenceEnabled(true);
+        cfg.setCacheConfiguration(asArray(cacheCfgs));
 
-        memCfg.setDefaultDataRegionConfiguration(memPlcCfg);
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+            .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4)
+            .setPageSize(1024)
+            .setCheckpointFrequency(10 * 1000)
+            .setWalMode(WALMode.LOG_ONLY)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setName("dfltDataRegion")
+                .setPersistenceEnabled(true)
+                .setMaxSize(512 * 1024 * 1024)
+            );
 
-        cfg.setDataStorageConfiguration(memCfg);
+        cfg.setDataStorageConfiguration(dsCfg);
 
         cfg.setDiscoverySpi(
             new TcpDiscoverySpi()
@@ -147,6 +156,17 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
         return cfg;
     }
 
+    /**
+     * @param cacheCfgs Cache cfgs.
+     */
+    private static CacheConfiguration[] asArray(List<CacheConfiguration> cacheCfgs) {
+        CacheConfiguration[] res = new CacheConfiguration[cacheCfgs.size()];
+        for (int i = 0; i < res.length; i++)
+            res[i] = cacheCfgs.get(i);
+
+        return res;
+    }
+
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return 20 * 60 * 1000;
@@ -185,15 +205,15 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
     public void testRebalancingOnRestart() throws Exception {
         Ignite ignite0 = startGrid(0);
 
-        ignite0.cluster().active(true);
-
         startGrid(1);
 
         IgniteEx ignite2 = startGrid(2);
 
+        ignite0.cluster().active(true);
+
         awaitPartitionMapExchange();
 
-        IgniteCache<Integer, Integer> cache1 = ignite0.cache(cacheName);
+        IgniteCache<Integer, Integer> cache1 = ignite0.cache(CACHE);
 
         for (int i = 0; i < 5000; i++)
             cache1.put(i, i);
@@ -222,7 +242,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
 
         awaitPartitionMapExchange();
 
-        IgniteCache<Integer, Integer> cache3 = ignite2.cache(cacheName);
+        IgniteCache<Integer, Integer> cache3 = ignite2.cache(CACHE);
 
         for (int i = 0; i < 100; i++)
             assertEquals(String.valueOf(i), (Integer)(i * 2), cache3.get(i));
@@ -243,28 +263,21 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
 
         ignite0.cluster().active(true);
 
-        ignite0.cache(cacheName).rebalance().get();
-        ignite1.cache(cacheName).rebalance().get();
-        ignite2.cache(cacheName).rebalance().get();
-        ignite3.cache(cacheName).rebalance().get();
-
         awaitPartitionMapExchange();
 
-        IgniteCache<Integer, Integer> cache1 = ignite0.cache(cacheName);
+        IgniteCache<Integer, Integer> cache1 = ignite0.cache(CACHE);
 
         for (int i = 0; i < 1000; i++)
             cache1.put(i, i);
 
-        ignite0.context().cache().context().database().waitForCheckpoint("test");
-        ignite1.context().cache().context().database().waitForCheckpoint("test");
+        forceCheckpoint(ignite0);
+        forceCheckpoint(ignite1);
 
         info("++++++++++ After checkpoint");
 
         ignite2.close();
         ignite3.close();
 
-        resetBaselineTopology();
-
         ignite0.resetLostPartitions(Collections.singletonList(cache1.getName()));
 
         assert cache1.lostPartitions().isEmpty();
@@ -281,116 +294,20 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
 
         info(">>> Done puts...");
 
-        ignite2 = startGrid(2);
-        ignite3 = startGrid(3);
-
-        ignite2.cache(cacheName).rebalance().get();
-        ignite3.cache(cacheName).rebalance().get();
-
-        IgniteCache<Integer, Integer> cache2 = ignite2.cache(cacheName);
-        IgniteCache<Integer, Integer> cache3 = ignite3.cache(cacheName);
-
-        for (int i = 0; i < 100; i++) {
-            assertEquals(String.valueOf(i), (Integer)(i * 2), cache2.get(i));
-            assertEquals(String.valueOf(i), (Integer)(i * 2), cache3.get(i));
-        }
-    }
-
-    /**
-     * Test that all data is correctly restored after non-graceful restart.
-     *
-     * @throws Exception If fails.
-     */
-    public void testDataCorrectnessAfterRestart() throws Exception {
-        IgniteEx ignite1 = (IgniteEx)G.start(getConfiguration("test1"));
-        IgniteEx ignite2 = (IgniteEx)G.start(getConfiguration("test2"));
-        IgniteEx ignite3 = (IgniteEx)G.start(getConfiguration("test3"));
-        IgniteEx ignite4 = (IgniteEx)G.start(getConfiguration("test4"));
-
-        ignite1.cluster().active(true);
+        startGrid(2);
+        startGrid(3);
 
         awaitPartitionMapExchange();
 
-        IgniteCache<Integer, Integer> cache1 = ignite1.cache(cacheName);
-
-        for (int i = 0; i < 100; i++)
-            cache1.put(i, i);
-
-        ignite1.close();
-        ignite2.close();
-        ignite3.close();
-        ignite4.close();
-
-        ignite1 = (IgniteEx)G.start(getConfiguration("test1"));
-        ignite2 = (IgniteEx)G.start(getConfiguration("test2"));
-        ignite3 = (IgniteEx)G.start(getConfiguration("test3"));
-        ignite4 = (IgniteEx)G.start(getConfiguration("test4"));
+        ignite2 = grid(2);
+        ignite3 = grid(3);
 
-        ignite1.cluster().active(true);
-
-        awaitPartitionMapExchange();
-
-        cache1 = ignite1.cache(cacheName);
-        IgniteCache<Integer, Integer> cache2 = ignite2.cache(cacheName);
-        IgniteCache<Integer, Integer> cache3 = ignite3.cache(cacheName);
-        IgniteCache<Integer, Integer> cache4 = ignite4.cache(cacheName);
+        IgniteCache<Integer, Integer> cache2 = ignite2.cache(CACHE);
+        IgniteCache<Integer, Integer> cache3 = ignite3.cache(CACHE);
 
         for (int i = 0; i < 100; i++) {
-            assert cache1.get(i).equals(i);
-            assert cache2.get(i).equals(i);
-            assert cache3.get(i).equals(i);
-            assert cache4.get(i).equals(i);
-        }
-    }
-
-    /**
-     * Test that partitions are marked as lost when all owners leave cluster, but recover after nodes rejoin.
-     *
-     * @throws Exception If fails.
-     */
-    public void testPartitionLossAndRecover() throws Exception {
-        Ignite ignite1 = startGrid(0);
-        Ignite ignite2 = startGrid(1);
-        Ignite ignite3 = startGrid(2);
-        Ignite ignite4 = startGrid(3);
-
-        ignite1.cluster().active(true);
-
-        awaitPartitionMapExchange();
-
-        IgniteCache<String, String> cache1 = ignite1.cache(cacheName);
-
-        final int offset = 10;
-
-        for (int i = 0; i < 100; i++)
-            cache1.put(String.valueOf(i), String.valueOf(i + offset));
-
-        ignite3.close();
-        ignite4.close();
-
-        awaitPartitionMapExchange();
-
-        assert !ignite1.cache(cacheName).lostPartitions().isEmpty();
-
-        ignite3 = startGrid(2);
-        ignite4 = startGrid(3);
-
-        ignite1.resetLostPartitions(Collections.singletonList(cacheName));
-
-        IgniteCache<String, String> cache2 = ignite2.cache(cacheName);
-        IgniteCache<String, String> cache3 = ignite3.cache(cacheName);
-        IgniteCache<String, String> cache4 = ignite4.cache(cacheName);
-
-        //Thread.sleep(5_000);
-
-        for (int i = 0; i < 100; i++) {
-            String key = String.valueOf(i);
-            String expected = String.valueOf(i + offset);
-
-            assertEquals(expected, cache1.get(key));
-            assertEquals(expected, cache2.get(key));
-            assertEquals(expected, cache3.get(key));
-            assertEquals(expected, cache4.get(key));
+            assertEquals(String.valueOf(i), (Integer)(i * 2), cache2.get(i));
+            assertEquals(String.valueOf(i), (Integer)(i * 2), cache3.get(i));
         }
     }
 
@@ -401,26 +318,26 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
         final long timeOut = U.currentTimeMillis() + 10 * 60 * 1000;
 
         final int entriesCnt = 10_000;
-        int maxNodesCount = 4;
-        int topChanges = 20;
-        final String cacheName = "indexed";
+        final int maxNodesCnt = 4;
+        final int topChanges = 50;
 
         final AtomicBoolean stop = new AtomicBoolean();
+        final AtomicBoolean suspend = new AtomicBoolean();
 
         final ConcurrentMap<Integer, TestValue> map = new ConcurrentHashMap<>();
 
-        Ignite ignite = startGrid(0);
+        Ignite ignite = startGridsMultiThreaded(4);
 
         ignite.cluster().active(true);
 
-        IgniteCache<Integer, TestValue> cache = ignite.cache(cacheName);
+        IgniteCache<Integer, TestValue> cache = ignite.cache(INDEXED_CACHE);
 
         for (int i = 0; i < entriesCnt; i++) {
             cache.put(i, new TestValue(i, i));
             map.put(i, new TestValue(i, i));
         }
 
-        final AtomicInteger nodesCnt = new AtomicInteger();
+        final AtomicInteger nodesCnt = new AtomicInteger(4);
 
         IgniteInternalFuture fut = runMultiThreadedAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
@@ -428,6 +345,12 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
                     if (stop.get())
                         return null;
 
+                    if (suspend.get()) {
+                        U.sleep(10);
+
+                        continue;
+                    }
+
                     int k = ThreadLocalRandom.current().nextInt(entriesCnt);
                     int v1 = ThreadLocalRandom.current().nextInt();
                     int v2 = ThreadLocalRandom.current().nextInt();
@@ -456,7 +379,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
                         tx = ignite.transactions().txStart();
 
                     try {
-                        ignite.cache(cacheName).put(k, new TestValue(v1, v2));
+                        ignite.cache(INDEXED_CACHE).put(k, new TestValue(v1, v2));
                     }
                     catch (Exception ignored) {
                         success = false;
@@ -478,8 +401,10 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
             }
         }, 1, "load-runner");
 
+        boolean[] changes = new boolean[] {false, false, true, true};
+
         try {
-            for (int i = 0; i < topChanges; i++) {
+            for (int it = 0; it < topChanges; it++) {
                 if (U.currentTimeMillis() > timeOut)
                     break;
 
@@ -487,21 +412,30 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
 
                 boolean add;
 
-                if (nodesCnt.get() <= maxNodesCount / 2)
+                if (it < changes.length)
+                    add = changes[it];
+                else if (nodesCnt.get() <= maxNodesCnt / 2)
                     add = true;
-                else if (nodesCnt.get() > maxNodesCount)
+                else if (nodesCnt.get() >= maxNodesCnt)
                     add = false;
                 else // More chance that node will be added
                     add = ThreadLocalRandom.current().nextInt(3) <= 1;
 
                 if (add)
-                    startGrid(nodesCnt.incrementAndGet());
+                    startGrid(nodesCnt.getAndIncrement());
                 else
-                    stopGrid(nodesCnt.getAndDecrement());
+                    stopGrid(nodesCnt.decrementAndGet());
 
                 awaitPartitionMapExchange();
 
-                cache.rebalance().get();
+                suspend.set(true);
+
+                U.sleep(200);
+
+                for (Map.Entry<Integer, TestValue> entry : map.entrySet())
+                    assertEquals(it + " " + Integer.toString(entry.getKey()), entry.getValue(), cache.get(entry.getKey()));
+
+                suspend.set(false);
             }
         }
         finally {
@@ -520,14 +454,21 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
      * @throws Exception If failed.
      */
     public void testForceRebalance() throws Exception {
-        testForceRebalance(cacheName);
+        testForceRebalance(CACHE);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testForceRebalanceClientTopology() throws Exception {
-        testForceRebalance("filtered");
+        filteredCacheEnabled = true;
+
+        try {
+            testForceRebalance(FILTERED_CACHE);
+        }
+        finally {
+            filteredCacheEnabled = false;
+        }
     }
 
     /**
@@ -579,63 +520,52 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
 
         ig.cluster().active(true);
 
-        int k = 0;
+        int keys = 0;
 
-        try (IgniteDataStreamer ds = ig.dataStreamer(cacheName)) {
+        try (IgniteDataStreamer<Object, Object> ds = ig.dataStreamer(CACHE)) {
             ds.allowOverwrite(true);
 
-            for (int k0 = k; k < k0 + 10_000; k++)
-                ds.addData(k, k);
+            for (; keys < 10_000; keys++)
+                ds.addData(keys, keys);
         }
 
-        for (int t = 0; t < 5; t++) {
-            int t0 = t;
+        for (int it = 0; it < 10; it++) {
+            final int it0 = it;
+
             IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
                 try {
                     stopGrid(3);
 
-                    forceCheckpoint();
-
                     U.sleep(500); // Wait for data load.
 
-                    IgniteEx ig0 = startGrid(3);
+                    startGrid(3);
 
-                    U.sleep(2000); // Wait for node join.
+                    U.sleep(500); // Wait for data load.
 
-                    if (t0 % 2 == 1) {
+                    if (it0 % 2 != 0) {
                         stopGrid(2);
 
-                        awaitPartitionMapExchange();
-
-                        forceCheckpoint();
+                        U.sleep(500); // Wait for data load.
 
                         startGrid(2);
-
-                        awaitPartitionMapExchange();
                     }
 
-                    ig0.cache(cacheName).rebalance().get();
+                    awaitPartitionMapExchange();
                 }
                 catch (Exception e) {
                     error("Unable to start/stop grid", e);
+
                     throw new RuntimeException(e);
                 }
             });
 
-            try (IgniteDataStreamer ds = ig.dataStreamer(cacheName)) {
-                ds.allowOverwrite(true);
-
-                while (!fut.isDone()) {
-                    int k0 = k;
+            IgniteCache<Object, Object> cache = ig.cache(CACHE);
 
-                    for (;k < k0 + 3; k++)
-                        ds.addData(k, k);
+            while (!fut.isDone()) {
+                int nextKeys = keys + 10;
 
-                    U.sleep(10);
-                }
-            }
-            catch (Exception e) {
-                log.error("Unable to write data", e);
+                for (;keys < nextKeys; keys++)
+                    cache.put(keys, keys);
             }
 
             fut.get();
@@ -647,18 +577,18 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
             for (int g = 0; g < 4; g++) {
                 IgniteEx ig0 = grid(g);
 
-                for (GridDhtLocalPartition part : ig0.cachex(cacheName).context().topology().currentLocalPartitions()) {
+                for (GridDhtLocalPartition part : ig0.cachex(CACHE).context().topology().currentLocalPartitions()) {
                     if (cntrs.containsKey(part.id()))
                         assertEquals(String.valueOf(part.id()), (long) cntrs.get(part.id()), part.updateCounter());
                     else
                         cntrs.put(part.id(), part.updateCounter());
                 }
 
-                for (int k0 = 0; k0 < k; k0++)
-                    assertEquals(String.valueOf(k0) + " " + g, k0, ig0.cache(cacheName).get(k0));
+                for (int k0 = 0; k0 < keys; k0++)
+                    assertEquals(String.valueOf(k0) + " " + g, k0, ig0.cache(CACHE).get(k0));
             }
 
-            assertEquals(ig.affinity(cacheName).partitions(), cntrs.size());
+            assertEquals(ig.affinity(CACHE).partitions(), cntrs.size());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java
deleted file mode 100644
index 179c8e0..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.persistence;
-
-import org.apache.ignite.IgniteSystemProperties;
-
-/**
- *
- */
-public class IgnitePdsTxCacheHistoricalRebalancingTest extends IgnitePdsTxCacheRebalancingTest {
-    /** {@inheritDoc */
-    @Override protected void beforeTest() throws Exception {
-        // Use rebalance from WAL if possible.
-        System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
-
-        super.beforeTest();
-    }
-
-    /** {@inheritDoc */
-    @Override protected void afterTest() throws Exception {
-        System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
-
-        super.afterTest();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
index 7b853db..aa6bb30 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -223,7 +224,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
 
         IgniteEx newIgnite = startGrid(3);
 
-        final GridCacheDatabaseSharedManager.CheckpointHistory cpHist =
+        final CheckpointHistory cpHist =
             ((GridCacheDatabaseSharedManager)newIgnite.context().cache().context().database()).checkpointHistory();
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsUnusedWalSegmentsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsUnusedWalSegmentsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsUnusedWalSegmentsTest.java
index 0f52254..06a9ec2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsUnusedWalSegmentsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsUnusedWalSegmentsTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -49,6 +50,8 @@ public class IgnitePdsUnusedWalSegmentsTest extends GridCommonAbstractTest {
 
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        cfg.setConsistentId(gridName);
+
         cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
 
         CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
@@ -102,7 +105,7 @@ public class IgnitePdsUnusedWalSegmentsTest extends GridCommonAbstractTest {
 
             assertTrue("Expected that at least resIdx greater than 0, real is " + resIdx, resIdx > 0);
 
-            FileWALPointer lowPtr = (FileWALPointer)dbMgr.checkpointHistory().lowCheckpointBound();
+            FileWALPointer lowPtr = (FileWALPointer)dbMgr.checkpointHistory().firstCheckpointPointer();
 
             assertTrue("Expected that dbMbr returns valid resIdx", lowPtr.index() == resIdx);
 
@@ -136,7 +139,7 @@ public class IgnitePdsUnusedWalSegmentsTest extends GridCommonAbstractTest {
 
             assertTrue("Expected that at least resIdx greater than 0, real is " + resIdx, resIdx > 0);
 
-            FileWALPointer lowPtr = (FileWALPointer) dbMgr.checkpointHistory().lowCheckpointBound();
+            FileWALPointer lowPtr = (FileWALPointer) dbMgr.checkpointHistory().firstCheckpointPointer();
 
             assertTrue("Expected that dbMbr returns valid resIdx", lowPtr.index() == resIdx);
 
@@ -186,17 +189,10 @@ public class IgnitePdsUnusedWalSegmentsTest extends GridCommonAbstractTest {
      * Get index of reserved WAL segment by checkpointer.
      *
      * @param dbMgr Database shared manager.
-     * @throws Exception If failed.
      */
-    private long getReservedWalSegmentIndex(GridCacheDatabaseSharedManager dbMgr) throws Exception{
-        GridCacheDatabaseSharedManager.CheckpointHistory cpHist = dbMgr.checkpointHistory();
-
-        Object histMap = GridTestUtils.getFieldValue(cpHist, "histMap");
-
-        Object cpEntry = GridTestUtils.getFieldValue(GridTestUtils.invoke(histMap, "firstEntry"), "value");
-
-        FileWALPointer walPtr = GridTestUtils.getFieldValue(cpEntry, "cpMark");
+    private long getReservedWalSegmentIndex(GridCacheDatabaseSharedManager dbMgr) {
+        CheckpointHistory cpHist = dbMgr.checkpointHistory();
 
-        return walPtr.index();
+        return ((FileWALPointer) cpHist.firstCheckpointPointer()).index();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 23dda26..28801cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -17,32 +17,52 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.db.wal;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
 
 /**
- * Historic WAL rebalance base test.
+ * Historical WAL rebalance base test.
  */
 public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     /** Cache name. */
     private static final String CACHE_NAME = "cache";
 
+    /** Partitions count. */
+    private static final int PARTS_CNT = 32;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); //to make all rebalance wal-based
@@ -51,25 +71,24 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
 
         cfg.setConsistentId(gridName);
 
-        CacheConfiguration<Integer, IndexedObject> ccfg = new CacheConfiguration<>(CACHE_NAME);
-
-        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
-
-        ccfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
-
-        ccfg.setCacheMode(CacheMode.REPLICATED);
-
-        ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setRebalanceMode(CacheRebalanceMode.ASYNC)
+            .setCacheMode(CacheMode.REPLICATED)
+            .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
 
         cfg.setCacheConfiguration(ccfg);
 
         DataStorageConfiguration dbCfg = new DataStorageConfiguration()
                     .setWalHistorySize(Integer.MAX_VALUE)
                     .setWalMode(WALMode.LOG_ONLY)
+                    .setCheckpointFrequency(15 * 60 * 1000)
                     .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true));
 
         cfg.setDataStorageConfiguration(dbCfg);
 
+        cfg.setCommunicationSpi(new WalRebalanceCheckingCommunicationSpi());
+
         return cfg;
     }
 
@@ -83,10 +102,19 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         System.clearProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+        System.clearProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING);
+
+        boolean walRebalanceInvoked = !IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.allRebalances()
+            .isEmpty();
+
+        IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.cleanup();
 
         stopAllGrids();
 
         cleanPersistenceDir();
+
+        if (!walRebalanceInvoked)
+            throw new AssertionError("WAL rebalance hasn't been invoked.");
     }
 
     /**
@@ -97,7 +125,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     public void testSimple() throws Exception {
         IgniteEx ig0 = startGrid(0);
         IgniteEx ig1 = startGrid(1);
-        final int entryCnt = 10_000;
+
+        final int entryCnt = PARTS_CNT * 100;
 
         ig0.cluster().active(true);
 
@@ -135,7 +164,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     public void testRebalanceRemoves() throws Exception {
         IgniteEx ig0 = startGrid(0);
         IgniteEx ig1 = startGrid(1);
-        final int entryCnt = 10_000;
+
+        final int entryCnt = PARTS_CNT * 100;
 
         ig0.cluster().active(true);
 
@@ -174,6 +204,164 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Test that WAL rebalance is not invoked if there are gaps in WAL history due to temporary WAL disabling.
+     *
+     * @throws Exception If failed.
+     */
+    public void testWithLocalWalChange() throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, "true");
+
+        IgniteEx crd = (IgniteEx) startGrids(4);
+
+        crd.cluster().active(true);
+
+        final int entryCnt = PARTS_CNT * 10;
+
+        {
+            IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME);
+
+            for (int k = 0; k < entryCnt; k++)
+                cache.put(k, new IndexedObject(k - 1));
+        }
+
+        stopAllGrids();
+
+        IgniteEx ig0 = (IgniteEx) startGrids(2);
+
+        ig0.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ig0.cache(CACHE_NAME);
+
+        int grpId = ig0.cachex(CACHE_NAME).context().groupId();
+
+        for (int k = 0; k < entryCnt; k++)
+            cache.put(k, new IndexedObject(k));
+
+        // This node should rebalance data from other nodes and shouldn't have WAL history.
+        Ignite ignite = startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        Set<Long> topVers = ((WalRebalanceCheckingCommunicationSpi) ignite.configuration().getCommunicationSpi())
+            .walRebalanceVersions(grpId);
+
+        Assert.assertTrue(topVers.contains(ignite.cluster().topologyVersion()));
+
+        // Rewrite some data.
+        for (int k = 0; k < entryCnt; k++) {
+            if (k % 3 == 0)
+                cache.put(k, new IndexedObject(k + 1));
+            else if (k % 3 == 1) // Spread removes across all partitions.
+                cache.remove(k);
+        }
+
+        // Stop grids which have actual WAL history.
+        stopGrid(0);
+
+        stopGrid(1);
+
+        // Start new node which should rebalance all data from node(2) without using WAL,
+        // because node(2) doesn't have full history for rebalance.
+        ignite = startGrid(3);
+
+        awaitPartitionMapExchange();
+
+        topVers = ((WalRebalanceCheckingCommunicationSpi) ignite.configuration().getCommunicationSpi())
+            .walRebalanceVersions(grpId);
+
+        Assert.assertFalse(topVers.contains(ignite.cluster().topologyVersion()));
+
+        // Check data consistency.
+        for (Ignite ig : G.allGrids()) {
+            IgniteCache<Object, Object> cache1 = ig.cache(CACHE_NAME);
+
+            for (int k = 0; k < entryCnt; k++) {
+                if (k % 3 == 0)
+                    assertEquals(new IndexedObject(k + 1), cache1.get(k));
+                else if (k % 3 == 1)
+                    assertNull(cache1.get(k));
+                else
+                    assertEquals(new IndexedObject(k), cache1.get(k));
+            }
+        }
+    }
+
+    /**
+     * Test that WAL rebalance is not invoked if there are gaps in WAL history due to global WAL disabling.
+     *
+     * @throws Exception If failed.
+     */
+    public void testWithGlobalWalChange() throws Exception {
+        // Prepare some data.
+        IgniteEx crd = (IgniteEx) startGrids(3);
+
+        crd.cluster().active(true);
+
+        final int entryCnt = PARTS_CNT * 10;
+
+        {
+            IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME);
+
+            for (int k = 0; k < entryCnt; k++)
+                cache.put(k, new IndexedObject(k - 1));
+        }
+
+        stopAllGrids();
+
+        // Rewrite data with globally disabled WAL.
+        crd = (IgniteEx) startGrids(2);
+
+        crd.cluster().active(true);
+
+        crd.cluster().disableWal(CACHE_NAME);
+
+        IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME);
+
+        int grpId = crd.cachex(CACHE_NAME).context().groupId();
+
+        for (int k = 0; k < entryCnt; k++)
+            cache.put(k, new IndexedObject(k));
+
+        crd.cluster().enableWal(CACHE_NAME);
+
+        // This node shouldn't rebalance data using WAL, because it was disabled on other nodes.
+        IgniteEx ignite = startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        Set<Long> topVers = ((WalRebalanceCheckingCommunicationSpi) ignite.configuration().getCommunicationSpi())
+            .walRebalanceVersions(grpId);
+
+        Assert.assertFalse(topVers.contains(ignite.cluster().topologyVersion()));
+
+        stopGrid(2);
+
+        // Fix actual state to have start point in WAL to rebalance from.
+        forceCheckpoint();
+
+        // After another rewriting data with enabled WAL, node should rebalance this diff using WAL rebalance.
+        for (int k = 0; k < entryCnt; k++)
+            cache.put(k, new IndexedObject(k + 1));
+
+        ignite = startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        topVers = ((WalRebalanceCheckingCommunicationSpi) ignite.configuration().getCommunicationSpi())
+            .walRebalanceVersions(grpId);
+
+        Assert.assertTrue(topVers.contains(ignite.cluster().topologyVersion()));
+
+        // Check data consistency.
+        for (Ignite ig : G.allGrids()) {
+            IgniteCache<Object, Object> cache1 = ig.cache(CACHE_NAME);
+
+            for (int k = 0; k < entryCnt; k++)
+                assertEquals(new IndexedObject(k + 1), cache1.get(k));
+        }
+    }
+
+    /**
      *
      */
     private static class IndexedObject {
@@ -214,4 +402,63 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
             return S.toString(IndexedObject.class, this);
         }
     }
+
+    /**
+     * Wrapper of communication spi to detect on what topology versions WAL rebalance has happened.
+     */
+    public static class WalRebalanceCheckingCommunicationSpi extends TcpCommunicationSpi {
+        /** (Group ID, Set of topology versions). */
+        private static final Map<Integer, Set<Long>> topVers = new HashMap<>();
+
+        /** Lock object. */
+        private static final Object mux = new Object();
+
+        /**
+         * @param grpId Group ID.
+         * @return Set of topology versions where WAL history has been used for rebalance.
+         */
+        Set<Long> walRebalanceVersions(int grpId) {
+            synchronized (mux) {
+                return Collections.unmodifiableSet(topVers.getOrDefault(grpId, Collections.emptySet()));
+            }
+        }
+
+        /**
+         * @return All topology versions for all groups where WAL rebalance has been used.
+         */
+        public static Map<Integer, Set<Long>> allRebalances() {
+            synchronized (mux) {
+                return Collections.unmodifiableMap(topVers);
+            }
+        }
+
+        /**
+         * Cleans all rebalances history.
+         */
+        public static void cleanup() {
+            synchronized (mux) {
+                topVers.clear();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            if (((GridIoMessage)msg).message() instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage) ((GridIoMessage)msg).message();
+
+                IgniteDhtDemandedPartitionsMap map = demandMsg.partitions();
+
+                if (!map.historicalMap().isEmpty()) {
+                    int grpId = demandMsg.groupId();
+                    long topVer = demandMsg.topologyVersion().topologyVersion();
+
+                    synchronized (mux) {
+                        topVers.computeIfAbsent(grpId, v -> new HashSet<>()).add(topVer);
+                    }
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
index 4493b4e..5abcff4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryTxLogicalRecordsTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeListImpl;
@@ -500,7 +501,7 @@ public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
                 dbMgr.waitForCheckpoint("test");
             }
 
-            GridCacheDatabaseSharedManager.CheckpointHistory hist = dbMgr.checkpointHistory();
+            CheckpointHistory hist = dbMgr.checkpointHistory();
 
             assertTrue(hist.checkpoints().size() <= WAL_HIST_SIZE);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10aa02ac/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index 1cb777a..2d967cd 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinaryMe
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinarySortObjectFieldsTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedIndexTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsMarshallerMappingRestoreOnNodeStartTest;
-import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheHistoricalRebalancingTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreCacheGroupsTest;
 import org.apache.ignite.internal.processors.cache.persistence.PersistenceDirectoryWarningLoggingTest;
@@ -67,7 +66,6 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite {
         suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class);
 
         suite.addTestSuite(IgnitePdsAtomicCacheHistoricalRebalancingTest.class);
-        suite.addTestSuite(IgnitePdsTxCacheHistoricalRebalancingTest.class);
         suite.addTestSuite(IgniteWalRebalanceTest.class);
 
         suite.addTestSuite(IgniteWalRecoveryPPCTest.class);


Mime
View raw message