lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject [1/2] lucene-solr:jira/solr-11990: SOLR-11990: Make it possible to co-locate replicas of multiple collections together in a node
Date Wed, 18 Jul 2018 11:03:11 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-11990 [created] 34841fc01


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34841fc0/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
index c201aa3..67721ba 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
@@ -18,6 +18,7 @@
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,6 +28,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -34,13 +36,18 @@ import java.util.function.Predicate;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
 
 /* A suggester is capable of suggesting a collection operation
  * given a particular session. Before it suggests a new operation,
@@ -50,6 +57,8 @@ import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.Conditio
  *
  */
 public abstract class Suggester implements MapWriter {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class);
   Policy.Session session;
   SolrRequest operation;
@@ -94,34 +103,40 @@ public abstract class Suggester implements MapWriter {
 
   abstract SolrRequest init();
 
-
+  @SuppressWarnings("unchecked")
   public SolrRequest getSuggestion() {
     if (!isInitialized) {
       Set<String> collections = (Set<String>) hints.getOrDefault(Hint.COLL, Collections.emptySet());
       Set<Pair<String, String>> s = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD,
Collections.emptySet());
       if (!collections.isEmpty() || !s.isEmpty()) {
-        HashSet<Pair<String, String>> shards = new HashSet<>(s);
-        collections.stream().forEach(c -> shards.add(new Pair<>(c, null)));
-        ClusterStateProvider stateProvider = session.cloudManager.getClusterStateProvider();
-        for (Pair<String, String> shard : shards) {
-          // if this is not a known collection from the existing clusterstate,
-          // then add it
-          if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first())))
{
-            session.addClausesForCollection(stateProvider, shard.first());
+        HashSet<Pair<String, String>> collectionShardPairs = new HashSet<>(s);
+        collections.forEach(c -> collectionShardPairs.add(new Pair<>(c, null)));
+        collections.forEach(c -> {
+          try {
+            getWithCollection(c).ifPresent(withCollection -> collectionShardPairs.add(new
Pair<>(withCollection, null)));
+          } catch (IOException e) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "Exception while fetching 'withCollection' attribute for collection: " +
c, e);
           }
-          for (Row row : session.matrix) {
-            Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(),
it -> new HashMap<>());
-            if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it ->
new ArrayList<>());
+        });
+        s.forEach(kv -> {
+          try {
+            getWithCollection(kv.first()).ifPresent(withCollection -> collectionShardPairs.add(new
Pair<>(withCollection, null)));
+          } catch (IOException e) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "Exception while fetching 'withCollection' attribute for collection: " +
kv.first(), e);
           }
-        }
+        });
+        setupCollection(collectionShardPairs);
         Collections.sort(session.expandedClauses);
       }
       Set<String> srcNodes = (Set<String>) hints.get(Hint.SRC_NODE);
       if (srcNodes != null && !srcNodes.isEmpty()) {
         // the source node is dead so live nodes may not have it
         for (String srcNode : srcNodes) {
-          if (session.matrix.stream().noneMatch(row -> row.node.equals(srcNode)))
+          if (session.matrix.stream().noneMatch(row -> row.node.equals(srcNode))) {
             session.matrix.add(new Row(srcNode, session.getPolicy().params, session.getPolicy().perReplicaAttributes,
session));
+          }
         }
       }
       session.applyRules();
@@ -135,6 +150,30 @@ public abstract class Suggester implements MapWriter {
     return operation;
   }
 
+  protected Optional<String> getWithCollection(String collectionName) throws IOException
{
+    DocCollection collection = session.cloudManager.getClusterStateProvider().getCollection(collectionName);
+    if (collection != null) {
+      return Optional.ofNullable(collection.getStr(WITH_COLLECTION));
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  private void setupCollection(HashSet<Pair<String, String>> collectionShardPairs)
{
+    ClusterStateProvider stateProvider = session.cloudManager.getClusterStateProvider();
+    for (Pair<String, String> shard : collectionShardPairs) {
+      // if this is not a known collection from the existing clusterstate,
+      // then add it
+      if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(shard.first())))
{
+        session.addClausesForCollection(stateProvider, shard.first());
+      }
+      for (Row row : session.matrix) {
+        Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.computeIfAbsent(shard.first(),
it -> new HashMap<>());
+        if (shard.second() != null) shardInfo.computeIfAbsent(shard.second(), it -> new
ArrayList<>());
+      }
+    }
+  }
+
   public Policy.Session getSession() {
     return session;
   }
@@ -355,4 +394,67 @@ public abstract class Suggester implements MapWriter {
     ew.put("action", String.valueOf(getAction()));
     ew.put("hints", (MapWriter) ew1 -> hints.forEach((hint, o) -> ew1.putNoEx(hint.toString(),
o)));
   }
+
+  protected Collection setupWithCollectionTargetNodes(Set<String> collections, Set<Pair<String,
String>> s, String withCollection) {
+    Collection originalTargetNodesCopy = null;
+    if (withCollection != null) {
+      if (log.isDebugEnabled()) {
+        HashSet<String> set = new HashSet<>(collections);
+        s.forEach(kv -> set.add(kv.first()));
+        log.debug("Identified withCollection = {} for collection: {}", withCollection, set);
+      }
+
+      originalTargetNodesCopy = Utils.getDeepCopy((Collection) hints.get(Hint.TARGET_NODE),
10, true);
+
+      Set<String> withCollectionNodes = new HashSet<>();
+
+      for (Row row : getMatrix()) {
+        row.forEachReplica(r -> {
+          if (withCollection.equals(r.getCollection()) &&
+              "shard1".equals(r.getShard())) {
+            withCollectionNodes.add(r.getNode());
+          }
+        });
+      }
+
+      if (originalTargetNodesCopy != null && !originalTargetNodesCopy.isEmpty())
{
+        // find intersection of the set of target nodes with the set of 'withCollection'
nodes
+        Set<String> set = (Set<String>) hints.computeIfAbsent(Hint.TARGET_NODE,
h -> new HashSet<>());
+        set.retainAll(withCollectionNodes);
+        if (set.isEmpty()) {
+          // no nodes common between the sets, we have no choice but to restore the original
target node hint
+          hints.put(Hint.TARGET_NODE, originalTargetNodesCopy);
+        }
+      } else if (originalTargetNodesCopy == null) {
+        hints.put(Hint.TARGET_NODE, withCollectionNodes);
+      }
+    }
+    return originalTargetNodesCopy;
+  }
+
+  protected String findWithCollection(Set<String> collections, Set<Pair<String,
String>> s) {
+    List<String> withCollections = new ArrayList<>(1);
+    collections.forEach(c -> {
+      try {
+        getWithCollection(c).ifPresent(withCollections::add);
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Exception while fetching 'withCollection' attribute for collection: " + c, e);
+      }
+    });
+    s.forEach(kv -> {
+      try {
+        getWithCollection(kv.first()).ifPresent(withCollections::add);
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Exception while fetching 'withCollection' attribute for collection: " + kv.first(),
e);
+      }
+    });
+
+    if (withCollections.size() > 1) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "The number of 'withCollection' attributes should be exactly 1 for any policy but
found: " + withCollections);
+    }
+    return withCollections.isEmpty() ? null : withCollections.get(0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34841fc0/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
index c61f371..66f49ef 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -37,12 +38,14 @@ import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.V2RequestSupport;
 import org.apache.solr.client.solrj.cloud.autoscaling.Clause.ComputedType;
 import org.apache.solr.client.solrj.cloud.autoscaling.Violation.ReplicaInfoAndErr;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.StrUtils;
 
 import static java.util.Collections.emptySet;
 import static java.util.Collections.unmodifiableSet;
+import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.parseString;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.ANY;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
 
@@ -337,7 +340,7 @@ public class Suggestion {
 
       //When a replica is added, freedisk should be incremented
       @Override
-      public void projectAddReplica(Cell cell, ReplicaInfo ri) {
+      public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo>
ops) {
         //go through other replicas of this shard and copy the index size value into this
         for (Row row : cell.getRow().session.matrix) {
           row.forEachReplica(replicaInfo -> {
@@ -357,7 +360,7 @@ public class Suggestion {
       }
 
       @Override
-      public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
+      public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo>
opCollector) {
         Double idxSize = (Double) validate(CORE_IDX.tagName, ri.getVariable(CORE_IDX.tagName),
false);
         if (idxSize == null) return;
         Double currFreeDisk = cell.val == null ? 0.0d : (Double) cell.val;
@@ -426,12 +429,12 @@ public class Suggestion {
       }
 
       @Override
-      public void projectAddReplica(Cell cell, ReplicaInfo ri) {
+      public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo>
ops) {
         cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() + 1;
       }
 
       @Override
-      public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
+      public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo>
opCollector) {
         cell.val = cell.val == null ? 0 : ((Number) cell.val).longValue() - 1;
       }
     },
@@ -484,7 +487,12 @@ public class Suggestion {
     LAZY() {
       @Override
       public Object validate(String name, Object val, boolean isRuleVal) {
-        return Clause.parseString(val);
+        return parseString(val);
+      }
+
+      @Override
+      boolean match(Object inputVal, Operand op, Object val, String name, Row row) {
+        return op.match(parseString(val), parseString(inputVal)) == Clause.TestStatus.PASS;
       }
 
       @Override
@@ -501,6 +509,56 @@ public class Suggestion {
       public void getSuggestions(SuggestionCtx ctx) {
         perNodeSuggestions(ctx);
       }
+
+
+    },
+
+    @Meta(name = "withCollection", type = String.class)
+    WITH_COLLECTION() {
+      @Override
+      boolean match(Object inputVal, Operand op, Object val, String name, Row row) {
+        Map<String, String> withCollectionMap = (Map<String, String>) inputVal;
+        if (withCollectionMap == null || withCollectionMap.isEmpty()) return true;
+
+        Set<String> uniqueColls = new HashSet<>();
+        row.forEachReplica(replicaInfo -> uniqueColls.add(replicaInfo.getCollection()));
+
+        for (Map.Entry<String, String> e : withCollectionMap.entrySet()) {
+          if (uniqueColls.contains(e.getKey()) && !uniqueColls.contains(e.getValue()))
return false;
+        }
+
+        return true;
+      }
+
+      @Override
+      public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo>
opCollector) {
+        Map<String, String> withCollectionMap = (Map<String, String>) cell.val;
+        if (withCollectionMap == null || withCollectionMap.isEmpty()) return;
+
+        Set<String> uniqueColls = new HashSet<>();
+        Row row = cell.row;
+        row.forEachReplica(replicaInfo -> uniqueColls.add(replicaInfo.getCollection()));
+
+        for (Map.Entry<String, String> e : withCollectionMap.entrySet()) {
+          if (uniqueColls.contains(e.getKey()) && !uniqueColls.contains(e.getValue()))
 {
+            String withCollection = e.getValue();
+
+            opCollector.accept(new Row.OperationInfo(withCollection, "shard1", row.node,
cell.name, true, Replica.Type.NRT));
+          }
+        }
+      }
+
+      @Override
+      public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo>
opCollector) {
+        // todo nocommit
+        super.projectRemoveReplica(cell, ri, opCollector);
+      }
+
+      @Override
+      public int compareViolation(Violation v1, Violation v2) {
+        // todo nocommit
+        return super.compareViolation(v1, v2);
+      }
     };
 
     public final String tagName;
@@ -627,10 +685,10 @@ public class Suggestion {
     /**
      * Simulate a replica addition to a node in the cluster
      */
-    public void projectAddReplica(Cell cell, ReplicaInfo ri) {
+    public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo>
opCollector) {
     }
 
-    public void projectRemoveReplica(Cell cell, ReplicaInfo ri) {
+    public void projectRemoveReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo>
opCollector) {
     }
 
     public int compareViolation(Violation v1, Violation v2) {
@@ -642,6 +700,11 @@ public class Suggestion {
     public Object computeValue(Policy.Session session, Clause.Condition condition, String
collection, String shard, String node) {
       return condition.val;
     }
+
+    boolean match(Object inputVal, Operand op, Object val, String name, Row row) {
+      return op.match(val, validate(name, inputVal, false)) == Clause.TestStatus.PASS;
+
+    }
   }
 
   private static void collectViolatingReplicas(ViolationCtx ctx, Row row) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34841fc0/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
index 92183b8..faf43b8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
@@ -45,6 +45,7 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
 import org.apache.solr.common.cloud.rule.SnitchContext;
+import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -60,6 +61,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Clause.METRICS_PREFIX;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.FREEDISK;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.TOTALDISK;
+import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.ConditionType.WITH_COLLECTION;
 
 /**
  *
@@ -75,6 +77,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
   private final Map<String, Map<String, Map<String, List<ReplicaInfo>>>>
nodeVsCollectionVsShardVsReplicaInfo = new HashMap<>();
   private Map<String, Object> snitchSession = new HashMap<>();
   private Map<String, Map> nodeVsTags = new HashMap<>();
+  private Map<String, String> withCollectionsMap = new HashMap<>();
 
   public SolrClientNodeStateProvider(CloudSolrClient solrClient) {
     this.solrClient = solrClient;
@@ -100,6 +103,9 @@ public class SolrClientNodeStateProvider implements NodeStateProvider,
MapWriter
     all.forEach((collName, ref) -> {
       DocCollection coll = ref.get();
       if (coll == null) return;
+      if (coll.getProperties().get(CollectionAdminParams.WITH_COLLECTION) != null) {
+        withCollectionsMap.put(coll.getName(), (String) coll.getProperties().get(CollectionAdminParams.WITH_COLLECTION));
+      }
       coll.forEachReplica((shard, replica) -> {
         Map<String, Map<String, List<ReplicaInfo>>> nodeData = nodeVsCollectionVsShardVsReplicaInfo.computeIfAbsent(replica.getNodeName(),
k -> new HashMap<>());
         Map<String, List<ReplicaInfo>> collData = nodeData.computeIfAbsent(collName,
k -> new HashMap<>());
@@ -114,13 +120,15 @@ public class SolrClientNodeStateProvider implements NodeStateProvider,
MapWriter
 //    ew.put("liveNodes", liveNodes);
     ew.put("replicaInfo", Utils.getDeepCopy(nodeVsCollectionVsShardVsReplicaInfo, 5));
     ew.put("nodeValues", nodeVsTags);
-
   }
 
   @Override
   public Map<String, Object> getNodeValues(String node, Collection<String> tags)
{
     Map<String, Object> tagVals = fetchTagValues(node, tags);
     nodeVsTags.put(node, tagVals);
+    if (tags.contains(WITH_COLLECTION.tagName)) {
+      tagVals.put(WITH_COLLECTION.tagName, withCollectionsMap);
+    }
     return tagVals;
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34841fc0/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 1f5fdd2..92b75a1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -59,9 +59,11 @@ import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
 import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
 import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_PARAM;
 import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
 
 /**
  * This class is experimental and subject to change.
@@ -80,7 +82,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       MAX_SHARDS_PER_NODE,
       AUTO_ADD_REPLICAS,
       POLICY,
-      COLL_CONF);
+      COLL_CONF,
+      WITH_COLLECTION,
+      COLOCATED_WITH);
 
   protected final CollectionAction action;
 
@@ -417,10 +421,11 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     protected Integer pullReplicas;
     protected Integer tlogReplicas;
 
-    private Properties properties;
+    protected Properties properties;
     protected Boolean autoAddReplicas;
     protected Integer stateFormat;
-    private String[] rule , snitch;
+    protected String[] rule , snitch;
+    protected String withCollection;
 
     /** Constructor intended for typical use cases */
     protected Create(String collection, String config, Integer numShards, Integer numNrtReplicas,
Integer numTlogReplicas, Integer numPullReplicas) { // TODO: maybe add other constructors
@@ -557,6 +562,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (rule != null) params.set(DocCollection.RULE, rule);
       if (snitch != null) params.set(DocCollection.SNITCH, snitch);
       params.setNonNull(POLICY, policy);
+      params.setNonNull(WITH_COLLECTION, withCollection);
       return params;
     }
 
@@ -564,6 +570,15 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       this.policy = policy;
       return this;
     }
+
+    public String getWithCollection() {
+      return withCollection;
+    }
+
+    public Create setWithCollection(String withCollection) {
+      this.withCollection = withCollection;
+      return this;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34841fc0/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 4c12d9c..e9b51cf 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
 
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.common.SolrException;
@@ -174,6 +175,9 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice>
{
     return slices.get(sliceName);
   }
 
+  /**
+   * @param consumer consume shardName vs. replica
+   */
   public void forEachReplica(BiConsumer<String, Replica> consumer) {
     slices.forEach((shard, slice) -> slice.getReplicasMap().forEach((s, replica) ->
consumer.accept(shard, replica)));
   }
@@ -321,7 +325,22 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice>
{
     }
     return replicas;
   }
-  
+
+  /**
+   * @param predicate test against shardName vs. replica
+   * @return
+   */
+  public Replica getReplica(BiPredicate<String, Replica> predicate) {
+    final Replica[] result = new Replica[1];
+    forEachReplica((s, replica) -> {
+      if (result[0] != null) return;
+      if (predicate.test(s, replica)) {
+        result[0] = replica;
+      }
+    });
+    return result[0];
+  }
+
   public List<Replica> getReplicas(EnumSet<Replica.Type> s) {
     List<Replica> replicas = new ArrayList<>();
     for (Slice slice : this) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34841fc0/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
index 6153eb1..a0ef11f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
@@ -79,4 +79,15 @@ public interface CollectionAdminParams {
    * The name of the config set to be used for a collection
    */
   String COLL_CONF = "collection.configName";
+
+  /**
+   * The name of the collection with which a collection is to be co-located
+   */
+  String WITH_COLLECTION = "withCollection";
+
+  /**
+   * The reverse-link to WITH_COLLECTION flag. It is stored in the cluster state of the `withCollection`
+   * and points to the collection on which the `withCollection` was specified.
+   */
+  String COLOCATED_WITH = "COLOCATED_WITH";
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/34841fc0/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
index 0d250fd..9083b85 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
@@ -165,6 +165,130 @@ public class TestPolicy extends SolrTestCaseJ4 {
     return result;
   }
 
+
+  public void testWithColl() {
+    String clusterStateStr = "{" +
+        "  'comments_coll':{" +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards':{}," +
+        "    'withCollection' :'articles_coll'" +
+        "  }," +
+        "  'articles_coll': {" +
+        "    'router': {" +
+        "      'name': 'compositeId'" +
+        "    }," +
+        "    'shards': {" +
+        "      'shard1': {" +
+        "        'range': '80000000-ffffffff'," +
+        "        'replicas': {" +
+        "          'r1': {" +
+        "            'core': 'r1'," +
+        "            'base_url': 'http://10.0.0.4:8983/solr'," +
+        "            'node_name': 'node1'," +
+        "            'state': 'active'," +
+        "            'leader': 'true'" +
+        "          }," +
+        "          'r2': {" +
+        "            'core': 'r2'," +
+        "            'base_url': 'http://10.0.0.4:7574/solr'," +
+        "            'node_name': 'node2'," +
+        "            'state': 'active'" +
+        "          }" +
+        "        }" +
+        "      }" +
+        "    }" +
+        "  }" +
+        "}";
+    ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
+        ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
+    DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null)
{
+      @Override
+      public ClusterState getClusterState() throws IOException {
+        return clusterState;
+      }
+
+      @Override
+      public Set<String> getLiveNodes() {
+        return clusterState.getLiveNodes();
+      }
+    };
+
+    SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null)
{
+      @Override
+      protected Map<String, Object> fetchTagValues(String node, Collection<String>
tags) {
+        Map<String, Object> result = new HashMap<>();
+        AtomicInteger cores = new AtomicInteger();
+        forEachReplica(node, replicaInfo -> cores.incrementAndGet());
+        if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
+        if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
+        return result;
+      }
+
+      @Override
+      protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String,
Object> metricsKeyVsTag) {
+        //e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
+        Map<String, Object> result = new HashMap<>();
+        metricsKeyVsTag.forEach((k, v) -> {
+          if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
+        });
+
+        return result;
+      }
+
+      @Override
+      protected ClusterStateProvider getClusterStateProvider() {
+        return clusterStateProvider;
+      }
+    };
+    Map m = solrClientNodeStateProvider.getNodeValues("node1", ImmutableSet.of("cores", "withCollection"));
+    assertNotNull(m.get("withCollection"));
+
+    Map policies = (Map) Utils.fromJSONString("{" +
+        "  'cluster-preferences': [" +
+        "    { 'maximize': 'freedisk', 'precision': 50}," +
+        "    { 'minimize': 'cores'}" +
+        "  ]," +
+        "  'cluster-policy': [" +
+        "    { 'replica': 0, 'nodeRole': 'overseer'}" +
+        "    { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
+        "  ]" +
+        "}");
+    AutoScalingConfig config = new AutoScalingConfig(policies);
+    Policy policy = config.getPolicy();
+    Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
+      @Override
+      public ClusterStateProvider getClusterStateProvider() {
+        return clusterStateProvider;
+      }
+
+      @Override
+      public NodeStateProvider getNodeStateProvider() {
+        return solrClientNodeStateProvider;
+      }
+    });
+    Suggester suggester = session.getSuggester(CollectionAction.ADDREPLICA);
+    suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
+    SolrRequest op = suggester.getSuggestion();
+    assertNotNull(op);
+    Set<String> nodes = new HashSet<>(2);
+    nodes.add(op.getParams().get("node"));
+    session = suggester.getSession();
+    suggester = session.getSuggester(ADDREPLICA);
+    suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
+    op = suggester.getSuggestion();
+    assertNotNull(op);
+    nodes.add(op.getParams().get("node"));
+    assertEquals(2, nodes.size());
+
+    session = suggester.getSession();
+    suggester = session.getSuggester(MOVEREPLICA);
+    suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
+    op = suggester.getSuggestion();
+    assertNull(op);
+  }
+
   public void testValidate() {
     expectError("replica", -1, "must be greater than");
     expectError("replica", "hello", "not a valid number");
@@ -1203,7 +1327,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
     assertTrue(session.getPolicy() == config.getPolicy());
     assertEquals(sessionWrapper.status, PolicyHelper.Status.EXECUTING);
     sessionWrapper.release();
-    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
     PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
     assertEquals(sessionRef.getSessionWrapper().getCreateTime(), s1.getCreateTime());
     PolicyHelper.SessionWrapper[] s2 = new PolicyHelper.SessionWrapper[1];
@@ -1231,9 +1355,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
     assertEquals(2, s1.getRefCount());
 
     s2[0].release();
-    assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+    assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
     s1.release();
-    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
 
 
   }
@@ -1454,7 +1578,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
     assertEquals("node2", op.getNode());
   }
 
-  private SolrCloudManager getSolrCloudManager(final Map<String, Map> nodeValues, String
clusterState) {
+  private SolrCloudManager getSolrCloudManager(final Map<String, Map> nodeValues, String
clusterS) {
     return new SolrCloudManager() {
       ObjectCache objectCache = new ObjectCache();
 


Mime
View raw message