ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject ignite git commit: IGNITE-9976 Fixed flaky BinaryMetadataUpdatesFlowTest.testFlowNoConflicts - Fixes #5056.
Date Fri, 09 Nov 2018 09:40:19 GMT
Repository: ignite
Updated Branches:
  refs/heads/master b74441389 -> a1786bea9


IGNITE-9976 Fixed flaky BinaryMetadataUpdatesFlowTest.testFlowNoConflicts - Fixes #5056.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1786bea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1786bea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1786bea

Branch: refs/heads/master
Commit: a1786bea93e25cea928b66ee6c96ca8f807104e2
Parents: b744413
Author: pereslegin-pa <xxtern@gmail.com>
Authored: Fri Nov 9 12:38:07 2018 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Fri Nov 9 12:39:30 2018 +0300

----------------------------------------------------------------------
 .../binary/BinaryMetadataUpdatesFlowTest.java   | 531 ++++++++-----------
 1 file changed, 223 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a1786bea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
index eab7cfb..59a1e33 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
@@ -19,32 +19,28 @@ package org.apache.ignite.internal.processors.cache.binary;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.cache.event.CacheEntryListenerException;
-import javax.cache.event.CacheEntryUpdatedListener;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.query.CacheQueryEntryEvent;
-import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -53,10 +49,10 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.junit.Assert.assertArrayEquals;
 
 /**
  *
@@ -69,72 +65,61 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest
{
     protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private volatile boolean clientMode;
-
-    /** */
-    private volatile boolean applyDiscoveryHook;
-
-    /** */
     private volatile DiscoveryHook discoveryHook;
 
     /** */
-    private static final int UPDATES_COUNT = 5_000;
-
-    /** */
-    private static final int RESTART_DELAY = 3_000;
-
-    /** */
-    private final Queue<BinaryUpdateDescription> updatesQueue = new LinkedBlockingDeque<>(UPDATES_COUNT);
-
-    /** */
-    private static volatile BlockingDeque<Integer> srvResurrectQueue = new LinkedBlockingDeque<>(1);
-
-    /** */
-    private static final CountDownLatch START_LATCH = new CountDownLatch(1);
+    private static final int UPDATES_COUNT = 1_000;
 
     /** */
-    private static final CountDownLatch FINISH_LATCH_NO_CLIENTS = new CountDownLatch(5);
+    private static final int RESTART_DELAY = 1_000;
 
     /** */
-    private static volatile AtomicBoolean stopFlag0 = new AtomicBoolean(false);
+    private static final int GRID_CNT = 5;
 
     /** */
-    private static volatile AtomicBoolean stopFlag1 = new AtomicBoolean(false);
+    private static final String BINARY_TYPE_NAME = "TestBinaryType";
 
     /** */
-    private static volatile AtomicBoolean stopFlag2 = new AtomicBoolean(false);
+    private static final int BINARY_TYPE_ID = 708045005;
 
     /** */
-    private static volatile AtomicBoolean stopFlag3 = new AtomicBoolean(false);
+    private final Queue<BinaryUpdateDescription> updatesQueue = new ConcurrentLinkedQueue<>();
 
     /** */
-    private static volatile AtomicBoolean stopFlag4 = new AtomicBoolean(false);
+    private final List<BinaryUpdateDescription> updatesList = new ArrayList<>(UPDATES_COUNT);
 
     /** */
-    private static final String BINARY_TYPE_NAME = "TestBinaryType";
+    private final CountDownLatch startLatch = new CountDownLatch(1);
 
-    /** */
-    private static final int BINARY_TYPE_ID = 708045005;
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         for (int i = 0; i < UPDATES_COUNT; i++) {
             FieldType fType = null;
+            Object fVal = null;
+
             switch (i % 4) {
                 case 0:
                     fType = FieldType.NUMBER;
+                    fVal = getNumberFieldVal();
                     break;
                 case 1:
                     fType = FieldType.STRING;
+                    fVal = getStringFieldVal();
                     break;
                 case 2:
                     fType = FieldType.ARRAY;
+                    fVal = getArrayFieldVal();
                     break;
                 case 3:
                     fType = FieldType.OBJECT;
+                    fVal = new Object();
             }
 
-            updatesQueue.add(new BinaryUpdateDescription(i, "f" + (i + 1), fType));
+            BinaryUpdateDescription desc = new BinaryUpdateDescription(i, "f" + (i + 1),
fType, fVal);
+
+            updatesQueue.add(desc);
+            updatesList.add(desc);
         }
     }
 
@@ -144,12 +129,10 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest
{
 
         cfg.setPeerClassLoadingEnabled(false);
 
-        if (applyDiscoveryHook) {
-            final DiscoveryHook hook = discoveryHook != null ? discoveryHook : new DiscoveryHook();
-
+        if (discoveryHook != null) {
             TcpDiscoverySpi discoSpi = new TcpDiscoverySpi() {
                 @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
-                    super.setListener(GridTestUtils.DiscoverySpiListenerWrapper.wrap(lsnr,
hook));
+                    super.setListener(GridTestUtils.DiscoverySpiListenerWrapper.wrap(lsnr,
discoveryHook));
                 }
             };
 
@@ -162,7 +145,7 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest
{
 
         cfg.setMarshaller(new BinaryMarshaller());
 
-        cfg.setClientMode(clientMode);
+        cfg.setClientMode("client".equals(gridName) || getTestIgniteInstanceIndex(gridName)
>= GRID_CNT);
 
         CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
 
@@ -181,182 +164,135 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * Starts new ignite node and submits computation job to it.
-     * @param idx Index.
-     * @param stopFlag Stop flag.
-     * @throws Exception If failed.
+     * Starts computation job.
+     *
+     * @param idx Grid index on which computation job should start.
+     * @param restartIdx The index of the node to be restarted.
+     * @param workersCntr The current number of computation threads.
      */
-    private void startComputation(int idx, AtomicBoolean stopFlag) throws Exception {
-        clientMode = false;
+    private void startComputation(int idx, AtomicInteger restartIdx, AtomicInteger workersCntr)
{
+        Ignite ignite = grid(idx);
 
-        final IgniteEx ignite0 = startGrid(idx);
+        ClusterGroup cg = ignite.cluster().forLocal();
 
-        ClusterGroup cg = ignite0.cluster().forNodeId(ignite0.localNode().id());
-
-        ignite0.compute(cg).withAsync().call(new BinaryObjectAdder(ignite0, updatesQueue,
30, stopFlag));
+        ignite.compute(cg).callAsync(new BinaryObjectAdder(startLatch, idx, updatesQueue,
restartIdx, workersCntr));
     }
 
     /**
-     * @param idx Index.
-     * @param deafClient Deaf client.
-     * @param observedIds Observed ids.
      * @throws Exception If failed.
      */
-    private void startListening(int idx, boolean deafClient, Set<Integer> observedIds)
throws Exception {
-        clientMode = true;
+    public void testFlowNoConflicts() throws Exception {
+        startGridsMultiThreaded(GRID_CNT);
 
-        ContinuousQuery qry = new ContinuousQuery();
+        doTestFlowNoConflicts();
 
-        qry.setLocalListener(new CQListener(observedIds));
+        awaitPartitionMapExchange();
 
-        if (deafClient) {
-            applyDiscoveryHook = true;
-            discoveryHook = new DiscoveryHook() {
-                @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg)
{
-                    DiscoveryCustomMessage customMsg = msg == null ? null
-                            : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");
+        Ignite randomNode = G.allGrids().get(0);
 
-                    if (customMsg instanceof MetadataUpdateProposedMessage) {
-                        if (((MetadataUpdateProposedMessage) customMsg).typeId() == BINARY_TYPE_ID)
-                            GridTestUtils.setFieldValue(customMsg, "typeId", 1);
-                    }
-                    else if (customMsg instanceof MetadataUpdateAcceptedMessage) {
-                        if (((MetadataUpdateAcceptedMessage) customMsg).typeId() == BINARY_TYPE_ID)
-                            GridTestUtils.setFieldValue(customMsg, "typeId", 1);
-                    }
-                }
-            };
+        IgniteCache<Object, Object> cache = randomNode.cache(DEFAULT_CACHE_NAME);
 
-            IgniteEx client = startGrid(idx);
+        int cacheEntries = cache.size(CachePeekMode.PRIMARY);
 
-            client.cache(DEFAULT_CACHE_NAME).withKeepBinary().query(qry);
-        }
-        else {
-            applyDiscoveryHook = false;
+        assertTrue("Cache cannot contain more entries than were put in it;", cacheEntries
<= UPDATES_COUNT);
 
-            IgniteEx client = startGrid(idx);
+        assertEquals("There are less than expected entries, data loss occurred;", UPDATES_COUNT,
cacheEntries);
 
-            client.cache(DEFAULT_CACHE_NAME).withKeepBinary().query(qry);
-        }
+        validateCache(randomNode);
     }
 
     /**
-     *
+     * @throws Exception If failed.
      */
-    private static class CQListener implements CacheEntryUpdatedListener {
-        /** */
-        private final Set<Integer> observedIds;
-
-        /**
-         * @param observedIds
-         */
-        CQListener(Set<Integer> observedIds) {
-            this.observedIds = observedIds;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException
{
-            for (Object o : iterable) {
-                if (o instanceof CacheQueryEntryEvent) {
-                    CacheQueryEntryEvent e = (CacheQueryEntryEvent) o;
+    public void testFlowNoConflictsWithClients() throws Exception {
+        startGridsMultiThreaded(GRID_CNT);
 
-                    BinaryObjectImpl val = (BinaryObjectImpl) e.getValue();
+        if (!tcpDiscovery())
+            return;
 
-                    Integer seqNum = val.field(SEQ_NUM_FLD);
+        discoveryHook = new DiscoveryHook() {
+            @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+                DiscoveryCustomMessage customMsg = msg == null ? null
+                    : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");
 
-                    observedIds.add(seqNum);
+                if (customMsg instanceof MetadataUpdateProposedMessage) {
+                    if (((MetadataUpdateProposedMessage) customMsg).typeId() == BINARY_TYPE_ID)
+                        GridTestUtils.setFieldValue(customMsg, "typeId", 1);
+                }
+                else if (customMsg instanceof MetadataUpdateAcceptedMessage) {
+                    if (((MetadataUpdateAcceptedMessage) customMsg).typeId() == BINARY_TYPE_ID)
+                        GridTestUtils.setFieldValue(customMsg, "typeId", 1);
                 }
             }
-        }
-    }
+        };
 
-    /**
-     * @throws Exception If failed.
-     */
-    public void testFlowNoConflicts() throws Exception {
-        startComputation(0, stopFlag0);
-
-        startComputation(1, stopFlag1);
+        Ignite deafClient = startGrid(GRID_CNT);
 
-        startComputation(2, stopFlag2);
+        discoveryHook = null;
 
-        startComputation(3, stopFlag3);
+        Ignite regClient = startGrid(GRID_CNT + 1);
 
-        startComputation(4, stopFlag4);
+        doTestFlowNoConflicts();
 
-        Thread killer = new Thread(new ServerNodeKiller());
-        Thread resurrection = new Thread(new ServerNodeResurrection());
-        killer.setName("node-killer-thread");
-        killer.start();
-        resurrection.setName("node-resurrection-thread");
-        resurrection.start();
+        awaitPartitionMapExchange();
 
-        START_LATCH.countDown();
-
-        while (!updatesQueue.isEmpty())
-            Thread.sleep(1000);
+        validateCache(deafClient);
+        validateCache(regClient);
+    }
 
-        FINISH_LATCH_NO_CLIENTS.await();
 
-        IgniteEx ignite0 = grid(0);
+    /**
+     * Validates that all updates are readable on the specified node.
+     *
+     * @param ignite Ignite instance.
+     */
+    private void validateCache(Ignite ignite) {
+        String name = ignite.name();
 
-        IgniteCache<Object, Object> cache0 = ignite0.cache(DEFAULT_CACHE_NAME);
+        for (Cache.Entry entry : ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary()) {
+            BinaryObject binObj = (BinaryObject)entry.getValue();
 
-        int cacheEntries = cache0.size(CachePeekMode.PRIMARY);
+            Integer idx = binObj.field(SEQ_NUM_FLD);
 
-        assertTrue("Cache cannot contain more entries than were put in it;", cacheEntries
<= UPDATES_COUNT);
+            BinaryUpdateDescription desc = updatesList.get(idx - 1);
 
-        assertEquals("There are less than expected entries, data loss occurred;", UPDATES_COUNT,
cacheEntries);
+            Object val = binObj.field(desc.fieldName);
 
-        killer.interrupt();
-        resurrection.interrupt();
+            String errMsg = "Field " + desc.fieldName + " has unexpeted value (index=" +
idx + ", node=" + name + ")";
 
-        killer.join();
-        resurrection.join();
+            if (desc.fieldType == FieldType.OBJECT)
+                assertTrue(errMsg, val instanceof BinaryObject);
+            else if (desc.fieldType == FieldType.ARRAY)
+                assertArrayEquals(errMsg, (byte[])desc.val, (byte[])val);
+            else
+                assertEquals(errMsg, desc.val, binObj.field(desc.fieldName));
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testFlowNoConflictsWithClients() throws Exception {
-        startComputation(0, stopFlag0);
-
-        if (!tcpDiscovery())
-            return;
+    private void doTestFlowNoConflicts() throws Exception {
+        final AtomicBoolean stopFlag = new AtomicBoolean();
+        final AtomicInteger restartIdx = new AtomicInteger(-1);
+        final AtomicInteger workersCntr = new AtomicInteger(0);
 
-        startComputation(1, stopFlag1);
+        try {
+            for (int i = 0; i < GRID_CNT; i++)
+                startComputation(i, restartIdx, workersCntr);
 
-        startComputation(2, stopFlag2);
+            IgniteInternalFuture fut =
+                GridTestUtils.runAsync(new NodeRestarter(stopFlag, restartIdx, workersCntr),
"worker");
 
-        startComputation(3, stopFlag3);
+            startLatch.countDown();
 
-        startComputation(4, stopFlag4);
-
-        final Set<Integer> deafClientObservedIds = new ConcurrentHashSet<>();
-
-        startListening(5, true, deafClientObservedIds);
-
-        final Set<Integer> regClientObservedIds = new ConcurrentHashSet<>();
-
-        startListening(6, false, regClientObservedIds);
-
-        START_LATCH.countDown();
-
-        Thread killer = new Thread(new ServerNodeKiller());
-        Thread resurrection = new Thread(new ServerNodeResurrection());
-        killer.setName("node-killer-thread");
-        killer.start();
-        resurrection.setName("node-resurrection-thread");
-        resurrection.start();
-
-        while (!updatesQueue.isEmpty())
-            Thread.sleep(1000);
-
-        killer.interrupt();
-        resurrection.interrupt();
+            fut.get();
 
-        killer.join();
-        resurrection.join();
+            GridTestUtils.waitForCondition(() -> workersCntr.get() == 0, 5_000);
+        }
+        finally {
+            stopFlag.set(true);
+        }
     }
 
     /**
@@ -365,7 +301,7 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest
{
     public void testConcurrentMetadataUpdates() throws Exception {
         startGrid(0);
 
-        final Ignite client = startGrid(getConfiguration("client").setClientMode(true));
+        final Ignite client = startGrid(getConfiguration("client"));
 
         final IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME).withKeepBinary();
 
@@ -402,101 +338,6 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * Runnable responsible for stopping (gracefully) server nodes during metadata updates
process.
-     */
-    private final class ServerNodeKiller implements Runnable {
-        /** {@inheritDoc} */
-        @Override public void run() {
-            Thread curr = Thread.currentThread();
-            try {
-                START_LATCH.await();
-
-                while (!curr.isInterrupted()) {
-                    int idx = ThreadLocalRandom.current().nextInt(5);
-
-                    AtomicBoolean stopFlag;
-
-                    switch (idx) {
-                        case 0:
-                            stopFlag = stopFlag0;
-                            break;
-                        case 1:
-                            stopFlag = stopFlag1;
-                            break;
-                        case 2:
-                            stopFlag = stopFlag2;
-                            break;
-                        case 3:
-                            stopFlag = stopFlag3;
-                            break;
-                        default:
-                            stopFlag = stopFlag4;
-                    }
-
-                    stopFlag.set(true);
-
-                    while (stopFlag.get())
-                        Thread.sleep(10);
-
-                    stopGrid(idx);
-
-                    srvResurrectQueue.put(idx);
-
-                    Thread.sleep(RESTART_DELAY);
-                }
-            }
-            catch (Exception ignored) {
-                // No-op.
-            }
-        }
-    }
-
-    /**
-     * {@link Runnable} object to restart nodes killed by {@link ServerNodeKiller}.
-     */
-    private final class ServerNodeResurrection implements Runnable {
-        /** {@inheritDoc} */
-        @Override public void run() {
-            Thread curr = Thread.currentThread();
-
-            try {
-                START_LATCH.await();
-
-                while (!curr.isInterrupted()) {
-                    Integer idx = srvResurrectQueue.takeFirst();
-
-                    AtomicBoolean stopFlag;
-
-                    switch (idx) {
-                        case 0:
-                            stopFlag = stopFlag0;
-                            break;
-                        case 1:
-                            stopFlag = stopFlag1;
-                            break;
-                        case 2:
-                            stopFlag = stopFlag2;
-                            break;
-                        case 3:
-                            stopFlag = stopFlag3;
-                            break;
-                        default:
-                            stopFlag = stopFlag4;
-                    }
-
-                    clientMode = false;
-                    applyDiscoveryHook = false;
-
-                    startComputation(idx, stopFlag);
-                }
-            }
-            catch (Exception ignored) {
-                // No-op.
-            }
-        }
-    }
-
-    /**
      * Instruction for node to perform <b>add new binary object</b> action on
cache in <b>keepBinary</b> mode.
      *
      * Instruction includes id the object should be added under, new field to add to binary
schema
@@ -512,15 +353,20 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest
{
         /** */
         private FieldType fieldType;
 
+        /** */
+        private Object val;
+
         /**
          * @param itemId Item id.
          * @param fieldName Field name.
          * @param fieldType Field type.
+         * @param val Field value.
          */
-        private BinaryUpdateDescription(int itemId, String fieldName, FieldType fieldType)
{
+        private BinaryUpdateDescription(int itemId, String fieldName, FieldType fieldType,
Object val) {
             this.itemId = itemId;
             this.fieldName = fieldName;
             this.fieldType = fieldType;
+            this.val = val;
         }
     }
 
@@ -571,20 +417,7 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest
{
      */
     private static BinaryObject newBinaryObject(BinaryObjectBuilder builder, BinaryUpdateDescription
desc) {
         builder.setField(SEQ_NUM_FLD, desc.itemId + 1);
-
-        switch (desc.fieldType) {
-            case NUMBER:
-                builder.setField(desc.fieldName, getNumberFieldVal());
-                break;
-            case STRING:
-                builder.setField(desc.fieldName, getStringFieldVal());
-                break;
-            case ARRAY:
-                builder.setField(desc.fieldName, getArrayFieldVal());
-                break;
-            case OBJECT:
-                builder.setField(desc.fieldName, new Object());
-        }
+        builder.setField(desc.fieldName, desc.val);
 
         return builder.build();
     }
@@ -595,60 +428,142 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest
{
      */
     private static final class BinaryObjectAdder implements IgniteCallable<Object>
{
         /** */
-        private final IgniteEx ignite;
+        private final CountDownLatch startLatch;
+
+        /** */
+        private final int idx;
 
         /** */
         private final Queue<BinaryUpdateDescription> updatesQueue;
 
         /** */
-        private final long timeout;
+        private final AtomicInteger restartIdx;
 
         /** */
-        private final AtomicBoolean stopFlag;
+        private final AtomicInteger workersCntr;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
 
         /**
-         * @param ignite Ignite.
+         * @param startLatch Startup latch.
+         * @param idx Ignite instance index.
          * @param updatesQueue Updates queue.
-         * @param timeout Timeout.
-         * @param stopFlag Stop flag.
+         * @param restartIdx The index of the node to be restarted.
+         * @param workersCntr The number of active computation threads.
          */
-        BinaryObjectAdder(IgniteEx ignite, Queue<BinaryUpdateDescription> updatesQueue,
long timeout, AtomicBoolean stopFlag) {
-            this.ignite = ignite;
+        BinaryObjectAdder(
+            CountDownLatch startLatch,
+            int idx,
+            Queue<BinaryUpdateDescription> updatesQueue,
+            AtomicInteger restartIdx,
+            AtomicInteger workersCntr
+        ) {
+            this.startLatch = startLatch;
+            this.idx = idx;
             this.updatesQueue = updatesQueue;
-            this.timeout = timeout;
-            this.stopFlag = stopFlag;
+            this.restartIdx = restartIdx;
+            this.workersCntr = workersCntr;
         }
 
         /** {@inheritDoc} */
         @Override public Object call() throws Exception {
-            START_LATCH.await();
+            startLatch.await();
 
             IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary();
 
-            while (!updatesQueue.isEmpty()) {
-                BinaryUpdateDescription desc = updatesQueue.poll();
+            workersCntr.incrementAndGet();
 
-                if (desc == null)
-                    break;
+            try {
+                while (!updatesQueue.isEmpty()) {
+                    BinaryUpdateDescription desc = updatesQueue.poll();
 
-                BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
+                    if (desc == null)
+                        break;
 
-                BinaryObject bo = newBinaryObject(builder, desc);
+                    BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
 
-                cache.put(desc.itemId, bo);
+                    BinaryObject bo = newBinaryObject(builder, desc);
 
-                if (stopFlag.get())
-                    break;
-                else
-                    Thread.sleep(timeout);
-            }
+                    cache.put(desc.itemId, bo);
 
-            if (updatesQueue.isEmpty())
-                FINISH_LATCH_NO_CLIENTS.countDown();
+                    if (restartIdx.get() == idx)
+                        break;
+                }
+            }
+            finally {
+                workersCntr.decrementAndGet();
 
-            stopFlag.set(false);
+                if (restartIdx.get() == idx)
+                    restartIdx.set(-1);
+            }
 
             return null;
         }
     }
+
+    /**
+     * Restarts random server node and computation job.
+     */
+    private final class NodeRestarter implements Runnable {
+        /** Stop thread flag. */
+        private final AtomicBoolean stopFlag;
+
+        /** The index of the node to be restarted. */
+        private final AtomicInteger restartIdx;
+
+        /** The current number of computation threads. */
+        private final AtomicInteger workersCntr;
+
+        /**
+         * @param stopFlag Stop thread flag.
+         * @param restartIdx The index of the node to be restarted.
+         * @param workersCntr The current number of computation threads.
+         */
+        NodeRestarter(AtomicBoolean stopFlag, AtomicInteger restartIdx, AtomicInteger workersCntr)
{
+            this.stopFlag = stopFlag;
+            this.restartIdx = restartIdx;
+            this.workersCntr = workersCntr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                startLatch.await();
+
+                while (!shouldStop()) {
+                    int idx = ThreadLocalRandom.current().nextInt(5);
+
+                    restartIdx.set(idx);
+
+                    while (restartIdx.get() != -1) {
+                        if (shouldStop())
+                            return;
+
+                        Thread.sleep(10);
+                    }
+
+                    stopGrid(idx);
+
+                    if (shouldStop())
+                        return;
+
+                    startGrid(idx);
+
+                    startComputation(idx, restartIdx, workersCntr);
+
+                    Thread.sleep(RESTART_DELAY);
+                }
+            }
+            catch (Exception ignore) {
+                // No-op.
+            }
+        }
+
+        /** */
+        private boolean shouldStop() {
+            return updatesQueue.isEmpty() || stopFlag.get() || Thread.currentThread().isInterrupted();
+        }
+    }
 }


Mime
View raw message