ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amashen...@apache.org
Subject [ignite] 17/41: GG-17388 Fixed flaky test IgnitePdsStartWIthEmptyArchive
Date Tue, 07 May 2019 15:21:25 GMT
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch gg-18540
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit d074c3fc9d1b737b8115d47a3c0e958b3312db52
Author: Slava Koptilin <slava.koptilin@gmail.com>
AuthorDate: Fri Apr 26 13:06:14 2019 +0300

    GG-17388 Fixed flaky test IgnitePdsStartWIthEmptyArchive
---
 .../db/IgnitePdsStartWIthEmptyArchive.java         | 95 ++++++++++++++--------
 1 file changed, 60 insertions(+), 35 deletions(-)

diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java
index b9a883e..f96233d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java
@@ -18,20 +18,26 @@ package org.apache.ignite.internal.processors.cache.persistence.db;
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.events.WalSegmentArchivedEvent;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
 import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle;
-import org.apache.ignite.internal.util.future.CountDownFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,6 +49,9 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWr
  *
  */
 public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest {
+    /** Mapping of WAL segment idx to WalSegmentArchivedEvent. */
+    private final Map<Long, WalSegmentArchivedEvent> evts = new ConcurrentHashMap<>();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -63,9 +72,27 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest
{
                 )
         );
 
+        Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
+
+        lsnrs.put((e) -> {
+            WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
+
+            log.info("EVT_WAL_SEGMENT_ARCHIVED: " + archComplEvt.getAbsWalSegmentIdx());
+
+            evts.put(archComplEvt.getAbsWalSegmentIdx(), archComplEvt);
+
+            return true;
+        }, new int[] {EVT_WAL_SEGMENT_ARCHIVED});
+
+        cfg.setLocalEventListeners(lsnrs);
+
         return cfg;
     }
 
+    /**
+     * Executes initial steps before test execution.
+     * @throws Exception If failed.
+     */
     @Before
     public void before() throws Exception {
         stopAllGrids();
@@ -73,6 +100,19 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest
{
         cleanPersistenceDir();
     }
 
+    /**
+     * Stops all nodes and cleans work dir after a test.
+     */
+    @After
+    public void cleanup() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     @Test
     public void test() throws Exception {
         IgniteEx ig = startGrid(0);
@@ -85,9 +125,8 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest
{
         try (IgniteDataStreamer<Integer, byte[]> st = ig.dataStreamer(DEFAULT_CACHE_NAME))
{
             int entries = 1000;
 
-            for (int i = 0; i < entries; i++) {
+            for (int i = 0; i < entries; i++)
                 st.addData(i, new byte[1024 * 1024]);
-            }
         }
 
         File archiveDir = U.field(walMgr, "walArchiveDir");
@@ -96,7 +135,7 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest
{
 
         SegmentAware beforeSaw = U.field(walMgr, "segmentAware");
 
-        long beforeLastArchivedAbsoluteIndex = beforeSaw.lastArchivedAbsoluteIndex();
+        long beforeLastArchivedAbsoluteIdx = beforeSaw.lastArchivedAbsoluteIndex();
 
         FileWriteHandle fhBefore = U.field(walMgr, "currHnd");
 
@@ -114,6 +153,8 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest
{
 
         Assert.assertEquals(0, archiveDir.listFiles().length);
 
+        evts.clear();
+
         // Restart grid again after archive was removed.
         ig = startGrid(0);
 
@@ -126,10 +167,10 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest
{
         int segments = ig.configuration().getDataStorageConfiguration().getWalSegments();
 
         Assert.assertTrue(
-            "lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIndex +
+            "lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIdx +
                 ", lastArchivedAfterIdx=" + afterLastArchivedAbsoluteIndex + ",  segments="
+ segments,
             afterLastArchivedAbsoluteIndex >=
-            (beforeLastArchivedAbsoluteIndex - segments));
+            (beforeLastArchivedAbsoluteIdx - segments));
 
         ig.cluster().active(true);
 
@@ -140,38 +181,22 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest
{
         long idxAfter = fhAfter.getSegmentId();
 
         Assert.assertEquals(idxBefore, idxAfter);
-        Assert.assertTrue(idxAfter >= beforeLastArchivedAbsoluteIndex);
+        Assert.assertTrue(idxAfter >= beforeLastArchivedAbsoluteIdx);
 
-        // Future for await all current available semgment will be archived.
-        CountDownFuture awaitAchviedSegmentsLatch = new CountDownFuture(
-            // One is a last archived, secod is a current write segment.
-            (int)(idxAfter - afterLastArchivedAbsoluteIndex - 2)
-        );
-
-        log.info("currentIdx=" + idxAfter + ", lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIndex
+
+        log.info("currentIdx=" + idxAfter + ", lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIdx
+
             ", lastArchivedAfteridx=" + afterLastArchivedAbsoluteIndex + ",  segments=" +
segments);
 
-        ig.events().localListen(e -> {
-            WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
-
-            log.info("EVT_WAL_SEGMENT_ARCHIVED:" + archComplEvt.getAbsWalSegmentIdx());
-
-            if (archComplEvt.getAbsWalSegmentIdx() > afterLastArchivedAbsoluteIndex){
-                awaitAchviedSegmentsLatch.onDone();
-
-                return true;
-            }
+        // One is a last archived, secod is a current write segment.
+        final long awaitAchviedSegments = idxAfter - afterLastArchivedAbsoluteIndex - 2;
 
-            if (archComplEvt.getAbsWalSegmentIdx() < afterLastArchivedAbsoluteIndex){
-                awaitAchviedSegmentsLatch.onDone(new IgniteException("Unexected segment for
archivation. idx="
-                    + archComplEvt.getAbsWalSegmentIdx()));
-
-                return false;
-            }
-
-            return true;
-        }, EVT_WAL_SEGMENT_ARCHIVED);
+        // Await all current available semgment will be archived.
+        assertTrue(GridTestUtils.waitForCondition(
+            new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    long cut = evts.keySet().stream().filter(e -> e > afterLastArchivedAbsoluteIndex).count();
 
-        awaitAchviedSegmentsLatch.get();
+                    return cut >= awaitAchviedSegments;
+                }
+            }, 10_000));
     }
 }


Mime
View raw message