lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject [3/3] lucene-solr:master: SOLR-11990: Make it possible to co-locate replicas of multiple collections together in a node.
Date Sun, 29 Jul 2018 01:57:28 GMT
SOLR-11990: Make it possible to co-locate replicas of multiple collections together in a node.

A collection may be co-located with another collection during collection creation time by specifying a
'withCollection' parameter. It can also be co-located afterwards by using the modify collection API.
The co-location guarantee is enforced regardless of future cluster operations whether they are invoked
manually via the Collection API or automatically by the Autoscaling framework.

Squashed commit of the following:

commit 3827703b38c598f1247c90ab57d3d640ab3a9e21
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 11:54:10 2018 +0530

    SOLR-11990: Added change log entry

commit 7977222e07ba47274062cb8d8a69e7956d644000
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 11:52:17 2018 +0530

    SOLR-11990: Added change log entry

commit 1857075fdb9d535b6149ad4369fed8b64b0c01f6
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 11:49:51 2018 +0530

    SOLR-11990: Added note about co-location guarantees being one way only

commit 8557cbc8a511f21d1fcad99e11ea9d2104d0bef4
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 10:43:37 2018 +0530

    SOLR-11990: Remove unused import

commit 864b013fd744edca9b6b84a8a7573fab3c5310d5
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 10:21:59 2018 +0530

    SOLR-11990: Fixing compilation issues after merging master

commit dd840a2f7e765ee96c899d4d9ea89b6b67c5ae62
Merge: bb4ffb3 828d281
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Sat Jul 28 10:03:50 2018 +0530

    Merge branch 'master' into jira/solr-11990

    # Conflicts:
    #	solr/solr-ref-guide/src/collections-api.adoc
    #	solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
    #	solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java

commit bb4ffb32c4960a2809ac8927e214e1e012204a73
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Fri Jul 27 14:09:44 2018 +0530

    SOLR-11990: Ensure that the suggestion are validated by the policy engine otherwise move to the next candidate replica or the next candidate node

commit a97d45b22f9c232e939f979502c761001be9ae24
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Fri Jul 27 13:22:10 2018 +0530

    SOLR-11990: Autoscaling suggestions for withCollection violations should prefer moving replicas before adding replicas

commit 7b5a84338dfe7335599a5e96aff2d26cb4eeaac6
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Fri Jul 27 12:22:45 2018 +0530

    SOLR-11990: Fix statement about the behavior of the modify collection API when modifying the withCollection parameter

commit 63aec4fe0de7025c16b6ebc47dad1004531ecee1
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Thu Jul 26 07:29:07 2018 +0530

    SOLR-11990: Added new page to the reference guide describing how to colocate collections together including guarantees and limitations

commit 6bfcd0786bb30353de9c26a01ec97ce3191b58f8
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 25 21:42:25 2018 +0530

    SOLR-11990: Added another test which creates two collections which are colocated with two different collections and ensures that create collection and add replica operations work correctly

commit 4cead778f0044b6fb4012b085abf7b60350f495b
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 25 21:07:47 2018 +0530

    SOLR-11990: Stop or start jettys in test setup to ensure that we always have exactly 2 replicas running before a test starts

commit 70dbfd042c2164fcd76d406eeab1518e4d3147fb
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 25 19:19:07 2018 +0530

    SOLR-11990: Added description of the new withCollection parameter in the reference guide

commit 9d8260852b9d667d4d8e026432fd7727b7789393
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 25 19:16:46 2018 +0530

    SOLR-11990: Reset count down latch during test setup

commit ae508165571b1afde54337859b8d5fdbb1d67312
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 25 15:43:54 2018 +0530

    SOLR-11990: Add support for withCollection in simulated create collection API

commit 84f026b8c4cc25edb548430b8f5ad09d2486b3b5
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 17:21:33 2018 +0530

    SOLR-11990: Ported the refactoring made in CreateCollectionCmd to the simulated version so that simulation tests are able to create collections correctly

commit defe111c9d31c8e4f0f00b4f2f3c875f5b2fa602
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 16:17:52 2018 +0530

    SOLR-11990: Add missing javadoc for return statement

commit 8e47d5bc4545548c5441909c3fcc1a7901b38185
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 16:11:45 2018 +0530

    SOLR-11990: Replace usage of forbidden Charsets with StandardCharsets class

commit 2d1b9eb25ea96a3a42c000ae654400ed44c17554
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 16:07:36 2018 +0530

    SOLR-11990: Extract ConditionType to an interface VarType along with a WithCollectionVarType implementation

commit 1de2a4f52a59afca28de75bfa5156a3d6567a4f5
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 12:53:26 2018 +0530

    SOLR-11990: Pass strict-ness parameter to the ConditionType so that WITH_COLLECTION can choose not to project add replica in strict mode.

    This ensures that add replica or move replica suggesters always choose nodes that already have withCollection replicas first unless there are violations in doing so. Only if the first pass fails to find a suitable replica, do we go to the other nodes in the cluster. This also removes the need for the majority of changes in AddReplicaSuggester and so they've been reverted.

commit 0d616ed9e9bad791548c87086cba7760d724350d
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Tue Jul 24 11:36:34 2018 +0530

    SOLR-11990: Minor changes to formatting and code comments

commit 1228538f934f35f15797d89c2c66f2deb9cddd8c
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Mon Jul 23 14:26:19 2018 +0530

    SOLR-11990: Added a test which simulates a lost node and asserts that move replica suggester moves the replica on the lost node to a node already having the withCollection present

commit 582f1fd98de93ab73c74a1f623749dd031beb381
Author: Noble Paul <noble@apache.org>
Date:   Mon Jul 23 18:35:22 2018 +1000

    SOLR-11990: NPE removing unnecessary System.out.println

commit 501bc6c1d066321b344bbb8b1de3c2ead52f8c49
Author: Noble Paul <noble@apache.org>
Date:   Mon Jul 23 18:31:07 2018 +1000

    SOLR-11990: NPE during class init

commit acbf4a69321e16cff11cc7cf0a1f076fd9ac0037
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Mon Jul 23 13:55:30 2018 +0530

    SOLR-11990: Added asserts on the nodes that should be selected by the add replica suggester

commit 4824933fd6eb7d1773acbff1a1a0c5e670226e0b
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Fri Jul 20 14:30:52 2018 +0530

    SOLR-11990: Added WITH_COLLECTION to global tags. Fixed implementation of addViolatingReplicas and getSuggestions in the clause impl. Added more asserts in testWithCollectionSuggestions.

commit dbadb33211c190026e08d8e3ea587b6f8df8720b
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Fri Jul 20 13:44:36 2018 +0530

    SOLR-11990: Added support for comparing violations, generating suggestions and adding violating replicas

commit ada1f17d5c93a4186260473e4822d2bee1da0e16
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 18 19:14:56 2018 +0530

    SOLR-11990: Fix mock node state provider in TestPolicy to use the right cluster state. Added nocommits to ensure that we return the right suggestions for this feature.

commit ef2d61812e0d96eb2275b3411906d9de57ab835e
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 18 18:39:51 2018 +0530

    SOLR-11990: Add missing node in nodeValues configuration

commit 34841fc01fea4a9f1e6a9f64050e576f2247a72b
Author: Shalin Shekhar Mangar <shalin@apache.org>
Date:   Wed Jul 18 16:32:57 2018 +0530

    SOLR-11990: Make it possible to co-locate replicas of multiple collections together in a node


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/179c8f9b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/179c8f9b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/179c8f9b

Branch: refs/heads/master
Commit: 179c8f9b48af9bf3c327226d0e1fdbe460c4a325
Parents: 828d281
Author: Shalin Shekhar Mangar <shalin@apache.org>
Authored: Sun Jul 29 07:26:13 2018 +0530
Committer: Shalin Shekhar Mangar <shalin@apache.org>
Committed: Sun Jul 29 07:26:13 2018 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   6 +
 .../cloud/api/collections/AddReplicaCmd.java    |  53 +-
 .../api/collections/CreateCollectionCmd.java    | 145 +++--
 .../api/collections/DeleteCollectionCmd.java    |  21 +
 .../cloud/api/collections/MoveReplicaCmd.java   |  40 +-
 .../OverseerCollectionMessageHandler.java       |   6 +-
 .../cloud/autoscaling/ComputePlanAction.java    |  11 +-
 .../solr/handler/admin/CollectionsHandler.java  |  10 +-
 .../apache/solr/cloud/TestWithCollection.java   | 611 +++++++++++++++++++
 .../sim/SimClusterStateProvider.java            |  71 ++-
 solr/solr-ref-guide/src/collections-api.adoc    |   6 +
 .../src/colocating-collections.adoc             |  76 +++
 solr/solr-ref-guide/src/solrcloud.adoc          |   3 +-
 .../cloud/autoscaling/AddReplicaSuggester.java  |   4 +-
 .../client/solrj/cloud/autoscaling/Clause.java  |  26 +-
 .../cloud/autoscaling/MoveReplicaSuggester.java |  30 +-
 .../client/solrj/cloud/autoscaling/Policy.java  |  38 +-
 .../solrj/cloud/autoscaling/PolicyHelper.java   |  28 +-
 .../client/solrj/cloud/autoscaling/Row.java     |  69 ++-
 .../solrj/cloud/autoscaling/Suggester.java      | 130 +++-
 .../solrj/cloud/autoscaling/Suggestion.java     |  78 ++-
 .../client/solrj/cloud/autoscaling/VarType.java |  43 ++
 .../solrj/cloud/autoscaling/Violation.java      |   2 +-
 .../autoscaling/WithCollectionVarType.java      | 160 +++++
 .../solrj/impl/SolrClientNodeStateProvider.java |  10 +-
 .../solrj/request/CollectionAdminRequest.java   |  21 +-
 .../apache/solr/common/cloud/DocCollection.java |  21 +-
 .../common/params/CollectionAdminParams.java    |  11 +
 .../solrj/cloud/autoscaling/TestPolicy.java     | 568 ++++++++++++++++-
 29 files changed, 2134 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index c837eb7..c68c397 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -123,6 +123,12 @@ New Features
 
 * SOLR-12536: autoscaling policy support to equally distribute replicas on the basis of arbitrary properties (noble)
 
+* SOLR-11990: Make it possible to co-locate replicas of multiple collections together in a node. A collection may be
+  co-located with another collection during collection creation time by specifying a 'withCollection' parameter. It can
+  also be co-located afterwards by using the modify collection API. The co-location guarantee is enforced regardless of
+  future cluster operations whether they are invoked manually via the Collection API or by the Autoscaling framework.
+  (noble, shalin)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index c07a85ad..0feeec9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -23,15 +23,16 @@ import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.cloud.ActiveReplicaWatcher;
 import org.apache.solr.cloud.CloudUtil;
 import org.apache.solr.cloud.Overseer;
@@ -43,6 +44,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonAdminParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
@@ -52,11 +54,12 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
 import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
@@ -79,6 +82,10 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
   ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
       throws IOException, InterruptedException {
     log.debug("addReplica() : {}", Utils.toJSONString(message));
+
+    String collectionName = message.getStr(COLLECTION_PROP);
+    DocCollection coll = clusterState.getCollection(collectionName);
+
     boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
     boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
     final String asyncId = message.getStr(ASYNC);
@@ -86,9 +93,6 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
     message = assignReplicaDetails(ocmh.cloudManager, clusterState, message, sessionWrapper);
 
-    String collection = message.getStr(COLLECTION_PROP);
-    DocCollection coll = clusterState.getCollection(collection);
-
     String node = message.getStr(CoreAdminParams.NODE);
     String shard = message.getStr(SHARD_ID_PROP);
     String coreName = message.getStr(CoreAdminParams.NAME);
@@ -97,6 +101,27 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
     boolean parallel = message.getBool("parallel", false);
 
+    if (coll.getStr(WITH_COLLECTION) != null) {
+      String withCollectionName = coll.getStr(WITH_COLLECTION);
+      DocCollection withCollection = clusterState.getCollection(withCollectionName);
+      if (withCollection.getActiveSlices().size() > 1)  {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + withCollection.getActiveSlices().size());
+      }
+      String withCollectionShard = withCollection.getActiveSlices().iterator().next().getName();
+
+      List<Replica> replicas = withCollection.getReplicas(node);
+      if (replicas == null || replicas.isEmpty()) {
+        // create a replica of withCollection on the identified node before proceeding further
+        ZkNodeProps props = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
+            ZkStateReader.COLLECTION_PROP, withCollectionName,
+            ZkStateReader.SHARD_ID_PROP, withCollectionShard,
+            "node", node,
+            CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created
+        addReplica(clusterState, props, results, null);
+      }
+    }
+
     ModifiableSolrParams params = new ModifiableSolrParams();
 
     ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -104,7 +129,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
       if (!skipCreateReplicaInClusterState) {
         ZkNodeProps props = new ZkNodeProps(
             Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
-            ZkStateReader.COLLECTION_PROP, collection,
+            ZkStateReader.COLLECTION_PROP, collectionName,
             ZkStateReader.SHARD_ID_PROP, shard,
             ZkStateReader.CORE_NAME_PROP, coreName,
             ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
@@ -121,10 +146,10 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
         }
       }
       params.set(CoreAdminParams.CORE_NODE_NAME,
-          ocmh.waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
+          ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(coreName)).get(coreName).getName());
     }
 
-    String configName = zkStateReader.readConfigName(collection);
+    String configName = zkStateReader.readConfigName(collectionName);
     String routeKey = message.getStr(ShardParams._ROUTE_);
     String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
     String ulogDir = message.getStr(CoreAdminParams.ULOG_DIR);
@@ -133,7 +158,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
     params.set(CoreAdminParams.NAME, coreName);
     params.set(COLL_CONF, configName);
-    params.set(CoreAdminParams.COLLECTION, collection);
+    params.set(CoreAdminParams.COLLECTION, collectionName);
     params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name());
     if (shard != null) {
       params.set(CoreAdminParams.SHARD, shard);
@@ -172,7 +197,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
 
     Runnable runnable = () -> {
       ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
-      ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
+      ocmh.waitForCoreNodeName(collectionName, fnode, fcoreName);
       if (sessionWrapper.get() != null) {
         sessionWrapper.get().release();
       }
@@ -182,15 +207,15 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     if (!parallel || waitForFinalState) {
       if (waitForFinalState) {
         SolrCloseableLatch latch = new SolrCloseableLatch(1, ocmh);
-        ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collection, null, Collections.singletonList(coreName), latch);
+        ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null, Collections.singletonList(coreName), latch);
         try {
-          zkStateReader.registerCollectionStateWatcher(collection, watcher);
+          zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
           runnable.run();
           if (!latch.await(timeout, TimeUnit.SECONDS)) {
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
           }
         } finally {
-          zkStateReader.removeCollectionStateWatcher(collection, watcher);
+          zkStateReader.removeCollectionStateWatcher(collectionName, watcher);
         }
       } else {
         runnable.run();
@@ -201,7 +226,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
 
 
     return new ZkNodeProps(
-        ZkStateReader.COLLECTION_PROP, collection,
+        ZkStateReader.COLLECTION_PROP, collectionName,
         ZkStateReader.SHARD_ID_PROP, shard,
         ZkStateReader.CORE_NAME_PROP, coreName,
         ZkStateReader.NODE_NAME_PROP, node

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index a776820..45ced2b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -32,14 +32,14 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
 import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
@@ -47,6 +47,7 @@ import org.apache.solr.cloud.overseer.ClusterStateMutator;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.Replica;
@@ -73,13 +74,14 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
+import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
 import static org.apache.solr.common.params.CommonParams.NAME;
@@ -106,26 +108,48 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
     }
 
+    String withCollection = message.getStr(CollectionAdminParams.WITH_COLLECTION);
+    String withCollectionShard = null;
+    if (withCollection != null) {
+      if (!clusterState.hasCollection(withCollection)) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "The 'withCollection' does not exist: " + withCollection);
+      } else  {
+        DocCollection collection = clusterState.getCollection(withCollection);
+        if (collection.getActiveSlices().size() > 1)  {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + collection.getActiveSlices().size());
+        }
+        withCollectionShard = collection.getActiveSlices().iterator().next().getName();
+      }
+    }
+
     String configName = getConfigName(collectionName, message);
     if (configName == null) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No config set found to associate with the collection.");
     }
 
     ocmh.validateConfigOrThrowSolrException(configName);
+
+    List<String> nodeList = new ArrayList<>();
+    String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
+    String policy = message.getStr(Policy.POLICY);
+    AutoScalingConfig autoScalingConfig = ocmh.cloudManager.getDistribStateManager().getAutoScalingConfig();
+    boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null;
+
+    // fail fast if parameters are wrong or incomplete
+    List<String> shardNames = populateShardNames(message, router);
+    checkMaxShardsPerNode(message, usePolicyFramework);
+    checkReplicaTypes(message);
+
     AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
 
     try {
 
       final String async = message.getStr(ASYNC);
 
-      List<String> nodeList = new ArrayList<>();
-      List<String> shardNames = new ArrayList<>();
-      List<ReplicaPosition> replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, message,
-          nodeList, shardNames, sessionWrapper);
       ZkStateReader zkStateReader = ocmh.zkStateReader;
       boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
 
-      ocmh.createConfNode(stateManager, configName, collectionName, isLegacyCloud);
+      OverseerCollectionMessageHandler.createConfNode(stateManager, configName, collectionName, isLegacyCloud);
 
       Map<String,String> collectionParams = new HashMap<>();
       Map<String,Object> collectionProps = message.getProperties();
@@ -134,12 +158,12 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
           collectionParams.put(propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), (String) collectionProps.get(propName));
         }
       }
-      
+
       createCollectionZkNode(stateManager, collectionName, collectionParams);
-      
+
       Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
 
-      // wait for a while until we don't see the collection
+      // wait for a while until we see the collection
       TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
       boolean created = false;
       while (! waitUntil.hasTimedOut()) {
@@ -147,8 +171,12 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
         if(created) break;
       }
-      if (!created)
+      if (!created) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+      }
+
+      List<ReplicaPosition> replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, message,
+          nodeList, shardNames, sessionWrapper);
 
       if (nodeList.isEmpty()) {
         log.debug("Finished create command for collection: {}", collectionName);
@@ -165,6 +193,23 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
       for (ReplicaPosition replicaPosition : replicaPositions) {
         String nodeName = replicaPosition.node;
+
+        if (withCollection != null) {
+          // check that we have a replica of `withCollection` on this node and if not, create one
+          DocCollection collection = clusterState.getCollection(withCollection);
+          List<Replica> replicas = collection.getReplicas(nodeName);
+          if (replicas == null || replicas.isEmpty()) {
+            ZkNodeProps props = new ZkNodeProps(
+                Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
+                ZkStateReader.COLLECTION_PROP, withCollection,
+                ZkStateReader.SHARD_ID_PROP, withCollectionShard,
+                "node", nodeName,
+                CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.TRUE.toString()); // set to true because we want `withCollection` to be ready after this collection is created
+            new AddReplicaCmd(ocmh).call(clusterState, props, results);
+            clusterState = zkStateReader.getClusterState(); // refresh
+          }
+        }
+
         String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(),
             ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName),
             replicaPosition.shard, replicaPosition.type, true);
@@ -251,6 +296,16 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
               + " curl http://{host:port}/solr/" + collectionName + "/config -d '{\"set-user-property\": {\"update.autoCreateFields\":\"false\"}}'");
         }
       }
+
+      // modify the `withCollection` and store this new collection's name with it
+      if (withCollection != null) {
+        ZkNodeProps props = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(),
+            ZkStateReader.COLLECTION_PROP, withCollection,
+            CollectionAdminParams.COLOCATED_WITH, collectionName);
+        Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+      }
+
     } catch (SolrException ex) {
       throw ex;
     } catch (Exception ex) {
@@ -274,29 +329,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     String policy = message.getStr(Policy.POLICY);
     boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null;
 
-    Integer numSlices = message.getInt(OverseerCollectionMessageHandler.NUM_SLICES, null);
-    String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
-    if(ImplicitDocRouter.NAME.equals(router)){
-      ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
-      numSlices = shardNames.size();
-    } else {
-      if (numSlices == null ) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " is a required param (when using CompositeId router).");
-      }
-      if (numSlices <= 0) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " must be > 0");
-      }
-      ClusterStateMutator.getShardNames(numSlices, shardNames);
-    }
-
-    int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
-    if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used");
-    }
-    if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE;
-    if (numNrtReplicas + numTlogReplicas <= 0) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
-    }
+    Integer numSlices = shardNames.size();
+    int maxShardsPerNode = checkMaxShardsPerNode(message, usePolicyFramework);
 
     // we need to look at every node and see how many cores it serves
     // add our new cores to existing nodes serving the least number of cores
@@ -343,6 +377,43 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     return replicaPositions;
   }
 
+  public static int checkMaxShardsPerNode(ZkNodeProps message, boolean usePolicyFramework) {
+    int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
+    if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used");
+    }
+    if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE;
+
+    return maxShardsPerNode;
+  }
+
+  public static void checkReplicaTypes(ZkNodeProps message) {
+    int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
+    int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas > 0 ? 0 : 1));
+
+    if (numNrtReplicas + numTlogReplicas <= 0) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+    }
+  }
+
+  public static List<String> populateShardNames(ZkNodeProps message, String router) {
+    List<String> shardNames = new ArrayList<>();
+    Integer numSlices = message.getInt(OverseerCollectionMessageHandler.NUM_SLICES, null);
+    if (ImplicitDocRouter.NAME.equals(router)) {
+      ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
+      numSlices = shardNames.size();
+    } else {
+      if (numSlices == null) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " is a required param (when using CompositeId router).");
+      }
+      if (numSlices <= 0) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " must be > 0");
+      }
+      ClusterStateMutator.getShardNames(numSlices, shardNames);
+    }
+    return shardNames;
+  }
+
   String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
     String configName = message.getStr(COLL_CONF);
 
@@ -370,7 +441,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     }
     return "".equals(configName)? null: configName;
   }
-  
+
   /**
    * Copies the _default configset to the specified configset name (overwrites if pre-existing)
    */
@@ -476,7 +547,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     }
 
   }
-  
+
   private static void getConfName(DistribStateManager stateManager, String collection, String collectionPath, Map<String,Object> collectionProps) throws IOException,
       KeeperException, InterruptedException {
     // check for configName

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index c676cf3..9a569d1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -31,6 +31,7 @@ import org.apache.solr.common.NonExistentCoreException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -49,6 +50,8 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonParams.NAME;
@@ -69,6 +72,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     ZkStateReader zkStateReader = ocmh.zkStateReader;
 
     checkNotReferencedByAlias(zkStateReader, collection);
+    checkNotColocatedWith(zkStateReader, collection);
 
     final boolean deleteHistory = message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true);
 
@@ -181,4 +185,21 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         .map(Map.Entry::getKey) // alias name
         .findFirst().orElse(null);
   }
+
+  private void checkNotColocatedWith(ZkStateReader zkStateReader, String collection) throws Exception {
+    DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
+    if (docCollection != null)  {
+      String colocatedWith = docCollection.getStr(COLOCATED_WITH);
+      if (colocatedWith != null) {
+        DocCollection colocatedCollection = zkStateReader.getClusterState().getCollectionOrNull(colocatedWith);
+        if (colocatedCollection != null && collection.equals(colocatedCollection.getStr(WITH_COLLECTION))) {
+          // todo how do we clean up if reverse-link is not present?
+          // can't delete this collection because it is still co-located with another collection
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+              "Collection: " + collection + " is co-located with collection: " + colocatedWith
+                  + " remove the link using modify collection API or delete the co-located collection: " + colocatedWith);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
index f9392b5..4a9bd59 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -33,6 +33,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.NamedList;
@@ -104,19 +105,34 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
       if (shardId == null) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + SHARD_ID_PROP + "' is a required param");
       }
-      Slice slice = clusterState.getCollection(collection).getSlice(shardId);
-      List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas());
-      Collections.shuffle(sliceReplicas, OverseerCollectionMessageHandler.RANDOM);
-      // this picks up a single random replica from the sourceNode
-      for (Replica r : slice.getReplicas()) {
-        if (r.getNodeName().equals(sourceNode)) {
-          replica = r;
-        }
-      }
-      if (replica == null) {
+      Slice slice = coll.getSlice(shardId);
+      List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas(r -> sourceNode.equals(r.getNodeName())));
+      if (sliceReplicas.isEmpty()) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
             "Collection: " + collection + " node: " + sourceNode + " does not have any replica belonging to shard: " + shardId);
       }
+      Collections.shuffle(sliceReplicas, OverseerCollectionMessageHandler.RANDOM);
+      replica = sliceReplicas.iterator().next();
+    }
+
+    if (coll.getStr(CollectionAdminParams.COLOCATED_WITH) != null) {
+      // we must ensure that moving this replica does not cause the co-location to break
+      String sourceNode = replica.getNodeName();
+      String colocatedCollectionName = coll.getStr(CollectionAdminParams.COLOCATED_WITH);
+      DocCollection colocatedCollection = clusterState.getCollectionOrNull(colocatedCollectionName);
+      if (colocatedCollection != null) {
+        if (colocatedCollection.getReplica((s, r) -> sourceNode.equals(r.getNodeName())) != null) {
+          // check if we have at least two replicas of the collection on the source node
+          // only then it is okay to move one out to another node
+          List<Replica> replicasOnSourceNode = coll.getReplicas(replica.getNodeName());
+          if (replicasOnSourceNode == null || replicasOnSourceNode.size() < 2) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+                "Collection: " + collection + " is co-located with collection: " + colocatedCollectionName
+                    + " and has a single replica: " + replica.getName() + " on node: " + replica.getNodeName()
+                    + " so it is not possible to move it to another node");
+          }
+        }
+      }
     }
 
     log.info("Replica will be moved to node {}: {}", targetNode, replica);
@@ -151,7 +167,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
       );
       removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false);
       removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false);
-      if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
+      if (async != null) removeReplicasProps.getProperties().put(ASYNC, async);
       NamedList deleteResult = new NamedList();
       ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
       if (deleteResult.get("failure") != null) {
@@ -256,7 +272,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     }
     if (addResult.get("failure") != null) {
       String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
-          " on node=%s, failure=", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
+          " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
       log.warn(errorString);
       results.add("failure", errorString);
       if (watcher != null) { // unregister

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index d7d555e..e15c389 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -105,6 +105,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonParams.NAME;
@@ -149,7 +151,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       ZkStateReader.AUTO_ADD_REPLICAS, "false",
       DocCollection.RULE, null,
       POLICY, null,
-      SNITCH, null));
+      SNITCH, null,
+      WITH_COLLECTION, null,
+      COLOCATED_WITH, null));
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index 22e3ef5..4cb15ea 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -28,11 +28,11 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
 import org.apache.solr.client.solrj.cloud.autoscaling.UnsupportedSuggester;
 import org.apache.solr.common.SolrException;
@@ -89,8 +89,8 @@ public class ComputePlanAction extends TriggerActionBase {
         log.trace("-- state: {}", clusterState);
       }
       try {
-        Suggester intialSuggester = getSuggester(session, event, context, cloudManager);
-        Suggester suggester = intialSuggester;
+        Suggester initialSuggester = getSuggester(session, event, context, cloudManager);
+        Suggester suggester = initialSuggester;
         int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState);
         int requestedOperations = getRequestedNumOps(event);
         if (requestedOperations > maxOperations) {
@@ -119,13 +119,13 @@ public class ComputePlanAction extends TriggerActionBase {
           // unless a specific number of ops was requested
           // uncomment the following to log too many operations
           /*if (opCount > 10) {
-            PolicyHelper.logState(cloudManager, intialSuggester);
+            PolicyHelper.logState(cloudManager, initialSuggester);
           }*/
 
           if (operation == null) {
             if (requestedOperations < 0) {
               //uncomment the following to log zero operations
-//              PolicyHelper.logState(cloudManager, intialSuggester);
+//              PolicyHelper.logState(cloudManager, initialSuggester);
               break;
             } else {
               log.info("Computed plan empty, remained " + (opCount - opLimit) + " requested ops to try.");
@@ -225,6 +225,7 @@ public class ComputePlanAction extends TriggerActionBase {
         for (Map.Entry<Suggester.Hint, Object> e : op.getHints().entrySet()) {
           suggester = suggester.hint(e.getKey(), e.getValue());
         }
+        suggester = suggester.forceOperation(true);
         start++;
         event.getProperties().put(START, start);
         break;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 1b5edd7..8d7cdbf 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -104,7 +104,6 @@ import static org.apache.solr.client.solrj.response.RequestStatusState.NOT_FOUND
 import static org.apache.solr.client.solrj.response.RequestStatusState.RUNNING;
 import static org.apache.solr.client.solrj.response.RequestStatusState.SUBMITTED;
 import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
-import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
 import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
 import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
 import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY;
@@ -137,9 +136,11 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
 import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
+import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
 import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
 import static org.apache.solr.common.params.CollectionAdminParams.PROPERTY_NAME;
 import static org.apache.solr.common.params.CollectionAdminParams.PROPERTY_VALUE;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
@@ -480,11 +481,10 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           TLOG_REPLICAS,
           NRT_REPLICAS,
           POLICY,
-          WAIT_FOR_FINAL_STATE);
+          WAIT_FOR_FINAL_STATE,
+          WITH_COLLECTION);
 
-      if (props.get(STATE_FORMAT) == null) {
-        props.put(STATE_FORMAT, "2");
-      }
+      props.putIfAbsent(STATE_FORMAT, "2");
 
       if (props.get(REPLICATION_FACTOR) != null && props.get(NRT_REPLICAS) != null) {
         //TODO: Remove this in 8.0 . Keep this for SolrJ client back-compat. See SOLR-11676 for more details

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/core/src/test/org/apache/solr/cloud/TestWithCollection.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWithCollection.java b/solr/core/src/test/org/apache/solr/cloud/TestWithCollection.java
new file mode 100644
index 0000000..a4816a0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestWithCollection.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.ComputePlanAction;
+import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
+import org.apache.solr.cloud.autoscaling.TriggerActionBase;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TimeOut;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
+
+/**
+ * Tests for co-locating a collection with another collection such that any Collection API
+ * always ensures that the co-location is never broken.
+ *
+ * See SOLR-11990 for more details.
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=TRACE;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class TestWithCollection extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static SolrCloudManager cloudManager;
+
+  private static final int NUM_JETTIES = 2;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(NUM_JETTIES)
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    if (zkClient().exists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, true))  {
+      zkClient().setData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, "{}".getBytes(StandardCharsets.UTF_8), true);
+    }
+    ClusterState clusterState = cluster.getSolrClient().getZkStateReader().getClusterState();
+    for (Map.Entry<String, ClusterState.CollectionRef> entry : clusterState.getCollectionStates().entrySet()) {
+      if (entry.getKey().contains("_xyz"))  {
+        try {
+          CollectionAdminRequest.deleteCollection(entry.getKey()).process(cluster.getSolrClient());
+        } catch (Exception e) {
+          log.error("Exception while deleting collection: " + entry.getKey());
+        }
+      }
+    }
+    cluster.deleteAllCollections();
+    cluster.getSolrClient().setDefaultCollection(null);
+
+    cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
+    LATCH = new CountDownLatch(1);
+
+    int jettys = cluster.getJettySolrRunners().size();
+    if (jettys < NUM_JETTIES) {
+      for (int i = jettys; i < NUM_JETTIES; i++) {
+        cluster.startJettySolrRunner();
+      }
+    } else  {
+      for (int i = jettys; i > NUM_JETTIES; i--) {
+        cluster.stopJettySolrRunner(i - 1);
+      }
+    }
+  }
+
+  private void deleteChildrenRecursively(String path) throws Exception {
+    cloudManager.getDistribStateManager().removeRecursively(path, true, false);
+  }
+
+  @Test
+  public void testCreateCollectionNoWithCollection() throws IOException, SolrServerException {
+    String prefix = "testCreateCollectionNoWithCollection";
+    String xyz = prefix + "_xyz";
+    String abc = prefix + "_abc";
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    try {
+
+      CollectionAdminRequest.createCollection(xyz, 1, 1)
+          .setWithCollection(abc).process(solrClient);
+    } catch (HttpSolrClient.RemoteSolrException e) {
+      assertTrue(e.getMessage().contains("The 'withCollection' does not exist"));
+    }
+
+    CollectionAdminRequest.createCollection(abc, 2, 1)
+        .process(solrClient);
+    try {
+      CollectionAdminRequest.createCollection(xyz, 1, 1)
+          .setWithCollection(abc).process(solrClient);
+    } catch (HttpSolrClient.RemoteSolrException e) {
+      assertTrue(e.getMessage().contains("The `withCollection` must have only one shard, found: 2"));
+    }
+  }
+
+  public void testCreateCollection() throws Exception {
+    String prefix = "testCreateCollection";
+    String xyz = prefix + "_xyz";
+    String abc = prefix + "_abc";
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    String setClusterPolicyCommand = "{" +
+        " 'set-cluster-policy': [" +
+        "      {'cores':'<10', 'node':'#ANY'}," +
+        "    ]" +
+        "}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+    solrClient.request(req);
+
+    String chosenNode = cluster.getRandomJetty(random()).getNodeName();
+    CollectionAdminRequest.createCollection(abc, 1, 1)
+        .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
+        .process(solrClient);
+    CollectionAdminRequest.createCollection(xyz, 1, 1)
+        .setWithCollection(abc)
+        .process(solrClient);
+
+    DocCollection c1 = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(xyz);
+    assertNotNull(c1);
+    assertEquals(abc, c1.getStr(WITH_COLLECTION));
+    Replica replica = c1.getReplicas().get(0);
+    String nodeName = replica.getNodeName();
+
+    assertEquals(chosenNode, nodeName);
+  }
+
+  @Test
+  public void testDeleteWithCollection() throws IOException, SolrServerException, InterruptedException {
+    String prefix = "testDeleteWithCollection";
+    String xyz = prefix + "_xyz";
+    String abc = prefix + "_abc";
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    CollectionAdminRequest.createCollection(abc, 1, 1)
+        .process(solrClient);
+    CollectionAdminRequest.createCollection(xyz, 1, 1)
+        .setWithCollection(abc)
+        .process(solrClient);
+    try {
+      CollectionAdminRequest.deleteCollection(abc).process(solrClient);
+    } catch (HttpSolrClient.RemoteSolrException e) {
+      assertTrue(e.getMessage().contains("is co-located with collection"));
+    }
+
+    // delete the co-located collection first
+    CollectionAdminRequest.deleteCollection(xyz).process(solrClient);
+    // deleting the with collection should succeed now
+    CollectionAdminRequest.deleteCollection(abc).process(solrClient);
+
+    xyz = xyz + "_2";
+    abc = abc + "_2";
+    CollectionAdminRequest.createCollection(abc, 1, 1)
+        .process(solrClient);
+    CollectionAdminRequest.createCollection(xyz, 1, 1)
+        .setWithCollection(abc)
+        .process(solrClient);
+    // sanity check
+    try {
+      CollectionAdminRequest.deleteCollection(abc).process(solrClient);
+    } catch (HttpSolrClient.RemoteSolrException e) {
+      assertTrue(e.getMessage().contains("is co-located with collection"));
+    }
+
+    CollectionAdminRequest.modifyCollection(xyz, null)
+        .unsetAttribute("withCollection")
+        .process(solrClient);
+    TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    while (!timeOut.hasTimedOut()) {
+      DocCollection c1 = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(xyz);
+      if (c1.getStr("withCollection") == null) break;
+      Thread.sleep(200);
+    }
+    DocCollection c1 = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(xyz);
+    assertNull(c1.getStr("withCollection"));
+    CollectionAdminRequest.deleteCollection(abc).process(solrClient);
+  }
+
+  @Test
+  public void testAddReplicaSimple() throws Exception {
+    String prefix = "testAddReplica";
+    String xyz = prefix + "_xyz";
+    String abc = prefix + "_abc";
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String chosenNode = cluster.getRandomJetty(random()).getNodeName();
+    log.info("Chosen node {} for collection {}", chosenNode, abc);
+    CollectionAdminRequest.createCollection(abc, 1, 1)
+        .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
+        .process(solrClient);
+    CollectionAdminRequest.createCollection(xyz, 1, 1)
+        .setWithCollection(abc)
+        .process(solrClient);
+
+    String otherNode = null;
+    for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) {
+      if (!chosenNode.equals(jettySolrRunner.getNodeName())) {
+        otherNode = jettySolrRunner.getNodeName();
+      }
+    }
+    CollectionAdminRequest.addReplicaToShard(xyz, "shard1")
+        .setNode(otherNode)
+        .process(solrClient);
+    DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
+
+    assertTrue(collection.getReplicas().stream().noneMatch(replica -> withCollection.getReplicas(replica.getNodeName()).isEmpty()));
+  }
+
+  public void testAddReplicaWithPolicy() throws Exception {
+    String prefix = "testAddReplicaWithPolicy";
+    String xyz = prefix + "_xyz";
+    String abc = prefix + "_abc";
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String setClusterPolicyCommand = "{" +
+        " 'set-cluster-policy': [" +
+        "      {'cores':'<10', 'node':'#ANY'}," +
+        "      {'replica':'<2', 'node':'#ANY'}," +
+        "    ]" +
+        "}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+    solrClient.request(req);
+
+    String chosenNode = cluster.getRandomJetty(random()).getNodeName();
+    log.info("Chosen node {} for collection {}", chosenNode, abc);
+    CollectionAdminRequest.createCollection(abc, 1, 1)
+        .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
+        .process(solrClient);
+    CollectionAdminRequest.createCollection(xyz, 1, 1)
+        .setWithCollection(abc)
+        .process(solrClient);
+
+    DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    assertEquals(chosenNode, collection.getReplicas().iterator().next().getNodeName());
+
+//    zkClient().printLayoutToStdOut();
+
+    CollectionAdminRequest.addReplicaToShard(xyz, "shard1")
+        .process(solrClient);
+    collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
+
+    assertTrue(collection.getReplicas().stream().noneMatch(
+        replica -> withCollection.getReplicas(replica.getNodeName()) == null
+            || withCollection.getReplicas(replica.getNodeName()).isEmpty()));
+  }
+
+  @Test
+  public void testMoveReplicaMainCollection() throws Exception {
+    String prefix = "testMoveReplicaMainCollection";
+    String xyz = prefix + "_xyz";
+    String abc = prefix + "_abc";
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    String setClusterPolicyCommand = "{" +
+        " 'set-cluster-policy': [" +
+        "      {'cores':'<10', 'node':'#ANY'}," +
+        "      {'replica':'<2', 'node':'#ANY'}," +
+        "    ]" +
+        "}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+    solrClient.request(req);
+
+    String chosenNode = cluster.getRandomJetty(random()).getNodeName();
+    log.info("Chosen node {} for collection {}", chosenNode, abc);
+    CollectionAdminRequest.createCollection(abc, 1, 1)
+        .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
+        .process(solrClient);
+    CollectionAdminRequest.createCollection(xyz, 1, 1)
+        .setWithCollection(abc)
+        .process(solrClient);
+
+    String otherNode = null;
+    for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) {
+      if (!chosenNode.equals(jettySolrRunner.getNodeName())) {
+        otherNode = jettySolrRunner.getNodeName();
+      }
+    }
+
+    DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
+    assertNull(collection.getReplicas(otherNode)); // sanity check
+    assertNull(withCollection.getReplicas(otherNode)); // sanity check
+
+    new CollectionAdminRequest.MoveReplica(xyz, collection.getReplicas().iterator().next().getName(), otherNode)
+        .process(solrClient);
+//    zkClient().printLayoutToStdOut();
+    collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); // refresh
+    DocCollection withCollectionRefreshed = solrClient.getZkStateReader().getClusterState().getCollection(abc); // refresh
+    assertTrue(collection.getReplicas().stream().noneMatch(
+        replica -> withCollectionRefreshed.getReplicas(replica.getNodeName()) == null
+            || withCollectionRefreshed.getReplicas(replica.getNodeName()).isEmpty()));
+  }
+
+  @Test
+  public void testMoveReplicaWithCollection() throws Exception {
+    String prefix = "testMoveReplicaWithCollection";
+    String xyz = prefix + "_xyz";
+    String abc = prefix + "_abc";
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    String setClusterPolicyCommand = "{" +
+        " 'set-cluster-policy': [" +
+        "      {'cores':'<10', 'node':'#ANY'}," +
+        "      {'replica':'<2', 'node':'#ANY'}," +
+        "    ]" +
+        "}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+    solrClient.request(req);
+
+    String chosenNode = cluster.getRandomJetty(random()).getNodeName();
+    log.info("Chosen node {} for collection {}", chosenNode, abc);
+    CollectionAdminRequest.createCollection(abc, 1, 1)
+        .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
+        .process(solrClient);
+    CollectionAdminRequest.createCollection(xyz, 1, 1)
+        .setWithCollection(abc)
+        .process(solrClient);
+
+    DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    assertEquals(chosenNode, collection.getReplicas().iterator().next().getNodeName());
+
+    String otherNode = null;
+    for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) {
+      if (!chosenNode.equals(jettySolrRunner.getNodeName())) {
+        otherNode = jettySolrRunner.getNodeName();
+      }
+    }
+
+    collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
+    assertNull(collection.getReplicas(otherNode)); // sanity check
+    assertNull(withCollection.getReplicas(otherNode)); // sanity check
+
+    try {
+      new CollectionAdminRequest.MoveReplica(abc, collection.getReplicas().iterator().next().getName(), otherNode)
+          .process(solrClient);
+      fail("Expected moving a replica of 'withCollection': " + abc + " to fail");
+    } catch (HttpSolrClient.RemoteSolrException e) {
+      assertTrue(e.getMessage().contains("Collection: testMoveReplicaWithCollection_abc is co-located with collection: testMoveReplicaWithCollection_xyz"));
+    }
+//    zkClient().printLayoutToStdOut();
+    collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz); // refresh
+    DocCollection withCollectionRefreshed = solrClient.getZkStateReader().getClusterState().getCollection(abc); // refresh
+
+    // sanity check that the failed move operation didn't actually change our co-location guarantees
+    assertTrue(collection.getReplicas().stream().noneMatch(
+        replica -> withCollectionRefreshed.getReplicas(replica.getNodeName()) == null
+            || withCollectionRefreshed.getReplicas(replica.getNodeName()).isEmpty()));
+  }
+
+  /**
+   * Tests that when a new node is added to the cluster and autoscaling framework
+   * moves replicas to the new node, we maintain all co-locating guarantees
+   */
+  public void testNodeAdded() throws Exception  {
+    String prefix = "testNodeAdded";
+    String xyz = prefix + "_xyz";
+    String abc = prefix + "_abc";
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    String setClusterPolicyCommand = "{" +
+        " 'set-cluster-policy': [" +
+        "      {'cores':'<10', 'node':'#ANY'}," +
+        "      {'replica':'<2', 'node':'#ANY'}," +
+        "    ]" +
+        "}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+    solrClient.request(req);
+
+    String chosenNode = cluster.getRandomJetty(random()).getNodeName();
+    log.info("Chosen node {} for collection {}", chosenNode, abc);
+    CollectionAdminRequest.createCollection(abc, 1, 1)
+        .setCreateNodeSet(chosenNode) // randomize to avoid choosing the first node always
+        .process(solrClient);
+    CollectionAdminRequest.createCollection(xyz, 1, 1)
+        .setWithCollection(abc)
+        .process(solrClient);
+
+    DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    assertEquals(chosenNode, collection.getReplicas().iterator().next().getNodeName());
+
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger1'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '0s'," +
+        "'actions' : [" +
+        "{'name' : 'compute', 'class' : '" + ComputePlanAction.class.getName() + "'}" +
+        "{'name' : 'execute', 'class' : '" + ExecutePlanAction.class.getName() + "'}" +
+        "{'name' : 'compute', 'class' : '" + CapturingAction.class.getName() + "'}" +
+        "]" +
+        "}}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    solrClient.request(req);
+
+    Optional<JettySolrRunner> other = cluster.getJettySolrRunners()
+        .stream().filter(j -> !chosenNode.equals(j.getNodeName())).findAny();
+    String otherNode = other.orElseThrow(AssertionError::new).getNodeName();
+
+    // add an extra replica of abc collection on a different node
+    CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(abc, "shard1")
+        .setNode(otherNode);
+    addReplica.setWaitForFinalState(true);
+    addReplica.process(solrClient);
+
+    // refresh
+    collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
+
+    // sanity check
+    assertColocated(collection, otherNode, withCollection);
+
+    assertEquals(1, collection.getReplicas().size());
+    Replica xyzReplica = collection.getReplicas().get(0);
+
+    // start a new node
+    JettySolrRunner newNode = cluster.startJettySolrRunner();
+    assertTrue("Action was not fired till 30 seconds", LATCH.await(30, TimeUnit.SECONDS));
+    // refresh
+    collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
+
+    // sanity check
+    assertColocated(collection, otherNode, withCollection);
+
+    // assert that the replica of xyz collection was not moved
+    assertNotNull(collection.getReplica(xyzReplica.getName()));
+    assertEquals(chosenNode, collection.getReplicas().get(0).getNodeName());
+
+    // add an extra replica of xyz collection -- this should be placed on the 'otherNode'
+    addReplica = CollectionAdminRequest.addReplicaToShard(xyz, "shard1");
+    addReplica.setWaitForFinalState(true);
+    addReplica.process(solrClient);
+
+    // refresh
+    collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
+
+    List<Replica> replicas = collection.getReplicas(otherNode);
+    assertNotNull(replicas);
+    assertEquals(1, replicas.size());
+    replicas = withCollection.getReplicas(otherNode);
+    assertNotNull(replicas);
+    assertEquals(1, replicas.size());
+
+    // add an extra replica of xyz collection -- this should be placed on the 'newNode'
+    addReplica = CollectionAdminRequest.addReplicaToShard(xyz, "shard1");
+    addReplica.setWaitForFinalState(true);
+    addReplica.process(solrClient);
+
+    // refresh
+    collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
+
+    assertNotNull(collection.getReplicas(newNode.getNodeName()));
+    replicas = collection.getReplicas(newNode.getNodeName());
+    assertNotNull(replicas);
+    assertEquals(1, replicas.size());
+    replicas = withCollection.getReplicas(newNode.getNodeName());
+    assertNotNull(replicas);
+    assertEquals(1, replicas.size());
+  }
+
+  public void testMultipleWithCollections() throws Exception {
+    String prefix = "testMultipleWithCollections";
+    String xyz = prefix + "_xyz";
+    String xyz2 = prefix + "_xyz2";
+    String abc = prefix + "_abc";
+    String abc2 = prefix + "_abc2";
+
+    // start 2 more nodes so we have 4 in total
+    cluster.startJettySolrRunner();
+    cluster.startJettySolrRunner();
+    cluster.waitForAllNodes(30);
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    String setClusterPolicyCommand = "{" +
+        " 'set-cluster-policy': [" +
+        "      {'cores':'<10', 'node':'#ANY'}," +
+        "      {'replica':'<2', 'node':'#ANY'}," +
+        "    ]" +
+        "}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+    solrClient.request(req);
+
+    String chosenNode = cluster.getJettySolrRunner(0).getNodeName();
+    log.info("Chosen node {} for collection {}", chosenNode, abc);
+
+    CollectionAdminRequest.createCollection(abc, 1, 1)
+        .setCreateNodeSet(chosenNode)
+        .process(solrClient);
+    CollectionAdminRequest.createCollection(xyz, 1, 1)
+        .setWithCollection(abc)
+        .process(solrClient);
+
+    String chosenNode2 = cluster.getJettySolrRunner(1).getNodeName();
+    log.info("Chosen node {} for collection {}", chosenNode2, abc2);
+    CollectionAdminRequest.createCollection(abc2, 1, 1)
+        .setCreateNodeSet(chosenNode2)
+        .process(solrClient);
+    CollectionAdminRequest.createCollection(xyz2, 1, 1)
+        .setWithCollection(abc2)
+        .process(solrClient);
+
+    // refresh
+    DocCollection collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    DocCollection collection2 = solrClient.getZkStateReader().getClusterState().getCollection(xyz2);
+    DocCollection withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
+    DocCollection withCollection2 = solrClient.getZkStateReader().getClusterState().getCollection(abc2);
+
+    // sanity check
+    assertColocated(collection, chosenNode2, withCollection); // no replica should be on chosenNode2
+    assertColocated(collection2, chosenNode, withCollection2); // no replica should be on chosenNode
+
+    String chosenNode3 = cluster.getJettySolrRunner(2).getNodeName();
+    CollectionAdminRequest.addReplicaToShard(xyz, "shard1")
+        .setNode(chosenNode3)
+        .process(solrClient);
+    String chosenNode4 = cluster.getJettySolrRunner(2).getNodeName();
+    CollectionAdminRequest.addReplicaToShard(xyz2, "shard1")
+        .setNode(chosenNode4)
+        .process(solrClient);
+
+    collection = solrClient.getZkStateReader().getClusterState().getCollection(xyz);
+    collection2 = solrClient.getZkStateReader().getClusterState().getCollection(xyz2);
+    withCollection = solrClient.getZkStateReader().getClusterState().getCollection(abc);
+    withCollection2 = solrClient.getZkStateReader().getClusterState().getCollection(abc2);
+
+    // sanity check
+    assertColocated(collection, null, withCollection);
+    assertColocated(collection2, null, withCollection2);
+  }
+
+  /**
+   * Asserts that all replicas of collection are colocated with at least one
+   * replica of the withCollection and none of them should be on the given 'noneOnNode'.
+   */
+  private void assertColocated(DocCollection collection, String noneOnNode, DocCollection withCollection) {
+    // sanity check
+    assertTrue(collection.getReplicas().stream().noneMatch(
+        replica -> withCollection.getReplicas(replica.getNodeName()) == null
+            || withCollection.getReplicas(replica.getNodeName()).isEmpty()));
+
+    if (noneOnNode != null) {
+      assertTrue(collection.getReplicas().stream().noneMatch(
+          replica -> noneOnNode.equals(replica.getNodeName())));
+    }
+  }
+
+  private static CountDownLatch LATCH = new CountDownLatch(1);
+  public static class CapturingAction extends TriggerActionBase {
+    @Override
+    public void process(TriggerEvent event, ActionContext context) throws Exception {
+      LATCH.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index c4fbf2f..6024790 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
 import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
@@ -92,6 +93,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
 import static org.apache.solr.common.params.CommonParams.NAME;
 
 /**
@@ -680,11 +682,37 @@ public class SimClusterStateProvider implements ClusterStateProvider {
     }
     boolean waitForFinalState = props.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
     List<String> nodeList = new ArrayList<>();
-    List<String> shardNames = new ArrayList<>();
     final String collectionName = props.getStr(NAME);
+
+    String router = props.getStr("router.name", DocRouter.DEFAULT_NAME);
+    String policy = props.getStr(Policy.POLICY);
+    AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+    boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null;
+
+    // fail fast if parameters are wrong or incomplete
+    List<String> shardNames = CreateCollectionCmd.populateShardNames(props, router);
+    CreateCollectionCmd.checkMaxShardsPerNode(props, usePolicyFramework);
+    CreateCollectionCmd.checkReplicaTypes(props);
+
     // always force getting fresh state
     collectionsStatesRef.set(null);
-    ClusterState clusterState = getClusterState();
+    final ClusterState clusterState = getClusterState();
+
+    String withCollection = props.getStr(CollectionAdminParams.WITH_COLLECTION);
+    String wcShard = null;
+    if (withCollection != null) {
+      if (!clusterState.hasCollection(withCollection)) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The 'withCollection' does not exist: " + withCollection);
+      } else  {
+        DocCollection collection = clusterState.getCollection(withCollection);
+        if (collection.getActiveSlices().size() > 1)  {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + collection.getActiveSlices().size());
+        }
+        wcShard = collection.getActiveSlices().iterator().next().getName();
+      }
+    }
+    final String withCollectionShard = wcShard;
+
     ZkWriteCommand cmd = new ClusterStateMutator(cloudManager).createCollection(clusterState, props);
     if (cmd.noop) {
       LOG.warn("Collection {} already exists. exit", collectionName);
@@ -710,6 +738,35 @@ public class SimClusterStateProvider implements ClusterStateProvider {
     final CountDownLatch finalStateLatch = new CountDownLatch(replicaPositions.size());
     AtomicInteger replicaNum = new AtomicInteger(1);
     replicaPositions.forEach(pos -> {
+
+      if (withCollection != null) {
+        // check that we have a replica of `withCollection` on this node and if not, create one
+        DocCollection collection = clusterState.getCollection(withCollection);
+        List<Replica> replicas = collection.getReplicas(pos.node);
+        if (replicas == null || replicas.isEmpty()) {
+          Map<String, Object> replicaProps = new HashMap<>();
+          replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
+          replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
+          String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", withCollection, withCollectionShard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT),
+              collection.getReplicas().size() + 1);
+          try {
+            replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+            replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
+            replicaProps.put("SEARCHER.searcher.numDocs", 0);
+            replicaProps.put("SEARCHER.searcher.maxDoc", 0);
+            ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, withCollection, 0),
+                coreName, withCollection, withCollectionShard, pos.type, pos.node, replicaProps);
+            cloudManager.submit(() -> {
+              simAddReplica(pos.node, ri, false);
+              // do not count down the latch here
+              return true;
+            });
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+
       Map<String, Object> replicaProps = new HashMap<>();
       replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
       replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
@@ -744,6 +801,16 @@ public class SimClusterStateProvider implements ClusterStateProvider {
         }
       });
     });
+
+    // modify the `withCollection` and store this new collection's name with it
+    if (withCollection != null) {
+      ZkNodeProps message = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(),
+          ZkStateReader.COLLECTION_PROP, withCollection,
+          CollectionAdminParams.COLOCATED_WITH, collectionName);
+      cmd = new CollectionMutator(cloudManager).modifyCollection(clusterState,message);
+    }
+
     // force recreation of collection states
     collectionsStatesRef.set(null);
     simRunLeaderElection(Collections.singleton(collectionName), true);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/179c8f9b/solr/solr-ref-guide/src/collections-api.adoc
----------------------------------------------------------------------
diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc
index c266f96..527f4c7 100644
--- a/solr/solr-ref-guide/src/collections-api.adoc
+++ b/solr/solr-ref-guide/src/collections-api.adoc
@@ -112,6 +112,11 @@ Details of the snitch provider. See the section <<rule-based-replica-placement.a
 `waitForFinalState`::
 If `true`, the request will complete only when all affected replicas become active. The default is `false`, which means that the API will return the status of the single action, which may be before the new replica is online and active.
 
+`withCollection`::
+The name of the collection with which all replicas of this collection must be co-located. The collection must already exist and must have a single shard named `shard1`.
+See <<colocating-collections.adoc#colocating-collections, Colocating collections>> for more details.
+
+
 === CREATE Response
 
 The response will include the status of the request and the new core names. If the status is anything other than "success", an error message will explain why the request failed.
@@ -181,6 +186,7 @@ The attributes that can be modified are:
 * rule
 * snitch
 * policy
+* withCollection
 
 See the <<create,CREATE action>> section above for details on these attributes.
 


Mime
View raw message