lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From no...@apache.org
Subject lucene-solr:master: SOLR-12495: An #EQUALS function for replica in autoscaling policy to equally distribute replicas
Date Wed, 04 Jul 2018 14:40:03 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/master 7b2a2d989 -> f86c47752


SOLR-12495: An #EQUALS function for replica in autoscaling policy to equally distribute replicas


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/f86c4775
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/f86c4775
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/f86c4775

Branch: refs/heads/master
Commit: f86c477521637a5cd48808c9f6b3d86b6db92b42
Parents: 7b2a2d9
Author: Noble Paul <noble@apache.org>
Authored: Thu Jul 5 00:39:52 2018 +1000
Committer: Noble Paul <noble@apache.org>
Committed: Thu Jul 5 00:39:52 2018 +1000

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../client/solrj/cloud/autoscaling/Clause.java  |  53 +++++--
 .../solrj/cloud/autoscaling/Suggestion.java     |  92 +++++++----
 .../solrj/impl/SolrClientNodeStateProvider.java |  45 ++++--
 .../solrj/cloud/autoscaling/TestPolicy.java     | 153 +++++++++++++++++++
 5 files changed, 296 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f86c4775/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index a6a7890..0b7bf85 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -101,6 +101,8 @@ New Features
 * SOLR-12530: Ability to disable configset upload via -Dconfigset.upload.enabled=false startup
parameter
   (Ishan Chattopadhyaya)
 
+* SOLR-12495: An #EQUALS function for replica in autoscaling policy to equally distribute
replicas (noble)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f86c4775/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
index e51158c..60ebf9b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
@@ -37,7 +37,6 @@ import org.apache.solr.common.util.Utils;
 
 import static java.util.Collections.singletonMap;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.TestStatus.PASS;
-import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.EQUAL;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.GREATER_THAN;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.LESS_THAN;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.NOT_EQUAL;
@@ -103,9 +102,20 @@ public class Clause implements MapWriter, Comparable<Clause> {
         throw new RuntimeException("Invalid metrics: param in " + Utils.toJSONString(m) +
" must have at 2 or 3 segments after 'metrics:' separated by ':'");
       }
     }
+    doPostValidate(collection, shard, replica, tag, globalTag);
     hasComputedValue = hasComputedValue();
   }
 
+  private void doPostValidate(Condition... conditions) {
+    for (Condition condition : conditions) {
+      if (condition == null) continue;
+      String err = condition.varType.postValidate(condition);
+      if (err != null) {
+        throw new IllegalArgumentException(StrUtils.formatString("Error in clause : {0},
caused by : {1}", Utils.toJSONString(original), err));
+      }
+    }
+  }
+
   public static Clause create(Map<String, Object> m) {
     Clause clause = new Clause(m);
     return clause.hasComputedValue() ?
@@ -211,7 +221,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
 
   //replica value is zero
   boolean isReplicaZero() {
-    return replica != null && replica.getOperand() == EQUAL &&
+    return replica != null && replica.getOperand() == Operand.EQUAL &&
         Preference.compareWithTolerance(0d, (Double) replica.val, 1) == 0;
   }
 
@@ -238,14 +248,14 @@ public class Clause implements MapWriter, Comparable<Clause> {
         else if (strVal.startsWith(NOT_EQUAL.operand)) operand = NOT_EQUAL;
         else if (strVal.startsWith(GREATER_THAN.operand)) operand = GREATER_THAN;
         else if (strVal.startsWith(LESS_THAN.operand)) operand = LESS_THAN;
-        else operand = EQUAL;
-        strVal = strVal.substring(EQUAL == operand || WILDCARD == operand ? 0 : 1);
+        else operand = Operand.EQUAL;
+        strVal = strVal.substring(Operand.EQUAL == operand || WILDCARD == operand ? 0 : 1);
         for (ComputationType t : ComputationType.values()) {
           String changedVal = t.match(strVal);
           if (changedVal != null) {
             computationType = t;
             strVal = changedVal;
-            if (varType == null || !varType.supportComputed(computationType)) {
+            if (varType == null || !varType.supportComputed(computationType, this)) {
               throw new IllegalArgumentException(StrUtils.formatString("''{0}'' is not allowed
for variable :  ''{1}'' , in condition : ''{2}'' ",
                   t, conditionName, Utils.toJSONString(m)));
             }
@@ -255,7 +265,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
         expectedVal = validate(s, new Condition(s, strVal, operand, computationType, null),
true);
 
       } else if (val instanceof Number) {
-        operand = EQUAL;
+        operand = Operand.EQUAL;
         operand = varType.getOperand(operand, val, null);
         expectedVal = validate(s, new Condition(s, val, operand, null, null), true);
       }
@@ -272,8 +282,8 @@ public class Clause implements MapWriter, Comparable<Clause> {
     ComputedValueEvaluator computedValueEvaluator = new ComputedValueEvaluator(session);
     Suggestion.ViolationCtx ctx = new Suggestion.ViolationCtx(this, session.matrix, computedValueEvaluator);
     if (isPerCollectiontag()) {
-      Map<String, Map<String, Map<String, ReplicaCount>>> replicaCount
= computeReplicaCounts(session.matrix, computedValueEvaluator);
-      for (Map.Entry<String, Map<String, Map<String, ReplicaCount>>> e
: replicaCount.entrySet()) {
+      Map<String, Map<String, Map<String, ReplicaCount>>> replicaCounts
= computeReplicaCounts(session.matrix, computedValueEvaluator);
+      for (Map.Entry<String, Map<String, Map<String, ReplicaCount>>> e
: replicaCounts.entrySet()) {
         computedValueEvaluator.collName = e.getKey();
         if (!collection.isPass(computedValueEvaluator.collName)) continue;
         for (Map.Entry<String, Map<String, ReplicaCount>> shardVsCount : e.getValue().entrySet())
{
@@ -281,15 +291,16 @@ public class Clause implements MapWriter, Comparable<Clause> {
           if (!shard.isPass(computedValueEvaluator.shardName)) continue;
           for (Map.Entry<String, ReplicaCount> counts : shardVsCount.getValue().entrySet())
{
             SealedClause sealedClause = getSealedClause(computedValueEvaluator);
-            if (!sealedClause.replica.isPass(counts.getValue())) {
+            ReplicaCount replicas = counts.getValue();
+            if (!sealedClause.replica.isPass(replicas)) {
               Violation violation = new Violation(sealedClause,
                   computedValueEvaluator.collName,
                   computedValueEvaluator.shardName,
                   tag.name.equals("node") ? counts.getKey() : null,
                   counts.getValue(),
-                  sealedClause.getReplica().delta(counts.getValue()),
+                  sealedClause.getReplica().delta(replicas),
                   counts.getKey());
-              Suggestion.getTagType(tag.name).addViolatingReplicas(ctx.reset(counts.getKey(),
counts.getValue(), violation));
+              tag.varType.addViolatingReplicas(ctx.reset(counts.getKey(), replicas, violation));
             }
           }
         }
@@ -336,6 +347,21 @@ public class Clause implements MapWriter, Comparable<Clause> {
   }
 
   enum ComputationType {
+    EQUAL() {
+      @Override
+      public String wrap(String value) {
+        return "#EQUAL";
+      }
+
+      @Override
+      public String match(String val) {
+        if ("#EQUAL".equals(val)) return "1";
+        return null;
+      }
+
+    },
+
+
     PERCENT {
       @Override
       public String wrap(String value) {
@@ -584,6 +610,11 @@ public class Clause implements MapWriter, Comparable<Clause> {
     }
 
     @Override
+    public String toString() {
+      return jsonStr();
+    }
+
+    @Override
     public void writeMap(EntryWriter ew) throws IOException {
       ew.put("min", min).put("max", max).putIfNotNull("actual", actual);
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f86c4775/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
index 590f720..f5e25cc 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
@@ -138,6 +138,7 @@ public class Suggestion {
 
       @Override
       public Operand getOperand(Operand expected, Object strVal, Clause.ComputationType computationType)
{
+//        if (computationType == Clause.ComputationType.EQUAL) return expected;
         if (strVal instanceof String) {
           String s = ((String) strVal).trim();
           int hyphenIdx = s.indexOf('-');
@@ -161,29 +162,37 @@ public class Suggestion {
       }
 
       @Override
-      public boolean supportComputed(Clause.ComputationType computedType) {
-        return computedType == Clause.ComputationType.PERCENT;
+      public boolean supportComputed(Clause.ComputationType computedType, Clause clause)
{
+        if (computedType == Clause.ComputationType.PERCENT || computedType == Clause.ComputationType.EQUAL)
return true;
+        return false;
       }
 
       @Override
-      public Object computeValue(Policy.Session session, Clause.Condition cv, String collection,
String shard) {
-        if (cv.computationType == Clause.ComputationType.PERCENT) {
-          AtomicInteger totalReplicasOfInterest = new AtomicInteger(0);
-          Clause clause = cv.getClause();
-          for (Row row : session.matrix) {
-            row.forEachReplica(replicaInfo -> {
-              if (replicaInfo.getCollection().equals(collection)) {
-                if (clause.getShard().op == Operand.WILDCARD || replicaInfo.getShard().equals(shard))
{
-                  if(cv.getClause().type == null || replicaInfo.getType() == cv.getClause().type)
-                  totalReplicasOfInterest.incrementAndGet();
-                }
-              }
-            });
+      public String postValidate(Clause.Condition condition) {
+        if (condition.computationType == Clause.ComputationType.EQUAL) {
+          if (condition.getClause().tag != null &&
+              condition.getClause().tag.varType == NODE &&
+              condition.getClause().tag.op == Operand.WILDCARD) {
+            return null;
+          } else {
+            return "'replica': '#EQUAL` must be used with 'node':'#ANY'";
           }
+        }
+        return null;
+      }
 
-          return totalReplicasOfInterest.doubleValue() * Clause.parseDouble(cv.name, cv.val).doubleValue()
/ 100;
+      @Override
+      public Object computeValue(Policy.Session session, Clause.Condition cv, String collection,
String shard) {
+        if (cv.computationType == Clause.ComputationType.EQUAL) {
+          int relevantReplicasCount = getRelevantReplicasCount(session, cv, collection, shard);
+          if (relevantReplicasCount == 0) return 0;
+          return (double) session.matrix.size() / (double) relevantReplicasCount;
+        } else if (cv.computationType == Clause.ComputationType.PERCENT) {
+          int relevantReplicasCount = getRelevantReplicasCount(session, cv, collection, shard);
+          if (relevantReplicasCount == 0) return 0;
+          return (double) relevantReplicasCount * Clause.parseDouble(cv.name, cv.val).doubleValue()
/ 100;
         } else {
-          throw new RuntimeException("Unsupported type " + cv.computationType);
+          throw new IllegalArgumentException("Unsupported type " + cv.computationType);
 
         }
       }
@@ -346,6 +355,13 @@ public class Suggestion {
         }
 
       }
+
+      @Override
+      public void addViolatingReplicas(ViolationCtx ctx) {
+        for (Row r : ctx.allRows) {
+          if(r.node.equals(ctx.tagKey)) collectViolatingReplicas(ctx,r);
+        }
+      }
     },
     LAZY("LAZY", null, null, null, null) {
       @Override
@@ -398,14 +414,7 @@ public class Suggestion {
 
     public void addViolatingReplicas(ViolationCtx ctx) {
       for (Row row : ctx.allRows) {
-        row.forEachReplica(replica -> {
-          if (ctx.clause.replica.isPass(0) && !ctx.clause.tag.isPass(row)) return;
-          if (!ctx.clause.replica.isPass(0) && ctx.clause.tag.isPass(row)) return;
-          if (!ctx.currentViolation.matchShard(replica.getShard())) return;
-          if (!ctx.clause.collection.isPass(ctx.currentViolation.coll) || !ctx.clause.shard.isPass(ctx.currentViolation.shard))
-            return;
-          ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica).withDelta(ctx.clause.tag.delta(row.getVal(ctx.clause.tag.name))));
-        });
+        collectViolatingReplicas(ctx, row);
       }
     }
 
@@ -418,6 +427,10 @@ public class Suggestion {
       return val;
     }
 
+    public String postValidate(Clause.Condition condition) {
+      return null;
+    }
+
     public Object validate(String name, Object val, boolean isRuleVal) {
       if (val instanceof Clause.Condition) {
         Clause.Condition condition = (Clause.Condition) val;
@@ -472,7 +485,7 @@ public class Suggestion {
       return Math.abs(v1.replicaCountDelta) < Math.abs(v2.replicaCountDelta) ? -1 : 1;
     }
 
-    public boolean supportComputed(Clause.ComputationType computedType) {
+    public boolean supportComputed(Clause.ComputationType computedType, Clause clause) {
       return false;
     }
 
@@ -481,6 +494,33 @@ public class Suggestion {
     }
   }
 
+  private static void collectViolatingReplicas(ViolationCtx ctx, Row row) {
+    row.forEachReplica(replica -> {
+      if (ctx.clause.replica.isPass(0) && !ctx.clause.tag.isPass(row)) return;
+      if (!ctx.clause.replica.isPass(0) && ctx.clause.tag.isPass(row)) return;
+      if (!ctx.currentViolation.matchShard(replica.getShard())) return;
+      if (!ctx.clause.collection.isPass(ctx.currentViolation.coll) || !ctx.clause.shard.isPass(ctx.currentViolation.shard))
+        return;
+      ctx.currentViolation.addReplica(new ReplicaInfoAndErr(replica).withDelta(ctx.clause.tag.delta(row.getVal(ctx.clause.tag.name))));
+    });
+  }
+
+  private static int getRelevantReplicasCount(Policy.Session session, Clause.Condition cv,
String collection, String shard) {
+    AtomicInteger totalReplicasOfInterest = new AtomicInteger(0);
+    Clause clause = cv.getClause();
+    for (Row row : session.matrix) {
+      row.forEachReplica(replicaInfo -> {
+        if (replicaInfo.getCollection().equals(collection)) {
+          if (clause.getShard() ==null || clause.getShard().op == Operand.WILDCARD || replicaInfo.getShard().equals(shard))
{
+            if (cv.getClause().type == null || replicaInfo.getType() == cv.getClause().type)
+              totalReplicasOfInterest.incrementAndGet();
+          }
+        }
+      });
+    }
+    return totalReplicasOfInterest.get();
+  }
+
   static class ViolationCtx {
     final Function<Clause.Condition, Object> evaluator;
     String tagKey;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f86c4775/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
index b65d42a..903a75c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
@@ -43,7 +43,6 @@ import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
 import org.apache.solr.common.cloud.rule.SnitchContext;
 import org.apache.solr.common.params.CommonParams;
@@ -71,19 +70,31 @@ public class SolrClientNodeStateProvider implements NodeStateProvider,
MapWriter
 
 
   private final CloudSolrClient solrClient;
-  private final ZkStateReader zkStateReader;
   private final Map<String, Map<String, Map<String, List<ReplicaInfo>>>>
nodeVsCollectionVsShardVsReplicaInfo = new HashMap<>();
   private Map<String, Object> snitchSession = new HashMap<>();
   private Map<String, Map> nodeVsTags = new HashMap<>();
 
   public SolrClientNodeStateProvider(CloudSolrClient solrClient) {
     this.solrClient = solrClient;
-    this.zkStateReader = solrClient.getZkStateReader();
-    ClusterState clusterState = zkStateReader.getClusterState();
+    try {
+      readReplicaDetails();
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+    if(log.isDebugEnabled()) INST = this;
+  }
+
+  protected ClusterStateProvider getClusterStateProvider() {
+    return solrClient.getClusterStateProvider();
+  }
+
+  private void readReplicaDetails() throws IOException {
+    ClusterStateProvider clusterStateProvider = getClusterStateProvider();
+    ClusterState clusterState = clusterStateProvider.getClusterState();
     if (clusterState == null) { // zkStateReader still initializing
       return;
     }
-    Map<String, ClusterState.CollectionRef> all = clusterState.getCollectionStates();
+    Map<String, ClusterState.CollectionRef> all = clusterStateProvider.getClusterState().getCollectionStates();
     all.forEach((collName, ref) -> {
       DocCollection coll = ref.get();
       if (coll == null) return;
@@ -94,7 +105,6 @@ public class SolrClientNodeStateProvider implements NodeStateProvider,
MapWriter
         replicas.add(new ReplicaInfo(collName, shard, replica, new HashMap<>(replica.getProperties())));
       });
     });
-    if(log.isDebugEnabled()) INST = this;
   }
 
   @Override
@@ -107,10 +117,15 @@ public class SolrClientNodeStateProvider implements NodeStateProvider,
MapWriter
 
   @Override
   public Map<String, Object> getNodeValues(String node, Collection<String> tags)
{
+    Map<String, Object> tagVals = fetchTagValues(node, tags);
+    nodeVsTags.put(node, tagVals);
+    return tagVals;
+  }
+
+  protected Map<String, Object> fetchTagValues(String node, Collection<String>
tags) {
     AutoScalingSnitch snitch = new AutoScalingSnitch();
     ClientSnitchCtx ctx = new ClientSnitchCtx(null, node, snitchSession, solrClient);
     snitch.getTags(node, new HashSet<>(tags), ctx);
-    nodeVsTags.put(node, ctx.getTags());
     return ctx.getTags();
   }
 
@@ -140,11 +155,10 @@ public class SolrClientNodeStateProvider implements NodeStateProvider,
MapWriter
       });
 
       if (!keyVsReplica.isEmpty()) {
-        ClientSnitchCtx ctx = new ClientSnitchCtx(null, null, emptyMap(), solrClient);
-        fetchMetrics(node, ctx,
+        Map<String, Object> tags = fetchReplicaMetrics(node,
             keyVsReplica.entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getKey)));
-        ctx.getTags().forEach((k, o) -> {
+        tags.forEach((k, o) -> {
           Pair<String, ReplicaInfo> p = keyVsReplica.get(k);
           Suggestion.ConditionType validator = Suggestion.getTagType(p.first());
           if (validator != null) o = validator.convertVal(o);
@@ -156,7 +170,14 @@ public class SolrClientNodeStateProvider implements NodeStateProvider,
MapWriter
     return result;
   }
 
-  static void fetchMetrics(String solrNode, ClientSnitchCtx ctx, Map<String, Object>
metricsKeyVsTag) {
+  protected  Map<String,Object> fetchReplicaMetrics(String solrNode, Map<String,
Object> metricsKeyVsTag) {
+    ClientSnitchCtx ctx = new ClientSnitchCtx(null, null, emptyMap(), solrClient);
+    fetchReplicaMetrics(solrNode, ctx,metricsKeyVsTag);
+    return ctx.getTags();
+
+  }
+
+  static void fetchReplicaMetrics(String solrNode, ClientSnitchCtx ctx, Map<String, Object>
metricsKeyVsTag) {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.add("key", metricsKeyVsTag.keySet().toArray(new String[0]));
     try {
@@ -209,7 +230,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider,
MapWriter
         });
       }
       if (!metricsKeyVsTag.isEmpty()) {
-        fetchMetrics(solrNode, snitchContext, metricsKeyVsTag);
+        fetchReplicaMetrics(solrNode, snitchContext, metricsKeyVsTag);
       }
 
       Set<String> groups = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f86c4775/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
index bee7e54..112d136 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrResponse;
@@ -45,12 +46,15 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.Clause.RangeVal;
 import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.rule.ImplicitSnitch;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.params.SolrParams;
@@ -67,6 +71,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.REPLICA;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
@@ -262,6 +267,154 @@ public class TestPolicy extends SolrTestCaseJ4 {
     expectThrows(IllegalArgumentException.class,
         () -> Clause.create((Map<String, Object>) Utils.fromJSONString("{replica:
'20%-33%', node:'#ANY'}")));
 
+    clause = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '#EQUAL',
shard:'#EACH', node:'#ANY'}"));
+    assertEquals(Operand.RANGE_EQUAL, clause.replica.op);
+    clause = Clause.create((Map<String, Object>) Utils.fromJSONString("{replica: '#EQUAL',
node:'#ANY'}"));
+    assertEquals(Operand.RANGE_EQUAL, clause.replica.op);
+    expectThrows(IllegalArgumentException.class,
+        () -> Clause.create((Map<String, Object>) Utils.fromJSONString("{replica:
'#EQUAL', node:'node_1'}")));
+  }
+
+
+  public void testEqualFunction() {
+
+    String clusterStateStr = "{" +
+        "  'coll1': {" +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards': {" +
+        "      'shard1': {" +
+        "        'range': '80000000-ffffffff'," +
+        "        'replicas': {" +
+        "          'r1': {" +
+        "            'core': 'r1'," +
+        "            'base_url': 'http://10.0.0.4:8983/solr'," +
+        "            'node_name': 'node1'," +
+        "            'state': 'active'," +
+        "            'leader': 'true'" +
+        "          }," +
+        "          'r2': {" +
+        "            'core': 'r2'," +
+        "            'base_url': 'http://10.0.0.4:7574/solr'," +
+        "            'node_name': 'node2'," +
+        "            'state': 'active'" +
+        "          }" +
+        "        }" +
+        "      }," +
+        "      'shard2': {" +
+        "        'range': '0-7fffffff'," +
+        "        'replicas': {" +
+        "          'r3': {" +
+        "            'core': 'r3'," +
+        "            'base_url': 'http://10.0.0.4:8983/solr'," +
+        "            'node_name': 'node1'," +
+        "            'state': 'active'," +
+        "            'leader': 'true'" +
+        "          }," +
+        "          'r4': {" +
+        "            'core': 'r4'," +
+        "            'base_url': 'http://10.0.0.4:8987/solr'," +
+        "            'node_name': 'node4'," +
+        "            'state': 'active'" +
+        "          }," +
+        "          'r6': {" +
+        "            'core': 'r6'," +
+        "            'base_url': 'http://10.0.0.4:8989/solr'," +
+        "            'node_name': 'node3'," +
+        "            'state': 'active'" +
+        "          }," +
+        "          'r5': {" +
+        "            'core': 'r5'," +
+        "            'base_url': 'http://10.0.0.4:8983/solr'," +
+        "            'node_name': 'node1'," +
+        "            'state': 'active'" +
+        "          }" +
+        "        }" +
+        "      }" +
+        "    }" +
+        "  }" +
+        "}";
+
+
+    ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
+        ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
+    DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null)
{
+      @Override
+      public ClusterState getClusterState() throws IOException {
+        return clusterState;
+      }
+
+      @Override
+      public Set<String> getLiveNodes() {
+        return clusterState.getLiveNodes();
+      }
+    };
+
+    SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null)
{
+      @Override
+      protected Map<String, Object> fetchTagValues(String node, Collection<String>
tags) {
+        Map<String, Object> result = new HashMap<>();
+        AtomicInteger cores = new AtomicInteger();
+        forEachReplica(node, replicaInfo -> cores.incrementAndGet());
+        if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
+        if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
+        return result;
+      }
+
+      @Override
+      protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String,
Object> metricsKeyVsTag) {
+        //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
+        Map<String, Object> result = new HashMap<>();
+        metricsKeyVsTag.forEach((k, v) -> {
+          if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
+        });
+
+        return result;
+      }
+
+      @Override
+      protected ClusterStateProvider getClusterStateProvider() {
+        return clusterStateProvider;
+      }
+    };
+
+    Map policies = (Map) Utils.fromJSONString("{" +
+        "  'cluster-preferences': [" +
+        "    { 'minimize': 'cores', 'precision': 50}" +
+        "  ]," +
+        "  'cluster-policy': [" +
+        "    { 'replica': '#EQUAL', 'node': '#ANY'}," +
+        "  ]" +
+        "}");
+    AutoScalingConfig config = new AutoScalingConfig(policies);
+    Policy policy = config.getPolicy();
+    Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
+      @Override
+      public ClusterStateProvider getClusterStateProvider() {
+        return clusterStateProvider;
+      }
+
+      @Override
+      public NodeStateProvider getNodeStateProvider() {
+        return solrClientNodeStateProvider;
+      }
+    });
+    List<Violation> violations = session.getViolations();
+    assertEquals(1, violations.size());
+    Violation violation = violations.get(0);
+    assertEquals("node1", violation.node);
+    RangeVal val = (RangeVal) violation.getClause().replica.val;
+    assertEquals(0.0, val.min);
+    assertEquals(1.0, val.max);
+    assertEquals(0, Preference.compareWithTolerance(val.actual.doubleValue(), 0.833, 1));
+    assertEquals(3, violation.getViolatingReplicas().size());
+    Set<String> expected = ImmutableSet.of("r1", "r3", "r5");
+    for (Violation.ReplicaInfoAndErr replicaInfoAndErr : violation.getViolatingReplicas())
{
+      assertTrue(expected.contains(replicaInfoAndErr.replicaInfo.getCore()));
+    }
+    System.out.println();
+
   }
 
   private static void expectError(String name, Object val, String msg) {


Mime
View raw message