ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [1/3] ignite git commit: IGNITE-3897 Test transactional behaviour of caches during rolling restart
Date Fri, 30 Sep 2016 17:01:42 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3897-pr1059 [created] c75f6fc14


IGNITE-3897 Test transactional behaviour of caches during rolling restart


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a853ee32
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a853ee32
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a853ee32

Branch: refs/heads/ignite-3897-pr1059
Commit: a853ee325dd93e5c2adfe32ae1e89fa687aee9b2
Parents: 3db0a3d
Author: John Levey <john.levey@workday.com>
Authored: Wed Aug 3 11:32:22 2016 +0100
Committer: John Levey <john.levey@workday.com>
Committed: Fri Sep 16 10:42:48 2016 +0200

----------------------------------------------------------------------
 .../rebalancing/CacheNodeSafeAssertion.java     | 118 -------
 .../GridCacheRebalancingAggregationTest.java    | 263 ++++++++++++++
 ...cheRebalancingPartitionDistributionTest.java |   1 +
 .../GridCacheRebalancingTransactionTest.java    | 339 +++++++++++++++++++
 .../testframework/assertions/Assertion.java     |   2 +-
 .../assertions/CacheNodeSafeAssertion.java      | 117 +++++++
 .../assertions/EventualAssertion.java           |  99 ++++++
 .../IgniteCacheFailoverTestSuite.java           |   4 +
 8 files changed, 824 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a853ee32/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheNodeSafeAssertion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheNodeSafeAssertion.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheNodeSafeAssertion.java
deleted file mode 100644
index bf6b63f..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/CacheNodeSafeAssertion.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
-
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.testframework.assertions.Assertion;
-
-/**
- * {@link Assertion} that checks that the primary and backup partitions are distributed such
that we won't lose any data
- * if we lose a single node. This implies that the cache in question was configured with
a backup count of at least one
- * and that all partitions are backed up to a different node from the primary.
- */
-public class CacheNodeSafeAssertion implements Assertion {
-    /** The {@link Ignite} instance. */
-    private final Ignite ignite;
-
-    /** The cache name. */
-    private final String cacheName;
-
-    /**
-     * Construct a new {@link CacheNodeSafeAssertion} for the given {@code cacheName}.
-     *
-     * @param ignite The Ignite instance.
-     * @param cacheName The cache name.
-     */
-    public CacheNodeSafeAssertion(Ignite ignite, String cacheName) {
-        this.ignite = ignite;
-        this.cacheName = cacheName;
-    }
-
-    /**
-     * @return Ignite instance.
-     */
-    protected Ignite ignite() {
-        return ignite;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void test() throws AssertionError {
-        Affinity<?> affinity = ignite.affinity(cacheName);
-
-        int partCnt = affinity.partitions();
-
-        boolean hostSafe = true;
-
-        boolean nodeSafe = true;
-
-        for (int x = 0; x < partCnt; ++x) {
-            // Results are returned with the primary node first and backups after. We want
to ensure that there is at
-            // least one backup on a different host.
-            Collection<ClusterNode> results = affinity.mapPartitionToPrimaryAndBackups(x);
-
-            Iterator<ClusterNode> nodes = results.iterator();
-
-            boolean newHostSafe = false;
-
-            boolean newNodeSafe = false;
-
-            if (nodes.hasNext()) {
-                ClusterNode primary = nodes.next();
-
-                // For host safety, get all nodes on the same host as the primary node and
ensure at least one of the
-                // backups is on a different host. For node safety, make sure at least of
of the backups is not the
-                // primary.
-                Collection<ClusterNode> neighbors = hostSafe ? ignite.cluster().forHost(primary).nodes()
: null;
-
-                while (nodes.hasNext()) {
-                    ClusterNode backup = nodes.next();
-
-                    if (hostSafe) {
-                        if (!neighbors.contains(backup))
-                            newHostSafe = true;
-                    }
-
-                    if (nodeSafe) {
-                        if (!backup.equals(primary))
-                            newNodeSafe = true;
-                    }
-                }
-            }
-
-            hostSafe = newHostSafe;
-
-            nodeSafe = newNodeSafe;
-
-            if (!hostSafe && !nodeSafe)
-                break;
-        }
-
-        if (hostSafe)
-            return;
-
-        if (nodeSafe)
-            return;
-
-        throw new AssertionError("Cache " + cacheName + " is endangered!");
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a853ee32/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAggregationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAggregationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAggregationTest.java
new file mode 100644
index 0000000..1d566e9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAggregationTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import javax.cache.Cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.assertions.Assertion;
+import org.apache.ignite.testframework.assertions.CacheNodeSafeAssertion;
+import org.apache.ignite.testframework.junits.common.GridRollingRestartAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+/**
+ * Test running a continuous aggregate sum against a running cluster that is undergoing a
rolling restart.
+ */
+public class GridCacheRebalancingAggregationTest extends GridRollingRestartAbstractTest {
+    /** The number of entries to put to the test cache. */
+    private static final int ENTRY_COUNT = 100;
+
+    /** The maximum number of times to restart nodes in the cluster. */
+    private static final int MAX_RESTARTS = 5;
+
+    /** The key of the cache entry that contains the current upper range of the aggregation.
*/
+    private static final int CEILING_KEY = Integer.MAX_VALUE;
+
+    /** Test cache name. */
+    private static final String CACHE_NAME = "AGGREGATION_TEST";
+
+    /** Keys of entries under test. */
+    private static final Set<Integer> KEYS;
+    static {
+        final Set<Integer> keys = new LinkedHashSet<>();
+        for (int i = 0; i < ENTRY_COUNT; i++) {
+            keys.add(i);
+        }
+        KEYS = Collections.unmodifiableSet(keys);
+    }
+
+    /** Thread responsible for writing to the cache.*/
+    private UpdateCacheThread updateThread;
+
+    /** The number of aggregations since the last node restart.*/
+    private volatile int aggregationCount;
+
+    /** Total number of times we updated the cache.*/
+    private volatile int totalUpdateCount;
+
+    /** {@inheritDoc} */
+    @Override
+    public int serverCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int getMaxRestarts() {
+        return MAX_RESTARTS;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected long getTestTimeout() {
+        return 1000 * 60 * 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration<?, ?> getCacheConfiguration() {
+        return new CacheConfiguration<Integer, Integer>(CACHE_NAME)
+                        .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                        .setCacheMode(CacheMode.PARTITIONED)
+                        .setBackups(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgnitePredicate<Ignite> getRestartCheck() {
+        return new IgnitePredicate<Ignite>() {
+            @Override public boolean apply(final Ignite ignite) {
+                return serverCount() >= ignite.cluster().forServers().nodes().size()
+                        && GridCacheRebalancingAggregationTest.this.aggregationCount
> 0;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public Assertion getRestartAssertion() {
+        return new CacheNodeSafeAssertion(grid(0), CACHE_NAME);
+    }
+
+    /**
+     * This test has three threads; the writer thread updates a monotonically increasing
set of values for the keys,
+     * while the reader thread is performing a checksum on the contents of the cache while
the restart thread is
+     * killing nodes. This test will fail if we lose any data.
+     */
+    public void testRollingRestartAggregation()
+        throws InterruptedException {
+
+        startUpdateThread();
+        RollingRestartThread restartThread = super.rollingRestartThread;
+
+        int totalAggregationCount = 0;
+        int checksumFailures = 0;
+        do {
+            if (!checksumCache()) {
+                checksumFailures++;
+            }
+
+            totalAggregationCount++;
+            this.aggregationCount++;
+        } while (restartThread.isAlive());
+
+        grid(0).log().info("Total successful aggregations during the test were: " + totalAggregationCount);
+        grid(0).log().info("Total successful updates during the test were: " + this.totalUpdateCount);
+        grid(0).log().info("Total node restarts during the test were: " + restartThread.getRestartTotal());
+
+        this.updateThread.join();
+
+        assertEquals(0, checksumFailures);
+        assertEquals(getMaxRestarts(), restartThread.getRestartTotal());
+    }
+
+    /**
+     * Create and start the {@link UpdateCacheThread}.
+     */
+    private void startUpdateThread() {
+        UpdateCacheThread updateThread = new UpdateCacheThread();
+        updateThread.start();
+        this.updateThread = updateThread;
+    }
+
+    /**
+     * Check that the values in the cache are equal to the sum of values between the floor
and ceiling (key in the cache
+     * signifying the highest value that has been written).
+     *
+     * @return true if the cache values sum to the expected value
+     */
+    private boolean checksumCache() {
+        final Ignite ignite = grid(0);
+        Cache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
+        Throwable e = null;
+        for (int i = 0, c = 20; i <= c; ++i) {
+            try (Transaction tx = ignite.transactions().txStart()) {
+                tx.timeout(2*60*1000l);
+
+                Integer ceiling = cache.get(CEILING_KEY);
+                if (ceiling == null) {
+                    return Boolean.TRUE;
+                }
+                int floor = ceiling - ENTRY_COUNT + 1;
+                Map<Integer, Integer> results = cache.getAll(KEYS);
+                tx.commit();
+
+                int sum = 0;
+                for (Integer j : results.values()) {
+                    sum += j;
+                }
+
+                if (sum == calculateSum(floor, ceiling)) {
+                    return Boolean.TRUE;
+                }
+
+                grid(0).log().info("Failed testing that sum of " + sum + " equals sum(" +
floor + "," + ceiling + ")");
+                return Boolean.FALSE;
+            }
+            catch (Throwable ee) {
+                e = ee;
+            }
+        }
+        throw new RuntimeException("Failed to commit transaction after " + 20 + " retries",
e);
+    }
+
+    /**
+     * Update the values in the cache.
+     */
+    private void updateCache() {
+        final Ignite ignite = grid(0);
+        Cache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
+        Throwable e = null;
+        for (int i = 0, c = 20; i <= c; ++i) {
+            try (Transaction tx = ignite.transactions().txStart()) {
+                tx.timeout(2*60*1000l);
+
+                Integer currentCeiling = cache.get(CEILING_KEY);
+                int newCeiling = currentCeiling == null ? ENTRY_COUNT - 1 : currentCeiling
+ ENTRY_COUNT;
+                int value = currentCeiling == null ? 0 : currentCeiling + 1;
+                Map<Integer, Integer> entries = new TreeMap<>();
+                for (Integer key : KEYS) {
+                    entries.put(key, value++);
+                }
+                cache.putAll(entries);
+                cache.put(CEILING_KEY, newCeiling);
+
+                GridCacheRebalancingAggregationTest.this.totalUpdateCount++;
+                 if (!tx.isRollbackOnly()) {
+                     tx.commit();
+                 }
+                return;
+            }
+            catch (Throwable ee) {
+                e = ee;
+            }
+        }
+        throw new RuntimeException("Failed to commit transaction after " + 20 + " retries",
e);
+    }
+
+    /**
+     * Calculate the sums of a range of values from floor to ceiling. This expects 0 gaps
in the range.
+     *
+     * @param f The floor of the range.
+     * @param c The ceiling of the range.
+     *
+     * @return The sum.
+     */
+    private int calculateSum(long f, long c) {
+        return (int) ((c * (c + 1)) / 2 - ((f - 1) * ((f - 1) + 1)) / 2);
+    }
+
+    /**
+     * Update cache thread.
+     */
+    private class UpdateCacheThread extends Thread {
+        /**
+         * Default Constructor sets this thread to run as a daemon.
+         */
+        public UpdateCacheThread() {
+            setDaemon(true);
+            setName(UpdateCacheThread.class.getSimpleName());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            RollingRestartThread restartThread =  GridCacheRebalancingAggregationTest.this.rollingRestartThread;
+            do {
+                GridCacheRebalancingAggregationTest.this.updateCache();
+            }
+            while (restartThread.isAlive());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a853ee32/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
index 61ee9ea..cc1f83e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingPartitionDistributionTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.assertions.Assertion;
+import org.apache.ignite.testframework.assertions.CacheNodeSafeAssertion;
 import org.apache.ignite.testframework.junits.common.GridRollingRestartAbstractTest;
 
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a853ee32/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingTransactionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingTransactionTest.java
new file mode 100644
index 0000000..b74fb5c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingTransactionTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.cache.Cache;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.assertions.Assertion;
+import org.apache.ignite.testframework.assertions.CacheNodeSafeAssertion;
+import org.apache.ignite.testframework.assertions.EventualAssertion;
+import org.apache.ignite.testframework.junits.common.GridRollingRestartAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+/**
+ * Test running transactional gets and puts against a running cluster that is undergoing
a rolling restart.
+ */
+public class GridCacheRebalancingTransactionTest extends GridRollingRestartAbstractTest {
+    /** The number of entries to put to the test cache. */
+    private static final int ENTRY_COUNT = 100;
+
+    /** Test cache name. */
+    private static final String CACHE_NAME = "TRANSACTION_TEST_NEAR";
+
+    /** Near Cache Configuration. */
+    private static final NearCacheConfiguration<Integer, Integer> NEAR_CACHE_CONFIGURATION
= new NearCacheConfiguration<Integer, Integer>()
+            .setNearEvictionPolicy(new LruEvictionPolicy<Integer, Integer>(Integer.MAX_VALUE));
+
+    /** The {@link CacheConfiguration} used by this test. */
+    @Override
+    protected CacheConfiguration<?, ?> getCacheConfiguration() {
+        return new CacheConfiguration<Integer, Integer>(CACHE_NAME)
+                .setCacheMode(CacheMode.PARTITIONED)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(true /* machine-safe */, 271))
+                .setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.CLOCK)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setRebalanceMode(CacheRebalanceMode.SYNC)
+                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setEvictionPolicy(NEAR_CACHE_CONFIGURATION.getNearEvictionPolicy())
+                .setCopyOnRead(false)
+                .setNearConfiguration(NEAR_CACHE_CONFIGURATION);
+    }
+
+    /** Keys of entries being read and updated. */
+    private static final Set<Integer> KEYS;
+    static {
+        final Set<Integer> keys = new LinkedHashSet<>();
+        for (int i = 0; i < ENTRY_COUNT; i++) {
+            keys.add(i);
+        }
+        KEYS = Collections.unmodifiableSet(keys);
+    }
+
+    /** The number of transactions since the last node restart. */
+    private volatile int transactionCount;
+
+    /** {@inheritDoc} */
+    @Override public int serverCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int getMaxRestarts() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgnitePredicate<Ignite> getRestartCheck() {
+        return new IgnitePredicate<Ignite>() {
+            @Override
+            public boolean apply(Ignite ignite) {
+                return serverCount() <= grid(0).cluster().forServers().nodes().size()
+                        && GridCacheRebalancingTransactionTest.this.transactionCount
> 0;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public Assertion getRestartAssertion() {
+        return new EventualAssertion(5000, new CacheNodeSafeAssertion(grid(0), CACHE_NAME));
+    }
+
+    /**
+     * This test performs a transactional update and checksum of the cache, as well as validates
that the cache is
+     * not changed when a transaction is rolled back.
+     */
+    public void testRollingRestartTransaction()
+            throws InterruptedException {
+
+        Cache<Integer, Integer> cache = (Cache<Integer, Integer>) grid(0).getOrCreateCache(getCacheConfiguration());
+
+        boolean invoke = true;
+        int currentValue = 0;
+        int checksumFailures = 0;
+        int transactionCount = 0;
+        Map<Integer, Integer> lastChecksum = null;
+        do {
+            updateCache(cache, currentValue, invoke, false);
+            Map<Integer, Integer> checksum = checksumCache(cache, currentValue, lastChecksum);
+            if (checksum == null) {
+                grid(0).log().info(
+                        "Checksum failure after commit (expected " + currentValue + ", invoke
" + invoke + ").");
+                checksumFailures++;
+            }
+            else {
+                lastChecksum = checksum;
+                updateCache(cache, currentValue + 1, invoke, true);
+                checksum = checksumCache(cache, currentValue, lastChecksum);
+                if (checksum == null) {
+                    checksumFailures++;
+                    grid(0).log().info(
+                            "Checksum failure after rollback (expected " + currentValue +
", invoke " + invoke + ").");
+                }
+                else {
+                    lastChecksum = checksum;
+                }
+            }
+
+            invoke = !invoke;
+            currentValue++;
+            transactionCount++;
+            this.transactionCount = transactionCount;
+        } while (this.rollingRestartThread.isAlive());
+
+        grid(0).log().info("Total checksum failures during the test were: " + checksumFailures);
+        grid(0).log().info("Total transactions during the test were: " + transactionCount);
+        grid(0).log().info("Total node restarts during the test were: " + this.rollingRestartThread.getRestartTotal());
+
+        assertEquals(getMaxRestarts(), this.rollingRestartThread.getRestartTotal());
+        assertEquals(0, checksumFailures);
+    }
+
+    /**
+     * Updates the cache and either commit or rollback the update.
+     *
+     * @param cache the cache
+     * @param newValue the new value to update the entries with
+     * @param invoke whether to use invokeAll() or putAll()
+     * @param rollback whether to rollback the changes or commit
+     */
+    private void updateCache(final Cache<Integer, Integer> cache, final int newValue,
final boolean invoke, final boolean rollback) {
+        final TransactionConfiguration txConfig = new TransactionConfiguration();
+        transact(new TransactionalTask<Object>() {
+            @Override
+            public Object run(Transaction tx) {
+                if (invoke) {
+                    for (EntryProcessorResult<Boolean> result : cache.invokeAll(KEYS,
new IntegerSetValue(newValue)).values()) {
+                        if (!Boolean.TRUE.equals(result.get())) {
+                            throw new IllegalStateException();
+                        }
+                    }
+                }
+                else {
+                    Map<Integer, Integer> entries = new HashMap<>(ENTRY_COUNT);
+                    for (Integer key : KEYS) {
+                        entries.put(key, newValue);
+                    }
+                    cache.putAll(entries);
+                }
+
+                if (rollback) {
+                    tx.rollback();
+                }
+                else {
+                    tx.commit();
+                }
+                return null;
+            }
+        }, txConfig);
+    }
+
+    /**
+     * Validate that all entries in the cache have the same value.
+     *
+     * @param cache The cache.
+     * @param currentValue The value to validate.
+     * @param lastChecksum The last validated entries returned by this method.
+     *
+     * @return A map of all validated entries or null if there was a checksum error.
+     */
+    private Map<Integer, Integer> checksumCache(final Cache<Integer, Integer>
cache,
+                                                final int currentValue,
+                                                final Map<Integer, Integer> lastChecksum)
{
+        TransactionConfiguration txConfig = new TransactionConfiguration();
+        return transact(new TransactionalTask<Map<Integer, Integer>>() {
+            @Override
+            public Map<Integer, Integer> run(Transaction tx) {
+                Map<Integer, Integer> map = cache.getAll(KEYS);
+                for (int value : map.values()) {
+                    if (value != currentValue) {
+                        grid(0).log().info(
+                                "Failed testing that all entries have a value of: " + currentValue
+ "\n\nCurrent: "
+                                        + map + "\n\nLast: " + lastChecksum);
+                        return null;
+                    }
+                }
+                return map;
+            }
+        }, txConfig);
+    }
+
+    /**
+     * Execute the {@link TransactionalTask} and return the result obtained.
+     *
+     * @param task The task to execute.
+     * @param config The transaction configuration.
+     * @param <T> The type of result.
+     *
+     * @return The result of executing the transaction.
+     * @throws RuntimeException If there are errors.
+     */
+    private <T> T transact(TransactionalTask<T> task, TransactionConfiguration
config) {
+        if (task == null) {
+            throw new IllegalArgumentException("Missing TransactionalTask");
+        }
+        if (config == null) {
+            throw new IllegalArgumentException("Missing TransactionConfiguration");
+        }
+        Throwable e = null;
+        for (int i = 0, c = 20; i <= c; ++i) {
+            try (Transaction tx = startTransaction(config)) {
+                T t = task.run(tx);
+                if (!tx.isRollbackOnly()) {
+                    tx.commit();
+                }
+                return t;
+            }
+            catch (final Throwable ee) {
+                e = ee;
+            }
+        }
+        throw new RuntimeException("Failed to commit transaction", e);
+    }
+
+    /**
+     * Helper method to start / continue an existing {@link Transaction}.
+     *
+     * @param config The transaction configuration to use to determine the timeout.
+     *
+     * @return The existing Transaction or a newly started Transaction.
+     * @throws IllegalStateException If there are exceptions.
+     */
+    private Transaction startTransaction(TransactionConfiguration config) {
+        Ignite ignite = grid(0);
+        Transaction txThread = ignite.transactions().tx();
+
+        // We do not support nested transactions. If one has already been started, return
the existing transaction
+        if (txThread == null) {
+            try {
+                Transaction tx = ignite.transactions().txStart();
+                tx.timeout(config.getDefaultTxTimeout());
+                return tx;
+            }
+            catch (RuntimeException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+        else {
+            return txThread;
+        }
+    }
+
+    /**
+     * Interface to represent the execution of a transaction that returns a result.
+     *
+     * @param <T> The results of executing the transaction.
+     */
+    interface TransactionalTask<T> {
+        /**
+         * Run the task.
+         *
+         * @param tx The current transaction.
+         * @return The result from running the task.
+         */
+        T run(Transaction tx);
+    }
+
+    /**
+     * {@link EntryProcessor} used to update an entry with a specified integer value.
+     */
+    private static class IntegerSetValue
+            implements EntryProcessor<Integer, Integer, Boolean>, Serializable {
+        /** The value. */
+        private final int newValue;
+
+        /**
+         * Create a new {@link IntegerSetValue} processor that will update a targeted entry
with the specified value.
+         *
+         * @param newValue The new value.
+         */
+        public IntegerSetValue(int newValue) {
+            this.newValue = newValue;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Boolean process(MutableEntry<Integer, Integer> entry, Object... args)
{
+            entry.setValue(this.newValue);
+            return Boolean.TRUE;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a853ee32/modules/core/src/test/java/org/apache/ignite/testframework/assertions/Assertion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/assertions/Assertion.java
b/modules/core/src/test/java/org/apache/ignite/testframework/assertions/Assertion.java
index 4799d88..62e04dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/assertions/Assertion.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/assertions/Assertion.java
@@ -27,5 +27,5 @@ public interface Assertion {
      *
      * @throws AssertionError if the condition was not satisfied.
      */
-    public void test() throws AssertionError;
+    void test() throws AssertionError;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a853ee32/modules/core/src/test/java/org/apache/ignite/testframework/assertions/CacheNodeSafeAssertion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/assertions/CacheNodeSafeAssertion.java
b/modules/core/src/test/java/org/apache/ignite/testframework/assertions/CacheNodeSafeAssertion.java
new file mode 100644
index 0000000..5d74c11
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/assertions/CacheNodeSafeAssertion.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testframework.assertions;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ * {@link Assertion} that checks that the primary and backup partitions are distributed such
that we won't lose any data
+ * if we lose a single node. This implies that the cache in question was configured with
a backup count of at least one
+ * and that all partitions are backed up to a different node from the primary.
+ */
+public class CacheNodeSafeAssertion implements Assertion {
+    /** The {@link Ignite} instance. */
+    private final Ignite ignite;
+
+    /** The cache name. */
+    private final String cacheName;
+
+    /**
+     * Construct a new {@link CacheNodeSafeAssertion} for the given {@code cacheName}.
+     *
+     * @param ignite The Ignite instance.
+     * @param cacheName The cache name.
+     */
+    public CacheNodeSafeAssertion(Ignite ignite, String cacheName) {
+        this.ignite = ignite;
+        this.cacheName = cacheName;
+    }
+
+    /**
+     * @return Ignite instance.
+     */
+    protected Ignite ignite() {
+        return ignite;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void test() throws AssertionError {
+        Affinity<?> affinity = ignite.affinity(cacheName);
+
+        int partCnt = affinity.partitions();
+
+        boolean hostSafe = true;
+
+        boolean nodeSafe = true;
+
+        for (int x = 0; x < partCnt; ++x) {
+            // Results are returned with the primary node first and backups after. We want
to ensure that there is at
+            // least one backup on a different host.
+            Collection<ClusterNode> results = affinity.mapPartitionToPrimaryAndBackups(x);
+
+            Iterator<ClusterNode> nodes = results.iterator();
+
+            boolean newHostSafe = false;
+
+            boolean newNodeSafe = false;
+
+            if (nodes.hasNext()) {
+                ClusterNode primary = nodes.next();
+
+                // For host safety, get all nodes on the same host as the primary node and
ensure at least one of the
+                // backups is on a different host. For node safety, make sure at least of
of the backups is not the
+                // primary.
+                Collection<ClusterNode> neighbors = hostSafe ? ignite.cluster().forHost(primary).nodes()
: null;
+
+                while (nodes.hasNext()) {
+                    ClusterNode backup = nodes.next();
+
+                    if (hostSafe) {
+                        if (!neighbors.contains(backup))
+                            newHostSafe = true;
+                    }
+
+                    if (nodeSafe) {
+                        if (!backup.equals(primary))
+                            newNodeSafe = true;
+                    }
+                }
+            }
+
+            hostSafe = newHostSafe;
+
+            nodeSafe = newNodeSafe;
+
+            if (!hostSafe && !nodeSafe)
+                break;
+        }
+
+        if (hostSafe)
+            return;
+
+        if (nodeSafe)
+            return;
+
+        throw new AssertionError("Cache " + cacheName + " is endangered!");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a853ee32/modules/core/src/test/java/org/apache/ignite/testframework/assertions/EventualAssertion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/assertions/EventualAssertion.java
b/modules/core/src/test/java/org/apache/ignite/testframework/assertions/EventualAssertion.java
new file mode 100644
index 0000000..6fa7994
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/assertions/EventualAssertion.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testframework.assertions;
+
+/**
+ * An {@link Assertion} that attempts to assert a condition within a maximum amount of time.
+ */
+public class EventualAssertion implements Assertion {
+    /** The assertion condition. */
+    private final Assertion assertion;
+
+    /**
+     * The amount of time in milliseconds to wait for the assertion to succeed. If zero try
forever, if negative
+     * try once and return immediately.
+     */
+    private final long timeout;
+
+    /**
+     * Create a new EventualAssertion.
+     *
+     * @param timeout The maximum amount of time in milliseconds to wait for the specified
assertion to succeed; if zero
+     * the implementation will try forever and if negative try once and return immediately.
+     * @param assertion The assertion to test.
+     */
+    public EventualAssertion(long timeout, Assertion assertion) {
+        if (assertion == null) {
+            throw new IllegalArgumentException("Assertion cannot be null");
+        }
+        this.timeout = timeout;
+        this.assertion = assertion;
+    }
+
+    /**
+     * Test that the assertion is satisfied within the configured timeout.
+     * <p>
+     *     Note: this implementation uses an exponential back-off between successive retries
of the assertion.
+     *
+     * @throws AssertionError If the condition was not satisfied within the timeout.
+     */
+    public void test() throws AssertionError {
+
+        // compute the ending time
+        long endTime = timeout == 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.timeout;
+
+        // current wait time
+        long waitTime = 2;
+
+        // last assertion error
+        Throwable lastError;
+        do {
+            try {
+                this.assertion.test();
+                return;
+            }
+            catch (final Throwable e) {
+                lastError = e;
+            }
+
+            // see if we're done
+            long currentTime = System.currentTimeMillis();
+            if (currentTime > endTime) {
+                break;
+            }
+
+            // wait for the current wait time
+            try {
+                synchronized (this) {
+                    wait(waitTime);
+                }
+            }
+            catch (InterruptedException e) {
+                lastError = e;
+                Thread.currentThread().interrupt();
+                break;
+            }
+
+            // exponential back-off
+            waitTime = Math.min(endTime - currentTime, Math.max(waitTime << 2, 8));
+        }
+        while (true);
+
+        throw new AssertionError(this.assertion.getClass().getName() + " failed", lastError);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a853ee32/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 26cea39..95e889e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -41,7 +41,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCa
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearRemoveFailureTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPrimaryWriteOrderNearRemoveFailureTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearRemoveFailureTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAggregationTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionDistributionTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingTransactionTest;
 import org.apache.ignite.testframework.GridTestUtils;
 
 /**
@@ -66,7 +68,9 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
 
         suite.addTestSuite(GridCacheAtomicInvalidPartitionHandlingSelfTest.class);
         suite.addTestSuite(GridCacheAtomicClientInvalidPartitionHandlingSelfTest.class);
+        suite.addTestSuite(GridCacheRebalancingAggregationTest.class);
         suite.addTestSuite(GridCacheRebalancingPartitionDistributionTest.class);
+        suite.addTestSuite(GridCacheRebalancingTransactionTest.class);
 
         GridTestUtils.addTestIfNeeded(suite, GridCacheIncrementTransformTest.class, ignoredTests);
 


Mime
View raw message