lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From is...@apache.org
Subject [lucene-solr] branch branch_7_7 updated: SOLR-12291: fixing premature completion of async tasks
Date Thu, 29 Aug 2019 13:43:55 GMT
This is an automated email from the ASF dual-hosted git repository.

ishan pushed a commit to branch branch_7_7
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_7_7 by this push:
     new 8b6ca69  SOLR-12291: fixing premature completion of async tasks
8b6ca69 is described below

commit 8b6ca690acee929ceadd3ea7a8a504499cbfa012
Author: Mikhail Khludnev <mkhl@apache.org>
AuthorDate: Sun Apr 28 11:19:15 2019 +0300

    SOLR-12291: fixing premature completion of async tasks
    
    * extract async tracking methods from OverseerCollectionMessageHandler into the separate class
    * replacing hashmap to named list to avoid entry loss
---
 solr/CHANGES.txt                                   |   4 +-
 .../solr/cloud/api/collections/AddReplicaCmd.java  |  39 ++-
 .../solr/cloud/api/collections/BackupCmd.java      |  19 +-
 .../cloud/api/collections/CreateCollectionCmd.java |  13 +-
 .../cloud/api/collections/CreateSnapshotCmd.java   |   7 +-
 .../cloud/api/collections/DeleteCollectionCmd.java |  23 +-
 .../cloud/api/collections/DeleteReplicaCmd.java    |  23 +-
 .../cloud/api/collections/DeleteSnapshotCmd.java   |  11 +-
 .../solr/cloud/api/collections/MigrateCmd.java     |  72 +++---
 .../OverseerCollectionMessageHandler.java          | 263 +++++++++++----------
 .../solr/cloud/api/collections/RestoreCmd.java     |  56 +++--
 .../solr/cloud/api/collections/SplitShardCmd.java  | 107 +++++----
 .../AsyncCallRequestStatusResponseTest.java        |  49 +++-
 .../TestRequestStatusCollectionAPI.java            |  65 +++--
 .../solr/cloud/AbstractFullDistribZkTestBase.java  |   2 +-
 15 files changed, 429 insertions(+), 324 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 6ac68ba..b6f963f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -33,8 +33,10 @@ Jetty 9.4.14.v20181114
 Bug fixes
 ----------------------
 
- * SOLR-13718: SPLITSHARD (async) with failures in underlying sub-operations can result in data loss (Ishan Chattopadhyaya)
+* SOLR-13718: SPLITSHARD (async) with failures in underlying sub-operations can result in data loss (Ishan Chattopadhyaya)
 
+* SOLR-12291: prematurely reporting not yet finished async Collections API call as completed 
+  when collection's replicas are collocated at least at one node (Varun Thacker, Mikhail Khludnev)
 
 ==================  7.7.2 ==================
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index a0abaf0..b8332fb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -18,13 +18,27 @@
 package org.apache.solr.cloud.api.collections;
 
 
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
+import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -37,6 +51,7 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.cloud.ActiveReplicaWatcher;
 import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.common.SolrCloseableLatch;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
@@ -56,21 +71,6 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
-import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
-import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
-import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
-import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
-
 public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -155,17 +155,16 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
 
     ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
     ZkStateReader zkStateReader = ocmh.zkStateReader;
-    // For tracking async calls.
-    Map<String,String> requestMap = new HashMap<>();
 
+    final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
     for (CreateReplica createReplica : createReplicas) {
       assert createReplica.coreName != null;
       ModifiableSolrParams params = getReplicaParams(clusterState, message, results, collectionName, coll, skipCreateReplicaInClusterState, asyncId, shardHandler, createReplica);
-      ocmh.sendShardRequest(createReplica.node, params, shardHandler, asyncId, requestMap);
+      shardRequestTracker.sendShardRequest(createReplica.node, params, shardHandler);
     }
 
     Runnable runnable = () -> {
-      ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
+      shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica");
       for (CreateReplica replica : createReplicas) {
         ocmh.waitForCoreNodeName(collectionName, replica.node, replica.coreName);
       }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index fd9faad..c07abf1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -16,17 +16,21 @@
  */
 package org.apache.solr.cloud.api.collections;
 
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.lucene.util.Version;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
@@ -51,11 +55,6 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
 public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -161,7 +160,6 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
     String asyncId = request.getStr(ASYNC);
     String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
     ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
-    Map<String, String> requestMap = new HashMap<>();
 
     String commitName = request.getStr(CoreAdminParams.COMMIT_NAME);
     Optional<CollectionSnapshotMetaData> snapshotMeta = Optional.empty();
@@ -186,6 +184,7 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
       shardsToConsider = snapshotMeta.get().getShards();
     }
 
+    final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
     for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
       Replica replica = null;
 
@@ -216,11 +215,11 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
         params.set(CoreAdminParams.COMMIT_NAME, snapshotMeta.get().getName());
       }
 
-      ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+      shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
       log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
     }
     log.debug("Sent backup requests to all shard leaders for backupName={}", backupName);
 
-    ocmh.processResponses(results, shardHandler, true, "Could not backup all shards", asyncId, requestMap);
+    shardRequestTracker.processResponses(results, shardHandler, true, "Could not backup all shards");
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 69a8cae..251cb39 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -42,6 +42,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -188,10 +189,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         return;
       }
 
-      // For tracking async calls.
-      Map<String, String> requestMap = new HashMap<>();
-
-
+      final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(async);
       log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
           collectionName, shardNames, message));
       Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
@@ -253,7 +251,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         if (async != null) {
           String coreAdminAsyncId = async + Math.abs(System.nanoTime());
           params.add(ASYNC, coreAdminAsyncId);
-          requestMap.put(nodeName, coreAdminAsyncId);
+          shardRequestTracker.track(nodeName, coreAdminAsyncId);
         }
         ocmh.addPropertyParams(message, params);
 
@@ -282,8 +280,9 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         }
       }
 
-      ocmh.processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet());
-      if(results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0) {
+      shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
+      boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
+      if (failure) {
         // Let's cleanup as we hit an exception
         // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
         // element, which may be interpreted by the user as a positive ack
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
index 8a091ef..fcb0591 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
@@ -81,11 +82,11 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
     SolrSnapshotManager.createCollectionLevelSnapshot(zkClient, collectionName, new CollectionSnapshotMetaData(commitName));
     log.info("Created a ZK path to store snapshot information for collection={} with commitName={}", collectionName, commitName);
 
-    Map<String, String> requestMap = new HashMap<>();
     NamedList shardRequestResults = new NamedList();
     Map<String, Slice> shardByCoreName = new HashMap<>();
     ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
 
+    final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
     for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
       for (Replica replica : slice.getReplicas()) {
         if (replica.getState() != State.ACTIVE) {
@@ -101,7 +102,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
         params.set(CORE_NAME_PROP, coreName);
         params.set(CoreAdminParams.COMMIT_NAME, commitName);
 
-        ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+        shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
         log.debug("Sent createsnapshot request to core={} with commitName={}", coreName, commitName);
 
         shardByCoreName.put(coreName, slice);
@@ -113,7 +114,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
     // This is to take care of the situation where e.g. entire shard is unavailable.
     Set<String> failedShards = new HashSet<>();
 
-    ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap);
+    shardRequestTracker.processResponses(shardRequestResults, shardHandler, false, null);
     NamedList success = (NamedList) shardRequestResults.get("success");
     List<CoreSnapshotMetaData> replicas = new ArrayList<>();
     if (success != null) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index e5f6f2d..50516d8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -18,8 +18,13 @@
 
 package org.apache.solr.cloud.api.collections;
 
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -50,12 +55,6 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
-import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
 public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final OverseerCollectionMessageHandler ocmh;
@@ -106,17 +105,13 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       params.set(CoreAdminParams.DELETE_METRICS_HISTORY, deleteHistory);
 
       String asyncId = message.getStr(ASYNC);
-      Map<String, String> requestMap = null;
-      if (asyncId != null) {
-        requestMap = new HashMap<>();
-      }
 
       Set<String> okayExceptions = new HashSet<>(1);
       okayExceptions.add(NonExistentCoreException.class.getName());
 
-      List<Replica> failedReplicas = ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
-      for (Replica failedRepilca : failedReplicas) {
-        boolean isSharedFS = failedRepilca.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && failedRepilca.get("dataDir") != null;
+      List<Replica> failedReplicas = ocmh.collectionCmd(message, params, results, null, asyncId, okayExceptions);
+      for (Replica failedReplica : failedReplicas) {
+        boolean isSharedFS = failedReplica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && failedReplica.get("dataDir") != null;
         if (isSharedFS) {
           // if the replica use a shared FS and it did not receive the unload message, then counter node should not be removed
           // because when a new collection with same name is created, new replicas may reuse the old dataDir
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index ec158bb..be679e2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -16,6 +16,12 @@
  */
 package org.apache.solr.cloud.api.collections;
 
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -24,9 +30,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -44,12 +50,6 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
 
 public class DeleteReplicaCmd implements Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -221,10 +221,6 @@ public class DeleteReplicaCmd implements Cmd {
     ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
     String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
     String asyncId = message.getStr(ASYNC);
-    AtomicReference<Map<String, String>> requestMap = new AtomicReference<>(null);
-    if (asyncId != null) {
-      requestMap.set(new HashMap<>(1, 1.0f));
-    }
 
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
@@ -236,14 +232,15 @@ public class DeleteReplicaCmd implements Cmd {
     params.set(CoreAdminParams.DELETE_METRICS_HISTORY, message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true));
 
     boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
+    final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
     if (isLive) {
-      ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get());
+      shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
     }
 
     Callable<Boolean> callable = () -> {
       try {
         if (isLive) {
-          ocmh.processResponses(results, shardHandler, false, null, asyncId, requestMap.get());
+          shardRequestTracker.processResponses(results, shardHandler, false, null);
 
           //check if the core unload removed the corenode zk entry
           if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
index 21d9cb0..0a28413 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
@@ -23,21 +23,20 @@ import static org.apache.solr.common.params.CommonParams.NAME;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.State;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.Replica.State;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -67,7 +66,6 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
     String collectionName =  message.getStr(COLLECTION_PROP);
     String commitName =  message.getStr(CoreAdminParams.COMMIT_NAME);
     String asyncId = message.getStr(ASYNC);
-    Map<String, String> requestMap = new HashMap<>();
     NamedList shardRequestResults = new NamedList();
     ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
     SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
@@ -93,6 +91,7 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
       }
     }
 
+    final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
     log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores);
     for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
       for (Replica replica : slice.getReplicas()) {
@@ -113,12 +112,12 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
           params.set(CoreAdminParams.COMMIT_NAME, commitName);
 
           log.info("Sending deletesnapshot request to core={} with commitName={}", coreName, commitName);
-          ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+          shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
         }
       }
     }
 
-    ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap);
+    shardRequestTracker.processResponses(shardRequestResults, shardHandler, false, null);
     NamedList success = (NamedList) shardRequestResults.get("success");
     List<CoreSnapshotMetaData> replicas = new ArrayList<>();
     if (success != null) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
index f22544a..4a8f001 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
@@ -160,8 +161,6 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
     log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
 
     Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
-    // For tracking async calls.
-    Map<String, String> requestMap = new HashMap<>();
 
     log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
         + targetLeader.getStr("core") + " to buffer updates");
@@ -169,10 +168,12 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
     params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString());
     params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
 
-    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap);
+    {
+      final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
+      shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
 
+      shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates");
+    }
     ZkNodeProps m = new ZkNodeProps(
         Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
         COLLECTION_PROP, sourceCollection.getName(),
@@ -243,12 +244,17 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
     cmd.setState(Replica.State.ACTIVE);
     cmd.setCheckLive(true);
     cmd.setOnlyIfLeader(true);
-    // we don't want this to happen asynchronously
-    ocmh.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
-
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" +
-        " or timed out waiting for it to come up", asyncId, requestMap);
-
+    {
+      final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker();
+      // we don't want this to happen asynchronously
+      syncRequestTracker.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()),
+          shardHandler);
+
+      syncRequestTracker.processResponses(results, shardHandler, true,
+          "MIGRATE failed to create temp collection leader" +
+              " or timed out waiting for it to come up");
+    }
+    
     log.info("Asking source leader to split index");
     params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
@@ -259,9 +265,11 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
 
     String tempNodeName = sourceLeader.getNodeName();
 
-    ocmh.sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap);
-
+    {
+      final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
+      shardRequestTracker.sendShardRequest(tempNodeName, params, shardHandler);
+      shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command");
+    }
     log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
         tempSourceCollectionName, targetLeader.getNodeName());
     String tempCollectionReplica2 = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(),
@@ -284,9 +292,11 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
     }
     ((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null);
 
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
-        "temporary collection in target leader node.", asyncId, requestMap);
-
+    {
+      final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker();
+      syncRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
+        "temporary collection in target leader node.");
+    }
     coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
         targetLeader.getNodeName(), tempCollectionReplica2);
     // wait for the replicas to be seen as active on temp source leader
@@ -300,11 +310,13 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
     cmd.setOnlyIfLeader(true);
     params = new ModifiableSolrParams(cmd.getParams());
 
-    ocmh.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
-        " replica or timed out waiting for them to come up", asyncId, requestMap);
+    {
+      final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
+      shardRequestTracker.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler);
 
+      shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
+        " replica or timed out waiting for them to come up");
+    }
     log.info("Successfully created replica of temp source collection on target leader node");
 
     log.info("Requesting merge of temp source collection replica to target leader");
@@ -313,20 +325,24 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
     params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
     params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
 
-    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+    {
+      final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
+    
+      shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
     String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
         + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
-    ocmh.processResponses(results, shardHandler, true, msg, asyncId, requestMap);
-
+    shardRequestTracker.processResponses(results, shardHandler, true, msg);
+    }
     log.info("Asking target leader to apply buffered updates");
     params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
     params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
 
-    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates",
-        asyncId, requestMap);
-
+    {
+      final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
+      shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
+      shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates");
+    }
     try {
       log.info("Deleting temporary collection: " + tempSourceCollectionName);
       props = makeMap(
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 8f3dc64..bdf5b9b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -337,11 +337,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
 
     String asyncId = message.getStr(ASYNC);
-    Map<String, String> requestMap = null;
-    if (asyncId != null) {
-      requestMap = new HashMap<>();
-    }
-    collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId, requestMap);
+    collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId);
   }
 
   @SuppressWarnings("unchecked")
@@ -610,34 +606,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     }
   }
 
-  void sendShardRequest(String nodeName, ModifiableSolrParams params,
-                        ShardHandler shardHandler, String asyncId,
-                        Map<String, String> requestMap) {
-    sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap, adminPath, zkStateReader);
-
-  }
-
-  public void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
-                                      String asyncId, Map<String, String> requestMap, String adminPath,
-                                      ZkStateReader zkStateReader) {
-    if (asyncId != null) {
-      String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
-      params.set(ASYNC, coreAdminAsyncId);
-      requestMap.put(nodeName, coreAdminAsyncId);
-    }
-
-    ShardRequest sreq = new ShardRequest();
-    params.set("qt", adminPath);
-    sreq.purpose = 1;
-    String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
-    sreq.shards = new String[]{replica};
-    sreq.actualShards = sreq.shards;
-    sreq.nodeName = nodeName;
-    sreq.params = params;
-
-    shardHandler.submit(sreq, replica, sreq.params);
-  }
-
   void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
     // Now add the property.key=value pairs
     for (String key : message.keySet()) {
@@ -749,38 +717,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);
   }
 
-  void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
-                        String asyncId, Map<String, String> requestMap) {
-    processResponses(results, shardHandler, abortOnError, msgOnError, asyncId, requestMap, Collections.emptySet());
-  }
-
-  void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
-                                String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
-    //Processes all shard responses
-    ShardResponse srsp;
-    do {
-      srsp = shardHandler.takeCompletedOrError();
-      if (srsp != null) {
-        processResponse(results, srsp, okayExceptions);
-        Throwable exception = srsp.getException();
-        if (abortOnError && exception != null)  {
-          // drain pending requests
-          while (srsp != null)  {
-            srsp = shardHandler.takeCompletedOrError();
-          }
-          throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
-        }
-      }
-    } while (srsp != null);
-
-    //If request is async wait for the core admin to complete before returning
-    if (asyncId != null) {
-      waitForAsyncCallsToComplete(requestMap, results);
-      requestMap.clear();
-    }
-  }
-
-
   void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
     boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
     if(!isValid) {
@@ -813,8 +749,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   }
   
   private List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
-                             NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) {
-    return collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
+                             NamedList<Object> results, Replica.State stateMatcher, String asyncId) {
+    return collectionCmd( message, params, results, stateMatcher, asyncId, Collections.emptySet());
   }
 
   /**
@@ -822,47 +758,25 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
    * @return List of replicas which is not live for receiving the request
    */
   List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
-                     NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
+                     NamedList<Object> results, Replica.State stateMatcher, String asyncId, Set<String> okayExceptions) {
     log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
     String collectionName = message.getStr(NAME);
+    @SuppressWarnings("deprecation")
     ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
 
     ClusterState clusterState = zkStateReader.getClusterState();
     DocCollection coll = clusterState.getCollection(collectionName);
     List<Replica> notLivesReplicas = new ArrayList<>();
+    final ShardRequestTracker shardRequestTracker = new ShardRequestTracker(asyncId);
     for (Slice slice : coll.getSlices()) {
-      notLivesReplicas.addAll(sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap));
+      notLivesReplicas.addAll(shardRequestTracker.sliceCmd(clusterState, params, stateMatcher, slice, shardHandler));
     }
 
-    processResponses(results, shardHandler, false, null, asyncId, requestMap, okayExceptions);
+    shardRequestTracker.processResponses(results, shardHandler, false, null, okayExceptions);
     return notLivesReplicas;
   }
 
-  /**
-   * Send request to all replicas of a slice
-   * @return List of replicas which is not live for receiving the request
-   */
-  List<Replica> sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
-                Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
-    List<Replica> notLiveReplicas = new ArrayList<>();
-    for (Replica replica : slice.getReplicas()) {
-      if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
-        if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))) {
-          // For thread safety, only simple clone the ModifiableSolrParams
-          ModifiableSolrParams cloneParams = new ModifiableSolrParams();
-          cloneParams.add(params);
-          cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
-
-          sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
-        } else {
-          notLiveReplicas.add(replica);
-        }
-      }
-    }
-    return notLiveReplicas;
-  }
-  
-  private void processResponse(NamedList results, ShardResponse srsp, Set<String> okayExceptions) {
+  private void processResponse(NamedList<Object> results, ShardResponse srsp, Set<String> okayExceptions) {
     Throwable e = srsp.getException();
     String nodeName = srsp.getNodeName();
     SolrResponse solrResponse = srsp.getSolrResponse();
@@ -871,8 +785,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     processResponse(results, e, nodeName, solrResponse, shard, okayExceptions);
   }
 
-  @SuppressWarnings("unchecked")
-  private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set<String> okayExceptions) {
+  @SuppressWarnings("deprecation")
+  private void processResponse(NamedList<Object> results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set<String> okayExceptions) {
     String rootThrowable = null;
     if (e instanceof RemoteSolrException) {
       rootThrowable = ((RemoteSolrException) e).getRootThrowable();
@@ -906,30 +820,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     success.add(key, value);
   }
 
-  /*
-   * backward compatibility reasons, add the response with the async ID as top level.
-   * This can be removed in Solr 9
-   */
-  @Deprecated
-  public final static boolean INCLUDE_TOP_LEVEL_RESPONSE = true;
-  @SuppressWarnings("unchecked")
-  private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
-    for (String k:requestMap.keySet()) {
-      log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
-      NamedList reqResult = waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k));
-      if (INCLUDE_TOP_LEVEL_RESPONSE) {
-        results.add(requestMap.get(k), reqResult);
-      }
-      if ("failed".equalsIgnoreCase(((String)reqResult.get("STATUS")))) {
-        log.error("Error from shard {}: {}", k,  reqResult);
-        addFailure(results, k, reqResult);
-      } else {
-        addSuccess(results, k, reqResult);
-      }
-    }
-  }
-
-  private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
+  private NamedList<Object> waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
     ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
@@ -951,10 +842,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       do {
         srsp = shardHandler.takeCompletedOrError();
         if (srsp != null) {
-          NamedList results = new NamedList();
+          NamedList<Object> results = new NamedList<>();
           processResponse(results, srsp, Collections.emptySet());
           if (srsp.getSolrResponse().getResponse() == null) {
-            NamedList response = new NamedList();
+            NamedList<Object> response = new NamedList<>();
             response.add("STATUS", "failed");
             return response;
           }
@@ -1053,4 +944,130 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   protected interface Cmd {
     void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception;
   }
+
+  /*
+   * backward compatibility reasons, add the response with the async ID as top level.
+   * This can be removed in Solr 9
+   */
+  @Deprecated
+  static boolean INCLUDE_TOP_LEVEL_RESPONSE = true;
+  
+  public ShardRequestTracker syncRequestTracker() {
+    return new ShardRequestTracker(null);
+  }
+  
+  public ShardRequestTracker asyncRequestTracker(String asyncId) {
+    return new ShardRequestTracker(asyncId);
+  }
+  
+  public class ShardRequestTracker{
+    private final String asyncId;
+    private final NamedList<String> shardAsyncIdByNode = new NamedList<String>();
+
+    private ShardRequestTracker(String asyncId) {
+      this.asyncId = asyncId;
+    }
+    
+    /**
+     * Send request to all replicas of a slice
+     * @return List of replicas which is not live for receiving the request
+     */
+    public List<Replica> sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
+                  Slice slice, ShardHandler shardHandler) {
+      List<Replica> notLiveReplicas = new ArrayList<>();
+      for (Replica replica : slice.getReplicas()) {
+        if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
+          if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))) {
+            // For thread safety, only simple clone the ModifiableSolrParams
+            ModifiableSolrParams cloneParams = new ModifiableSolrParams();
+            cloneParams.add(params);
+            cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
+
+            sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler);
+          } else {
+            notLiveReplicas.add(replica);
+          }
+        }
+      }
+      return notLiveReplicas;
+    }
+    
+    public void sendShardRequest(String nodeName, ModifiableSolrParams params,
+        ShardHandler shardHandler) {
+      sendShardRequest(nodeName, params, shardHandler, adminPath, zkStateReader);
+    }
+
+    public void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
+        String adminPath, ZkStateReader zkStateReader) {
+      if (asyncId != null) {
+        String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
+        params.set(ASYNC, coreAdminAsyncId);
+        track(nodeName, coreAdminAsyncId);
+      }
+
+      ShardRequest sreq = new ShardRequest();
+      params.set("qt", adminPath);
+      sreq.purpose = 1;
+      String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+      sreq.shards = new String[] {replica};
+      sreq.actualShards = sreq.shards;
+      sreq.nodeName = nodeName;
+      sreq.params = params;
+
+      shardHandler.submit(sreq, replica, sreq.params);
+    }
+    
+    void processResponses(NamedList<Object> results, ShardHandler shardHandler, boolean abortOnError, String msgOnError) {
+      processResponses(results, shardHandler, abortOnError, msgOnError, Collections.emptySet());
+    }
+
+    void processResponses(NamedList<Object> results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
+        Set<String> okayExceptions) {
+      // Processes all shard responses
+      ShardResponse srsp;
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          processResponse(results, srsp, okayExceptions);
+          Throwable exception = srsp.getException();
+          if (abortOnError && exception != null) {
+            // drain pending requests
+            while (srsp != null) {
+              srsp = shardHandler.takeCompletedOrError();
+            }
+            throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
+          }
+        }
+      } while (srsp != null);
+
+      // If request is async wait for the core admin to complete before returning
+      if (asyncId != null) {
+        waitForAsyncCallsToComplete(results);
+        shardAsyncIdByNode.clear();
+      }
+    }
+
+    private void waitForAsyncCallsToComplete(NamedList<Object> results) {
+      for (Map.Entry<String,String> nodeToAsync:shardAsyncIdByNode) {
+        final String node = nodeToAsync.getKey();
+        final String shardAsyncId = nodeToAsync.getValue();
+        log.debug("I am Waiting for :{}/{}", node, shardAsyncId);
+        NamedList<Object> reqResult = waitForCoreAdminAsyncCallToComplete(node, shardAsyncId);
+        if (INCLUDE_TOP_LEVEL_RESPONSE) {
+          results.add(shardAsyncId, reqResult);
+        }
+        if ("failed".equalsIgnoreCase(((String)reqResult.get("STATUS")))) {
+          log.error("Error from shard {}: {}", node,  reqResult);
+          addFailure(results, node, reqResult);
+        } else {
+          addSuccess(results, node, reqResult);
+        }
+      }
+    }
+
+    /** @deprecated consider to make it private after {@link CreateCollectionCmd} refactoring*/
+    @Deprecated void track(String nodeName, String coreAdminAsyncId) {
+      shardAsyncIdByNode.add(nodeName, coreAdminAsyncId);
+    }
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index 3a70f11..4243253 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -52,6 +52,7 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -95,7 +96,6 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
     ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
     String asyncId = message.getStr(ASYNC);
     String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
-    Map<String, String> requestMap = new HashMap<>();
 
     CoreContainer cc = ocmh.overseer.getCoreContainer();
     BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
@@ -317,36 +317,42 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
       //refresh the location copy of collection state
       restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
 
-      //Copy data from backed up index to each replica
-      for (Slice slice : restoreCollection.getSlices()) {
-        ModifiableSolrParams params = new ModifiableSolrParams();
-        params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString());
-        params.set(NAME, "snapshot." + slice.getName());
-        params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString());
-        params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
-        ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
+      {
+        ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
+        // Copy data from backed up index to each replica
+        for (Slice slice : restoreCollection.getSlices()) {
+          ModifiableSolrParams params = new ModifiableSolrParams();
+          params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString());
+          params.set(NAME, "snapshot." + slice.getName());
+          params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString());
+          params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
+          shardRequestTracker.sliceCmd(clusterState, params, null, slice, shardHandler);
+        }
+        shardRequestTracker.processResponses(new NamedList(), shardHandler, true, "Could not restore core");
       }
-      ocmh.processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap);
-
 
-      for (Slice s: restoreCollection.getSlices()) {
-        for (Replica r : s.getReplicas()) {
-          String nodeName = r.getNodeName();
-          String coreNodeName = r.getCoreName();
-          Replica.State stateRep  = r.getState();
+      {
+        ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
 
-          log.debug("Calling REQUESTAPPLYUPDATES on: nodeName={}, coreNodeName={}, state={}"
-              , nodeName, coreNodeName, stateRep.name());
+        for (Slice s : restoreCollection.getSlices()) {
+          for (Replica r : s.getReplicas()) {
+            String nodeName = r.getNodeName();
+            String coreNodeName = r.getCoreName();
+            Replica.State stateRep = r.getState();
 
-          ModifiableSolrParams params = new ModifiableSolrParams();
-          params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
-          params.set(CoreAdminParams.NAME, coreNodeName);
+            log.debug("Calling REQUESTAPPLYUPDATES on: nodeName={}, coreNodeName={}, state={}", nodeName, coreNodeName,
+                stateRep.name());
 
-          ocmh.sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
-        }
+            ModifiableSolrParams params = new ModifiableSolrParams();
+            params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
+            params.set(CoreAdminParams.NAME, coreNodeName);
 
-        ocmh.processResponses(new NamedList(), shardHandler, true, "REQUESTAPPLYUPDATES calls did not succeed", asyncId, requestMap);
+            shardRequestTracker.sendShardRequest(nodeName, params, shardHandler);
+          }
 
+          shardRequestTracker.processResponses(new NamedList(), shardHandler, true,
+              "REQUESTAPPLYUPDATES calls did not succeed");
+        }
       }
 
       //Mark all shards in ACTIVE STATE
@@ -360,7 +366,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
         ocmh.overseer.offerStateUpdate((Utils.toJSON(new ZkNodeProps(propMap))));
       }
 
-        if (totalReplicasPerShard > 1) {
+      if (totalReplicasPerShard > 1) {
         log.info("Adding replicas to restored collection={}", restoreCollection.getName());
         for (Slice slice : restoreCollection.getSlices()) {
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 5db9494..2c117b5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -37,6 +37,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
 import org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -90,12 +91,13 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
     this.ocmh = ocmh;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    split(state, message, results);
+    split(state, message,(NamedList<Object>) results);
   }
 
-  public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+  public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList<Object> results) throws Exception {
     boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
     String methodStr = message.getStr(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
     SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
@@ -229,7 +231,6 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
 
       final String asyncId = message.getStr(ASYNC);
-      Map<String, String> requestMap = new HashMap<>();
       String nodeName = parentShardLeader.getNodeName();
 
       t = timings.sub("createSubSlicesAndLeadersInState");
@@ -281,32 +282,38 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null);
       }
 
+      @SuppressWarnings("deprecation")
       ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
-
-      String msgOnError = "SPLITSHARD failed to create subshard leaders";
-      ocmh.processResponses(results, shardHandler, true, msgOnError, asyncId, requestMap);
-      handleFailureOnAsyncRequest(results, msgOnError);
+      {
+        final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker();
+        String msgOnError = "SPLITSHARD failed to create subshard leaders";
+        syncRequestTracker.processResponses(results, shardHandler, true, msgOnError);
+        handleFailureOnAsyncRequest(results, msgOnError);
+      }
       t.stop();
       t = timings.sub("waitForSubSliceLeadersAlive");
-      for (String subShardName : subShardNames) {
-        // wait for parent leader to acknowledge the sub-shard core
-        log.debug("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
-        String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
-        CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
-        cmd.setCoreName(subShardName);
-        cmd.setNodeName(nodeName);
-        cmd.setCoreNodeName(coreNodeName);
-        cmd.setState(Replica.State.ACTIVE);
-        cmd.setCheckLive(true);
-        cmd.setOnlyIfLeader(true);
-
-        ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
-        ocmh.sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
-      }
+      {
+        final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
+        for (String subShardName : subShardNames) {
+          // wait for parent leader to acknowledge the sub-shard core
+          log.debug("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
+          String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
+          CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+          cmd.setCoreName(subShardName);
+          cmd.setNodeName(nodeName);
+          cmd.setCoreNodeName(coreNodeName);
+          cmd.setState(Replica.State.ACTIVE);
+          cmd.setCheckLive(true);
+          cmd.setOnlyIfLeader(true);
+
+          ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+          shardRequestTracker.sendShardRequest(nodeName, p, shardHandler);
+        }
 
-      msgOnError = "SPLITSHARD timed out waiting for subshard leaders to come up";
-      ocmh.processResponses(results, shardHandler, true, msgOnError, asyncId, requestMap);
-      handleFailureOnAsyncRequest(results, msgOnError);
+        String msgOnError = "SPLITSHARD timed out waiting for subshard leaders to come up";
+        shardRequestTracker.processResponses(results, shardHandler, true, msgOnError);
+        handleFailureOnAsyncRequest(results, msgOnError);
+      }
       t.stop();
 
       log.debug("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
@@ -326,34 +333,39 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       params.set(CoreAdminParams.RANGES, rangesStr);
 
       t = timings.sub("splitParentCore");
+      {
+        final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
+        shardRequestTracker.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler);
 
-      ocmh.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
-      msgOnError = "SPLITSHARD failed to invoke SPLIT core admin command";
-      ocmh.processResponses(results, shardHandler, true, msgOnError, asyncId,
-          requestMap);
-      handleFailureOnAsyncRequest(results, msgOnError);
+        String msgOnError = "SPLITSHARD failed to invoke SPLIT core admin command";
+        shardRequestTracker.processResponses(results, shardHandler, true, msgOnError);
+        handleFailureOnAsyncRequest(results, msgOnError);
+      }
       t.stop();
 
       log.debug("Index on shard: {} split into {} successfully", nodeName, subShardNames.size());
 
       t = timings.sub("applyBufferedUpdates");
       // apply buffered updates on sub-shards
-      for (int i = 0; i < subShardNames.size(); i++) {
-        String subShardName = subShardNames.get(i);
+      {
+        final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
 
-        log.debug("Applying buffered updates on : " + subShardName);
+        for (int i = 0; i < subShardNames.size(); i++) {
+          String subShardName = subShardNames.get(i);
 
-        params = new ModifiableSolrParams();
-        params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
-        params.set(CoreAdminParams.NAME, subShardName);
+          log.debug("Applying buffered updates on : " + subShardName);
 
-        ocmh.sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
-      }
+          params = new ModifiableSolrParams();
+          params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
+          params.set(CoreAdminParams.NAME, subShardName);
 
-      msgOnError = "SPLITSHARD failed while asking sub shard leaders to apply buffered updates";
-      ocmh.processResponses(results, shardHandler, true, msgOnError, asyncId, requestMap);
-      handleFailureOnAsyncRequest(results, msgOnError);
+          shardRequestTracker.sendShardRequest(nodeName, params, shardHandler);
+        }
+
+        String msgOnError = "SPLITSHARD failed while asking sub shard leaders to apply buffered updates";
+        shardRequestTracker.processResponses(results, shardHandler, true, msgOnError);
+        handleFailureOnAsyncRequest(results, msgOnError);
+      }
       t.stop();
 
       log.debug("Successfully applied buffered updates on : " + subShardNames);
@@ -508,9 +520,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
       assert TestInjection.injectSplitFailureAfterReplicaCreation();
 
-      msgOnError = "SPLITSHARD failed to create subshard replicas";
-      ocmh.processResponses(results, shardHandler, true, msgOnError, asyncId, requestMap);
-      handleFailureOnAsyncRequest(results, msgOnError);
+      {
+        final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker();
+        String msgOnError = "SPLITSHARD failed to create subshard replicas";
+        syncRequestTracker.processResponses(results, shardHandler, true, msgOnError);
+        handleFailureOnAsyncRequest(results, msgOnError);
+      }
       t.stop();
 
       log.info("Successfully created all replica shards for all sub-slices " + subSlices);
@@ -653,7 +668,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       props.put(SHARD_ID_PROP, subSlice);
       ZkNodeProps m = new ZkNodeProps(props);
       try {
-        ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList());
+        ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList<Object>());
       } catch (Exception e) {
         log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e);
       }
diff --git a/solr/core/src/test/org/apache/solr/cloud/AsyncCallRequestStatusResponseTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java
similarity index 51%
rename from solr/core/src/test/org/apache/solr/cloud/AsyncCallRequestStatusResponseTest.java
rename to solr/core/src/test/org/apache/solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java
index 31159b6..d00dd67 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AsyncCallRequestStatusResponseTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java
@@ -14,32 +14,51 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.solr.cloud;
+package org.apache.solr.cloud.api.collections;
 
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.RequestStatusState;
-import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
+import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.util.NamedList;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class AsyncCallRequestStatusResponseTest extends SolrCloudTestCase {
 
+  private static boolean oldResponseEntries;
+
+  @SuppressWarnings("deprecation")
   @BeforeClass
   public static void setupCluster() throws Exception {
+    oldResponseEntries = OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE;
+    OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE = random().nextBoolean();
     configureCluster(2)
         .addConfig("conf", configset("cloud-minimal"))
         .configure();
   }
+  
+  @SuppressWarnings("deprecation")
+  @AfterClass
+  public static void restoreFlag() throws Exception {
+    OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE = oldResponseEntries; 
+  }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void testAsyncCallStatusResponse() throws Exception {
-
+    int numShards = 4;
+    int numReplicas = 1;
+    Create createCollection = CollectionAdminRequest.createCollection("asynccall", "conf", numShards, numReplicas);
+    createCollection.setMaxShardsPerNode(100);
     String asyncId =
-        CollectionAdminRequest.createCollection("asynccall", "conf", 2, 1).processAsync(cluster.getSolrClient());
+        createCollection.processAsync(cluster.getSolrClient());
 
-    waitForState("Expected collection 'asynccall' to have 2 shards and 1 replica", "asynccall", clusterShape(2, 2));
+    waitForState("Expected collection 'asynccall' to have "+numShards+" shards and "+
+        numShards*numReplicas+" replica", "asynccall", clusterShape(numShards, numShards*numReplicas));
 
     RequestStatusState state = AbstractFullDistribZkTestBase.getRequestStateAfterCompletion(asyncId, 30, cluster.getSolrClient());
     assertEquals("Unexpected request status: " + state, "completed", state.getKey());
@@ -48,13 +67,25 @@ public class AsyncCallRequestStatusResponseTest extends SolrCloudTestCase {
     CollectionAdminResponse rsp = requestStatus.process(cluster.getSolrClient());
     NamedList<?> r = rsp.getResponse();
     if (OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE) {
-      assertEquals("Expected 5 elements in the response" + r, 5, r.size());
+      final int actualNumOfElems = 3+(numShards*numReplicas);
+      // responseHeader, success, status, + old responses per every replica  
+      assertEquals("Expected "+actualNumOfElems+" elements in the response" + r.jsonStr(),
+               actualNumOfElems, r.size());
     } else {
-      assertEquals("Expected 3 elements in the response" + r, 3, r.size());
+      // responseHeader, success, status
+      assertEquals("Expected 3 elements in the response" + r.jsonStr(), 3, r.size());
     }
     assertNotNull("Expected 'responseHeader' response" + r, r.get("responseHeader"));
-    assertNotNull("Expected 'success' response" + r, r.get("success"));
     assertNotNull("Expected 'status' response" + r, r.get("status"));
-    assertEquals("Expected 4 elements in the success element" + r.get("success"), 4, ((NamedList<?>)r.get("success")).size());
+    {
+      final NamedList<?> success = (NamedList<?>)r.get("success");
+      assertNotNull("Expected 'success' response" + r, success);
+    
+      final int actualSuccessElems = 2*(numShards*numReplicas);
+      // every replica responds once on submit and once on complete
+      assertEquals("Expected "+actualSuccessElems+
+        " elements in the success element" + success.jsonStr(), 
+          actualSuccessElems, success.size());
+    }
   }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java
index 3d32d6c..a0fa70c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java
@@ -17,8 +17,9 @@
 package org.apache.solr.cloud.api.collections;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
 
-import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
@@ -44,8 +45,10 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
 
     params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
     params.set("name", "collection2");
-    params.set("numShards", 2);
-    params.set("replicationFactor", 1);
+    int numShards = 2;
+    params.set("numShards", numShards);
+    int replicationFactor = 1;
+    params.set("replicationFactor", replicationFactor);
     params.set("maxShardsPerNode", 100);
     params.set("collection.configName", "conf1");
     params.set(CommonAdminParams.ASYNC, "1000");
@@ -66,14 +69,18 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
     params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
     params.set(OverseerCollectionMessageHandler.REQUESTID, "1000");
 
+    NamedList<Object> createResponse =null;
     try {
-      message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
+      createResponse = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
+      message = (String) createResponse.findRecursive("status","msg");
     } catch (SolrServerException | IOException e) {
       e.printStackTrace();
     }
 
-    assertEquals("found [1000] in completed tasks", message);
-
+    assertEquals("found [1000] in completed tasks", message); 
+    assertEquals("expecting "+numShards+" shard responses at "+createResponse,
+        numShards, numResponsesCompleted(createResponse));
+    
     // Check for a random (hopefully non-existent request id
     params = new ModifiableSolrParams();
     params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.REQUESTSTATUS.toString());
@@ -103,13 +110,18 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
     params = new ModifiableSolrParams();
     params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
     params.set(OverseerCollectionMessageHandler.REQUESTID, "1001");
+    NamedList<Object> splitResponse=null;
     try {
-      message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
+      splitResponse = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
+      message = (String) splitResponse.findRecursive("status","msg");
     } catch (SolrServerException | IOException e) {
       e.printStackTrace();
     }
 
     assertEquals("found [1001] in completed tasks", message);
+    // create * 2 + preprecovery *2 + split + req_apply_upd * 2 =7 
+    assertEquals("expecting "+(2+2+1+2)+" shard responses at "+splitResponse,
+        (2+2+1+2), numResponsesCompleted(splitResponse));
 
     params = new ModifiableSolrParams();
     params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
@@ -131,12 +143,12 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
     params.set(OverseerCollectionMessageHandler.REQUESTID, "1002");
 
     try {
-      message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
+      NamedList<Object> response = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
+      message = (String) response.findRecursive("status","msg");
     } catch (SolrServerException | IOException e) {
       e.printStackTrace();
     }
 
-
     assertEquals("found [1002] in failed tasks", message);
 
     params = new ModifiableSolrParams();
@@ -156,21 +168,38 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
     assertEquals("Task with the same requestid already exists.", r.get("error"));
   }
 
+  @SuppressWarnings("unchecked")
+  private int numResponsesCompleted(NamedList<Object> response) {
+    int sum=0;
+    for (String key: Arrays.asList("success","failure")) {
+      NamedList<Object> allStatuses = (NamedList<Object>)response.get(key);
+      if (allStatuses!=null) {
+        for (Map.Entry<String, Object> tuple: allStatuses) {
+          NamedList<Object> statusResponse = (NamedList<Object>) tuple.getValue();
+          if (statusResponse.indexOf("STATUS",0)>=0) {
+            sum+=1;
+          }
+        }
+      }
+    }
+    return sum;
+  }
+
   /**
    * Helper method to send a status request with specific retry limit and return
    * the message/null from the success response.
    */
-  private String sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter)
+  private NamedList<Object> sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter)
       throws SolrServerException, IOException{
-    String message = null;
+    NamedList<Object> r = null;
     while (maxCounter-- > 0) {
-      final NamedList r = sendRequest(params);
-      final NamedList status = (NamedList) r.get("status");
+      r = sendRequest(params);
+      @SuppressWarnings("unchecked")
+      final NamedList<Object> status = (NamedList<Object>) r.get("status");
       final RequestStatusState state = RequestStatusState.fromKey((String) status.get("state"));
-      message = (String) status.get("msg");
 
       if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED) {
-        return message;
+        return r;
       }
 
       try {
@@ -180,11 +209,11 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
 
     }
     // Return last state?
-    return message;
+    return r;
   }
 
-  protected NamedList sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException {
-    SolrRequest request = new QueryRequest(params);
+  protected NamedList<Object> sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException {
+    QueryRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
 
     String baseUrl = ((HttpSolrClient) shardToJetty.get(SHARD1).get(0).client.getSolrClient()).getBaseURL();
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index e8a0862..8fa8f94 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -2317,7 +2317,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
     }
   }
 
-  static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client)
+  public static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client)
       throws IOException, SolrServerException {
     RequestStatusState state = null;
     final TimeOut timeout = new TimeOut(waitForSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);


Mime
View raw message