Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1357D10A93 for ; Tue, 3 Mar 2015 20:28:44 +0000 (UTC) Received: (qmail 10393 invoked by uid 500); 3 Mar 2015 20:28:31 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 10308 invoked by uid 500); 3 Mar 2015 20:28:31 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 10245 invoked by uid 99); 3 Mar 2015 20:28:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 20:28:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 03 Mar 2015 20:28:29 +0000 Received: (qmail 7902 invoked by uid 99); 3 Mar 2015 20:28:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 20:28:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 22205E1080; Tue, 3 Mar 2015 20:28:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vkulichenko@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 03 Mar 2015 20:28:32 -0000 Message-Id: <038cd0f27ad54f7e85a78275b64434dd@git.apache.org> In-Reply-To: <3c81b0e1f7b84d6a886b33d710bf100e@git.apache.org> References: <3c81b0e1f7b84d6a886b33d710bf100e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [28/50] incubator-ignite git commit: #ignite-237: move CacheConsistentHashAffinityFunction. X-Virus-Checked: Checked by ClamAV on apache.org #ignite-237: move CacheConsistentHashAffinityFunction. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e3782644 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e3782644 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e3782644 Branch: refs/heads/ignite-141 Commit: e3782644789c565162870fb2a41bd753e7631922 Parents: 414410b Author: ivasilinets Authored: Fri Feb 27 19:36:34 2015 +0300 Committer: ivasilinets Committed: Fri Feb 27 19:36:34 2015 +0300 ---------------------------------------------------------------------- .../CacheConsistentHashAffinityFunction.java | 703 ------------------- 1 file changed, 703 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3782644/modules/core/src/main/java/org/gridgain/benchmarks/risk/affinity/CacheConsistentHashAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/benchmarks/risk/affinity/CacheConsistentHashAffinityFunction.java b/modules/core/src/main/java/org/gridgain/benchmarks/risk/affinity/CacheConsistentHashAffinityFunction.java deleted file mode 100644 index 35be9b8..0000000 --- a/modules/core/src/main/java/org/gridgain/benchmarks/risk/affinity/CacheConsistentHashAffinityFunction.java +++ /dev/null @@ -1,703 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.benchmarks.risk.affinity; - -import org.apache.ignite.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Affinity function for partitioned cache. This function supports the following - * configuration: - *
    - *
  • - * {@code backups} - Use this flag to control how many back up nodes will be - * assigned to every key. The default value is {@code 0}. - *
  • - *
  • - * {@code replicas} - Generally the more replicas a node gets, the more key assignments - * it will receive. You can configure different number of replicas for a node by - * setting user attribute with name {@link #getReplicaCountAttributeName()} to some - * number. Default value is {@code 512} defined by {@link #DFLT_REPLICA_COUNT} constant. - *
  • - *
  • - * {@code backupFilter} - Optional filter for back up nodes. If provided, then only - * nodes that pass this filter will be selected as backup nodes. If not provided, then - * primary and backup nodes will be selected out of all nodes available for this cache. - *
  • - *
- *

- * Cache affinity can be configured for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method. - */ -@Deprecated -public class CacheConsistentHashAffinityFunction implements CacheAffinityFunction { - /** */ - private static final long serialVersionUID = 0L; - - /** Flag to enable/disable consistency check (for internal use only). */ - private static final boolean AFFINITY_CONSISTENCY_CHECK = Boolean.getBoolean("IGNITE_AFFINITY_CONSISTENCY_CHECK"); - - /** Default number of partitions. */ - public static final int DFLT_PARTITION_COUNT = 10000; - - /** Default replica count for partitioned caches. */ - public static final int DFLT_REPLICA_COUNT = 128; - - /** - * Name of node attribute to specify number of replicas for a node. - * Default value is {@code gg:affinity:node:replicas}. - */ - public static final String DFLT_REPLICA_COUNT_ATTR_NAME = "gg:affinity:node:replicas"; - - /** Node hash. */ - private transient GridConsistentHash nodeHash; - - /** Total number of partitions. */ - private int parts = DFLT_PARTITION_COUNT; - - /** */ - private int replicas = DFLT_REPLICA_COUNT; - - /** */ - private String attrName = DFLT_REPLICA_COUNT_ATTR_NAME; - - /** */ - private boolean exclNeighbors; - - /** - * Optional backup filter. First node passed to this filter is primary node, - * and second node is a node being tested. - */ - private IgniteBiPredicate backupFilter; - - /** */ - private CacheAffinityNodeHashResolver hashIdRslvr = new CacheAffinityNodeAddressHashResolver(); - - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Injected cache name. */ - @CacheNameResource - private String cacheName; - - /** Injected logger. */ - @LoggerResource - private IgniteLogger log; - - /** Initialization flag. */ - @SuppressWarnings("TransientFieldNotInitialized") - private transient AtomicBoolean init = new AtomicBoolean(); - - /** Latch for initializing. */ - @SuppressWarnings({"TransientFieldNotInitialized"}) - private transient CountDownLatch initLatch = new CountDownLatch(1); - - /** Nodes IDs. */ - @GridToStringInclude - @SuppressWarnings({"TransientFieldNotInitialized"}) - private transient ConcurrentMap addedNodes = new ConcurrentHashMap<>(); - - /** Optional backup filter. */ - @GridToStringExclude - private final IgniteBiPredicate backupIdFilter = new IgniteBiPredicate() { - @Override public boolean apply(NodeInfo primaryNodeInfo, NodeInfo nodeInfo) { - return backupFilter == null || backupFilter.apply(primaryNodeInfo.node(), nodeInfo.node()); - } - }; - - /** Map of neighbors. */ - @SuppressWarnings("TransientFieldNotInitialized") - private transient ConcurrentMap> neighbors = - new ConcurrentHashMap8<>(); - - /** - * Empty constructor with all defaults. - */ - public CacheConsistentHashAffinityFunction() { - // No-op. - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other - * and specified number of backups. - *

- * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - */ - public CacheConsistentHashAffinityFunction(boolean exclNeighbors) { - this.exclNeighbors = exclNeighbors; - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, - * and specified number of backups and partitions. - *

- * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - * @param parts Total number of partitions. - */ - public CacheConsistentHashAffinityFunction(boolean exclNeighbors, int parts) { - A.ensure(parts != 0, "parts != 0"); - - this.exclNeighbors = exclNeighbors; - this.parts = parts; - } - - /** - * Initializes optional counts for replicas and backups. - *

- * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @param parts Total number of partitions. - * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected - * from all nodes that pass this filter. First argument for this filter is primary node, and second - * argument is node being tested. - *

- * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - */ - public CacheConsistentHashAffinityFunction(int parts, - @Nullable IgniteBiPredicate backupFilter) { - A.ensure(parts != 0, "parts != 0"); - - this.parts = parts; - this.backupFilter = backupFilter; - } - - /** - * Gets default count of virtual replicas in consistent hash ring. - *

- * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName()} - * name will be checked first. If it is absent, then this value will be used. - * - * @return Count of virtual replicas in consistent hash ring. - */ - public int getDefaultReplicas() { - return replicas; - } - - /** - * Sets default count of virtual replicas in consistent hash ring. - *

- * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName} name - * will be checked first. If it is absent, then this value will be used. - * - * @param replicas Count of virtual replicas in consistent hash ring.s - */ - public void setDefaultReplicas(int replicas) { - this.replicas = replicas; - } - - /** - * Gets total number of key partitions. To ensure that all partitions are - * equally distributed across all nodes, please make sure that this - * number is significantly larger than a number of nodes. Also, partition - * size should be relatively small. Try to avoid having partitions with more - * than quarter million keys. - *

- * Note that for fully replicated caches this method should always - * return {@code 1}. - * - * @return Total partition count. - */ - public int getPartitions() { - return parts; - } - - /** - * Sets total number of partitions. - * - * @param parts Total number of partitions. - */ - public void setPartitions(int parts) { - this.parts = parts; - } - - /** - * Gets hash ID resolver for nodes. This resolver is used to provide - * alternate hash ID, other than node ID. - *

- * Node IDs constantly change when nodes get restarted, which causes them to - * be placed on different locations in the hash ring, and hence causing - * repartitioning. Providing an alternate hash ID, which survives node restarts, - * puts node on the same location on the hash ring, hence minimizing required - * repartitioning. - * - * @return Hash ID resolver. - */ - public CacheAffinityNodeHashResolver getHashIdResolver() { - return hashIdRslvr; - } - - /** - * Sets hash ID resolver for nodes. This resolver is used to provide - * alternate hash ID, other than node ID. - *

- * Node IDs constantly change when nodes get restarted, which causes them to - * be placed on different locations in the hash ring, and hence causing - * repartitioning. Providing an alternate hash ID, which survives node restarts, - * puts node on the same location on the hash ring, hence minimizing required - * repartitioning. - * - * @param hashIdRslvr Hash ID resolver. - */ - public void setHashIdResolver(CacheAffinityNodeHashResolver hashIdRslvr) { - this.hashIdRslvr = hashIdRslvr; - } - - /** - * Gets optional backup filter. If not {@code null}, backups will be selected - * from all nodes that pass this filter. First node passed to this filter is primary node, - * and second node is a node being tested. - *

- * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @return Optional backup filter. - */ - @Nullable public IgniteBiPredicate getBackupFilter() { - return backupFilter; - } - - /** - * Sets optional backup filter. If provided, then backups will be selected from all - * nodes that pass this filter. First node being passed to this filter is primary node, - * and second node is a node being tested. - *

- * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @param backupFilter Optional backup filter. - */ - public void setBackupFilter(@Nullable IgniteBiPredicate backupFilter) { - this.backupFilter = backupFilter; - } - - /** - * Gets optional attribute name for replica count. If not provided, the - * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}. - * - * @return User attribute name for replica count for a node. - */ - public String getReplicaCountAttributeName() { - return attrName; - } - - /** - * Sets optional attribute name for replica count. If not provided, the - * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}. - * - * @param attrName User attribute name for replica count for a node. - */ - public void setReplicaCountAttributeName(String attrName) { - this.attrName = attrName; - } - - /** - * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - *

- * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @return {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public boolean isExcludeNeighbors() { - return exclNeighbors; - } - - /** - * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - *

- * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public void setExcludeNeighbors(boolean exclNeighbors) { - this.exclNeighbors = exclNeighbors; - } - - /** - * Gets neighbors for a node. - * - * @param node Node. - * @return Neighbors. - */ - private Collection neighbors(final ClusterNode node) { - Collection ns = neighbors.get(node.id()); - - if (ns == null) { - Collection nodes = ignite.cluster().forHost(node).nodes(); - - ns = F.addIfAbsent(neighbors, node.id(), new ArrayList<>(F.nodeIds(nodes))); - } - - return ns; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public List> assignPartitions(CacheAffinityFunctionContext ctx) { - List> res = new ArrayList<>(parts); - - Collection topSnapshot = ctx.currentTopologySnapshot(); - - for (int part = 0; part < parts; part++) { - res.add(F.isEmpty(topSnapshot) ? - Collections.emptyList() : - // Wrap affinity nodes with unmodifiable list since unmodifiable generic collection - // doesn't provide equals and hashCode implementations. - U.sealList(nodes(part, topSnapshot, ctx.backups()))); - } - - return res; - } - - /** - * Assigns nodes to one partition. - * - * @param part Partition to assign nodes for. - * @param nodes Cache topology nodes. - * @return Assigned nodes, first node is primary, others are backups. - */ - public Collection nodes(int part, Collection nodes, int backups) { - if (nodes == null) - return Collections.emptyList(); - - int nodesSize = nodes.size(); - - if (nodesSize == 0) - return Collections.emptyList(); - - if (nodesSize == 1) // Minor optimization. - return nodes; - - initialize(); - - final Map lookup = new GridLeanMap<>(nodesSize); - - // Store nodes in map for fast lookup. - for (ClusterNode n : nodes) - // Add nodes into hash circle, if absent. - lookup.put(resolveNodeInfo(n), n); - - Collection selected; - - if (backupFilter != null) { - final IgnitePredicate p = new P1() { - @Override public boolean apply(NodeInfo id) { - return lookup.containsKey(id); - } - }; - - final NodeInfo primaryId = nodeHash.node(part, p); - - IgnitePredicate backupPrimaryIdFilter = new IgnitePredicate() { - @Override public boolean apply(NodeInfo node) { - return backupIdFilter.apply(primaryId, node); - } - }; - - Collection backupIds = nodeHash.nodes(part, backups, p, backupPrimaryIdFilter); - - if (F.isEmpty(backupIds) && primaryId != null) { - ClusterNode n = lookup.get(primaryId); - - assert n != null; - - return Collections.singletonList(n); - } - - selected = primaryId != null ? F.concat(false, primaryId, backupIds) : backupIds; - } - else { - if (!exclNeighbors) { - selected = nodeHash.nodes(part, backups == Integer.MAX_VALUE ? backups : backups + 1, new P1() { - @Override public boolean apply(NodeInfo id) { - return lookup.containsKey(id); - } - }); - - if (selected.size() == 1) { - NodeInfo id = F.first(selected); - - assert id != null : "Node ID cannot be null in affinity node ID collection: " + selected; - - ClusterNode n = lookup.get(id); - - assert n != null; - - return Collections.singletonList(n); - } - } - else { - int primaryAndBackups = backups + 1; - - selected = new ArrayList<>(primaryAndBackups); - - final Collection selected0 = selected; - - List ids = nodeHash.nodes(part, primaryAndBackups, new P1() { - @Override public boolean apply(NodeInfo id) { - ClusterNode n = lookup.get(id); - - if (n == null) - return false; - - Collection neighbors = neighbors(n); - - for (NodeInfo id0 : selected0) { - ClusterNode n0 = lookup.get(id0); - - if (n0 == null) - return false; - - Collection neighbors0 = neighbors(n0); - - if (F.containsAny(neighbors0, neighbors)) - return false; - } - - selected0.add(id); - - return true; - } - }); - - if (AFFINITY_CONSISTENCY_CHECK) - assert F.eqOrdered(ids, selected); - } - } - - Collection ret = new ArrayList<>(selected.size()); - - for (NodeInfo id : selected) { - ClusterNode n = lookup.get(id); - - assert n != null; - - ret.add(n); - } - - return ret; - } - - /** {@inheritDoc} */ - @Override public int partition(Object key) { - initialize(); - - return U.safeAbs(key.hashCode() % parts); - } - - /** {@inheritDoc} */ - @Override public int partitions() { - initialize(); - - return parts; - } - - /** {@inheritDoc} */ - @Override public void reset() { - addedNodes = new ConcurrentHashMap<>(); - neighbors = new ConcurrentHashMap8<>(); - - initLatch = new CountDownLatch(1); - - init = new AtomicBoolean(); - } - - /** {@inheritDoc} */ - @Override public void removeNode(UUID nodeId) { - NodeInfo info = addedNodes.remove(nodeId); - - if (info == null) - return; - - nodeHash.removeNode(info); - - neighbors.clear(); - } - - /** - * Resolve node info for specified node. - * Add node to hash circle if this is the first node invocation. - * - * @param n Node to get info for. - * @return Node info. - */ - private NodeInfo resolveNodeInfo(ClusterNode n) { - UUID nodeId = n.id(); - NodeInfo nodeInfo = addedNodes.get(nodeId); - - if (nodeInfo != null) - return nodeInfo; - - assert hashIdRslvr != null; - - nodeInfo = new NodeInfo(nodeId, hashIdRslvr.resolve(n), n); - - neighbors.clear(); - - nodeHash.addNode(nodeInfo, replicas(n)); - - addedNodes.put(nodeId, nodeInfo); - - return nodeInfo; - } - - /** {@inheritDoc} */ - private void initialize() { - if (!init.get() && init.compareAndSet(false, true)) { - if (log.isInfoEnabled()) - log.info("Consistent hash configuration [cacheName=" + cacheName + ", partitions=" + parts + - ", excludeNeighbors=" + exclNeighbors + ", replicas=" + replicas + - ", backupFilter=" + backupFilter + ", hashIdRslvr=" + hashIdRslvr + ']'); - - nodeHash = new GridConsistentHash<>(); - - initLatch.countDown(); - } - else { - if (initLatch.getCount() > 0) { - try { - U.await(initLatch); - } - catch (IgniteInterruptedCheckedException ignored) { - // Recover interrupted state flag. - Thread.currentThread().interrupt(); - } - } - } - } - - /** - * @param n Node. - * @return Replicas. - */ - private int replicas(ClusterNode n) { - Integer nodeReplicas = n.attribute(attrName); - - if (nodeReplicas == null) - nodeReplicas = replicas; - - return nodeReplicas; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheConsistentHashAffinityFunction.class, this); - } - - /** - * Node hash ID. - */ - private static final class NodeInfo implements Comparable { - /** Node ID. */ - private UUID nodeId; - - /** Hash ID. */ - private Object hashId; - - /** Grid node. */ - private ClusterNode node; - - /** - * @param nodeId Node ID. - * @param hashId Hash ID. - * @param node Rich node. - */ - private NodeInfo(UUID nodeId, Object hashId, ClusterNode node) { - assert nodeId != null; - assert hashId != null; - - this.hashId = hashId; - this.nodeId = nodeId; - this.node = node; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** - * @return Hash ID. - */ - public Object hashId() { - return hashId; - } - - /** - * @return Node. - */ - public ClusterNode node() { - return node; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return hashId.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (!(obj instanceof NodeInfo)) - return false; - - NodeInfo that = (NodeInfo)obj; - - // If objects are equal, hash codes should be the same. - // Cannot use that.hashId.equals(hashId) due to Comparable interface restrictions. - return that.nodeId.equals(nodeId) && that.hashCode() == hashCode(); - } - - /** {@inheritDoc} */ - @Override public int compareTo(NodeInfo o) { - int diff = nodeId.compareTo(o.nodeId); - - if (diff == 0) { - int h1 = hashCode(); - int h2 = o.hashCode(); - - diff = h1 == h2 ? 0 : (h1 < h2 ? -1 : 1); - } - - return diff; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(NodeInfo.class, this); - } - } -}