ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ag...@apache.org
Subject ignite git commit: IGNITE-9280 WAL archive compaction latency is decreased
Date Fri, 17 Aug 2018 15:03:29 GMT
Repository: ignite
Updated Branches:
  refs/heads/master e827b600b -> 8170f2cb1


IGNITE-9280 WAL archive compaction latency is decreased

Signed-off-by: Andrey Gura <agura@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8170f2cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8170f2cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8170f2cb

Branch: refs/heads/master
Commit: 8170f2cb195357142eb720af395ae29c04cc6f73
Parents: e827b60
Author: Andrey Kuznetsov <stkuzma@gmail.com>
Authored: Fri Aug 17 18:03:09 2018 +0300
Committer: Andrey Gura <agura@apache.org>
Committed: Fri Aug 17 18:03:09 2018 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/events/EventType.java     | 15 ++++-
 .../ignite/events/WalSegmentArchivedEvent.java  | 13 +++-
 .../ignite/events/WalSegmentCompactedEvent.java | 46 +++++++++++++
 .../pagemem/wal/IgniteWriteAheadLogManager.java | 13 +++-
 .../GridCacheDatabaseSharedManager.java         |  4 +-
 .../wal/FileWriteAheadLogManager.java           | 33 +++++++---
 .../wal/FsyncModeFileWriteAheadLogManager.java  | 31 ++++++---
 .../db/wal/reader/IgniteWalReaderTest.java      | 69 ++++++++++++++------
 .../persistence/pagemem/NoOpWALManager.java     |  7 +-
 9 files changed, 184 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8170f2cb/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index a6ab962..485e567 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -841,6 +841,19 @@ public interface EventType {
     public static final int EVT_TX_RESUMED = 133;
 
     /**
+     * Built-in event type: WAL archive segment compaction is completed.
+     * <p>
+     * Fired for each WAL archive segment upon its compaction completion.
+     * <p>
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     *
+     * @see WalSegmentArchivedEvent
+     */
+    public static final int EVT_WAL_SEGMENT_COMPACTED = 134;
+
+    /**
      * All checkpoint events. This array can be directly passed into
      * {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to
      * subscribe to all checkpoint events.
@@ -1062,4 +1075,4 @@ public interface EventType {
      * All Ignite events (<b>excluding</b> metric update event).
      */
     public static final int[] EVTS_ALL_MINUS_METRIC_UPDATE = U.gridEvents(EVT_NODE_METRICS_UPDATED);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8170f2cb/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
b/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
index 2fc1715..2425c5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java
@@ -22,7 +22,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * Event indicates there was movement of WAL segment file to archive has been completed
+ * Event indicates the completion of WAL segment file transition to archive.
  */
 public class WalSegmentArchivedEvent extends EventAdapter {
     /** */
@@ -45,7 +45,16 @@ public class WalSegmentArchivedEvent extends EventAdapter {
         @NotNull final ClusterNode node,
         final long absWalSegmentIdx,
         final File archiveFile) {
-        super(node, "", EventType.EVT_WAL_SEGMENT_ARCHIVED);
+        this(node, absWalSegmentIdx, archiveFile, EventType.EVT_WAL_SEGMENT_ARCHIVED);
+    }
+
+    /** */
+    protected WalSegmentArchivedEvent(
+        @NotNull final ClusterNode node,
+        final long absWalSegmentIdx,
+        final File archiveFile,
+        int evtType) {
+        super(node, "", evtType);
         this.absWalSegmentIdx = absWalSegmentIdx;
         this.archiveFile = archiveFile;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8170f2cb/modules/core/src/main/java/org/apache/ignite/events/WalSegmentCompactedEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/WalSegmentCompactedEvent.java
b/modules/core/src/main/java/org/apache/ignite/events/WalSegmentCompactedEvent.java
new file mode 100644
index 0000000..50422ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/events/WalSegmentCompactedEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import java.io.File;
+import org.apache.ignite.cluster.ClusterNode;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Event indicates the completion of WAL segment compaction.
+ * <p>
+ * {@link #getArchiveFile()} corresponds to compacted file.
+ */
+public class WalSegmentCompactedEvent extends WalSegmentArchivedEvent {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates WAL segment compaction event.
+     *
+     * @param node Node.
+     * @param absWalSegmentIdx Absolute wal segment index.
+     * @param archiveFile Compacted archive file.
+     */
+    public WalSegmentCompactedEvent(
+        @NotNull final ClusterNode node,
+        final long absWalSegmentIdx,
+        final File archiveFile) {
+        super(node, absWalSegmentIdx, archiveFile, EventType.EVT_WAL_SEGMENT_COMPACTED);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8170f2cb/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index 2b6358b..a43fd0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -109,12 +109,14 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager,
Igni
     public int truncate(WALPointer low, WALPointer high);
 
     /**
-     * Gives a hint to WAL manager to compact WAL until given pointer (exclusively).
-     * Compaction implies filtering out physical records and ZIP compression.
+     * Notifies {@code this} about latest checkpoint pointer.
+     * <p>
+     * Current implementations, in fact, react by keeping all WAL segments uncompacted starting
from index prior to
+     * the index of {@code ptr}. Compaction implies filtering out physical records and ZIP
compression.
      *
      * @param ptr Pointer for which it is safe to compact the log.
      */
-    public void allowCompressionUntil(WALPointer ptr);
+    public void notchLastCheckpointPtr(WALPointer ptr);
 
     /**
      * @return Total number of segments in the WAL archive.
@@ -127,6 +129,11 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager,
Igni
     public long lastArchivedSegment();
 
     /**
+     * @return Last compacted segment index.
+     */
+    public long lastCompactedSegment();
+
+    /**
      * Checks if WAL segment is under lock or reserved
      *
      * @param ptr Pointer to check.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8170f2cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 3e73629..ce914e4 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1916,7 +1916,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             cctx.pageStore().beginRecover();
         }
         else
-            cctx.wal().allowCompressionUntil(status.startPtr);
+            cctx.wal().notchLastCheckpointPtr(status.startPtr);
 
         long start = U.currentTimeMillis();
 
@@ -3646,7 +3646,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 writeCheckpointEntry(tmpWriteBuf, cp, CheckpointEntryType.END);
 
-                cctx.wal().allowCompressionUntil(chp.cpEntry.checkpointMark());
+                cctx.wal().notchLastCheckpointPtr(chp.cpEntry.checkpointMark());
             }
 
             List<CheckpointEntry> removedFromHistory = cpHistory.onCheckpointFinished(chp,
truncateWalOnCpFinish);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8170f2cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index ed79e11..e3f5a52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -69,6 +69,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.events.WalSegmentArchivedEvent;
+import org.apache.ignite.events.WalSegmentCompactedEvent;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
@@ -128,6 +129,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_T
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
 import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
 import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
@@ -956,9 +958,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
     }
 
     /** {@inheritDoc} */
-    @Override public void allowCompressionUntil(WALPointer ptr) {
+    @Override public void notchLastCheckpointPtr(WALPointer ptr) {
         if (compressor != null)
-            compressor.allowCompressionUntil(((FileWALPointer)ptr).index());
+            compressor.keepUncompressedIdxFrom(((FileWALPointer)ptr).index());
     }
 
     /** {@inheritDoc} */
@@ -981,6 +983,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
     }
 
     /** {@inheritDoc} */
+    @Override public long lastCompactedSegment() {
+        return compressor != null ? compressor.lastCompressedIdx : -1L;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean reserved(WALPointer ptr) {
         FileWALPointer fPtr = (FileWALPointer)ptr;
 
@@ -1436,7 +1443,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
 
             if (failureProcessor != null)
                 failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
-            
+
             throw ex;
         }
     }
@@ -1912,7 +1919,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
         private volatile long lastCompressedIdx = -1L;
 
         /** All segments prior to this (inclusive) can be compressed. */
-        private volatile long lastAllowedToCompressIdx = -1L;
+        private volatile long minUncompressedIdxToKeep = -1L;
 
         /**
          *
@@ -1941,10 +1948,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
         }
 
         /**
-         * @param lastCpStartIdx Segment index to allow compression until (exclusively).
+         * @param idx Minimum raw segment index that should be preserved from deletion.
          */
-        synchronized void allowCompressionUntil(long lastCpStartIdx) {
-            lastAllowedToCompressIdx = lastCpStartIdx - 1;
+        synchronized void keepUncompressedIdxFrom(long idx) {
+            minUncompressedIdxToKeep = idx;
 
             notify();
         }
@@ -1967,7 +1974,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
                 if (stopped)
                     return -1;
 
-                while (segmentToCompress > Math.min(lastAllowedToCompressIdx, archivedMonitor.lastArchivedAbsoluteIndex()))
{
+                while (segmentToCompress > archivedMonitor.lastArchivedAbsoluteIndex())
{
                     wait();
 
                     if (stopped)
@@ -2004,7 +2011,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
                 if (segmentReservedOrLocked(desc.idx))
                     return;
 
-                if (desc.idx < lastCompressedIdx && duplicateIndices.contains(desc.idx))
{
+                if (desc.idx < minUncompressedIdxToKeep && duplicateIndices.contains(desc.idx))
{
                     if (!desc.file.delete())
                         U.warn(log, "Failed to remove obsolete WAL segment (make sure the
process has enough rights): " +
                             desc.file.getAbsolutePath() + ", exists: " + desc.file.exists());
@@ -2042,6 +2049,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter
impl
                         try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
                             f0.force();
                         }
+
+                        if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED)) {
+                            evt.record(new WalSegmentCompactedEvent(
+                                cctx.discovery().localNode(),
+                                currReservedSegment,
+                                zip.getAbsoluteFile())
+                            );
+                        }
                     }
 
                     lastCompressedIdx = currReservedSegment;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8170f2cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index 7521f73..c8224a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -66,6 +66,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.events.WalSegmentArchivedEvent;
+import org.apache.ignite.events.WalSegmentCompactedEvent;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -117,6 +118,7 @@ import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.READ;
 import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
@@ -848,9 +850,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
     }
 
     /** {@inheritDoc} */
-    @Override public void allowCompressionUntil(WALPointer ptr) {
+    @Override public void notchLastCheckpointPtr(WALPointer ptr) {
         if (compressor != null)
-            compressor.allowCompressionUntil(((FileWALPointer)ptr).index());
+            compressor.keepUncompressedIdxFrom(((FileWALPointer)ptr).index());
     }
 
     /** {@inheritDoc} */
@@ -873,6 +875,11 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
     }
 
     /** {@inheritDoc} */
+    @Override public long lastCompactedSegment() {
+        return compressor != null ? compressor.lastCompressedIdx : -1L;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean reserved(WALPointer ptr) {
         FileWALPointer fPtr = (FileWALPointer)ptr;
 
@@ -1739,7 +1746,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
         private volatile long lastCompressedIdx = -1L;
 
         /** All segments prior to this (inclusive) can be compressed. */
-        private volatile long lastAllowedToCompressIdx = -1L;
+        private volatile long minUncompressedIdxToKeep = -1L;
 
         /**
          *
@@ -1768,10 +1775,10 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
         }
 
         /**
-         * @param lastCpStartIdx Segment index to allow compression until (exclusively).
+         * @param idx Minimum raw segment index that should be preserved from deletion.
          */
-        synchronized void allowCompressionUntil(long lastCpStartIdx) {
-            lastAllowedToCompressIdx = lastCpStartIdx - 1;
+        synchronized void keepUncompressedIdxFrom(long idx) {
+            minUncompressedIdxToKeep = idx;
 
             notify();
         }
@@ -1794,7 +1801,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
                 if (stopped)
                     return -1;
 
-                while (segmentToCompress > Math.min(lastAllowedToCompressIdx, archiver.lastArchivedAbsoluteIndex()))
{
+                while (segmentToCompress > archiver.lastArchivedAbsoluteIndex()) {
                     wait();
 
                     if (stopped)
@@ -1833,7 +1840,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
                 if (archiver0 != null && archiver0.reserved(desc.idx))
                     return;
 
-                if (desc.idx < lastCompressedIdx && duplicateIndices.contains(desc.idx))
{
+                if (desc.idx < minUncompressedIdxToKeep && duplicateIndices.contains(desc.idx))
{
                     if (!desc.file.delete())
                         U.warn(log, "Failed to remove obsolete WAL segment (make sure the
process has enough rights): " +
                             desc.file.getAbsolutePath() + ", exists: " + desc.file.exists());
@@ -1871,6 +1878,14 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
                         try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
                             f0.force();
                         }
+
+                        if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED)) {
+                            evt.record(new WalSegmentCompactedEvent(
+                                cctx.discovery().localNode(),
+                                currReservedSegment,
+                                zip.getAbsoluteFile())
+                            );
+                        }
                     }
 
                     lastCompressedIdx = currReservedSegment;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8170f2cb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
index 7c54d62..c93f8bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
@@ -51,7 +51,6 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
-import org.apache.ignite.events.EventType;
 import org.apache.ignite.events.WalSegmentArchivedEvent;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
@@ -83,6 +82,7 @@ import org.junit.Assert;
 
 import static java.util.Arrays.fill;
 import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.TX_RECORD;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
@@ -119,10 +119,13 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
     private WALMode customWalMode;
 
     /** Clear properties in afterTest() method. */
-    private boolean clearProperties;
+    private boolean clearProps;
 
     /** Set WAL and Archive path to same value. */
-    private boolean setWalAndArchiveToSameValue;
+    private boolean setWalAndArchiveToSameVal;
+
+    /** Whether to enable WAL archive compaction. */
+    private boolean enableWalCompaction;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
@@ -139,7 +142,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
         cfg.setCacheConfiguration(ccfg);
 
-        cfg.setIncludeEventTypes(EventType.EVT_WAL_SEGMENT_ARCHIVED);
+        cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_ARCHIVED, EVT_WAL_SEGMENT_COMPACTED);
 
         DataStorageConfiguration dsCfg = new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(
@@ -149,7 +152,8 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
             .setWalHistorySize(1)
             .setWalSegmentSize(1024 * 1024)
             .setWalSegments(WAL_SEGMENTS)
-            .setWalMode(customWalMode != null ? customWalMode : WALMode.BACKGROUND);
+            .setWalMode(customWalMode != null ? customWalMode : WALMode.BACKGROUND)
+            .setWalCompactionEnabled(enableWalCompaction);
 
         if (archiveIncompleteSegmentAfterInactivityMs > 0)
             dsCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs);
@@ -158,7 +162,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
         File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false);
         File wal = new File(db, "wal");
 
-        if(setWalAndArchiveToSameValue) {
+        if(setWalAndArchiveToSameVal) {
             String walAbsPath = wal.getAbsolutePath();
 
             dsCfg.setWalPath(walAbsPath);
@@ -186,7 +190,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
         cleanPersistenceDir();
 
-        if (clearProperties)
+        if (clearProps)
             System.clearProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS);
     }
 
@@ -194,7 +198,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testFillWalAndReadRecords() throws Exception {
-        setWalAndArchiveToSameValue = false;
+        setWalAndArchiveToSameVal = false;
 
         Ignite ignite0 = startGrid();
 
@@ -285,6 +289,29 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testArchiveCompletedEventFired() throws Exception {
+        assertTrue(checkWhetherWALRelatedEventFired(EVT_WAL_SEGMENT_ARCHIVED));
+    }
+
+    /**
+     * Tests archive completed event is fired.
+     *
+     * @throws Exception if failed.
+     */
+    public void testArchiveCompactedEventFired() throws Exception {
+        boolean oldEnableWalCompaction = enableWalCompaction;
+
+        try {
+            enableWalCompaction = true;
+
+            assertTrue(checkWhetherWALRelatedEventFired(EVT_WAL_SEGMENT_COMPACTED));
+        }
+        finally {
+            enableWalCompaction = oldEnableWalCompaction;
+        }
+    }
+
+    /** */
+    private boolean checkWhetherWALRelatedEventFired(int evtType) throws Exception {
         AtomicBoolean evtRecorded = new AtomicBoolean();
 
         Ignite ignite = startGrid();
@@ -293,7 +320,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
         final IgniteEvents evts = ignite.events();
 
-        if (!evts.isEnabled(EVT_WAL_SEGMENT_ARCHIVED))
+        if (!evts.isEnabled(evtType))
             fail("nothing to test");
 
         evts.localListen(e -> {
@@ -301,19 +328,19 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
             long idx = archComplEvt.getAbsWalSegmentIdx();
 
-            log.info("Finished archive for segment [" +
+            log.info("Finished for segment [" +
                 idx + ", " + archComplEvt.getArchiveFile() + "]: [" + e + "]");
 
             evtRecorded.set(true);
 
             return true;
-        }, EVT_WAL_SEGMENT_ARCHIVED);
+        }, evtType);
 
         putDummyRecords(ignite, 500);
 
         stopGrid();
 
-        assertTrue(evtRecorded.get());
+        return evtRecorded.get();
     }
 
     /**
@@ -340,7 +367,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
             long idx = archComplEvt.getAbsWalSegmentIdx();
 
             log.info("Finished archive for segment [" + idx + ", " +
-                archComplEvt.getArchiveFile() + "]: [" + e + "]");
+                archComplEvt.getArchiveFile() + "]: [" + e + ']');
 
             if (waitingForEvt.get())
                 archiveSegmentForInactivity.countDown();
@@ -415,13 +442,13 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
         IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
 
-        IteratorParametersBuilder iteratorParametersBuilder = createIteratorParametersBuilder(workDir,
subfolderName);
+        IteratorParametersBuilder iterParametersBuilder = createIteratorParametersBuilder(workDir,
subfolderName);
 
-        iteratorParametersBuilder.filesOrDirs(workDir);
+        iterParametersBuilder.filesOrDirs(workDir);
 
         scanIterateAndCount(
             factory,
-            iteratorParametersBuilder,
+            iterParametersBuilder,
             totalEntries,
             0,
             null,
@@ -763,13 +790,13 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
         IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
 
-        IteratorParametersBuilder iteratorParametersBuilder =
+        IteratorParametersBuilder iterParametersBuilder =
             createIteratorParametersBuilder(workDir, subfolderName)
             .filesOrDirs(workDir);
 
         scanIterateAndCount(
             factory,
-            iteratorParametersBuilder,
+            iterParametersBuilder,
             0,
             0,
             null,
@@ -1001,7 +1028,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
             createsFound != null && createsFound > 0);
 
         assertTrue("Create operations count should be at least " + cntEntries + " in log:
" + operationsFound,
-            createsFound != null && createsFound >= cntEntries);
+            createsFound >= cntEntries);
     }
 
     /**
@@ -1010,7 +1037,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testTxRecordsReadWoBinaryMeta() throws Exception {
-        clearProperties = true;
+        clearProps = true;
 
         System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, "true");
 
@@ -1281,7 +1308,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
          *
          * @param iVal I value.
          */
-        public TestExternalizable(int iVal) {
+        TestExternalizable(int iVal) {
             this.iVal = iVal;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8170f2cb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 0a240ea..ecc7b03 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -82,7 +82,7 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
     }
 
     /** {@inheritDoc} */
-    @Override public void allowCompressionUntil(WALPointer ptr) {
+    @Override public void notchLastCheckpointPtr(WALPointer ptr) {
         // No-op.
     }
 
@@ -155,4 +155,9 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
     @Override public long lastArchivedSegment() {
         return -1L;
     }
+
+    /** {@inheritDoc} */
+    @Override public long lastCompactedSegment() {
+        return -1L;
+    }
 }


Mime
View raw message