lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject [1/2] lucene-solr:branch_7x: SOLR-11990: Make it possible to co-locate replicas of multiple collections together in a node.
Date Sun, 29 Jul 2018 03:11:04 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x bde2f2a54 -> 3c3137728


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
index 87fecda..8fd815e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
@@ -38,7 +38,6 @@ import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.Utils;
 
 import static java.util.Collections.singletonMap;
-import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.TestStatus.PASS;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.EQUAL;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.GREATER_THAN;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Operand.LESS_THAN;
@@ -75,6 +74,17 @@ public class Clause implements MapWriter, Comparable<Clause> {
     this.strict = clause.strict;
   }
 
+  // internal use only
+  Clause(Map<String, Object> original, Condition tag, Condition globalTag, boolean isStrict)  {
+    this.original = original;
+    this.tag = tag;
+    this.globalTag = globalTag;
+    this.globalTag.clause = this;
+    this.type = null;
+    this.hasComputedValue = false;
+    this.strict = isStrict;
+  }
+
   private Clause(Map<String, Object> m) {
     this.original = Utils.getDeepCopy(m, 10);
     String type = (String) m.get("type");
@@ -374,7 +384,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
       for (Row r : session.matrix) {
         SealedClause sealedClause = getSealedClause(computedValueEvaluator);
         if (!sealedClause.getGlobalTag().isPass(r)) {
-          ConditionType.CORES.addViolatingReplicas(ctx.reset(null, null,
+          sealedClause.getGlobalTag().varType.addViolatingReplicas(ctx.reset(null, null,
               new Violation(sealedClause, null, null, r.node, r.getVal(sealedClause.globalTag.name), sealedClause.globalTag.delta(r.getVal(globalTag.name)), null)));
         }
       }
@@ -553,21 +563,21 @@ public class Clause implements MapWriter, Comparable<Clause> {
     }
 
     boolean isPass(Object inputVal) {
+      return isPass(inputVal, null);
+    }
+
+    boolean isPass(Object inputVal, Row row) {
       if (computedType != null) {
         throw new IllegalStateException("This is supposed to be called only from a Condition with no computed value or a SealedCondition");
 
       }
       if (inputVal instanceof ReplicaCount) inputVal = ((ReplicaCount) inputVal).getVal(getClause().type);
-      if (varType == ConditionType.LAZY) { // we don't know the type
-        return op.match(parseString(val), parseString(inputVal)) == PASS;
-      } else {
-        return op.match(val, validate(name, inputVal, false)) == PASS;
-      }
+      return varType.match(inputVal, op, val, name, row);
     }
 
 
     boolean isPass(Row row) {
-      return isPass(row.getVal(name));
+      return isPass(row.getVal(name), row);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java
index 2843f8a..fffaee2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/MoveReplicaSuggester.java
@@ -17,6 +17,7 @@
 
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Comparator;
 import java.util.List;
 
@@ -24,10 +25,13 @@ import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
 
 public class MoveReplicaSuggester extends Suggester {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   @Override
   SolrRequest init() {
@@ -56,20 +60,28 @@ public class MoveReplicaSuggester extends Suggester {
         targetRow = session.matrix.get(j);
         if (targetRow.node.equals(fromRow.node)) continue;
         if (!isNodeSuitableForReplicaAddition(targetRow)) continue;
-        targetRow = targetRow.addReplica(ri.getCollection(), ri.getShard(), ri.getType());//add replica to target first
-        Pair<Row, ReplicaInfo> pair = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node
-        if (pair == null) continue;//should not happen
-        Row srcRowModified = pair.first();//this is the final state of the source row and session
+        targetRow = targetRow.addReplica(ri.getCollection(), ri.getShard(), ri.getType(), strict); // add replica to target first
+        Row srcRowModified = targetRow.session.getNode(fromRow.node).removeReplica(ri.getCollection(), ri.getShard(), ri.getType());//then remove replica from source node
         List<Violation> errs = testChangedMatrix(strict, srcRowModified.session);
-        srcRowModified.session.applyRules();// now resort the nodes with the new values
+        srcRowModified.session.applyRules(); // now resort the nodes with the new values
         Policy.Session tmpSession = srcRowModified.session;
+
         if (!containsNewErrors(errs) &&
             isLessSerious(errs, leastSeriousViolation) &&
             (force || (tmpSession.indexOf(srcRowModified.node) < tmpSession.indexOf(targetRow.node)))) {
-          leastSeriousViolation = errs;
-          bestSrcRow = srcRowModified;
-          sourceReplicaInfo = ri;
-          bestTargetRow = targetRow;
+
+          int result = -1;
+          if (!force && srcRowModified.isLive && targetRow.isLive)  {
+            result = tmpSession.getPolicy().clusterPreferences.get(0).compare(srcRowModified, tmpSession.getNode(targetRow.node), true);
+            if (result == 0) result = tmpSession.getPolicy().clusterPreferences.get(0).compare(srcRowModified, tmpSession.getNode(targetRow.node), false);
+          }
+
+          if (result <= 0) {
+            leastSeriousViolation = errs;
+            bestSrcRow = srcRowModified;
+            sourceReplicaInfo = ri;
+            bestTargetRow = targetRow;
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index a5f57bb..879bb74 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -44,6 +44,7 @@ import org.apache.solr.common.IteratorWriter;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
+import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.StrUtils;
@@ -55,6 +56,8 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.toList;
+import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.NODE;
+import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.WITH_COLLECTION;
 
 /*The class that reads, parses and applies policies specified in
  * autoscaling.json
@@ -74,7 +77,7 @@ public class Policy implements MapWriter {
   public static final String POLICIES = "policies";
   public static final String CLUSTER_POLICY = "cluster-policy";
   public static final String CLUSTER_PREFERENCES = "cluster-preferences";
-  public static final Set<String> GLOBAL_ONLY_TAGS = Collections.singleton("cores");
+  public static final Set<String> GLOBAL_ONLY_TAGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList("cores", CollectionAdminParams.WITH_COLLECTION)));
   public static final List<Preference> DEFAULT_PREFERENCES = Collections.unmodifiableList(
       Arrays.asList(
           new Preference((Map<String, Object>) Utils.fromJSONString("{minimize : cores, precision:1}")),
@@ -131,9 +134,12 @@ public class Policy implements MapWriter {
 
     this.policies = Collections.unmodifiableMap(
         policiesFromMap((Map<String, List<Map<String, Object>>>) jsonMap.getOrDefault(POLICIES, emptyMap()), newParams));
-    this.params = Collections.unmodifiableList(newParams.stream()
+    List<Pair<String, Suggestion.ConditionType>> params = newParams.stream()
         .map(s -> new Pair<>(s, Suggestion.getTagType(s)))
-        .collect(toList()));
+        .collect(toList());
+    //let this be there always, there is no extra cost
+    params.add(new Pair<>(WITH_COLLECTION.tagName, WITH_COLLECTION));
+    this.params = Collections.unmodifiableList(params);
     perReplicaAttributes = readPerReplicaAttrs();
   }
 
@@ -501,6 +507,21 @@ public class Policy implements MapWriter {
           .filter(clause -> !clause.isPerCollectiontag())
           .collect(Collectors.toList());
 
+      if (nodes.size() > 0) {
+        //if any collection has 'withCollection' irrespective of the node, the NodeStateProvider returns a map value
+        Map<String, Object> vals = nodeStateProvider.getNodeValues(nodes.get(0), Collections.singleton("withCollection"));
+        if (!vals.isEmpty() && vals.get("withCollection") != null) {
+          Map<String, String> withCollMap = (Map<String, String>) vals.get("withCollection");
+          if (!withCollMap.isEmpty()) {
+            Clause withCollClause = new Clause((Map<String,Object>)Utils.fromJSONString("{withCollection:'*' , node: '#ANY'}") ,
+                new Clause.Condition(NODE.tagName, "#ANY", Operand.EQUAL, null, null),
+                new Clause.Condition(WITH_COLLECTION.tagName,"*" , Operand.EQUAL, null, null), true
+            );
+            expandedClauses.add(withCollClause);
+          }
+        }
+      }
+
       ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider();
       for (String c : collections) {
         addClausesForCollection(stateProvider, c);
@@ -594,4 +615,15 @@ public class Policy implements MapWriter {
       throw new RuntimeException("NO such node found " + node);
     }
   }
+  static final Map<String, Suggestion.ConditionType> validatetypes = new HashMap<>();
+  static {
+    for (Suggestion.ConditionType t : Suggestion.ConditionType.values())
+      validatetypes.put(t.tagName, t);
+  }
+  public static ConditionType getTagType(String name) {
+    ConditionType info = validatetypes.get(name);
+    if (info == null && name.startsWith(ImplicitSnitch.SYSPROP)) info = ConditionType.STRING;
+    if (info == null && name.startsWith(Clause.METRICS_PREFIX)) info = ConditionType.LAZY;
+    return info;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index f00675ba..cb9f3a0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -93,7 +93,7 @@ public class PolicyHelper {
         if (autoScalingConfig != null) {
           return new DelegatingDistribStateManager(null) {
             @Override
-            public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
+            public AutoScalingConfig getAutoScalingConfig() {
               return autoScalingConfig;
             }
           };
@@ -135,7 +135,7 @@ public class PolicyHelper {
           }
         }
       } catch (IOException e) {
-        /*ignore*/
+        log.warn("Exception while reading disk free metric values for nodes to be used for collection: " + collName, e);
       }
 
 
@@ -178,7 +178,7 @@ public class PolicyHelper {
   }
 
 
-  public static final int SESSION_EXPIRY = 180;//3 seconds
+  public static final int SESSION_EXPIRY = 180; // 3 minutes
 
   public static MapWriter getDiagnostics(Policy policy, SolrCloudManager cloudManager) {
     Policy.Session session = policy.createSession(cloudManager);
@@ -230,7 +230,7 @@ public class PolicyHelper {
   /**Use this to dump the state of a system and to generate a testcase
    */
   public static void logState(SolrCloudManager cloudManager, Suggester suggester) {
-    if(log.isTraceEnabled()) {
+    if (log.isTraceEnabled()) {
       log.trace("LOGSTATE: {}",
           Utils.toJSONString((MapWriter) ew -> {
             ew.put("liveNodes", cloudManager.getClusterStateProvider().getLiveNodes());
@@ -249,9 +249,9 @@ public class PolicyHelper {
 
   public enum Status {
     NULL,
-    //it is just created and not yet used or all operations on it has been competed fully
+    //it is just created and not yet used or all operations on it has been completed fully
     UNUSED,
-    COMPUTING, EXECUTING;
+    COMPUTING, EXECUTING
   }
 
   /**
@@ -265,7 +265,7 @@ public class PolicyHelper {
    */
   static class SessionRef {
     private final Object lockObj = new Object();
-    private SessionWrapper sessionWrapper = SessionWrapper.DEF_INST;
+    private SessionWrapper sessionWrapper = SessionWrapper.DEFAULT_INSTANCE;
 
 
     public SessionRef() {
@@ -286,7 +286,7 @@ public class PolicyHelper {
       synchronized (lockObj) {
         if (sessionWrapper.createTime == this.sessionWrapper.createTime && this.sessionWrapper.refCount.get() <= 0) {
           log.debug("session set to NULL");
-          this.sessionWrapper = SessionWrapper.DEF_INST;
+          this.sessionWrapper = SessionWrapper.DEFAULT_INSTANCE;
         } // else somebody created a new session b/c of expiry . So no need to do anything about it
       }
     }
@@ -311,7 +311,7 @@ public class PolicyHelper {
           //one thread who is waiting for this need to be notified.
           lockObj.notify();
         } else {
-          log.info("create time NOT SAME {} ", SessionWrapper.DEF_INST.createTime);
+          log.info("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
           //else just ignore it
         }
       }
@@ -343,7 +343,7 @@ public class PolicyHelper {
             }
             log.debug("out of waiting curr-time:{} time-elapsed {}", time(timeSource, MILLISECONDS), timeElapsed(timeSource, waitStart, MILLISECONDS));
             // now this thread has woken up because it got timed out after 10 seconds or it is notified after
-            //the session was returned from another COMPUTING operation
+            // the session was returned from another COMPUTING operation
             if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
               log.debug("Wait over. reusing the existing session ");
               this.sessionWrapper.status = Status.COMPUTING;
@@ -400,12 +400,12 @@ public class PolicyHelper {
 
 
   public static class SessionWrapper {
-    public static final SessionWrapper DEF_INST = new SessionWrapper(null, null);
+    public static final SessionWrapper DEFAULT_INSTANCE = new SessionWrapper(null, null);
 
     static {
-      DEF_INST.status = Status.NULL;
-      DEF_INST.createTime = -1l;
-      DEF_INST.lastUpdateTime = -1l;
+      DEFAULT_INSTANCE.status = Status.NULL;
+      DEFAULT_INSTANCE.createTime = -1L;
+      DEFAULT_INSTANCE.lastUpdateTime = -1L;
     }
 
     private long createTime;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
index c00a90eb..8a14abd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
@@ -18,10 +18,12 @@
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -33,6 +35,8 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.params.CoreAdminParams.NODE;
 
@@ -40,6 +44,8 @@ import static org.apache.solr.common.params.CoreAdminParams.NODE;
  * Each instance represents a node in the cluster
  */
 public class Row implements MapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   public final String node;
   final Cell[] cells;
   //this holds the details of each replica in the node
@@ -109,6 +115,14 @@ public class Row implements MapWriter {
     return jsonStr();
   }
 
+  public Row addReplica(String coll, String shard, Replica.Type type) {
+    return addReplica(coll, shard, type, 0, true);
+  }
+
+  public Row addReplica(String coll, String shard, Replica.Type type, boolean strictMode) {
+    return addReplica(coll, shard, type, 0, strictMode);
+  }
+
   /**
    * this simulates adding a replica of a certain coll+shard to node. as a result of adding a replica ,
    * values of certain attributes will be modified, in this node as well as other nodes. Please note that
@@ -117,9 +131,18 @@ public class Row implements MapWriter {
    * @param coll  collection name
    * @param shard shard name
    * @param type  replica type
+   * @param recursionCount the number of times we have recursed to add more replicas
+   * @param strictMode whether suggester is operating in strict mode or not
    */
-  public Row addReplica(String coll, String shard, Replica.Type type) {
-    Row row = session.copy().getNode(this.node);
+  Row addReplica(String coll, String shard, Replica.Type type, int recursionCount, boolean strictMode) {
+    if (recursionCount > 3) {
+      log.error("more than 3 levels of recursion ", new RuntimeException());
+      return this;
+    }
+    List<OperationInfo> furtherOps = new LinkedList<>();
+    Consumer<OperationInfo> opCollector = it -> furtherOps.add(it);
+    Row row = null;
+    row = session.copy().getNode(this.node);
     if (row == null) throw new RuntimeException("couldn't get a row");
     Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.computeIfAbsent(coll, k -> new HashMap<>());
     List<ReplicaInfo> replicas = c.computeIfAbsent(shard, k -> new ArrayList<>());
@@ -128,12 +151,37 @@ public class Row implements MapWriter {
         Utils.makeMap(ZkStateReader.REPLICA_TYPE, type != null ? type.toString() : Replica.Type.NRT.toString()));
     replicas.add(ri);
     for (Cell cell : row.cells) {
-      cell.type.projectAddReplica(cell, ri);
+      cell.type.projectAddReplica(cell, ri, opCollector, strictMode);
+    }
+    for (OperationInfo op : furtherOps) {
+      if (op.isAdd) {
+        row = row.session.getNode(op.node).addReplica(op.coll, op.shard, op.type, recursionCount + 1, strictMode);
+      } else {
+        row.session.getNode(op.node).removeReplica(op.coll, op.shard, op.type, recursionCount+1);
+      }
     }
+
     return row;
   }
 
 
+  static class OperationInfo {
+    final String coll, shard, node, cellName;
+    final boolean isAdd;// true =addReplica, false=removeReplica
+    final Replica.Type type;
+
+
+    OperationInfo(String coll, String shard, String node, String cellName, boolean isAdd, Replica.Type type) {
+      this.coll = coll;
+      this.shard = shard;
+      this.node = node;
+      this.cellName = cellName;
+      this.isAdd = isAdd;
+      this.type = type;
+    }
+  }
+
+
   public ReplicaInfo getReplica(String coll, String shard, Replica.Type type) {
     Map<String, List<ReplicaInfo>> c = collectionVsShardVsReplicas.get(coll);
     if (c == null) return null;
@@ -150,9 +198,18 @@ public class Row implements MapWriter {
     if (idx == -1) return null;
     return r.get(idx);
   }
+  public Row removeReplica(String coll, String shard, Replica.Type type) {
+    return removeReplica(coll,shard, type, 0);
 
+  }
   // this simulates removing a replica from a node
-  public Pair<Row, ReplicaInfo> removeReplica(String coll, String shard, Replica.Type type) {
+  public Row removeReplica(String coll, String shard, Replica.Type type, int recursionCount) {
+    if (recursionCount > 3) {
+      log.error("more than 3 levels of recursion ", new RuntimeException());
+      return this;
+    }
+    List<OperationInfo> furtherOps = new LinkedList<>();
+    Consumer<OperationInfo> opCollector = it -> furtherOps.add(it);
     Row row = session.copy().getNode(this.node);
     Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.get(coll);
     if (c == null) return null;
@@ -169,9 +226,9 @@ public class Row implements MapWriter {
     if (idx == -1) return null;
     ReplicaInfo removed = r.remove(idx);
     for (Cell cell : row.cells) {
-      cell.type.projectRemoveReplica(cell, removed);
+      cell.type.projectRemoveReplica(cell, removed, opCollector);
     }
-    return new Pair(row, removed);
+    return row;
 
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
index c201aa3..67721ba 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
@@ -18,6 +18,7 @@
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,6 +28,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -34,13 +36,18 @@ import java.util.function.Predicate;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
 
 /* A suggester is capable of suggesting a collection operation
  * given a particular session. Before it suggests a new operation,
@@ -50,6 +57,8 @@ import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.Conditio
  *
  */
 public abstract class Suggester implements MapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class);
   Policy.Session session;
   SolrRequest operation;
@@ -94,34 +103,40 @@ public abstract class Suggester implements MapWriter {
 
   abstract SolrRequest init();
 
-
+  @SuppressWarnings("unchecked")
   public SolrRequest getSuggestion() {
     if (!isInitialized) {
       Set<String> collections = (Set<String>) hints.getOrDefault(Hint.COLL, Collections.emptySet());
       Set<Pair<String, String>> s = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
       if (!collections.isEmpty() || !s.isEmpty()) {
-        HashSet<Pair<String, String>> shards = new HashSet<>(s);
-        collections.stream().forEach(c -> shards.add(new Pair<>(c, null)));
-        ClusterStateProvider stateProvider = session.cloudManager.getClusterStateProvider();
-        for (Pair<String, String> shard : shards) {
-          // if this is not a known collection from the existing clusterstate,
-          // then add it
-          if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first()))) {
-            session.addClausesForCollection(stateProvider, shard.first());
+        HashSet<Pair<String, String>> collectionShardPairs = new HashSet<>(s);
+        collections.forEach(c -> collectionShardPairs.add(new Pair<>(c, null)));
+        collections.forEach(c -> {
+          try {
+            getWithCollection(c).ifPresent(withCollection -> collectionShardPairs.add(new Pair<>(withCollection, null)));
+          } catch (IOException e) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "Exception while fetching 'withCollection' attribute for collection: " + c, e);
           }
-          for (Row row : session.matrix) {
-            Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(), it -> new HashMap<>());
-            if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new ArrayList<>());
+        });
+        s.forEach(kv -> {
+          try {
+            getWithCollection(kv.first()).ifPresent(withCollection -> collectionShardPairs.add(new Pair<>(withCollection, null)));
+          } catch (IOException e) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "Exception while fetching 'withCollection' attribute for collection: " + kv.first(), e);
           }
-        }
+        });
+        setupCollection(collectionShardPairs);
         Collections.sort(session.expandedClauses);
       }
       Set<String> srcNodes = (Set<String>) hints.get(Hint.SRC_NODE);
       if (srcNodes != null && !srcNodes.isEmpty()) {
         // the source node is dead so live nodes may not have it
         for (String srcNode : srcNodes) {
-          if (session.matrix.stream().noneMatch(row -> row.node.equals(srcNode)))
+          if (session.matrix.stream().noneMatch(row -> row.node.equals(srcNode))) {
             session.matrix.add(new Row(srcNode, session.getPolicy().params, session.getPolicy().perReplicaAttributes, session));
+          }
         }
       }
       session.applyRules();
@@ -135,6 +150,30 @@ public abstract class Suggester implements MapWriter {
     return operation;
   }
 
+  protected Optional<String> getWithCollection(String collectionName) throws IOException {
+    DocCollection collection = session.cloudManager.getClusterStateProvider().getCollection(collectionName);
+    if (collection != null) {
+      return Optional.ofNullable(collection.getStr(WITH_COLLECTION));
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  private void setupCollection(HashSet<Pair<String, String>> collectionShardPairs) {
+    ClusterStateProvider stateProvider = session.cloudManager.getClusterStateProvider();
+    for (Pair<String, String> shard : collectionShardPairs) {
+      // if this is not a known collection from the existing clusterstate,
+      // then add it
+      if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first()))) {
+        session.addClausesForCollection(stateProvider, shard.first());
+      }
+      for (Row row : session.matrix) {
+        Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(), it -> new HashMap<>());
+        if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new ArrayList<>());
+      }
+    }
+  }
+
   public Policy.Session getSession() {
     return session;
   }
@@ -355,4 +394,67 @@ public abstract class Suggester implements MapWriter {
     ew.put("action", String.valueOf(getAction()));
     ew.put("hints", (MapWriter) ew1 -> hints.forEach((hint, o) -> ew1.putNoEx(hint.toString(), o)));
   }
+
+  protected Collection setupWithCollectionTargetNodes(Set<String> collections, Set<Pair<String, String>> s, String withCollection) {
+    Collection originalTargetNodesCopy = null;
+    if (withCollection != null) {
+      if (log.isDebugEnabled()) {
+        HashSet<String> set = new HashSet<>(collections);
+        s.forEach(kv -> set.add(kv.first()));
+        log.debug("Identified withCollection = {} for collection: {}", withCollection, set);
+      }
+
+      originalTargetNodesCopy = Utils.getDeepCopy((Collection) hints.get(Hint.TARGET_NODE), 10, true);
+
+      Set<String> withCollectionNodes = new HashSet<>();
+
+      for (Row row : getMatrix()) {
+        row.forEachReplica(r -> {
+          if (withCollection.equals(r.getCollection()) &&
+              "shard1".equals(r.getShard())) {
+            withCollectionNodes.add(r.getNode());
+          }
+        });
+      }
+
+      if (originalTargetNodesCopy != null && !originalTargetNodesCopy.isEmpty()) {
+        // find intersection of the set of target nodes with the set of 'withCollection' nodes
+        Set<String> set = (Set<String>) hints.computeIfAbsent(Hint.TARGET_NODE, h -> new HashSet<>());
+        set.retainAll(withCollectionNodes);
+        if (set.isEmpty()) {
+          // no nodes common between the sets, we have no choice but to restore the original target node hint
+          hints.put(Hint.TARGET_NODE, originalTargetNodesCopy);
+        }
+      } else if (originalTargetNodesCopy == null) {
+        hints.put(Hint.TARGET_NODE, withCollectionNodes);
+      }
+    }
+    return originalTargetNodesCopy;
+  }
+
+  protected String findWithCollection(Set<String> collections, Set<Pair<String, String>> s) {
+    List<String> withCollections = new ArrayList<>(1);
+    collections.forEach(c -> {
+      try {
+        getWithCollection(c).ifPresent(withCollections::add);
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Exception while fetching 'withCollection' attribute for collection: " + c, e);
+      }
+    });
+    s.forEach(kv -> {
+      try {
+        getWithCollection(kv.first()).ifPresent(withCollections::add);
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Exception while fetching 'withCollection' attribute for collection: " + kv.first(), e);
+      }
+    });
+
+    if (withCollections.size() > 1) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "The number of 'withCollection' attributes should be exactly 1 for any policy but found: " + withCollections);
+    }
+    return withCollections.isEmpty() ? null : withCollections.get(0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
index a481a40..af20fac 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
@@ -25,12 +25,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -44,13 +43,14 @@ import org.apache.solr.common.util.StrUtils;
 
 import static java.util.Collections.emptySet;
 import static java.util.Collections.unmodifiableSet;
+import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.parseString;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.ANY;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
 
 public class Suggestion {
   public static final String coreidxsize = "INDEX.sizeInGB";
 
-  static final Map<String, ConditionType> validatetypes = new HashMap<>();
+
   private static final String NULL = "";
 
   @Target(ElementType.FIELD)
@@ -82,14 +82,13 @@ public class Suggestion {
 
     String metricsKey() default NULL;
 
+    Class implementation() default void.class;
+
     ComputedType[] computedValues() default ComputedType.NULL;
   }
 
   public static ConditionType getTagType(String name) {
-    ConditionType info = validatetypes.get(name);
-    if (info == null && name.startsWith(ImplicitSnitch.SYSPROP)) info = ConditionType.STRING;
-    if (info == null && name.startsWith(Clause.METRICS_PREFIX)) info = ConditionType.LAZY;
-    return info;
+    return Policy.getTagType(name);
   }
 
   private static Object getOperandAdjustedValue(Object val, Object original) {
@@ -160,7 +159,9 @@ public class Suggestion {
   /**
    * Type details of each variable in policies
    */
-  public enum ConditionType {
+  public enum ConditionType implements VarType {
+    @Meta(name = "withCollection", type = String.class, isNodeSpecificVal = true, implementation = WithCollectionVarType.class)
+    WITH_COLLECTION(),
 
     @Meta(name = "collection",
         type = String.class)
@@ -375,7 +376,7 @@ public class Suggestion {
 
       //When a replica is added, freedisk should be incremented
       @Override
-      public void projectAddReplica(Cell cell, ReplicaInfo ri) {
+      public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> ops, boolean strictMode) {
         //go through other replicas of this shard and copy the index size value into this
         for (Row row : cell.getRow().session.matrix) {
           row.forEachReplica(replicaInfo -> {
@@ -395,7 +396,7 @@ public class Suggestion {
       }
 
       @Override
-      public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
+      public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector) {
         Double idxSize = (Double) validate(CORE_IDX.tagName, ri.getVariable(CORE_IDX.tagName), false);
         if (idxSize == null) return;
         Double currFreeDisk = cell.val == null ? 0.0d : (Double) cell.val;
@@ -464,12 +465,12 @@ public class Suggestion {
       }
 
       @Override
-      public void projectAddReplica(Cell cell, ReplicaInfo ri) {
+      public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> ops, boolean strictMode) {
         cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() + 1;
       }
 
       @Override
-      public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
+      public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector) {
         cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() - 1;
       }
     },
@@ -525,7 +526,12 @@ public class Suggestion {
     LAZY() {
       @Override
       public Object validate(String name, Object val, boolean isRuleVal) {
-        return Clause.parseString(val);
+        return parseString(val);
+      }
+
+      @Override
+      public boolean match(Object inputVal, Operand op, Object val, String name, Row row) {
+        return op.match(parseString(val), parseString(inputVal)) == Clause.TestStatus.PASS;
       }
 
       @Override
@@ -543,6 +549,8 @@ public class Suggestion {
       public void getSuggestions(SuggestionCtx ctx) {
         perNodeSuggestions(ctx);
       }
+
+
     };
 
     public final String tagName;
@@ -558,6 +566,7 @@ public class Suggestion {
     public final Set<String> associatedPerNodeValues;
     public final String metricsAttribute;
     public final Set<ComputedType> supportedComputedTypes;
+    private final VarType impl;
 
 
     ConditionType() {
@@ -569,6 +578,15 @@ public class Suggestion {
       } catch (NoSuchFieldException e) {
         //cannot happen
       }
+      if (meta.implementation() != void.class) {
+        try {
+          impl = (VarType) meta.implementation().newInstance();
+        } catch (Exception e) {
+          throw new RuntimeException("Unable to instantiate: " + meta.implementation().getName());
+        }
+      } else {
+        impl = null;
+      }
       this.tagName = meta.name();
       this.type = meta.type();
 
@@ -583,6 +601,7 @@ public class Suggestion {
           emptySet() :
           unmodifiableSet(new HashSet(Arrays.asList(meta.computedValues())));
       this.wildCards = readSet(meta.wildCards());
+
     }
 
     public String getTagName() {
@@ -603,11 +622,21 @@ public class Suggestion {
       return unmodifiableSet(new HashSet<>(Arrays.asList(vals)));
     }
 
+    @Override
     public void getSuggestions(SuggestionCtx ctx) {
+      if (impl != null) {
+        impl.getSuggestions(ctx);
+        return;
+      }
       perNodeSuggestions(ctx);
     }
 
+    @Override
     public void addViolatingReplicas(ViolationCtx ctx) {
+      if (impl != null) {
+        impl.addViolatingReplicas(ctx);
+        return;
+      }
       for (Row row : ctx.allRows) {
         if (ctx.clause.tag.varType.meta.isNodeSpecificVal() && !row.node.equals(ctx.tagKey)) continue;
         collectViolatingReplicas(ctx, row);
@@ -669,21 +698,35 @@ public class Suggestion {
     /**
      * Simulate a replica addition to a node in the cluster
      */
-    public void projectAddReplica(Cell cell, ReplicaInfo ri) {
+    public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector, boolean strictMode) {
+      if (impl != null) impl.projectAddReplica(cell, ri, opCollector, strictMode);
     }
 
-    public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
+    public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector) {
+      if (impl != null) {
+        impl.projectRemoveReplica(cell, ri, opCollector);
+      }
     }
 
+    @Override
     public int compareViolation(Violation v1, Violation v2) {
+      if (impl != null) return impl.compareViolation(v1, v2);
       if (v2.replicaCountDelta == null || v1.replicaCountDelta == null) return 0;
       if (Math.abs(v1.replicaCountDelta) == Math.abs(v2.replicaCountDelta)) return 0;
       return Math.abs(v1.replicaCountDelta) < Math.abs(v2.replicaCountDelta) ? -1 : 1;
     }
 
+    @Override
     public Object computeValue(Policy.Session session, Clause.Condition condition, String collection, String shard, String node) {
+      if (impl != null) return impl.computeValue(session, condition, collection, shard, node);
       return condition.val;
     }
+
+    @Override
+    public boolean match(Object inputVal, Operand op, Object val, String name, Row row) {
+      if (impl != null) return impl.match(inputVal, op, val, name, row);
+      return op.match(val, validate(name, inputVal, false)) == Clause.TestStatus.PASS;
+    }
   }
 
   private static void collectViolatingReplicas(ViolationCtx ctx, Row row) {
@@ -757,9 +800,4 @@ public class Suggestion {
     }
   }
 
-  static {
-    for (Suggestion.ConditionType t : Suggestion.ConditionType.values()) Suggestion.validatetypes.put(t.tagName, t);
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VarType.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VarType.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VarType.java
new file mode 100644
index 0000000..00224a9
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VarType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+import java.util.function.Consumer;
+
+/**
+ * A Variable Type used in Autoscaling policy rules
+ */
+public interface VarType {
+  boolean match(Object inputVal, Operand op, Object val, String name, Row row);
+
+  void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector, boolean strictMode);
+
+  void addViolatingReplicas(Suggestion.ViolationCtx ctx);
+
+  default void getSuggestions(Suggestion.SuggestionCtx ctx) {
+  }
+
+  default Object computeValue(Policy.Session session, Clause.Condition condition, String collection, String shard, String node) {
+    return condition.val;
+  }
+
+  int compareViolation(Violation v1, Violation v2);
+
+  default void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java
index d4ab0e6..7b0f0f3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Violation.java
@@ -30,7 +30,7 @@ import org.apache.solr.common.util.Utils;
 public class Violation implements MapWriter {
   final String shard, coll, node;
   final Object actualVal;
-  final Double replicaCountDelta;//how many extra replicas
+  Double replicaCountDelta;//how many extra replicas
   final Object tagKey;
   private final int hash;
   private final Clause clause;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/WithCollectionVarType.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/WithCollectionVarType.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/WithCollectionVarType.java
new file mode 100644
index 0000000..989a087
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/WithCollectionVarType.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.util.Pair;
+
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
+
+/**
+ * Implements the 'withCollection' variable type
+ */
+public class WithCollectionVarType implements VarType {
+  @Override
+  public boolean match(Object inputVal, Operand op, Object val, String name, Row row) {
+    Map<String, String> withCollectionMap = (Map<String, String>) inputVal;
+    if (withCollectionMap == null || withCollectionMap.isEmpty()) return true;
+
+    Set<String> uniqueColls = new HashSet<>();
+    row.forEachReplica(replicaInfo -> uniqueColls.add(replicaInfo.getCollection()));
+
+    for (Map.Entry<String, String> e : withCollectionMap.entrySet()) {
+      if (uniqueColls.contains(e.getKey()) && !uniqueColls.contains(e.getValue())) return false;
+    }
+
+    return true;
+  }
+
+  public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> opCollector, boolean strictMode) {
+    if (strictMode) {
+      // we do not want to add a replica of the 'withCollection' in strict mode
+      return;
+    }
+
+    Map<String, String> withCollectionMap = (Map<String, String>) cell.val;
+    if (withCollectionMap == null || withCollectionMap.isEmpty()) return;
+
+    Set<String> uniqueColls = new HashSet<>();
+    Row row = cell.row;
+    row.forEachReplica(replicaInfo -> uniqueColls.add(replicaInfo.getCollection()));
+
+    for (Map.Entry<String, String> e : withCollectionMap.entrySet()) {
+      if (uniqueColls.contains(e.getKey()) && !uniqueColls.contains(e.getValue())) {
+        String withCollection = e.getValue();
+
+        opCollector.accept(new Row.OperationInfo(withCollection, "shard1", row.node, cell.name, true, Replica.Type.NRT));
+      }
+    }
+  }
+
+  @Override
+  public int compareViolation(Violation v1, Violation v2) {
+    return Integer.compare(v1.getViolatingReplicas().size(), v2.getViolatingReplicas().size());
+  }
+
+  public void addViolatingReplicas(Suggestion.ViolationCtx ctx) {
+    String node = ctx.currentViolation.node;
+    for (Row row : ctx.allRows) {
+      if (node.equals(row.node)) {
+        Map<String, String> withCollectionMap = (Map<String, String>) row.getVal("withCollection");
+        if (withCollectionMap != null) {
+          row.forEachReplica(r -> {
+            String withCollection = withCollectionMap.get(r.getCollection());
+            if (withCollection != null) {
+              // test whether this row has at least 1 replica of withCollection, else there is a violation
+              Set<String> uniqueCollections = new HashSet<>();
+              row.forEachReplica(replicaInfo -> uniqueCollections.add(replicaInfo.getCollection()));
+              if (!uniqueCollections.contains(withCollection)) {
+                ctx.currentViolation.addReplica(new Violation.ReplicaInfoAndErr(r).withDelta(1.0d));
+              }
+            }
+          });
+          ctx.currentViolation.replicaCountDelta = (double) ctx.currentViolation.getViolatingReplicas().size();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void getSuggestions(Suggestion.SuggestionCtx ctx) {
+    if (ctx.violation.getViolatingReplicas().isEmpty()) return;
+
+    Map<String, Object> nodeValues = ctx.session.nodeStateProvider.getNodeValues(ctx.violation.node, Collections.singleton("withCollection"));
+    Map<String, String> withCollectionsMap = (Map<String, String>) nodeValues.get("withCollection");
+    if (withCollectionsMap == null) return;
+
+    Set<String> uniqueCollections = new HashSet<>();
+    for (Violation.ReplicaInfoAndErr replicaInfoAndErr : ctx.violation.getViolatingReplicas()) {
+      uniqueCollections.add(replicaInfoAndErr.replicaInfo.getCollection());
+    }
+
+    collectionLoop:
+    for (String collection : uniqueCollections) {
+      String withCollection = withCollectionsMap.get(collection);
+      if (withCollection == null) continue;
+
+      // can we find a node from which we can move a replica of the `withCollection`
+      // without creating another violation?
+      for (Row row : ctx.session.matrix) {
+        if (ctx.violation.node.equals(row.node))  continue; // filter the violating node
+
+        Set<String> hostedCollections = new HashSet<>();
+        row.forEachReplica(replicaInfo -> hostedCollections.add(replicaInfo.getCollection()));
+
+        if (hostedCollections.contains(withCollection) && !hostedCollections.contains(collection))  {
+          // find the candidate replicas that we can move
+          List<ReplicaInfo> movableReplicas = new ArrayList<>();
+          row.forEachReplica(replicaInfo -> {
+            if (replicaInfo.getCollection().equals(withCollection)) {
+              movableReplicas.add(replicaInfo);
+            }
+          });
+
+          for (ReplicaInfo toMove : movableReplicas) {
+            // candidate source node for a move replica operation
+            Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
+                .forceOperation(true)
+                .hint(Suggester.Hint.COLL_SHARD, new Pair<>(withCollection, "shard1"))
+                .hint(Suggester.Hint.SRC_NODE, row.node)
+                .hint(Suggester.Hint.REPLICA, toMove.getName())
+                .hint(Suggester.Hint.TARGET_NODE, ctx.violation.node);
+            if (ctx.addSuggestion(suggester) != null)
+              continue collectionLoop; // one suggestion is enough for this collection
+          }
+        }
+      }
+
+      // we could not find a valid move, so we suggest adding a replica
+      Suggester suggester = ctx.session.getSuggester(ADDREPLICA)
+          .forceOperation(true)
+          .hint(Suggester.Hint.COLL_SHARD, new Pair<>(withCollection, "shard1"))
+          .hint(Suggester.Hint.TARGET_NODE, ctx.violation.node);
+      ctx.addSuggestion(suggester);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
index a770102..2015b52 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
@@ -45,6 +45,7 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
 import org.apache.solr.common.cloud.rule.SnitchContext;
+import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -60,6 +61,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.METRICS_PREFIX;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.TOTALDISK;
+import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.WITH_COLLECTION;
 
 /**
  *
@@ -75,6 +77,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
   protected final Map<String, Map<String, Map<String, List<ReplicaInfo>>>> nodeVsCollectionVsShardVsReplicaInfo = new HashMap<>();
   private Map<String, Object> snitchSession = new HashMap<>();
   private Map<String, Map> nodeVsTags = new HashMap<>();
+  private Map<String, String> withCollectionsMap = new HashMap<>();
 
   public SolrClientNodeStateProvider(CloudSolrClient solrClient) {
     this.solrClient = solrClient;
@@ -100,6 +103,9 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
     all.forEach((collName, ref) -> {
       DocCollection coll = ref.get();
       if (coll == null) return;
+      if (coll.getProperties().get(CollectionAdminParams.WITH_COLLECTION) != null) {
+        withCollectionsMap.put(coll.getName(), (String) coll.getProperties().get(CollectionAdminParams.WITH_COLLECTION));
+      }
       coll.forEachReplica((shard, replica) -> {
         Map<String, Map<String, List<ReplicaInfo>>> nodeData = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(replica.getNodeName(), k -> new HashMap<>());
         Map<String, List<ReplicaInfo>> collData = nodeData.computeIfAbsent(collName, k -> new HashMap<>());
@@ -114,13 +120,15 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
 //    ew.put("liveNodes", liveNodes);
     ew.put("replicaInfo", Utils.getDeepCopy(nodeVsCollectionVsShardVsReplicaInfo, 5));
     ew.put("nodeValues", nodeVsTags);
-
   }
 
   @Override
   public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
     Map<String, Object> tagVals = fetchTagValues(node, tags);
     nodeVsTags.put(node, tagVals);
+    if (tags.contains(WITH_COLLECTION.tagName)) {
+      tagVals.put(WITH_COLLECTION.tagName, withCollectionsMap);
+    }
     return tagVals;
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 301cbc8..8d36296 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -59,9 +59,11 @@ import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
 import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
 import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_PARAM;
 import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
 
 /**
  * This class is experimental and subject to change.
@@ -80,7 +82,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       MAX_SHARDS_PER_NODE,
       AUTO_ADD_REPLICAS,
       POLICY,
-      COLL_CONF);
+      COLL_CONF,
+      WITH_COLLECTION,
+      COLOCATED_WITH);
 
   protected final CollectionAction action;
 
@@ -417,10 +421,11 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     protected Integer pullReplicas;
     protected Integer tlogReplicas;
 
-    private Properties properties;
+    protected Properties properties;
     protected Boolean autoAddReplicas;
     protected Integer stateFormat;
-    private String[] rule , snitch;
+    protected String[] rule , snitch;
+    protected String withCollection;
 
     /** Constructor intended for typical use cases */
     protected Create(String collection, String config, Integer numShards, Integer numNrtReplicas, Integer numTlogReplicas, Integer numPullReplicas) { // TODO: maybe add other constructors
@@ -557,6 +562,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (rule != null) params.set(DocCollection.RULE, rule);
       if (snitch != null) params.set(DocCollection.SNITCH, snitch);
       params.setNonNull(POLICY, policy);
+      params.setNonNull(WITH_COLLECTION, withCollection);
       return params;
     }
 
@@ -564,6 +570,15 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       this.policy = policy;
       return this;
     }
+
+    public String getWithCollection() {
+      return withCollection;
+    }
+
+    public Create setWithCollection(String withCollection) {
+      this.withCollection = withCollection;
+      return this;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 4c12d9c..411fe56 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
 
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.common.SolrException;
@@ -174,6 +175,9 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     return slices.get(sliceName);
   }
 
+  /**
+   * @param consumer consume shardName vs. replica
+   */
   public void forEachReplica(BiConsumer<String, Replica> consumer) {
     slices.forEach((shard, slice) -> slice.getReplicasMap().forEach((s, replica) -> consumer.accept(shard, replica)));
   }
@@ -321,7 +325,22 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     }
     return replicas;
   }
-  
+
+  /**
+   * @param predicate test against shardName vs. replica
+   * @return the first replica that matches the predicate
+   */
+  public Replica getReplica(BiPredicate<String, Replica> predicate) {
+    final Replica[] result = new Replica[1];
+    forEachReplica((s, replica) -> {
+      if (result[0] != null) return;
+      if (predicate.test(s, replica)) {
+        result[0] = replica;
+      }
+    });
+    return result[0];
+  }
+
   public List<Replica> getReplicas(EnumSet<Replica.Type> s) {
     List<Replica> replicas = new ArrayList<>();
     for (Slice slice : this) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
index 6153eb1..a0ef11f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
@@ -79,4 +79,15 @@ public interface CollectionAdminParams {
    * The name of the config set to be used for a collection
    */
   String COLL_CONF = "collection.configName";
+
+  /**
+   * The name of the collection with which a collection is to be co-located
+   */
+  String WITH_COLLECTION = "withCollection";
+
+  /**
+   * The reverse-link to WITH_COLLECTION flag. It is stored in the cluster state of the `withCollection`
+   * and points to the collection on which the `withCollection` was specified.
+   */
+  String COLOCATED_WITH = "COLOCATED_WITH";
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3c313772/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
index feae38b..16addd4 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
@@ -166,6 +166,564 @@ public class TestPolicy extends SolrTestCaseJ4 {
     return result;
   }
 
+
+  public void testWithCollection() {
+    String clusterStateStr = "{" +
+        "  'comments_coll':{" +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards':{}," +
+        "    'withCollection' :'articles_coll'" +
+        "  }," +
+        "  'articles_coll': {" +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards': {" +
+        "      'shard1': {" +
+        "        'range': '80000000-ffffffff'," +
+        "        'replicas': {" +
+        "          'r1': {" +
+        "            'core': 'r1'," +
+        "            'base_url': 'http://10.0.0.4:8983/solr'," +
+        "            'node_name': 'node1'," +
+        "            'state': 'active'," +
+        "            'leader': 'true'" +
+        "          }," +
+        "          'r2': {" +
+        "            'core': 'r2'," +
+        "            'base_url': 'http://10.0.0.4:7574/solr'," +
+        "            'node_name': 'node2'," +
+        "            'state': 'active'" +
+        "          }" +
+        "        }" +
+        "      }" +
+        "    }" +
+        "  }" +
+        "}";
+    ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
+        ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
+    DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
+      @Override
+      public ClusterState getClusterState() throws IOException {
+        return clusterState;
+      }
+
+      @Override
+      public Set<String> getLiveNodes() {
+        return clusterState.getLiveNodes();
+      }
+    };
+
+    SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
+      @Override
+      protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
+        Map<String, Object> result = new HashMap<>();
+        AtomicInteger cores = new AtomicInteger();
+        forEachReplica(node, replicaInfo -> cores.incrementAndGet());
+        if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
+        if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
+        return result;
+      }
+
+      @Override
+      protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
+        //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
+        Map<String, Object> result = new HashMap<>();
+        metricsKeyVsTagReplica.forEach((k, v) -> {
+          if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
+        });
+        return result;
+      }
+
+      @Override
+      protected ClusterStateProvider getClusterStateProvider() {
+        return clusterStateProvider;
+      }
+    };
+    Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection"));
+    assertNotNull(m.get("withCollection"));
+
+    Map policies = (Map) Utils.fromJSONString("{" +
+        "  'cluster-preferences': [" +
+        "    { 'minimize': 'cores'}," +
+        "    { 'maximize': 'freedisk', 'precision': 50}" +
+        "  ]," +
+        "  'cluster-policy': [" +
+        "    { 'replica': 0, 'nodeRole': 'overseer'}" +
+        "    { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
+        "  ]" +
+        "}");
+    AutoScalingConfig config = new AutoScalingConfig(policies);
+    Policy policy = config.getPolicy();
+    Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
+      @Override
+      public ClusterStateProvider getClusterStateProvider() {
+        return clusterStateProvider;
+      }
+
+      @Override
+      public NodeStateProvider getNodeStateProvider() {
+        return solrClientNodeStateProvider;
+      }
+    });
+    Suggester suggester = session.getSuggester(CollectionAction.ADDREPLICA);
+    suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
+    SolrRequest op = suggester.getSuggestion();
+    assertNotNull(op);
+    Set<String> nodes = new HashSet<>(2);
+    nodes.add(op.getParams().get("node"));
+    session = suggester.getSession();
+    suggester = session.getSuggester(ADDREPLICA);
+    suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
+    op = suggester.getSuggestion();
+    assertNotNull(op);
+    nodes.add(op.getParams().get("node"));
+    assertEquals(2, nodes.size());
+    assertTrue("node1 should have been selected by add replica", nodes.contains("node1"));
+    assertTrue("node2 should have been selected by add replica", nodes.contains("node2"));
+
+    session = suggester.getSession();
+    suggester = session.getSuggester(MOVEREPLICA);
+    suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
+    op = suggester.getSuggestion();
+    assertNull(op);
+  }
+
+  public void testWithCollectionSuggestions() {
+    String clusterStateStr = "{" +
+        "  'articles_coll':{" +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards':{'shard1':{}}," +
+        "  }," +
+        "  'comments_coll': {" +
+        "    'withCollection' :'articles_coll'," +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards': {" +
+        "      'shard1': {" +
+        "        'range': '80000000-ffffffff'," +
+        "        'replicas': {" +
+        "          'r1': {" +
+        "            'core': 'r1'," +
+        "            'base_url': 'http://10.0.0.4:8983/solr'," +
+        "            'node_name': 'node1'," +
+        "            'state': 'active'," +
+        "            'leader': 'true'" +
+        "          }," +
+        "          'r2': {" +
+        "            'core': 'r2'," +
+        "            'base_url': 'http://10.0.0.4:7574/solr'," +
+        "            'node_name': 'node2'," +
+        "            'state': 'active'" +
+        "          }" +
+        "        }" +
+        "      }" +
+        "    }" +
+        "  }" +
+        "}";
+    ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
+        ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
+    DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
+      @Override
+      public ClusterState getClusterState() throws IOException {
+        return clusterState;
+      }
+
+      @Override
+      public Set<String> getLiveNodes() {
+        return clusterState.getLiveNodes();
+      }
+    };
+
+    SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
+      @Override
+      protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
+        Map<String, Object> result = new HashMap<>();
+        AtomicInteger cores = new AtomicInteger();
+        forEachReplica(node, replicaInfo -> cores.incrementAndGet());
+        if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
+        if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
+        return result;
+      }
+
+      @Override
+      protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
+        //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
+        Map<String, Object> result = new HashMap<>();
+        metricsKeyVsTagReplica.forEach((k, v) -> {
+          if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
+        });
+        return result;
+      }
+
+      @Override
+      protected ClusterStateProvider getClusterStateProvider() {
+        return clusterStateProvider;
+      }
+    };
+    Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection"));
+    assertNotNull(m.get("withCollection"));
+
+    Map policies = (Map) Utils.fromJSONString("{" +
+        "  'cluster-preferences': [" +
+        "    { 'maximize': 'freedisk', 'precision': 50}," +
+        "    { 'minimize': 'cores'}" +
+        "  ]," +
+        "  'cluster-policy': [" +
+        "    { 'replica': 0, 'nodeRole': 'overseer'}" +
+        "    { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
+        "  ]" +
+        "}");
+
+    List<Suggester.SuggestionInfo> l = PolicyHelper.getSuggestions(new AutoScalingConfig(policies),
+        new DelegatingCloudManager(null) {
+          @Override
+          public ClusterStateProvider getClusterStateProvider() {
+            return clusterStateProvider;
+          }
+
+          @Override
+          public NodeStateProvider getNodeStateProvider() {
+            return solrClientNodeStateProvider;
+          }
+        });
+    assertNotNull(l);
+    assertEquals(2, l.size());
+
+    // collect the set of nodes to which replicas are being added
+    Set<String> nodes = new HashSet<>(2);
+
+    m = l.get(0).toMap(new LinkedHashMap<>());
+    assertEquals(1.0d, Utils.getObjectByPath(m, true, "violation/violation/delta"));
+    assertEquals("POST", Utils.getObjectByPath(m, true, "operation/method"));
+    assertEquals("/c/articles_coll/shards", Utils.getObjectByPath(m, true, "operation/path"));
+    assertNotNull(Utils.getObjectByPath(m, false, "operation/command/add-replica"));
+    nodes.add((String) Utils.getObjectByPath(m, true, "operation/command/add-replica/node"));
+
+    m = l.get(1).toMap(new LinkedHashMap<>());
+    assertEquals(1.0d, Utils.getObjectByPath(m, true, "violation/violation/delta"));
+    assertEquals("POST", Utils.getObjectByPath(m, true, "operation/method"));
+    assertEquals("/c/articles_coll/shards", Utils.getObjectByPath(m, true, "operation/path"));
+    assertNotNull(Utils.getObjectByPath(m, false, "operation/command/add-replica"));
+    nodes.add((String) Utils.getObjectByPath(m, true, "operation/command/add-replica/node"));
+
+    assertEquals(2, nodes.size());
+    assertTrue(nodes.contains("node1"));
+    assertTrue(nodes.contains("node2"));
+  }
+
+  public void testWithCollectionMoveVsAddSuggestions() {
+    String clusterStateStr = "{" +
+        "  'articles_coll':{" +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards': {" +
+        "      'shard1': {" +
+        "        'range': '80000000-ffffffff'," +
+        "        'replicas': {" +
+        "          'r1': {" +
+        "            'core': 'r1'," +
+        "            'base_url': 'http://10.0.0.4:8983/solr'," +
+        "            'node_name': 'node1'," +
+        "            'state': 'active'," +
+        "            'leader': 'true'" +
+        "          }," +
+        "          'r2': {" +
+        "            'core': 'r2'," +
+        "            'base_url': 'http://10.0.0.4:7574/solr'," +
+        "            'node_name': 'node2'," +
+        "            'state': 'active'" +
+        "          }," +
+        "          'r3': {" +
+        "            'core': 'r3'," +
+        "            'base_url': 'http://10.0.0.4:7579/solr'," +
+        "            'node_name': 'node6'," +
+        "            'state': 'active'" +
+        "          }" +
+        "        }" +
+        "      }" +
+        "    }" +
+        "  }," +
+        "  'comments_coll': {" +
+        "    'withCollection' :'articles_coll'," +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards': {" +
+        "      'shard1': {" +
+        "        'range': '80000000-ffffffff'," +
+        "        'replicas': {" +
+        "          'r1': {" +
+        "            'core': 'r1'," +
+        "            'base_url': 'http://10.0.0.4:7576/solr'," +
+        "            'node_name': 'node3'," +
+        "            'state': 'active'," +
+        "            'leader': 'true'" +
+        "          }," +
+        "          'r2': {" +
+        "            'core': 'r2'," +
+        "            'base_url': 'http://10.0.0.4:7577/solr'," +
+        "            'node_name': 'node4'," +
+        "            'state': 'active'" +
+        "          }," +
+        "          'r3': {" +
+        "            'core': 'r3'," +
+        "            'base_url': 'http://10.0.0.4:7578/solr'," +
+        "            'node_name': 'node5'," +
+        "            'state': 'active'" +
+        "          }," +
+        "          'r4': {" +
+        "            'core': 'r4'," +
+        "            'base_url': 'http://10.0.0.4:7579/solr'," +
+        "            'node_name': 'node6'," +
+        "            'state': 'active'" +
+        "          }" +
+        "        }" +
+        "      }" +
+        "    }" +
+        "  }" +
+        "}";
+    ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
+        ImmutableSet.of("node1", "node2", "node3", "node4", "node5", "node6"));
+    DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
+      @Override
+      public ClusterState getClusterState() throws IOException {
+        return clusterState;
+      }
+
+      @Override
+      public Set<String> getLiveNodes() {
+        return clusterState.getLiveNodes();
+      }
+    };
+
+    SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
+      @Override
+      protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
+        Map<String, Object> result = new HashMap<>();
+        AtomicInteger cores = new AtomicInteger();
+        forEachReplica(node, replicaInfo -> cores.incrementAndGet());
+        if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
+        if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
+        return result;
+      }
+
+      @Override
+      protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
+        //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
+        Map<String, Object> result = new HashMap<>();
+        metricsKeyVsTagReplica.forEach((k, v) -> {
+          if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
+        });
+        return result;
+      }
+
+      @Override
+      protected ClusterStateProvider getClusterStateProvider() {
+        return clusterStateProvider;
+      }
+    };
+    Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection"));
+    assertNotNull(m.get("withCollection"));
+
+    Map policies = (Map) Utils.fromJSONString("{" +
+        "  'cluster-preferences': [" +
+        "    { 'maximize': 'freedisk', 'precision': 50}," +
+        "    { 'minimize': 'cores'}" +
+        "  ]," +
+        "  'cluster-policy': [" +
+        "    { 'replica': 0, 'nodeRole': 'overseer'}" +
+        "    { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
+        "  ]" +
+        "}");
+
+    List<Suggester.SuggestionInfo> l = PolicyHelper.getSuggestions(new AutoScalingConfig(policies),
+        new DelegatingCloudManager(null) {
+          @Override
+          public ClusterStateProvider getClusterStateProvider() {
+            return clusterStateProvider;
+          }
+
+          @Override
+          public NodeStateProvider getNodeStateProvider() {
+            return solrClientNodeStateProvider;
+          }
+        });
+    assertNotNull(l);
+    assertEquals(3, l.size());
+
+    // collect the set of nodes to which replicas are being added
+    Set<String> nodes = new HashSet<>(2);
+
+    int numMoves = 0, numAdds = 0;
+    Set<String> addNodes = new HashSet<>();
+    Set<String> targetNodes = new HashSet<>();
+    Set<String> movedReplicas = new HashSet<>();
+    for (Suggester.SuggestionInfo suggestionInfo : l) {
+      Map s = suggestionInfo.toMap(new LinkedHashMap<>());
+      assertEquals("POST", Utils.getObjectByPath(s, true, "operation/method"));
+      if (Utils.getObjectByPath(s, false, "operation/command/add-replica") != null)  {
+        numAdds++;
+        assertEquals(1.0d, Utils.getObjectByPath(s, true, "violation/violation/delta"));
+        assertEquals("/c/articles_coll/shards", Utils.getObjectByPath(s, true, "operation/path"));
+        addNodes.add((String) Utils.getObjectByPath(s, true, "operation/command/add-replica/node"));
+      } else if (Utils.getObjectByPath(s, false, "operation/command/move-replica") != null) {
+        numMoves++;
+        assertEquals("/c/articles_coll", Utils.getObjectByPath(s, true, "operation/path"));
+        targetNodes.add((String) Utils.getObjectByPath(s, true, "operation/command/move-replica/targetNode"));
+        movedReplicas.add((String) Utils.getObjectByPath(s, true, "operation/command/move-replica/replica"));
+      } else  {
+        fail("Unexpected operation type suggested for suggestion: " + suggestionInfo);
+      }
+    }
+
+    assertEquals(2, targetNodes.size());
+    assertEquals(1, addNodes.size());
+    assertEquals(2, movedReplicas.size());
+    Set<String> allTargetNodes = new HashSet<>(targetNodes);
+    allTargetNodes.addAll(addNodes);
+    assertEquals(3, allTargetNodes.size());
+    assertTrue(allTargetNodes.contains("node3"));
+    assertTrue(allTargetNodes.contains("node4"));
+    assertTrue(allTargetNodes.contains("node5"));
+  }
+
+  public void testWithCollectionMoveReplica() {
+    String clusterStateStr = "{" +
+        "  'comments_coll':{" +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards':{" +
+        "       'shard1' : {" +
+        "        'range': '80000000-ffffffff'," +
+        "        'replicas': {" +
+        "          'r1': {" +
+        "            'core': 'r1'," +
+        "            'base_url': 'http://10.0.0.4:8983/solr'," +
+        "            'node_name': 'node1'," +
+        "            'state': 'active'," +
+        "            'leader': 'true'" +
+        "          }" +
+        "         }" +
+        "       }" +
+        "     }," +
+        "    'withCollection' :'articles_coll'" +
+        "  }," +
+        "  'articles_coll': {" +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards': {" +
+        "      'shard1': {" +
+        "        'range': '80000000-ffffffff'," +
+        "        'replicas': {" +
+        "          'r1': {" +
+        "            'core': 'r1'," +
+        "            'base_url': 'http://10.0.0.4:8983/solr'," +
+        "            'node_name': 'node1'," +
+        "            'state': 'active'," +
+        "            'leader': 'true'" +
+        "          }," +
+        "          'r2': {" +
+        "            'core': 'r2'," +
+        "            'base_url': 'http://10.0.0.4:7574/solr'," +
+        "            'node_name': 'node2'," +
+        "            'state': 'active'" +
+        "          }" +
+        "        }" +
+        "      }" +
+        "    }" +
+        "  }" +
+        "}";
+    ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
+        ImmutableSet.of("node2", "node3", "node4", "node5"));
+    DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
+      @Override
+      public ClusterState getClusterState() throws IOException {
+        return clusterState;
+      }
+
+      @Override
+      public Set<String> getLiveNodes() {
+        return clusterState.getLiveNodes();
+      }
+    };
+
+    SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
+      @Override
+      protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
+        Map<String, Object> result = new HashMap<>();
+        AtomicInteger cores = new AtomicInteger();
+        forEachReplica(node, replicaInfo -> cores.incrementAndGet());
+        if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
+        if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
+        return result;
+      }
+
+      @Override
+      protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
+        //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
+        Map<String, Object> result = new HashMap<>();
+        metricsKeyVsTagReplica.forEach((k, v) -> {
+          if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
+        });
+        return result;
+      }
+
+      @Override
+      protected ClusterStateProvider getClusterStateProvider() {
+        return clusterStateProvider;
+      }
+    };
+    Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection"));
+    assertNotNull(m.get("withCollection"));
+
+    Map policies = (Map) Utils.fromJSONString("{" +
+        "  'cluster-preferences': [" +
+        "    { 'minimize': 'cores'}," +
+        "    { 'maximize': 'freedisk', 'precision': 50}" +
+        "  ]," +
+        "  'cluster-policy': [" +
+        "    { 'replica': 0, 'nodeRole': 'overseer'}" +
+        "    { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
+        "  ]" +
+        "}");
+    AutoScalingConfig config = new AutoScalingConfig(policies);
+    Policy policy = config.getPolicy();
+    Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
+      @Override
+      public ClusterStateProvider getClusterStateProvider() {
+        return clusterStateProvider;
+      }
+
+      @Override
+      public NodeStateProvider getNodeStateProvider() {
+        return solrClientNodeStateProvider;
+      }
+    });
+    Suggester suggester = session.getSuggester(CollectionAction.MOVEREPLICA);
+    suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
+    suggester.hint(Hint.SRC_NODE, "node1");
+    SolrRequest op = suggester.getSuggestion();
+    assertNotNull(op);
+    assertEquals("node2 should have been selected by move replica","node2",
+        op.getParams().get("targetNode"));
+
+    session = suggester.getSession();
+    suggester = session.getSuggester(MOVEREPLICA);
+    suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
+    suggester.hint(Hint.SRC_NODE, "node1");
+    op = suggester.getSuggestion();
+    assertNull(op);
+  }
+
   public void testValidate() {
     expectError("replica", -1, "must be greater than");
     expectError("replica", "hello", "not a valid number");
@@ -1228,7 +1786,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
     assertTrue(session.getPolicy() == config.getPolicy());
     assertEquals(sessionWrapper.status, PolicyHelper.Status.EXECUTING);
     sessionWrapper.release();
-    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
     PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
     assertEquals(sessionRef.getSessionWrapper().getCreateTime(), s1.getCreateTime());
     PolicyHelper.SessionWrapper[] s2 = new PolicyHelper.SessionWrapper[1];
@@ -1256,9 +1814,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
     assertEquals(2, s1.getRefCount());
 
     s2[0].release();
-    assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+    assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
     s1.release();
-    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
 
 
   }
@@ -1479,7 +2037,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
     assertEquals("node2", op.getNode());
   }
 
-  private SolrCloudManager getSolrCloudManager(final Map<String, Map> nodeValues, String clusterState) {
+  private SolrCloudManager getSolrCloudManager(final Map<String, Map> nodeValues, String clusterS) {
     return new SolrCloudManager() {
       ObjectCache objectCache = new ObjectCache();
 
@@ -1521,7 +2079,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
 
           @Override
           public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
-            return getReplicaDetails(node, clusterState);
+            return getReplicaDetails(node, clusterS);
           }
         };
       }


Mime
View raw message