ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dpav...@apache.org
Subject [1/3] ignite git commit: IGNITE-5357 Replicated cache reads load balancing. - Fixes #3578.
Date Thu, 22 Mar 2018 14:40:57 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 5ccb5482e -> d6c982df2


IGNITE-5357 Replicated cache reads load balancing. - Fixes #3578.

Signed-off-by: dpavlov <dpavlov@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/99b8c6fa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/99b8c6fa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/99b8c6fa

Branch: refs/heads/master
Commit: 99b8c6faadf1a4023a90d2003b1ce9677c71f97c
Parents: 5765d60
Author: Vyacheslav Daradur <daradurvs@gmail.com>
Authored: Thu Mar 22 17:06:12 2018 +0300
Committer: dpavlov <dpavlov@gridgain.com>
Committed: Thu Mar 22 17:06:12 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  11 +
 .../processors/cache/GridCacheContext.java      |  64 ++++
 .../dht/CacheDistributedGetFutureAdapter.java   |  19 -
 .../dht/GridPartitionedGetFuture.java           |   2 +-
 .../dht/GridPartitionedSingleGetFuture.java     |  21 +-
 .../distributed/near/GridNearGetFuture.java     |   2 +-
 ...titionedAtomicCacheGetsDistributionTest.java |  49 +++
 ...onalOptimisticCacheGetsDistributionTest.java |  46 +++
 ...nalPessimisticCacheGetsDistributionTest.java |  33 ++
 ...plicatedAtomicCacheGetsDistributionTest.java | 364 +++++++++++++++++++
 ...onalOptimisticCacheGetsDistributionTest.java |  46 +++
 ...nalPessimisticCacheGetsDistributionTest.java |  33 ++
 .../testsuites/IgniteCacheTestSuite5.java       |  14 +
 13 files changed, 663 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index f171597..1f67f81 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -25,6 +25,7 @@ import java.util.Properties;
 import javax.net.ssl.HostnameVerifier;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.internal.client.GridClient;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
@@ -831,6 +832,16 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_LOADED_PAGES_BACKWARD_SHIFT_MAP = "IGNITE_LOADED_PAGES_BACKWARD_SHIFT_MAP";
 
     /**
+     * Whenever read load balancing is enabled, that means 'get' requests will be distributed
between primary and backup
+     * nodes if it is possible and {@link CacheConfiguration#readFromBackup} is {@code true}.
+     *
+     * Default is {@code true}.
+     *
+     * @see CacheConfiguration#readFromBackup
+     */
+    public static final String IGNITE_READ_LOAD_BALANCING = "IGNITE_READ_LOAD_BALANCING";
+  
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 103a976..9eb4340 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.expiry.EternalExpiryPolicy;
@@ -39,6 +40,7 @@ import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.binary.BinaryField;
 import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.cache.CacheInterceptor;
@@ -106,12 +108,14 @@ import org.apache.ignite.plugin.security.SecurityException;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_READ_LOAD_BALANCING;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
@@ -258,6 +262,15 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** Statistics enabled flag. */
     private volatile boolean statisticsEnabled;
 
+    /** Whether to enable read load balancing. */
+    private final boolean readLoadBalancingEnabled = IgniteSystemProperties.getBoolean(IGNITE_READ_LOAD_BALANCING,
true);
+
+    /** Flag indicating whether data can be read from backup. */
+    private boolean readFromBackup = CacheConfiguration.DFLT_READ_FROM_BACKUP;
+
+    /** Local node's MAC address. */
+    private String locMacs;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -377,6 +390,12 @@ public class GridCacheContext<K, V> implements Externalizable {
             expiryPlc = null;
 
         itHolder = new CacheWeakQueryIteratorsHolder(log);
+
+        readFromBackup = cacheCfg.isReadFromBackup();
+
+        locMacs = localNode().attribute(ATTR_MACS);
+
+        assert locMacs != null;
     }
 
     /**
@@ -2158,6 +2177,51 @@ public class GridCacheContext<K, V> implements Externalizable
{
     }
 
     /**
+     * Determines an affinity node to send get request to.
+     *
+     * @param affNodes All affinity nodes.
+     * @param canRemap Flag indicating that 'get' should be done on a locked topology version.
+     * @return Affinity node to get key from or {@code null} if there is no suitable alive
node.
+     */
+    @Nullable public ClusterNode selectAffinityNodeBalanced(List<ClusterNode> affNodes,
boolean canRemap) {
+        if (!readLoadBalancingEnabled) {
+            if (!canRemap) {
+                for (ClusterNode node : affNodes) {
+                    if (ctx.discovery().alive(node))
+                        return node;
+                }
+
+                return null;
+            }
+            else
+                return affNodes.get(0);
+        }
+
+        if (!readFromBackup)
+            return affNodes.get(0);
+
+        assert locMacs != null;
+
+        int r = ThreadLocalRandom.current().nextInt(affNodes.size());
+
+        ClusterNode n0 = null;
+
+        for (ClusterNode node : affNodes) {
+            if (canRemap || discovery().alive(node)) {
+                if (locMacs.equals(node.attribute(ATTR_MACS)))
+                    return node;
+
+                if (r >= 0 || n0 == null)
+                    n0 = node;
+            }
+
+            r--;
+        }
+
+        return n0;
+    }
+
+    /**
      * Prepare affinity field for builder (if possible).
      *
      * @param buider Builder.

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 2257c9f..d9c4b3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -152,25 +152,6 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends
GridCacheCo
     }
 
     /**
-     * Affinity node to send get request to.
-     *
-     * @param affNodes All affinity nodes.
-     * @return Affinity node to get key from.
-     */
-    protected final ClusterNode affinityNode(List<ClusterNode> affNodes) {
-        if (!canRemap) {
-            for (ClusterNode node : affNodes) {
-                if (cctx.discovery().alive(node))
-                    return node;
-            }
-
-            return null;
-        }
-        else
-            return affNodes.get(0);
-    }
-
-    /**
      * @param part Partition.
      * @return {@code True} if partition is in owned state.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 179a8ae..204a0ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -407,7 +407,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             }
         }
 
-        ClusterNode node = affinityNode(affNodes);
+        ClusterNode node = cctx.selectAffinityNodeBalanced(affNodes, canRemap);
 
         if (node == null) {
             onDone(serverNotFoundError(topVer));

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 4e34bcb..e0aea9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -347,7 +347,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
             }
         }
 
-        ClusterNode affNode = affinityNode(affNodes);
+        ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, canRemap);
 
         if (affNode == null) {
             onDone(serverNotFoundError(topVer));
@@ -711,25 +711,6 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
             "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name()
+ ']');
     }
 
-    /**
-     * Affinity node to send get request to.
-     *
-     * @param affNodes All affinity nodes.
-     * @return Affinity node to get key from.
-     */
-    @Nullable private ClusterNode affinityNode(List<ClusterNode> affNodes) {
-        if (!canRemap) {
-            for (ClusterNode node : affNodes) {
-                if (cctx.discovery().alive(node))
-                    return node;
-            }
-
-            return null;
-        }
-        else
-            return affNodes.get(0);
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 29dd12f..b35f524 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -490,7 +490,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                         }
                     }
 
-                    ClusterNode affNode = affinityNode(affNodes);
+                    ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, canRemap);
 
                     if (affNode == null) {
                         onDone(serverNotFoundError(topVer));

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedAtomicCacheGetsDistributionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedAtomicCacheGetsDistributionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedAtomicCacheGetsDistributionTest.java
new file mode 100644
index 0000000..2241a95
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedAtomicCacheGetsDistributionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ * Tests of partitioned atomic cache's 'get' requests distribution.
+ */
+public class PartitionedAtomicCacheGetsDistributionTest extends ReplicatedAtomicCacheGetsDistributionTest
{
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected <K, V> CacheConfiguration<K, V> cacheConfiguration()
{
+        CacheConfiguration<K, V> cacheCfg = super.cacheConfiguration();
+
+        cacheCfg.setBackups(backupsCount());
+
+        return cacheCfg;
+    }
+
+    /**
+     * @return Backups count.
+     */
+    protected int backupsCount() {
+        return gridCount() - 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedTransactionalOptimisticCacheGetsDistributionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedTransactionalOptimisticCacheGetsDistributionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedTransactionalOptimisticCacheGetsDistributionTest.java
new file mode 100644
index 0000000..4c88229
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedTransactionalOptimisticCacheGetsDistributionTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/**
+ * Tests of optimistic transactional partitioned cache's 'get' requests distribution.
+ */
+public class PartitionedTransactionalOptimisticCacheGetsDistributionTest extends PartitionedAtomicCacheGetsDistributionTest
{
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected TransactionIsolation transactionIsolation() {
+        return READ_COMMITTED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected TransactionConcurrency transactionConcurrency() {
+        return OPTIMISTIC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedTransactionalPessimisticCacheGetsDistributionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedTransactionalPessimisticCacheGetsDistributionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedTransactionalPessimisticCacheGetsDistributionTest.java
new file mode 100644
index 0000000..78ea7a6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionedTransactionalPessimisticCacheGetsDistributionTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.transactions.TransactionConcurrency;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+
+/**
+ * Tests of pessimistic transactional partitioned cache's 'get' requests distribution.
+ */
+public class PartitionedTransactionalPessimisticCacheGetsDistributionTest
+    extends PartitionedTransactionalOptimisticCacheGetsDistributionTest {
+    /** {@inheritDoc} */
+    @Override protected TransactionConcurrency transactionConcurrency() {
+        return PESSIMISTIC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedAtomicCacheGetsDistributionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedAtomicCacheGetsDistributionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedAtomicCacheGetsDistributionTest.java
new file mode 100644
index 0000000..1d0c6de
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedAtomicCacheGetsDistributionTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests of replicated cache's 'get' requests distribution.
+ */
+public class ReplicatedAtomicCacheGetsDistributionTest extends GridCacheAbstractSelfTest
{
+    /** Cache name. */
+    private static final String CACHE_NAME = "getsDistributionTest";
+
+    /** Client nodes instance's name. */
+    private static final String CLIENT_NAME = "client";
+
+    /** Value prefix. */
+    private static final String VAL_PREFIX = "val";
+
+    /** */
+    private static final int PRIMARY_KEYS_NUMBER = 1_000;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        IgniteConfiguration clientCfg = getConfiguration(CLIENT_NAME);
+
+        clientCfg.setClientMode(true);
+
+        startGrid(clientCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        IgniteCache cache = ignite(0).cache(CACHE_NAME);
+
+        if (cache != null)
+            cache.destroy();
+
+        // Setting different MAC addresses for all nodes
+        Map<UUID, String> macs = getClusterMacs();
+
+        int idx = 0;
+
+        for (Map.Entry<UUID, String> entry : macs.entrySet())
+            entry.setValue("x2-xx-xx-xx-xx-x" + idx++);
+
+        replaceMacAddresses(G.allGrids(), macs);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setTransactionConfiguration(transactionConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 4;
+    }
+
+    /**
+     * Test 'get' operations requests generator distribution.
+     *
+     * @throws Exception In case of an error.
+     * @see #runTestBalancingDistribution(boolean)
+     */
+    public void testGetRequestsGeneratorDistribution() throws Exception {
+        runTestBalancingDistribution(false);
+    }
+
+    /**
+     * Test 'getAll' operations requests generator distribution.
+     *
+     * @throws Exception In case of an error.
+     * @see #runTestBalancingDistribution(boolean)
+     */
+    public void testGetAllRequestsGeneratorDistribution() throws Exception {
+        runTestBalancingDistribution(true);
+    }
+
+    /**
+     * @param batchMode Whenever 'get' or 'getAll' operations are used in the test.
+     * @throws Exception In case of an error.
+     */
+    protected void runTestBalancingDistribution(boolean batchMode) throws Exception {
+        IgniteCache<Integer, String> cache = grid(0).createCache(cacheConfiguration());
+
+        List<Integer> keys = primaryKeys(cache, PRIMARY_KEYS_NUMBER);
+
+        for (Integer key : keys)
+            cache.put(key, VAL_PREFIX + key);
+
+        IgniteCache<Integer, String> clientCache = grid(CLIENT_NAME).getOrCreateCache(CACHE_NAME);
+
+        assertTrue(GridTestUtils.waitForCondition(
+            new GridAbsPredicate() {
+                int batchSize = 10;
+                int idx = 0;
+
+                @Override public boolean apply() {
+                    if (idx >= PRIMARY_KEYS_NUMBER)
+                        idx = 0;
+
+                    try (Transaction tx = grid(CLIENT_NAME).transactions().txStart()) {
+                        if (batchMode) {
+                            Set<Integer> keys0 = new TreeSet<>();
+
+                            for (int i = idx; i < idx + batchSize && i < PRIMARY_KEYS_NUMBER;
i++)
+                                keys0.add(keys.get(i));
+
+                            idx += batchSize;
+
+                            Map<Integer, String> results = clientCache.getAll(keys0);
+
+                            for (Map.Entry<Integer, String> entry : results.entrySet())
+                                assertEquals(VAL_PREFIX + entry.getKey(), entry.getValue());
+                        }
+                        else {
+                            for (int i = idx; i < idx + gridCount() && i <
PRIMARY_KEYS_NUMBER; i++) {
+                                Integer key = keys.get(i);
+
+                                assertEquals(VAL_PREFIX + key, clientCache.get(key));
+                            }
+
+                            idx += gridCount();
+                        }
+
+                        tx.commit();
+                    }
+
+                    for (int i = 0; i < gridCount(); i++) {
+                        IgniteEx ignite = grid(i);
+
+                        long getsCnt = ignite.cache(CACHE_NAME).localMetrics().getCacheGets();
+
+                        if (getsCnt == 0)
+                            return false;
+                    }
+
+                    return true;
+                }
+            },
+            getTestTimeout())
+        );
+    }
+
+    /**
+     * Tests that the 'get' operation requests are routed to node with same MAC address as
at requester.
+     *
+     * @throws Exception In case of an error.
+     * @see #runTestSameHostDistribution(UUID, boolean)
+     */
+    public void testGetRequestsDistribution() throws Exception {
+        UUID destId = grid(0).localNode().id();
+
+        runTestSameHostDistribution(destId, false);
+    }
+
+    /**
+     * Tests that the 'getAll' operation requests are routed to node with same MAC address
as at requester.
+     *
+     * @throws Exception In case of an error.
+     * @see #runTestSameHostDistribution(UUID, boolean)
+     */
+    public void testGetAllRequestsDistribution() throws Exception {
+        UUID destId = grid(gridCount() - 1).localNode().id();
+
+        runTestSameHostDistribution(destId, true);
+    }
+
+    /**
+     * Tests that the 'get' and 'getAll' requests are routed to node with same MAC address
as at requester.
+     *
+     * @param destId Destination Ignite instance id for requests distribution.
+     * @param batchMode Test mode.
+     * @throws Exception In case of an error.
+     */
+    protected void runTestSameHostDistribution(final UUID destId, final boolean batchMode)
throws Exception {
+        Map<UUID, String> macs = getClusterMacs();
+
+        String clientMac = macs.get(grid(CLIENT_NAME).localNode().id());
+
+        macs.put(destId, clientMac);
+
+        replaceMacAddresses(G.allGrids(), macs);
+
+        IgniteCache<Integer, String> cache = grid(0).createCache(cacheConfiguration());
+
+        List<Integer> keys = primaryKeys(cache, PRIMARY_KEYS_NUMBER);
+
+        for (Integer key : keys)
+            cache.put(key, VAL_PREFIX + key);
+
+        IgniteCache<Integer, String> clientCache = grid(CLIENT_NAME).getOrCreateCache(CACHE_NAME);
+
+        try (Transaction tx = grid(CLIENT_NAME).transactions().txStart()) {
+            if (batchMode) {
+                Map<Integer, String> results = clientCache.getAll(new TreeSet<>(keys));
+
+                for (Map.Entry<Integer, String> entry : results.entrySet())
+                    assertEquals(VAL_PREFIX + entry.getKey(), entry.getValue());
+            }
+            else {
+                for (Integer key : keys)
+                    assertEquals(VAL_PREFIX + key, clientCache.get(key));
+            }
+
+            tx.commit();
+        }
+
+        for (int i = 0; i < gridCount(); i++) {
+            IgniteEx ignite = grid(i);
+
+            long getsCnt = ignite.cache(CACHE_NAME).localMetrics().getCacheGets();
+
+            if (destId.equals(ignite.localNode().id()))
+                assertEquals(PRIMARY_KEYS_NUMBER, getsCnt);
+            else
+                assertEquals(0L, getsCnt);
+        }
+    }
+
+    /**
+     * @return Transaction configuration.
+     */
+    protected TransactionConfiguration transactionConfiguration() {
+        TransactionConfiguration txCfg = new TransactionConfiguration();
+
+        txCfg.setDefaultTxIsolation(transactionIsolation());
+        txCfg.setDefaultTxConcurrency(transactionConcurrency());
+
+        return txCfg;
+    }
+
+    /**
+     * @return Cache transaction isolation.
+     */
+    protected TransactionIsolation transactionIsolation() {
+        return REPEATABLE_READ;
+    }
+
+    /**
+     * @return Cache transaction concurrency.
+     */
+    protected TransactionConcurrency transactionConcurrency() {
+        return PESSIMISTIC;
+    }
+
+    /**
+     * @return Caching mode.
+     */
+    @Override protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    protected <K, V> CacheConfiguration<K, V> cacheConfiguration() {
+        CacheConfiguration<K, V> cfg = new CacheConfiguration<K, V>(CACHE_NAME);
+
+        cfg.setCacheMode(cacheMode());
+        cfg.setAtomicityMode(atomicityMode());
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+        cfg.setReadFromBackup(true);
+        cfg.setStatisticsEnabled(true);
+
+        return cfg;
+    }
+
+    /**
+     * @param instances Started Ignite instances.
+     * @param macs Mapping MAC addresses to UUID.
+     */
+    private void replaceMacAddresses(List<Ignite> instances, Map<UUID, String>
macs) {
+        for (Ignite ignite : instances) {
+            for (ClusterNode node : ignite.cluster().nodes()) {
+                String mac = macs.get(node.id());
+
+                assertNotNull(mac);
+
+                Map<String, Object> attrs = new HashMap<>(node.attributes());
+
+                attrs.put(ATTR_MACS, mac);
+
+                ((TcpDiscoveryNode)node).setAttributes(attrs);
+            }
+        }
+    }
+
+    /**
+     * @return Cluster nodes MAC addresses.
+     */
+    private Map<UUID, String> getClusterMacs() {
+        Map<UUID, String> macs = new HashMap<>();
+
+        for (Ignite ignite : G.allGrids()) {
+            ClusterNode node = ignite.cluster().localNode();
+
+            String mac = node.attribute(ATTR_MACS);
+
+            assert mac != null;
+
+            macs.put(node.id(), mac);
+        }
+
+        return macs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedTransactionalOptimisticCacheGetsDistributionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedTransactionalOptimisticCacheGetsDistributionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedTransactionalOptimisticCacheGetsDistributionTest.java
new file mode 100644
index 0000000..3bc6809
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedTransactionalOptimisticCacheGetsDistributionTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/**
+ * Tests of optimistic transactional replicated cache's 'get' requests distribution.
+ */
+public class ReplicatedTransactionalOptimisticCacheGetsDistributionTest extends ReplicatedAtomicCacheGetsDistributionTest
{
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected TransactionIsolation transactionIsolation() {
+        return READ_COMMITTED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected TransactionConcurrency transactionConcurrency() {
+        return OPTIMISTIC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedTransactionalPessimisticCacheGetsDistributionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedTransactionalPessimisticCacheGetsDistributionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedTransactionalPessimisticCacheGetsDistributionTest.java
new file mode 100644
index 0000000..7bace3c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ReplicatedTransactionalPessimisticCacheGetsDistributionTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.transactions.TransactionConcurrency;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+
+/**
+ * Tests of pessimistic transactional replicated cache's 'get' requests distribution.
+ */
+public class ReplicatedTransactionalPessimisticCacheGetsDistributionTest
+    extends ReplicatedTransactionalOptimisticCacheGetsDistributionTest {
+    /** {@inheritDoc} */
+    @Override protected TransactionConcurrency transactionConcurrency() {
+        return PESSIMISTIC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/99b8c6fa/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index ee57fa7..6127fa4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -35,7 +35,13 @@ import org.apache.ignite.internal.processors.cache.EntryVersionConsistencyReadTh
 import org.apache.ignite.internal.processors.cache.IgniteCachePutStackOverflowSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheReadThroughEvictionsVariationsSuite;
 import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTest;
+import org.apache.ignite.internal.processors.cache.PartitionedAtomicCacheGetsDistributionTest;
+import org.apache.ignite.internal.processors.cache.PartitionedTransactionalOptimisticCacheGetsDistributionTest;
+import org.apache.ignite.internal.processors.cache.PartitionedTransactionalPessimisticCacheGetsDistributionTest;
 import org.apache.ignite.internal.processors.cache.PartitionsExchangeOnDiscoveryHistoryOverflowTest;
+import org.apache.ignite.internal.processors.cache.ReplicatedAtomicCacheGetsDistributionTest;
+import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalOptimisticCacheGetsDistributionTest;
+import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessimisticCacheGetsDistributionTest;
 import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartitionsTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest;
@@ -101,6 +107,14 @@ public class IgniteCacheTestSuite5 extends TestSuite {
 
         suite.addTestSuite(Cache64kPartitionsTest.class);
 
+        suite.addTestSuite(ReplicatedAtomicCacheGetsDistributionTest.class);
+        suite.addTestSuite(ReplicatedTransactionalOptimisticCacheGetsDistributionTest.class);
+        suite.addTestSuite(ReplicatedTransactionalPessimisticCacheGetsDistributionTest.class);
+
+        suite.addTestSuite(PartitionedAtomicCacheGetsDistributionTest.class);
+        suite.addTestSuite(PartitionedTransactionalOptimisticCacheGetsDistributionTest.class);
+        suite.addTestSuite(PartitionedTransactionalPessimisticCacheGetsDistributionTest.class);
+
         return suite;
     }
 }


Mime
View raw message