ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [4/5] ignite git commit: WIP
Date Mon, 05 Jun 2017 16:30:58 GMT
WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f4c76a8f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f4c76a8f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f4c76a8f

Branch: refs/heads/ignite-5375
Commit: f4c76a8f8e4330cea08b7f001063784fee0e7cb3
Parents: 887880d
Author: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Authored: Mon Jun 5 18:14:34 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Mon Jun 5 18:14:34 2017 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/MemoryMetrics.java   |  13 +-
 .../org/apache/ignite/PersistenceMetrics.java   |  36 ++-
 .../pagemem/wal/IgniteWriteAheadLogManager.java |   5 +
 .../cache/database/PersistenceMetricsImpl.java  | 200 ---------------
 .../cache/database/tree/io/PageIO.java          |   8 +
 .../ignite/mxbean/PersistenceMetricsMXBean.java |  16 +-
 .../GridCacheDatabaseSharedManager.java         | 111 +++++---
 .../cache/database/PersistenceMetricsImpl.java  | 253 +++++++++++++++++++
 .../pagemem/CheckpointMetricsTracker.java       | 183 ++++++++++++++
 .../cache/database/pagemem/PageMemoryEx.java    |   3 +-
 .../cache/database/pagemem/PageMemoryImpl.java  |   9 +-
 .../database/wal/FileWriteAheadLogManager.java  | 112 ++++++--
 12 files changed, 673 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/core/src/main/java/org/apache/ignite/MemoryMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/MemoryMetrics.java b/modules/core/src/main/java/org/apache/ignite/MemoryMetrics.java
index b12da46..21dad20 100644
--- a/modules/core/src/main/java/org/apache/ignite/MemoryMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/MemoryMetrics.java
@@ -91,20 +91,27 @@ public interface MemoryMetrics {
     public float getPagesFillFactor();
 
     /**
+     * Gets the number of dirty pages (pages which contents is different from the current persistent storage state).
+     * This metric is enabled only for Ignite nodes with enabled persistence.
      *
-     * @return
+     * @return Current number of dirty pages.
      */
     public long getDirtyPages();
 
     /**
+     * Gets rate (pages per second) at which pages get replaced with other pages from persistent storage.
+     * The rate effectively represents the rate at which pages get 'evicted' in favor of newly needed pages.
+     * This metric is enabled only for Ignite nodes with enabled persistence.
      *
-     * @return
+     * @return Pages per second replace rate.
      */
     public float getPagesReplaceRate();
 
     /**
+     * Gets total number of pages currently loaded to the RAM. When persistence is disabled, this metric is equal
+     * to {@link #getTotalAllocatedPages()}.
      *
-     * @return
+     * @return Total number of pages loaded to RAM.
      */
     public long getPhysicalMemoryPages();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/core/src/main/java/org/apache/ignite/PersistenceMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/PersistenceMetrics.java b/modules/core/src/main/java/org/apache/ignite/PersistenceMetrics.java
index 2769d65..3f00b01 100644
--- a/modules/core/src/main/java/org/apache/ignite/PersistenceMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/PersistenceMetrics.java
@@ -58,31 +58,57 @@ public interface PersistenceMetrics {
 
     /**
      * Gets the duration of the last checkpoint in milliseconds.
+     *
+     * @return Total checkpoint duration in milliseconds.
      */
     public long getLastCheckpointingDuration();
 
     /**
+     * Gets the duration of last checkpoint lock wait in milliseconds.
+     *
+     * @return Checkpoint lock wait time in milliseconds.
+     */
+    public long getLastCheckpointLockWaitDuration();
+
+    /**
+     * Gets the duration of last checkpoint mark phase in milliseconds.
+     *
+     * @return Checkpoint mark duration in milliseconds.
+     */
+    public long getLastCheckpointMarkDuration();
+
+    /**
+     * Gets the duration of last checkpoint pages write phase in milliseconds.
+     *
+     * @return Checkpoint pages write phase in milliseconds.
+     */
+    public long getLastCheckpointPagesWriteDuration();
+
+    /**
      * Gets the duration of the sync phase of the last checkpoint in milliseconds.
+     *
+     * @return Checkpoint fsync time in milliseconds.
      */
     public long getLastCheckpointFsyncDuration();
 
     /**
      * Gets the total number of pages written during the last checkpoint.
+     *
+     * @return Total number of pages written during the last checkpoint.
      */
     public long getLastCheckpointTotalPagesNumber();
 
     /**
      * Gets the number of data pages written during the last checkpoint.
+     *
+     * @return Total number of data pages written during the last checkpoint.
      */
     public long getLastCheckpointDataPagesNumber();
 
     /**
-     * Gets the number of index pages written during the last checkpoint.
-     */
-    public long getLastCheckpointIndexPagesNumber();
-
-    /**
      * Gets the number of pages copied to a temporary checkpoint buffer during the last checkpoint.
+     *
+     * @return Total number of pages copied to a temporary checkpoint buffer during the last checkpoint.
      */
     public long getLastCheckpointCopiedOnWritePagesNumber();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index ac785b6..187df8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -102,6 +102,11 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
     public int truncate(WALPointer ptr);
 
     /**
+     * @return Total number of segments in the WAL archive.
+     */
+    public int walArchiveSegments();
+
+    /**
      * @param ptr Pointer.
      * @return True if given pointer is located in reserved segment.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java
deleted file mode 100644
index d493ebf..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.database;
-
-import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics;
-import org.apache.ignite.mxbean.PersistenceMetricsMXBean;
-
-/**
- *
- */
-public class PersistenceMetricsImpl implements PersistenceMetricsMXBean {
-    /** */
-    private volatile HitRateMetrics walLoggingRate;
-
-    /** */
-    private volatile HitRateMetrics walWritingRate;
-
-    /** */
-    private volatile int walArchiveSegments;
-
-    /** */
-    private volatile long walFsyncTimeAvg;
-
-    /** */
-    private volatile long lastCpDuration;
-
-    /** */
-    private volatile long lastCpFsyncDuration;
-
-    /** */
-    private volatile long lastCpTotalPages;
-
-    /** */
-    private volatile long lastCpDataPages;
-
-    /** */
-    private volatile long lastCpIdxPages;
-
-    /** */
-    private volatile long lastCpCowPages;
-
-    /** */
-    private volatile long rateTimeInterval;
-
-    /** */
-    private volatile int subInts;
-
-    /** */
-    private volatile boolean metricsEnabled;
-
-    /**
-     * @param metricsEnabled Metrics enabled flag.
-     * @param rateTimeInterval Rate time interval.
-     * @param subInts Number of sub-intervals.
-     */
-    public PersistenceMetricsImpl(boolean metricsEnabled, long rateTimeInterval, int subInts) {
-        this.metricsEnabled = metricsEnabled;
-        this.rateTimeInterval = rateTimeInterval;
-        this.subInts = subInts;
-
-        resetRates();
-    }
-
-    /** {@inheritDoc} */
-    @Override public float getWalLoggingRate() {
-        if (!metricsEnabled)
-            return 0;
-
-        return ((float)walLoggingRate.getRate()) / rateTimeInterval;
-    }
-
-    /** {@inheritDoc} */
-    @Override public float getWalWritingRate() {
-        if (!metricsEnabled)
-            return 0;
-
-        return ((float)walWritingRate.getRate()) / rateTimeInterval;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getWalArchiveSegments() {
-        return 0;
-    }
-
-    @Override public float getWalFsyncTimeAverage() {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointingDuration() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpDuration;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointFsyncDuration() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpFsyncDuration;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointTotalPagesNumber() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpTotalPages;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointDataPagesNumber() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpDataPages;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointIndexPagesNumber() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpIdxPages;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLastCheckpointCopiedOnWritePagesNumber() {
-        if (!metricsEnabled)
-            return 0;
-
-        return lastCpCowPages;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void enableMetrics() {
-        metricsEnabled = true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void disableMetrics() {
-        metricsEnabled = false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void rateTimeInterval(long rateTimeInterval) {
-        this.rateTimeInterval = rateTimeInterval;
-
-        resetRates();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void subIntervals(int subInts) {
-        this.subInts = subInts;
-
-        resetRates();
-    }
-
-    public void onCheckpoint(
-        long duration,
-        long fsyncDuration,
-        long totalPages,
-        long dataPages,
-        long idxPages,
-        long cowPages
-    ) {
-        if (metricsEnabled) {
-            lastCpDuration = duration;
-            lastCpFsyncDuration = fsyncDuration;
-            lastCpTotalPages = totalPages;
-            lastCpDataPages = dataPages;
-            lastCpIdxPages = idxPages;
-            lastCpCowPages = cowPages;
-        }
-    }
-
-    /**
-     *
-     */
-    private void resetRates() {
-        walLoggingRate = new HitRateMetrics((int)rateTimeInterval, subInts);
-        walWritingRate = new HitRateMetrics((int)rateTimeInterval, subInts);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
index e40ed11..dd6a503 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
@@ -537,4 +537,12 @@ public abstract class PageIO {
 
         throw new IgniteCheckedException("Unknown page IO type: " + type);
     }
+
+    /**
+     * @param type Type to test.
+     * @return {@code True} if data page.
+     */
+    public static boolean isDataPageType(int type) {
+        return type == T_DATA;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/core/src/main/java/org/apache/ignite/mxbean/PersistenceMetricsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/PersistenceMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/PersistenceMetricsMXBean.java
index da18958..0926460 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/PersistenceMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/PersistenceMetricsMXBean.java
@@ -45,6 +45,18 @@ public interface PersistenceMetricsMXBean extends PersistenceMetrics {
     @Override long getLastCheckpointingDuration();
 
     /** {@inheritDoc} */
+    @MXBeanDescription("Duration of the checkpoint lock wait in milliseconds.")
+    @Override long getLastCheckpointLockWaitDuration();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Duration of the checkpoint mark in milliseconds.")
+    @Override long getLastCheckpointMarkDuration();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Duration of the checkpoint pages write in milliseconds.")
+    @Override long getLastCheckpointPagesWriteDuration();
+
+    /** {@inheritDoc} */
     @MXBeanDescription("Duration of the sync phase of the last checkpoint in milliseconds")
     @Override long getLastCheckpointFsyncDuration();
 
@@ -57,10 +69,6 @@ public interface PersistenceMetricsMXBean extends PersistenceMetrics {
     @Override long getLastCheckpointDataPagesNumber();
 
     /** {@inheritDoc} */
-    @MXBeanDescription("Total number of index pages written during the last checkpoint")
-    @Override long getLastCheckpointIndexPagesNumber();
-
-    /** {@inheritDoc} */
     @MXBeanDescription("Number of pages copied to a temporary checkpoint buffer during the last checkpoint")
     @Override long getLastCheckpointCopiedOnWritePagesNumber();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
index 8408e1c..5c2e822 100755
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
@@ -57,6 +57,10 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMException;
+import javax.management.MBeanRegistrationException;
+import javax.management.ObjectName;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -100,6 +104,7 @@ import org.apache.ignite.internal.processors.cache.ClusterState;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.database.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.database.pagemem.CheckpointMetricsTracker;
 import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryEx;
 import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryImpl;
 import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
@@ -126,6 +131,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.mxbean.PersistenceMetricsMXBean;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
@@ -272,6 +278,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** */
     private PersistenceMetricsImpl persStoreMetrics;
 
+    /** */
+    private ObjectName persistenceMetricsMbeanName;
+
     /**
      * @param ctx Kernal context.
      */
@@ -300,6 +309,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         };
 
         persStoreMetrics = new PersistenceMetricsImpl(
+            cctx.wal(),
             persistenceCfg.isMetricsEnabled(),
             persistenceCfg.getRateTimeInterval(),
             persistenceCfg.getSubIntervals()
@@ -347,6 +357,19 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 throw new IgniteCheckedException("Could not create directory for checkpoint metadata: " + cpDir);
 
             fileLockHolder = new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log);
+
+            try {
+                persistenceMetricsMbeanName = U.registerMBean(
+                    cctx.kernalContext().config().getMBeanServer(),
+                    cctx.kernalContext().igniteInstanceName(),
+                    "Persistent Store",
+                    "PersistenceMetrics",
+                    persStoreMetrics,
+                    PersistenceMetricsMXBean.class);
+            }
+            catch (JMException e) {
+                throw new IgniteCheckedException("Failed to register persistence metrics MBean", e);
+            }
         }
     }
 
@@ -549,6 +572,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             fileLockHolder.close();
         }
+
+        if (persistenceMetricsMbeanName != null) {
+            try {
+                cctx.kernalContext().config().getMBeanServer().unregisterMBean(persistenceMetricsMbeanName);
+            }
+            catch (InstanceNotFoundException ignore) {
+                // No-op, nothing to unregister.
+            }
+            catch (MBeanRegistrationException e) {
+                U.error(log, "Failed to unregister persistence metrics MBean (will continue stop routine)", e);
+            }
+        }
     }
 
     /** {@inheritDoc} */
@@ -1086,8 +1121,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             cp.cancelOrWaitPartitionDestroy(grpId, partId);
     }
 
-
-
     /**
      * Tries to search for a WAL pointer for the given partition counter start.
      *
@@ -1668,7 +1701,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             for (FullPageId fullId : cpPages) {
                 tmpWriteBuf.rewind();
 
-                Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf);
+                Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf, null);
 
                 if (tag != null) {
                     tmpWriteBuf.rewind();
@@ -1982,14 +2015,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          */
         private void doCheckpoint() {
             try {
-                long start = U.currentTimeMillis();
+                CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
 
-                Checkpoint chp = markCheckpointBegin();
+                Checkpoint chp = markCheckpointBegin(tracker);
 
                 snapshotMgr.onCheckPointBegin();
 
-                long written, fsync, marked = U.currentTimeMillis();
-
                 int pages = chp.cpPages.size();
 
                 boolean interrupted = true;
@@ -2001,9 +2032,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     CountDownFuture doneWriteFut = new CountDownFuture(
                         asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
 
+                    tracker.onPagesWriteStart();
+
                     if (asyncRunner != null) {
                         for (int i = 0; i < chp.cpPages.collectionsSize(); i++) {
                             Runnable write = new WriteCheckpointPages(
+                                tracker,
                                 chp.cpPages.innerCollection(i),
                                 updStores,
                                 doneWriteFut
@@ -2020,7 +2054,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     }
                     else {
                         // Single-threaded checkpoint.
-                        Runnable write = new WriteCheckpointPages(chp.cpPages, updStores, doneWriteFut);
+                        Runnable write = new WriteCheckpointPages(tracker, chp.cpPages, updStores, doneWriteFut);
 
                         write.run();
                     }
@@ -2035,7 +2069,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                     snapshotMgr.afterCheckpointPageWritten();
 
-                    written = U.currentTimeMillis();
+                    tracker.onFsyncStart();
 
                     if (!skipSync) {
                         for (PageStore updStore : updStores) {
@@ -2046,8 +2080,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         }
                     }
 
-                    fsync = U.currentTimeMillis();
-
                     // Must mark successful checkpoint only if there are no exceptions or interrupts.
                     interrupted = false;
                 }
@@ -2056,7 +2088,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         markCheckpointEnd(chp);
                 }
 
-                long fsyncEnd = U.currentTimeMillis();
+                tracker.onEnd();
 
                 // We finished this checkpoint, now it's time to clean up partitions.
                 PartitionDestroyQueue destroyQueue = chp.progress.destroyQueue;
@@ -2089,24 +2121,27 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 if (printCheckpointStats) {
                     if (log.isInfoEnabled())
                         log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
-                                "walSegmentsCleared=%d, markBegin=%dms, pagesWrite=%dms, fsync=%dms, markEnd=%dms, " +
+                                "walSegmentsCleared=%d, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " +
                                 "total=%dms]",
                             chp.cpEntry.checkpointId(),
                             pages,
                             chp.cpEntry.checkpointMark(),
                             chp.walFilesDeleted,
-                            marked - start,
-                            written - marked,
-                            fsync - written,
-                            fsyncEnd - fsync,
-                            fsyncEnd - start));
+                            tracker.markDuration(),
+                            tracker.pagesWriteDuration(),
+                            tracker.fsyncDuration(),
+                            tracker.totalDuration()));
                 }
 
                 persStoreMetrics.onCheckpoint(
-                    fsyncEnd - start,
-                    fsync - written,
+                    tracker.lockWaitDuration(),
+                    tracker.markDuration(),
+                    tracker.pagesWriteDuration(),
+                    tracker.fsyncDuration(),
+                    tracker.totalDuration(),
                     pages,
-                    0, 0, 0);
+                    tracker.dataPagesWritten(),
+                    tracker.cowPagesWritten());
             }
             catch (IgniteCheckedException e) {
                 // TODO-ignite-db how to handle exception?
@@ -2148,21 +2183,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          *
          */
         @SuppressWarnings("TooBroadScope")
-        private Checkpoint markCheckpointBegin() throws IgniteCheckedException {
+        private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws IgniteCheckedException {
             CheckpointRecord cpRec = new CheckpointRecord(null, false);
 
             WALPointer cpPtr;
 
             GridMultiCollectionWrapper<FullPageId> cpPages;
 
-            long lockAcquired, lockReleased, lockStart = U.currentTimeMillis();
-
             final CheckpointProgress curr;
 
+            tracker.onLockWaitStart();
+
             checkpointLock.writeLock().lock();
 
             try {
-                lockAcquired = U.currentTimeMillis();
+                tracker.onMarkStart();
 
                 synchronized (this) {
                     curr = scheduledCp;
@@ -2231,7 +2266,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             finally {
                 checkpointLock.writeLock().unlock();
 
-                lockReleased = U.currentTimeMillis();
+                tracker.onLockRelease();
             }
 
             curr.cpBeginFut.onDone();
@@ -2257,8 +2292,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                             "checkpointLockHoldTime=%dms, pages=%d, reason='%s']",
                         cpRec.checkpointId(),
                         cpPtr,
-                        lockAcquired - lockStart,
-                        lockReleased - lockAcquired,
+                        tracker.lockWaitDuration(),
+                        tracker.lockHoldDuration(),
                         cpPages.size(),
                         curr.reason)
                     );
@@ -2336,6 +2371,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      */
     private class WriteCheckpointPages implements Runnable {
         /** */
+        private CheckpointMetricsTracker tracker;
+
+        /** */
         private Collection<FullPageId> writePageIds;
 
         /** */
@@ -2348,10 +2386,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          * @param writePageIds Write page IDs.
          */
         private WriteCheckpointPages(
+            CheckpointMetricsTracker tracker,
             Collection<FullPageId> writePageIds,
             GridConcurrentHashSet<PageStore> updStores,
             CountDownFuture doneFut
         ) {
+            this.tracker = tracker;
             this.writePageIds = writePageIds;
             this.updStores = updStores;
             this.doneFut = doneFut;
@@ -2383,11 +2423,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                     PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
 
-                    Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf);
+                    Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null);
 
                     if (tag != null) {
                         tmpWriteBuf.rewind();
 
+                        if (persStoreMetrics.metricsEnabled()) {
+                            int pageType = PageIO.getType(tmpWriteBuf);
+
+                            if (PageIO.isDataPageType(pageType))
+                                tracker.onDataPageWritten();
+                        }
+
                         if (!skipCrc) {
                             PageIO.setCrc(writeAddr, PureJavaCrc32.calcCrc32(tmpWriteBuf, pageSize()));
 
@@ -2509,7 +2556,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /**
      *
      */
-    public static class CheckpointProgress {
+    private static class CheckpointProgress {
         /** */
         private volatile long nextCpTs;
 
@@ -2520,13 +2567,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         private GridFutureAdapter cpFinishFut = new GridFutureAdapter<>();
 
         /** */
-        public volatile boolean nextSnapshot;
+        private volatile boolean nextSnapshot;
 
         /** */
         private volatile boolean started;
 
         /** */
-        public volatile SnapshotOperation snapshotOperation;
+        private volatile SnapshotOperation snapshotOperation;
 
         /** Wakeup reason. */
         private String reason;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java
new file mode 100644
index 0000000..25ac74d
--- /dev/null
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/PersistenceMetricsImpl.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.database;
+
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics;
+import org.apache.ignite.mxbean.PersistenceMetricsMXBean;
+
+/**
+ *
+ */
+public class PersistenceMetricsImpl implements PersistenceMetricsMXBean {
+    /** */
+    private volatile HitRateMetrics walLoggingRate;
+
+    /** */
+    private volatile HitRateMetrics walWritingRate;
+
+    /** */
+    private volatile long walFsyncTimeAvg;
+
+    /** */
+    private volatile long lastCpLockWaitDuration;
+
+    /** */
+    private volatile long lastCpMarkDuration;
+
+    /** */
+    private volatile long lastCpPagesWriteDuration;
+
+    /** */
+    private volatile long lastCpDuration;
+
+    /** */
+    private volatile long lastCpFsyncDuration;
+
+    /** */
+    private volatile long lastCpTotalPages;
+
+    /** */
+    private volatile long lastCpDataPages;
+
+    /** */
+    private volatile long lastCpCowPages;
+
+    /** */
+    private volatile long rateTimeInterval;
+
+    /** */
+    private volatile int subInts;
+
+    /** */
+    private volatile boolean metricsEnabled;
+
+    /** */
+    private IgniteWriteAheadLogManager wal;
+
+    /**
+     * @param metricsEnabled Metrics enabled flag.
+     * @param rateTimeInterval Rate time interval.
+     * @param subInts Number of sub-intervals.
+     */
+    public PersistenceMetricsImpl(
+        IgniteWriteAheadLogManager wal,
+        boolean metricsEnabled,
+        long rateTimeInterval,
+        int subInts
+    ) {
+        this.wal = wal;
+        this.metricsEnabled = metricsEnabled;
+        this.rateTimeInterval = rateTimeInterval;
+        this.subInts = subInts;
+
+        resetRates();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getWalLoggingRate() {
+        if (!metricsEnabled)
+            return 0;
+
+        return ((float)walLoggingRate.getRate()) / rateTimeInterval;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getWalWritingRate() {
+        if (!metricsEnabled)
+            return 0;
+
+        return ((float)walWritingRate.getRate()) / rateTimeInterval;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getWalArchiveSegments() {
+        if (!metricsEnabled)
+            return 0;
+
+        return wal.walArchiveSegments();
+    }
+
+    @Override public float getWalFsyncTimeAverage() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLastCheckpointingDuration() {
+        if (!metricsEnabled)
+            return 0;
+
+        return lastCpDuration;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLastCheckpointLockWaitDuration() {
+        if (!metricsEnabled)
+            return 0;
+
+        return lastCpLockWaitDuration;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLastCheckpointMarkDuration() {
+        if (!metricsEnabled)
+            return 0;
+
+        return lastCpMarkDuration;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLastCheckpointPagesWriteDuration() {
+        if (!metricsEnabled)
+            return 0;
+
+        return lastCpPagesWriteDuration;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLastCheckpointFsyncDuration() {
+        if (!metricsEnabled)
+            return 0;
+
+        return lastCpFsyncDuration;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLastCheckpointTotalPagesNumber() {
+        if (!metricsEnabled)
+            return 0;
+
+        return lastCpTotalPages;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLastCheckpointDataPagesNumber() {
+        if (!metricsEnabled)
+            return 0;
+
+        return lastCpDataPages;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLastCheckpointCopiedOnWritePagesNumber() {
+        if (!metricsEnabled)
+            return 0;
+
+        return lastCpCowPages;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void enableMetrics() {
+        metricsEnabled = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void disableMetrics() {
+        metricsEnabled = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rateTimeInterval(long rateTimeInterval) {
+        this.rateTimeInterval = rateTimeInterval;
+
+        resetRates();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void subIntervals(int subInts) {
+        this.subInts = subInts;
+
+        resetRates();
+    }
+
+    /**
+     * @return Metrics enabled flag.
+     */
+    public boolean metricsEnabled() {
+        return metricsEnabled;
+    }
+
+    /**
+     * @param lockWaitDuration Lock wait duration.
+     * @param markDuration Mark duration.
+     * @param pagesWriteDuration Pages write duration.
+     * @param fsyncDuration Total checkpoint fsync duration.
+     * @param duration Total checkpoint duration.
+     * @param totalPages Total number of all pages in checkpoint.
+     * @param dataPages Total number of data pages in checkpoint.
+     * @param cowPages Total number of COW-ed pages in checkpoint.
+     */
+    public void onCheckpoint(
+        long lockWaitDuration,
+        long markDuration,
+        long pagesWriteDuration,
+        long fsyncDuration,
+        long duration,
+        long totalPages,
+        long dataPages,
+        long cowPages
+    ) {
+        if (metricsEnabled) {
+            lastCpLockWaitDuration = lockWaitDuration;
+            lastCpMarkDuration = markDuration;
+            lastCpPagesWriteDuration = pagesWriteDuration;
+            lastCpFsyncDuration = fsyncDuration;
+            lastCpDuration = duration;
+            lastCpTotalPages = totalPages;
+            lastCpDataPages = dataPages;
+            lastCpCowPages = cowPages;
+        }
+    }
+
+    /**
+     *
+     */
+    private void resetRates() {
+        walLoggingRate = new HitRateMetrics((int)rateTimeInterval, subInts);
+        walWritingRate = new HitRateMetrics((int)rateTimeInterval, subInts);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/CheckpointMetricsTracker.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/CheckpointMetricsTracker.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/CheckpointMetricsTracker.java
new file mode 100644
index 0000000..e6b88d3
--- /dev/null
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/CheckpointMetricsTracker.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.database.pagemem;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * Tracks various checkpoint phases and stats.
+ *
+ * Assumed sequence of events:
+ * <ol>
+ *     <li>Checkpoint start</li>
+ *     <li>CP Lock wait start</li>
+ *     <li>CP mark start</li>
+ *     <li>CP Lock release</li>
+ *     <li>Pages write start</li>
+ *     <li>fsync start</li>
+ *     <li>Checkpoint end</li>
+ * </ol>
+ */
+public class CheckpointMetricsTracker {
+    /** */
+    private static final AtomicIntegerFieldUpdater<CheckpointMetricsTracker> DATA_PAGES_UPDATER =
+        AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "dataPages");
+
+    /** */
+    private static final AtomicIntegerFieldUpdater<CheckpointMetricsTracker> COW_PAGES_UPDATER =
+        AtomicIntegerFieldUpdater.newUpdater(CheckpointMetricsTracker.class, "cowPages");
+
+    /** */
+    private volatile int dataPages;
+
+    /** */
+    private volatile int cowPages;
+
+    /** */
+    private long cpStart = System.currentTimeMillis();
+
+    /** */
+    private long cpLockWaitStart;
+
+    /** */
+    private long cpMarkStart;
+
+    /** */
+    private long cpLockRelease;
+
+    /** */
+    private long cpPagesWriteStart;
+
+    /** */
+    private long cpFsyncStart;
+
+    /** */
+    private long cpEnd;
+
+    /**
+     *
+     */
+    public void onCowPageWritten() {
+        COW_PAGES_UPDATER.incrementAndGet(this);
+    }
+
+    /**
+     *
+     */
+    public void onDataPageWritten() {
+        DATA_PAGES_UPDATER.incrementAndGet(this);
+    }
+
+    /**
+     * @return COW pages.
+     */
+    public int cowPagesWritten() {
+        return cowPages;
+    }
+
+    /**
+     * @return Data pages written.
+     */
+    public int dataPagesWritten() {
+        return dataPages;
+    }
+
+    /**
+     *
+     */
+    public void onLockWaitStart() {
+        cpLockWaitStart = System.currentTimeMillis();
+    }
+
+    /**
+     *
+     */
+    public void onMarkStart() {
+        cpMarkStart = System.currentTimeMillis();
+    }
+
+    /**
+     *
+     */
+    public void onLockRelease() {
+        cpLockRelease = System.currentTimeMillis();
+    }
+
+    /**
+     *
+     */
+    public void onPagesWriteStart() {
+        cpPagesWriteStart = System.currentTimeMillis();
+    }
+
+    /**
+     *
+     */
+    public void onFsyncStart() {
+        cpFsyncStart = System.currentTimeMillis();
+    }
+
+    /**
+     *
+     */
+    public void onEnd() {
+        cpEnd = System.currentTimeMillis();
+    }
+
+    /**
+     * @return Total checkpoint duration.
+     */
+    public long totalDuration() {
+        return cpEnd - cpStart;
+    }
+
+    /**
+     * @return Checkpoint lock wait duration.
+     */
+    public long lockWaitDuration() {
+        return cpMarkStart - cpLockWaitStart;
+    }
+
+    /**
+     * @return Checkpoint mark duration.
+     */
+    public long markDuration() {
+        return cpPagesWriteStart - cpMarkStart;
+    }
+
+    /**
+     * @return Checkpoint lock hold duration.
+     */
+    public long lockHoldDuration() {
+        return cpLockRelease - cpMarkStart;
+    }
+
+    /**
+     * @return Pages write duration.
+     */
+    public long pagesWriteDuration() {
+        return cpFsyncStart - cpPagesWriteStart;
+    }
+
+    /**
+     * @return Checkpoint fsync duration.
+     */
+    public long fsyncDuration() {
+        return cpEnd - cpFsyncStart;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java
index ef84d83..56d1be9 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java
@@ -116,10 +116,11 @@ public interface PageMemoryEx extends PageMemory {
      * @param pageId Page ID to get byte buffer for. The page ID must be present in the collection returned by
      *      the {@link #beginCheckpoint()} method call.
      * @param tmpBuf Temporary buffer to write changes into.
+     * @param tracker Checkpoint metrics tracker.
      * @return {@code True} if data were read, {@code false} otherwise (data already saved to storage).
      * @throws IgniteException If failed to obtain page data.
      */
-    @Nullable public Integer getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf);
+    @Nullable public Integer getForCheckpoint(FullPageId pageId, ByteBuffer tmpBuf, CheckpointMetricsTracker tracker);
 
     /**
      * Marks partition as invalid / outdated.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java
index f357016..93ee411 100755
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java
@@ -791,7 +791,7 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Integer getForCheckpoint(FullPageId fullId, ByteBuffer tmpBuf) {
+    @Override public Integer getForCheckpoint(FullPageId fullId, ByteBuffer tmpBuf, CheckpointMetricsTracker tracker) {
         assert tmpBuf.remaining() == pageSize();
 
         Segment seg = segment(fullId.cacheId(), fullId.pageId());
@@ -869,7 +869,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         }
         else {
-            copyPageForCheckpoint(absPtr, fullId, tmpBuf);
+            copyPageForCheckpoint(absPtr, fullId, tmpBuf, tracker);
 
             return tag;
         }
@@ -880,7 +880,7 @@ public class PageMemoryImpl implements PageMemoryEx {
      * @param fullId Full id.
      * @param tmpBuf Tmp buffer.
      */
-    private void copyPageForCheckpoint(long absPtr, FullPageId fullId, ByteBuffer tmpBuf) {
+    private void copyPageForCheckpoint(long absPtr, FullPageId fullId, ByteBuffer tmpBuf, CheckpointMetricsTracker tracker) {
         assert absPtr != 0;
 
         long tmpRelPtr;
@@ -918,6 +918,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         PageHeader.dirty(tmpAbsPtr, false);
 
+        if (tracker != null)
+            tracker.onCowPageWritten();
+
         checkpointPool.releaseFreePage(tmpRelPtr);
 
         // We pinned the page when allocated the temp buffer, release it now.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c76a8f/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
index 8a113ba..9127fbb 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
@@ -147,6 +147,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** Current log segment handle */
     private volatile FileWriteHandle currentHnd;
 
+    /** */
+    private volatile long oldestArchiveSegmentIdx;
+
     /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */
     private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> currentHndUpd =
         AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd");
@@ -217,7 +220,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             checkOrPrepareFiles();
 
-            archiver = new FileArchiver();
+            IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
+
+            oldestArchiveSegmentIdx = tup == null ? 0 : tup.get1();
+
+            archiver = new FileArchiver(tup == null ? -1 : tup.get2());
 
             if (mode != Mode.DEFAULT) {
                 if (log.isInfoEnabled())
@@ -279,13 +286,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** {@inheritDoc} */
     @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
         if (log.isDebugEnabled())
-            log.debug("Activate file write ahead log [nodeId=" + cctx.localNodeId() +
+            log.debug("Activated file write ahead log manager [nodeId=" + cctx.localNodeId() +
                 " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
 
         start0();
 
         if (!cctx.kernalContext().clientNode()) {
-            archiver = new FileArchiver();
+            assert archiver != null;
 
             archiver.start();
         }
@@ -459,7 +466,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         if (inArchive)
             return true;
 
-        if (absIdx <= archiver.lastArchivedIndex())
+        if (absIdx <= lastArchivedIndex())
             return false;
 
         FileWriteHandle cur = currentHnd;
@@ -494,6 +501,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                         desc.file.getAbsolutePath());
                 else
                     deleted++;
+
+                // Bump up the oldest archive segment index.
+                if (oldestArchiveSegmentIdx < desc.idx)
+                    oldestArchiveSegmentIdx = desc.idx;
             }
         }
 
@@ -501,6 +512,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /** {@inheritDoc} */
+    @Override public int walArchiveSegments() {
+        long oldest = oldestArchiveSegmentIdx;
+
+        long lastArchived = archiver.lastArchivedAbsoluteIndex();
+
+        if (lastArchived == -1)
+            return 0;
+
+        int res = (int)(lastArchived - oldest);
+
+        return res >= 0 ? res : 0;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean reserved(WALPointer ptr) {
         FileWALPointer fPtr = (FileWALPointer)ptr;
 
@@ -510,6 +535,52 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
+     * Lists files in archive directory and returns the index of last archived file.
+     *
+     * @return The absolute index of last archived file.
+     */
+    private long lastArchivedIndex() {
+        long lastIdx = -1;
+
+        for (File file : walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)) {
+            try {
+                long idx = Long.parseLong(file.getName().substring(0, 16));
+
+                lastIdx = Math.max(lastIdx, idx);
+            }
+            catch (NumberFormatException | IndexOutOfBoundsException ignore) {
+
+            }
+        }
+
+        return lastIdx;
+    }
+
+    /**
+     * Lists files in archive directory and returns the index of last archived file.
+     *
+     * @return The absolute index of last archived file.
+     */
+    private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() {
+        long minIdx = Integer.MAX_VALUE;
+        long maxIdx = -1;
+
+        for (File file : walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)) {
+            try {
+                long idx = Long.parseLong(file.getName().substring(0, 16));
+
+                minIdx = Math.min(minIdx, idx);
+                maxIdx = Math.max(maxIdx, idx);
+            }
+            catch (NumberFormatException | IndexOutOfBoundsException ignore) {
+
+            }
+        }
+
+        return maxIdx == -1 ? null : F.t(minIdx, maxIdx);
+    }
+
+    /**
      * Creates a directory specified by the given arguments.
      *
      * @param cfg Configured directory path, may be {@code null}.
@@ -863,10 +934,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /**
          *
          */
-        private FileArchiver() {
+        private FileArchiver(long lastAbsArchivedIdx) {
             super("wal-file-archiver%" + cctx.igniteInstanceName());
 
-            lastAbsArchivedIdx = lastArchivedIndex();
+            this.lastAbsArchivedIdx = lastAbsArchivedIdx;
+        }
+
+        /**
+         * @return Last archived segment absolute index.
+         */
+        private synchronized long lastArchivedAbsoluteIndex() {
+            return lastAbsArchivedIdx;
         }
 
         /**
@@ -1128,28 +1206,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * Lists files in archive directory and returns the index of last archived file.
-         *
-         * @return The absolute index of last archived file.
-         */
-        private int lastArchivedIndex() {
-            int lastIdx = -1;
-
-            for (File file : walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)) {
-                try {
-                    int idx = Integer.parseInt(file.getName().substring(0, 16));
-
-                    lastIdx = Math.max(lastIdx, idx);
-                }
-                catch (NumberFormatException | IndexOutOfBoundsException ignore) {
-
-                }
-            }
-
-            return lastIdx;
-        }
-
-        /**
          *
          */
         private boolean checkStop() {


Mime
View raw message