lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject svn commit: r1725474 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/client/solrj/request/
Date Tue, 19 Jan 2016 11:32:15 GMT
Author: varun
Date: Tue Jan 19 11:32:14 2016
New Revision: 1725474

URL: http://svn.apache.org/viewvc?rev=1725474&view=rev
Log:
SOLR-8534: Add generic support for collection APIs to be async

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1725474&r1=1725473&r2=1725474&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Tue Jan 19 11:32:14 2016
@@ -318,6 +318,10 @@ New Features
 * SOLR-8312: Add domain size and numBuckets to facet telemetry info (facet debug info
   for the new Facet Module).  (Michael Sun, yonik)
 
+* SOLR-8534: Add generic support for collection APIs to be async. Thus more actions benefit from having async
+  support. The commands that additionally get async support are: delete/reload collection, create/delete alias,
+  create/delete shard, delete replica, add/delete replica property, add/remove role,
+  overseer status, balance shard unique, rebalance leaders, modify collection, migrate state format (Varun Thacker)
 
 Bug Fixes
 ----------------------

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1725474&r1=1725473&r2=1725474&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Tue Jan 19 11:32:14 2016
@@ -224,9 +224,7 @@ public class OverseerCollectionMessageHa
           deleteCollection(message, results);
           break;
         case RELOAD:
-          ModifiableSolrParams params = new ModifiableSolrParams();
-          params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
-          collectionCmd(zkStateReader.getClusterState(), message, params, results, Replica.State.ACTIVE);
+          reloadCollection(message, results);
           break;
         case CREATEALIAS:
           createAlias(zkStateReader.getAliases(), message);
@@ -303,6 +301,18 @@ public class OverseerCollectionMessageHa
     return new OverseerSolrResponse(results);
   }
 
+  private void reloadCollection(ZkNodeProps message, NamedList results) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
+
+    String asyncId = message.getStr(ASYNC);
+    Map<String, String> requestMap = null;
+    if (asyncId != null) {
+      requestMap = new HashMap<>();
+    }
+    collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId, requestMap);
+  }
+
   @SuppressWarnings("unchecked")
   private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
@@ -670,7 +680,6 @@ public class OverseerCollectionMessageHa
     
     DocCollection coll = clusterState.getCollection(collectionName);
     Slice slice = coll.getSlice(shard);
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     if (slice == null) {
       throw new SolrException(ErrorCode.BAD_REQUEST,
           "Invalid shard name : " + shard + " in collection : " + collectionName);
@@ -691,36 +700,34 @@ public class OverseerCollectionMessageHa
           "Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName
               + " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
     }
-    
-    String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-    
-    // assume the core exists and try to unload it
-    Map m = makeMap("qt", adminPath, CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE,
-        core, CoreAdminParams.DELETE_INSTANCE_DIR, "true", CoreAdminParams.DELETE_DATA_DIR, "true");
-        
-    ShardRequest sreq = new ShardRequest();
-    sreq.purpose = 1;
-    sreq.shards = new String[] {baseUrl};
-    sreq.actualShards = sreq.shards;
-    sreq.params = new ModifiableSolrParams(new MapSolrParams(m));
-    try {
-      shardHandler.submit(sreq, baseUrl, sreq.params);
-    } catch (Exception e) {
-      log.warn("Exception trying to unload core " + sreq, e);
+    String asyncId = message.getStr(ASYNC);
+    Map<String, String> requestMap = null;
+    if (asyncId != null) {
+      requestMap = new HashMap<>(1, 1.0f);
     }
-    
-    collectShardResponses(replica.getState() != Replica.State.ACTIVE ? new NamedList() : results, false, null,
-        shardHandler);
-        
-    if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;// check if the core unload removed the
-                                                                              // corenode zk enry
-    deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.add(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+    params.add(CoreAdminParams.CORE, core);
+    params.add(CoreAdminParams.DELETE_INSTANCE_DIR, "true");
+    params.add(CoreAdminParams.DELETE_DATA_DIR, "true");
+
+    sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+    processResponses(results, shardHandler, false, null, asyncId, requestMap);
+
+    //check if the core unload removed the corenode zk entry
+    if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;
+
+    // try and ensure core info is removed from cluster state
+    deleteCoreNode(collectionName, replicaName, replica, core);
     if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return;
     
     throw new SolrException(ErrorCode.SERVER_ERROR,
         "Could not  remove replica : " + collectionName + "/" + shard + "/" + replicaName);
-        
   }
 
   private boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
@@ -760,21 +767,23 @@ public class OverseerCollectionMessageHa
 
   }
 
-  private void deleteCollection(ZkNodeProps message, NamedList results)
-      throws KeeperException, InterruptedException {
+  private void deleteCollection(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
     final String collection = message.getStr(NAME);
     try {
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
       params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
       params.set(CoreAdminParams.DELETE_DATA_DIR, true);
-      collectionCmd(zkStateReader.getClusterState(), message, params, results,
-          null);
 
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-          DELETE.toLower(), NAME, collection);
-      Overseer.getInQueue(zkStateReader.getZkClient()).offer(
-          Utils.toJSON(m));
+      String asyncId = message.getStr(ASYNC);
+      Map<String, String> requestMap = null;
+      if (asyncId != null) {
+        requestMap = new HashMap<>();
+      }
+      collectionCmd(message, params, results, null, asyncId, requestMap);
+
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
+      Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
 
       // wait for a while until we don't see the collection
       TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
@@ -877,7 +886,7 @@ public class OverseerCollectionMessageHa
 
     TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
     boolean success = false;
-    Aliases aliases = null;
+    Aliases aliases;
     while (! timeout.hasTimedOut()) {
       aliases = zkStateReader.getAliases();
       String collections = aliases.getCollectionAlias(name);
@@ -942,7 +951,7 @@ public class OverseerCollectionMessageHa
       throws KeeperException, InterruptedException {
     String collectionName = message.getStr(COLLECTION_PROP);
     String sliceName = message.getStr(SHARD_ID_PROP);
-    
+
     log.info("Create shard invoked: {}", message);
     if (collectionName == null || sliceName == null)
       throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
@@ -968,6 +977,13 @@ public class OverseerCollectionMessageHa
       throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME));
       
     String configName = message.getStr(COLL_CONF);
+
+    String async = message.getStr(ASYNC);
+    Map<String, String> requestMap = null;
+    if (async != null) {
+      requestMap = new HashMap<>(repFactor, 1.0f);
+    }
+
     for (int j = 1; j <= repFactor; j++) {
       String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
       String shardName = collectionName + "_" + sliceName + "_replica" + j;
@@ -977,27 +993,17 @@ public class OverseerCollectionMessageHa
       // Need to create new params for each request
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
-      
       params.set(CoreAdminParams.NAME, shardName);
       params.set(COLL_CONF, configName);
       params.set(CoreAdminParams.COLLECTION, collectionName);
       params.set(CoreAdminParams.SHARD, sliceName);
       params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
       addPropertyParams(message, params);
-      
-      ShardRequest sreq = new ShardRequest();
-      params.set("qt", adminPath);
-      sreq.purpose = 1;
-      String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
-      sreq.shards = new String[] {replica};
-      sreq.actualShards = sreq.shards;
-      sreq.params = params;
-      
-      shardHandler.submit(sreq, replica, sreq.params);
-      
+
+      sendShardRequest(nodeName, params, shardHandler, async, requestMap);
     }
     
-    processResponses(results, shardHandler);
+    processResponses(results, shardHandler, true, "Failed to create shard", async, requestMap);
     
     log.info("Finished create command on all shards for collection: " + collectionName);
     
@@ -1016,7 +1022,7 @@ public class OverseerCollectionMessageHa
     DocCollection collection = clusterState.getCollection(collectionName);
     DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
     
-    Slice parentSlice = null;
+    Slice parentSlice;
     
     if (slice == null) {
       if (router instanceof CompositeIdRouter) {
@@ -1157,13 +1163,8 @@ public class OverseerCollectionMessageHa
         }
       }
       
-      // do not abort splitshard if the unloading fails
-      // this can happen because the replicas created previously may be down
-      // the only side effect of this is that the sub shard may end up having more replicas than we want
-      collectShardResponses(results, false, null, shardHandler);
-      
       final String asyncId = message.getStr(ASYNC);
-      HashMap<String,String> requestMap = new HashMap<>();
+      Map<String,String> requestMap = new HashMap<>();
       
       for (int i = 0; i < subRanges.size(); i++) {
         String subSlice = subSlices.get(i);
@@ -1208,10 +1209,8 @@ public class OverseerCollectionMessageHa
         }
         addReplica(clusterState, new ZkNodeProps(propMap), results);
       }
-      
-      collectShardResponses(results, true, "SPLITSHARD failed to create subshard leaders", shardHandler);
-      
-      completeAsyncRequest(asyncId, requestMap, results);
+
+      processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
       
       for (String subShardName : subShardNames) {
         // wait for parent leader to acknowledge the sub-shard core
@@ -1228,11 +1227,9 @@ public class OverseerCollectionMessageHa
         ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
         sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
       }
-      
-      collectShardResponses(results, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
-          shardHandler);
-          
-      completeAsyncRequest(asyncId, requestMap, results);
+
+      processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
+          asyncId, requestMap);
       
       log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
           + " on: " + parentShardLeader);
@@ -1250,9 +1247,9 @@ public class OverseerCollectionMessageHa
       params.set(CoreAdminParams.RANGES, rangesStr);
       
       sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-      
-      collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command", shardHandler);
-      completeAsyncRequest(asyncId, requestMap, results);
+
+      processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
+          requestMap);
       
       log.info("Index on shard: " + nodeName + " split into two successfully");
       
@@ -1268,11 +1265,9 @@ public class OverseerCollectionMessageHa
         
         sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
       }
-      
-      collectShardResponses(results, true, "SPLITSHARD failed while asking sub shard leaders to apply buffered updates",
-          shardHandler);
-          
-      completeAsyncRequest(asyncId, requestMap, results);
+
+      processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" +
+          " to apply buffered updates", asyncId, requestMap);
       
       log.info("Successfully applied buffered updates on : " + subShardNames);
       
@@ -1380,11 +1375,8 @@ public class OverseerCollectionMessageHa
       for (Map<String, Object> replica : replicas) {
         addReplica(clusterState, new ZkNodeProps(replica), results);
       }
-      
-      collectShardResponses(results, true,
-          "SPLITSHARD failed to create subshard replicas", shardHandler);
-          
-      completeAsyncRequest(asyncId, requestMap, results);
+
+      processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
       
       log.info("Successfully created all replica shards for all sub-slices " + subSlices);
       
@@ -1481,26 +1473,6 @@ public class OverseerCollectionMessageHa
     );
   }
 
-  private void collectShardResponses(NamedList results, boolean abortOnError,
-                                     String msgOnError,
-                                     ShardHandler shardHandler) {
-    ShardResponse srsp;
-    do {
-      srsp = shardHandler.takeCompletedOrError();
-      if (srsp != null) {
-        processResponse(results, srsp);
-        Throwable exception = srsp.getException();
-        if (abortOnError && exception != null)  {
-          // drain pending requests
-          while (srsp != null)  {
-            srsp = shardHandler.takeCompletedOrError();
-          }
-          throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
-        }
-      }
-    } while (srsp != null);
-  }
-
   private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
     String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
     String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
@@ -1525,15 +1497,21 @@ public class OverseerCollectionMessageHa
           + ". Only non-active (or custom-hashed) slices can be deleted.");
     }
     ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+    String asyncId = message.getStr(ASYNC);
+    Map<String, String> requestMap = null;
+    if (asyncId != null) {
+      requestMap = new HashMap<>(slice.getReplicas().size(), 1.0f);
+    }
     
     try {
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
       params.set(CoreAdminParams.DELETE_INDEX, "true");
-      sliceCmd(clusterState, params, null, slice, shardHandler);
-      
-      processResponses(results, shardHandler);
-      
+      sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
+
+      processResponses(results, shardHandler, true, "Failed to delete shard", asyncId, requestMap);
+
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
           collection, ZkStateReader.SHARD_ID_PROP, sliceId);
       Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
@@ -1646,21 +1624,17 @@ public class OverseerCollectionMessageHa
 
     Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
     // For tracking async calls.
-    HashMap<String, String> requestMap = new HashMap<String, String>();
+    Map<String, String> requestMap = new HashMap<>();
 
     log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
         + targetLeader.getStr("core") + " to buffer updates");
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString());
     params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
-    String nodeName = targetLeader.getNodeName();
 
     sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
 
-    collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates",
-        shardHandler);
-
-    completeAsyncRequest(asyncId, requestMap, results);
+    processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap);
 
     ZkNodeProps m = new ZkNodeProps(
         Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
@@ -1671,8 +1645,7 @@ public class OverseerCollectionMessageHa
         "targetCollection", targetCollection.getName(),
         "expireAt", RoutingRule.makeExpiryAt(timeout));
     log.info("Adding routing rule: " + m);
-    Overseer.getInQueue(zkStateReader.getZkClient()).offer(
-        Utils.toJSON(m));
+    Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
 
     // wait for a while until we see the new rule
     log.info("Waiting to see routing rule updated in clusterstate");
@@ -1707,7 +1680,7 @@ public class OverseerCollectionMessageHa
         NUM_SLICES, 1,
         COLL_CONF, configName,
         CREATE_NODE_SET, sourceLeader.getNodeName());
-    if(asyncId != null) {
+    if (asyncId != null) {
       String internalAsyncId = asyncId + Math.abs(System.nanoTime());
       props.put(ASYNC, internalAsyncId);
     }
@@ -1734,9 +1707,8 @@ public class OverseerCollectionMessageHa
     // we don't want this to happen asynchronously
     sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
 
-    collectShardResponses(results, true,
-        "MIGRATE failed to create temp collection leader or timed out waiting for it to come up",
-        shardHandler);
+    processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" +
+        " or timed out waiting for it to come up", asyncId, requestMap);
 
     log.info("Asking source leader to split index");
     params = new ModifiableSolrParams();
@@ -1749,8 +1721,7 @@ public class OverseerCollectionMessageHa
     String tempNodeName = sourceLeader.getNodeName();
 
     sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
-    collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command", shardHandler);
-    completeAsyncRequest(asyncId, requestMap, results);
+    processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap);
 
     log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
         tempSourceCollectionName, targetLeader.getNodeName());
@@ -1773,11 +1744,8 @@ public class OverseerCollectionMessageHa
     }
     addReplica(clusterState, new ZkNodeProps(props), results);
 
-    collectShardResponses(results, true,
-        "MIGRATE failed to create replica of temporary collection in target leader node.",
-        shardHandler);
-
-    completeAsyncRequest(asyncId, requestMap, results);
+    processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
+        "temporary collection in target leader node.", asyncId, requestMap);
 
     coreNodeName = waitForCoreNodeName(tempSourceCollectionName,
         targetLeader.getNodeName(), tempCollectionReplica2);
@@ -1794,11 +1762,9 @@ public class OverseerCollectionMessageHa
 
     sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
 
-    collectShardResponses(results, true,
-        "MIGRATE failed to create temp collection replica or timed out waiting for them to come up",
-        shardHandler);
+    processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
+        " replica or timed out waiting for them to come up", asyncId, requestMap);
 
-    completeAsyncRequest(asyncId, requestMap, results);
     log.info("Successfully created replica of temp source collection on target leader node");
 
     log.info("Requesting merge of temp source collection replica to target leader");
@@ -1808,12 +1774,9 @@ public class OverseerCollectionMessageHa
     params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
 
     sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-    collectShardResponses(results, true,
-        "MIGRATE failed to merge " + tempCollectionReplica2 +
-            " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName(),
-        shardHandler);
-
-    completeAsyncRequest(asyncId, requestMap, results);
+    String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
+        + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
+    processResponses(results, shardHandler, true, msg, asyncId, requestMap);
 
     log.info("Asking target leader to apply buffered updates");
     params = new ModifiableSolrParams();
@@ -1821,11 +1784,8 @@ public class OverseerCollectionMessageHa
     params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
 
     sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-    collectShardResponses(results, true,
-        "MIGRATE failed to request node to apply buffered updates",
-        shardHandler);
-
-    completeAsyncRequest(asyncId, requestMap, results);
+    processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates",
+        asyncId, requestMap);
 
     try {
       log.info("Deleting temporary collection: " + tempSourceCollectionName);
@@ -1839,13 +1799,6 @@ public class OverseerCollectionMessageHa
     }
   }
 
-  private void completeAsyncRequest(String asyncId, HashMap<String, String> requestMap, NamedList results) {
-    if(asyncId != null) {
-      waitForAsyncCallsToComplete(requestMap, results);
-      requestMap.clear();
-    }
-  }
-
   private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
     if (a == null || b == null || !a.overlaps(b)) {
       return null;
@@ -1867,11 +1820,9 @@ public class OverseerCollectionMessageHa
 
   }
 
-  public static void sendShardRequest(String nodeName, ModifiableSolrParams params,
-                                       ShardHandler shardHandler, String asyncId,
-                                       Map<String, String> requestMap,
-                                       String adminPath, ZkStateReader zkStateReader
-  ) {
+  public static void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
+                                      String asyncId, Map<String, String> requestMap, String adminPath,
+                                      ZkStateReader zkStateReader) {
     if (asyncId != null) {
       String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
       params.set(ASYNC, coreAdminAsyncId);
@@ -2029,7 +1980,7 @@ public class OverseerCollectionMessageHa
       }
 
       // For tracking async calls.
-      HashMap<String, String> requestMap = new HashMap<String, String>();
+      Map<String, String> requestMap = new HashMap<>();
 
 
       log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
@@ -2098,11 +2049,9 @@ public class OverseerCollectionMessageHa
         }
       }
 
-      processResponses(results, shardHandler);
-
-      completeAsyncRequest(async, requestMap, results);
+      processResponses(results, shardHandler, false, null, async, requestMap);
 
-      log.info("Finished create command on all shards for collection: "
+      log.debug("Finished create command on all shards for collection: "
           + collectionName);
 
     } catch (SolrException ex) {
@@ -2263,22 +2212,37 @@ public class OverseerCollectionMessageHa
     addPropertyParams(message, params);
     
     // For tracking async calls.
-    HashMap<String,String> requestMap = new HashMap<>();
+    Map<String,String> requestMap = new HashMap<>();
     sendShardRequest(node, params, shardHandler, asyncId, requestMap);
-    
-    collectShardResponses(results, true, "ADDREPLICA failed to create replica", shardHandler);
-    
-    completeAsyncRequest(asyncId, requestMap, results);
+
+    processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
   }
 
-  private void processResponses(NamedList results, ShardHandler shardHandler) {
+
+  private void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
+                                String asyncId, Map<String, String> requestMap) {
+    //Processes all shard responses
     ShardResponse srsp;
     do {
       srsp = shardHandler.takeCompletedOrError();
       if (srsp != null) {
         processResponse(results, srsp);
+        Throwable exception = srsp.getException();
+        if (abortOnError && exception != null)  {
+          // drain pending requests
+          while (srsp != null)  {
+            srsp = shardHandler.takeCompletedOrError();
+          }
+          throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
+        }
       }
     } while (srsp != null);
+
+    //If request is async wait for the core admin to complete before returning
+    if (asyncId != null) {
+      waitForAsyncCallsToComplete(requestMap, results);
+      requestMap.clear();
+    }
   }
 
   private String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
@@ -2332,47 +2296,36 @@ public class OverseerCollectionMessageHa
 
   }
 
-  private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, Replica.State stateMatcher) {
+  private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+                             NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) {
     log.info("Executing Collection Cmd : " + params);
     String collectionName = message.getStr(NAME);
     ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-    
+
+    ClusterState clusterState = zkStateReader.getClusterState();
     DocCollection coll = clusterState.getCollection(collectionName);
     
-    for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
-      Slice slice = entry.getValue();
-      sliceCmd(clusterState, params, stateMatcher, slice, shardHandler);
+    for (Slice slice : coll.getSlices()) {
+      sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap);
     }
 
-    processResponses(results, shardHandler);
+    processResponses(results, shardHandler, false, null, asyncId, requestMap);
 
   }
 
   private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
-                        Slice slice, ShardHandler shardHandler) {
-    Map<String,Replica> shards = slice.getReplicasMap();
-    Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
-    for (Map.Entry<String,Replica> shardEntry : shardEntries) {
-      final ZkNodeProps node = shardEntry.getValue();
-      if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) 
-          && (stateMatcher == null || Replica.State.getState(node.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
+                        Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
+
+    for (Replica replica : slice.getReplicas()) {
+      if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))
+          && (stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
+
         // For thread safety, only simple clone the ModifiableSolrParams
         ModifiableSolrParams cloneParams = new ModifiableSolrParams();
         cloneParams.add(params);
-        cloneParams.set(CoreAdminParams.CORE, node.getStr(ZkStateReader.CORE_NAME_PROP));
+        cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
 
-        String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
-        ShardRequest sreq = new ShardRequest();
-        sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
-        // yes, they must use same admin handler path everywhere...
-        cloneParams.set("qt", adminPath);
-        sreq.purpose = 1;
-        sreq.shards = new String[] {replica};
-        sreq.actualShards = sreq.shards;
-        sreq.params = cloneParams;
-        log.info("Collection Admin sending CoreAdmin cmd to " + replica
-            + " params:" + sreq.params);
-        shardHandler.submit(sreq, replica, sreq.params);
+        sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
       }
     }
   }
@@ -2413,7 +2366,7 @@ public class OverseerCollectionMessageHa
 
   @SuppressWarnings("unchecked")
   private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
-    for(String k:requestMap.keySet()) {
+    for (String k:requestMap.keySet()) {
       log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
       results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
     }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1725474&r1=1725473&r2=1725474&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Tue Jan 19 11:32:14 2016
@@ -72,7 +72,6 @@ import org.apache.solr.core.CoreContaine
 import org.apache.solr.handler.BlobHandler;
 import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.zookeeper.CreateMode;
@@ -169,23 +168,26 @@ public class CollectionsHandler extends
     String a = params.get(CoreAdminParams.ACTION);
     if (a != null) {
       CollectionAction action = CollectionAction.get(a);
-      if (action == null)
+      if (action == null) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
+      }
       CollectionOperation operation = CollectionOperation.get(action);
       log.info("Invoked Collection Action :{} with params {} ", action.toLower(), req.getParamString());
-      Map<String, Object> result = operation.call(req, rsp, this);
-      if (result != null) {
-        result.put(QUEUE_OPERATION, operation.action.toLower());
-        ZkNodeProps props = new ZkNodeProps(result);
-        if (operation.sendToOCPQueue) handleResponse(operation.action.toLower(), props, rsp, operation.timeOut);
-        else Overseer.getInQueue(coreContainer.getZkController().getZkClient()).offer(Utils.toJSON(props));
 
+      Map<String, Object> props = operation.call(req, rsp, this);
+      String asyncId = req.getParams().get(ASYNC);
+      if (props != null) {
+        if (asyncId != null) {
+          props.put(ASYNC, asyncId);
+        }
+        props.put(QUEUE_OPERATION, operation.action.toLower());
+        ZkNodeProps zkProps = new ZkNodeProps(props);
+        if (operation.sendToOCPQueue) handleResponse(operation.action.toLower(), zkProps, rsp, operation.timeOut);
+        else Overseer.getInQueue(coreContainer.getZkController().getZkClient()).offer(Utils.toJSON(props));
       }
     } else {
       throw new SolrException(ErrorCode.BAD_REQUEST, "action is a required param");
-
     }
-
     rsp.setHttpCaching(false);
   }
 
@@ -315,7 +317,6 @@ public class CollectionsHandler extends
             MAX_SHARDS_PER_NODE,
             CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE,
             SHARDS_PROP,
-            ASYNC,
             STATE_FORMAT,
             AUTO_ADD_REPLICAS,
             RULE,
@@ -366,7 +367,6 @@ public class CollectionsHandler extends
       Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
           throws Exception {
         return req.getParams().required().getAll(null, NAME);
-
       }
     },
     SYNCSHARD_OP(SYNCSHARD) {
@@ -381,7 +381,6 @@ public class CollectionsHandler extends
         ZkNodeProps leaderProps = clusterState.getLeader(collection, shard);
         ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
 
-        ;
         try (HttpSolrClient client = new HttpSolrClient(nodeProps.getBaseUrl())) {
           client.setConnectionTimeout(15000);
           client.setSoTimeout(60000);
@@ -436,8 +435,7 @@ public class CollectionsHandler extends
             COLLECTION_PROP,
             SHARD_ID_PROP,
             "split.key",
-            CoreAdminParams.RANGES,
-            ASYNC);
+            CoreAdminParams.RANGES);
         return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
       }
     },
@@ -453,7 +451,6 @@ public class CollectionsHandler extends
       @Override
       Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
         forceLeaderElection(req, handler);
-
         return null;
       }
     },
@@ -468,7 +465,7 @@ public class CollectionsHandler extends
           throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections");
         req.getParams().getAll(map,
             REPLICATION_FACTOR,
-            CREATE_NODE_SET, ASYNC);
+            CREATE_NODE_SET);
         return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
       }
     },
@@ -479,14 +476,14 @@ public class CollectionsHandler extends
             COLLECTION_PROP,
             SHARD_ID_PROP,
             REPLICA_PROP);
-        return req.getParams().getAll(map, ASYNC, ONLY_IF_DOWN);
+        return req.getParams().getAll(map, ONLY_IF_DOWN);
       }
     },
     MIGRATE_OP(MIGRATE) {
       @Override
       Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
         Map<String, Object> map = req.getParams().required().getAll(null, COLLECTION_PROP, "split.key", "target.collection");
-        return req.getParams().getAll(map, "forward.timeout", ASYNC);
+        return req.getParams().getAll(map, "forward.timeout");
       }
     },
     ADDROLE_OP(ADDROLE) {
@@ -586,8 +583,7 @@ public class CollectionsHandler extends
             _ROUTE_,
             CoreAdminParams.NAME,
             INSTANCE_DIR,
-            DATA_DIR,
-            ASYNC);
+            DATA_DIR);
         return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
       }
     },
@@ -687,8 +683,7 @@ public class CollectionsHandler extends
           prop = COLL_PROP_PREFIX + prop;
         }
 
-        if (!shardUnique &&
-            !SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) {
+        if (!shardUnique && !SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) {
           throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
               + " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " +
               " Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique));

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java?rev=1725474&r1=1725473&r2=1725474&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java Tue Jan 19 11:32:14 2016
@@ -17,12 +17,22 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.TestUtil;
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.junit.Test;
 
 /**
@@ -83,4 +93,138 @@ public class CollectionsAPIAsyncDistribu
       assertEquals("Shard split did not complete. Last recorded state: " + state, "completed", state);
     }
   }
+
+  @Test
+  public void testAsyncRequests() throws Exception {
+    String collection = "testAsyncOperations";
+
+    Create createCollectionRequest = new Create()
+        .setCollectionName(collection)
+        .setNumShards(1)
+        .setRouterName("implicit")
+        .setShards("shard1")
+        .setConfigName("conf1")
+        .setAsyncId("42");
+    CollectionAdminResponse response = createCollectionRequest.process(cloudClient);
+    assertEquals("42", response.getResponse().get("requestid"));
+    String state = getRequestStateAfterCompletion("42", MAX_TIMEOUT_SECONDS, cloudClient);
+    assertEquals("CreateCollection task did not complete!", "completed", state);
+
+    //Add a few documents to shard1
+    int numDocs = TestUtil.nextInt(random(), 10, 100);
+    List<SolrInputDocument> docs = new ArrayList<>(numDocs);
+    for (int i=0; i<numDocs; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", i);
+      doc.addField("_route_", "shard1");
+      docs.add(doc);
+    }
+    cloudClient.add(collection, docs);
+    cloudClient.commit(collection);
+
+    SolrQuery query = new SolrQuery("*:*");
+    query.set("shards", "shard1");
+    assertEquals(numDocs, cloudClient.query(collection, query).getResults().getNumFound());
+
+    CollectionAdminRequest.Reload reloadCollection = new CollectionAdminRequest.Reload();
+    reloadCollection.setCollectionName(collection).setAsyncId("43");
+    response = reloadCollection.process(cloudClient);
+    assertEquals("43", response.getResponse().get("requestid"));
+    state = getRequestStateAfterCompletion("43", MAX_TIMEOUT_SECONDS, cloudClient);
+    assertEquals("ReloadCollection did not complete", "completed", state);
+
+    CollectionAdminRequest.CreateShard createShard = new CollectionAdminRequest.CreateShard()
+        .setCollectionName(collection)
+        .setShardName("shard2")
+        .setAsyncId("44");
+    response = createShard.process(cloudClient);
+    assertEquals("44", response.getResponse().get("requestid"));
+    state = getRequestStateAfterCompletion("44", MAX_TIMEOUT_SECONDS, cloudClient);
+    assertEquals("CreateShard did not complete", "completed", state);
+
+    //Add a doc to shard2 to make sure shard2 was created properly
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", numDocs + 1);
+    doc.addField("_route_", "shard2");
+    cloudClient.add(collection, doc);
+    cloudClient.commit(collection);
+    query = new SolrQuery("*:*");
+    query.set("shards", "shard2");
+    assertEquals(1, cloudClient.query(collection, query).getResults().getNumFound());
+
+    CollectionAdminRequest.DeleteShard deleteShard = new CollectionAdminRequest.DeleteShard()
+        .setCollectionName(collection)
+        .setShardName("shard2")
+        .setAsyncId("45");
+    response = deleteShard.process(cloudClient);
+    assertEquals("45", response.getResponse().get("requestid"));
+    state = getRequestStateAfterCompletion("45", MAX_TIMEOUT_SECONDS, cloudClient);
+    assertEquals("DeleteShard did not complete", "completed", state);
+
+    CollectionAdminRequest.AddReplica addReplica = new CollectionAdminRequest.AddReplica()
+        .setCollectionName(collection)
+        .setShardName("shard1")
+        .setAsyncId("46");
+    response = addReplica.process(cloudClient);
+    assertEquals("46", response.getResponse().get("requestid"));
+    state = getRequestStateAfterCompletion("46", MAX_TIMEOUT_SECONDS, cloudClient);
+    assertEquals("AddReplica did not complete", "completed", state);
+
+    //cloudClient watch might take a couple of seconds to reflect it
+    Slice shard1 = cloudClient.getZkStateReader().getClusterState().getSlice(collection, "shard1");
+    int count = 0;
+    while (shard1.getReplicas().size() != 2) {
+      if (count++ > 1000) {
+        fail("2nd Replica not reflecting in the cluster state");
+      }
+      Thread.sleep(100);
+    }
+
+    CollectionAdminRequest.CreateAlias createAlias = new CollectionAdminRequest.CreateAlias()
+        .setAliasName("myalias")
+        .setAliasedCollections(collection)
+        .setAsyncId("47");
+    response = createAlias.process(cloudClient);
+    assertEquals("47", response.getResponse().get("requestid"));
+    state = getRequestStateAfterCompletion("47", MAX_TIMEOUT_SECONDS, cloudClient);
+    assertEquals("CreateAlias did not complete", "completed", state);
+
+    query = new SolrQuery("*:*");
+    query.set("shards", "shard1");
+    assertEquals(numDocs, cloudClient.query("myalias", query).getResults().getNumFound());
+
+    CollectionAdminRequest.DeleteAlias deleteAlias = new CollectionAdminRequest.DeleteAlias()
+        .setAliasName("myalias")
+        .setAsyncId("48");
+    response = deleteAlias.process(cloudClient);
+    assertEquals("48", response.getResponse().get("requestid"));
+    state = getRequestStateAfterCompletion("48", MAX_TIMEOUT_SECONDS, cloudClient);
+    assertEquals("DeleteAlias did not complete", "completed", state);
+
+    try {
+      cloudClient.query("myalias", query);
+      fail("Alias should not exist");
+    } catch (SolrException e) {
+      //expected
+    }
+
+    String replica = shard1.getReplicas().iterator().next().getName();
+    CollectionAdminRequest.DeleteReplica deleteReplica = new CollectionAdminRequest.DeleteReplica()
+        .setCollectionName(collection)
+        .setShardName("shard1")
+        .setReplica(replica)
+        .setAsyncId("47");
+    response = deleteReplica.process(cloudClient);
+    assertEquals("47", response.getResponse().get("requestid"));
+    state = getRequestStateAfterCompletion("47", MAX_TIMEOUT_SECONDS, cloudClient);
+    assertEquals("DeleteReplica did not complete", "completed", state);
+
+    CollectionAdminRequest.Delete deleteCollection = new CollectionAdminRequest.Delete()
+        .setCollectionName(collection)
+        .setAsyncId("48");
+    response = deleteCollection.process(cloudClient);
+    assertEquals("48", response.getResponse().get("requestid"));
+    state = getRequestStateAfterCompletion("48", MAX_TIMEOUT_SECONDS, cloudClient);
+    assertEquals("DeleteCollection did not complete", "completed", state);
+  }
 }

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java?rev=1725474&r1=1725473&r2=1725474&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java Tue Jan 19 11:32:14 2016
@@ -45,6 +45,7 @@ import java.util.Properties;
 public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q>> extends SolrRequest<CollectionAdminResponse> {
 
   protected CollectionAction action = null;
+  protected String asyncId;
 
   private static String PROPERTY_PREFIX = "property.";
 
@@ -63,12 +64,24 @@ public abstract class CollectionAdminReq
 
   protected abstract Q getThis();
 
+  public Q setAsyncId(String asyncId) {
+    this.asyncId = asyncId;
+    return getThis();
+  }
+
+  public String getAsyncId() {
+    return asyncId;
+  }
+
   @Override
   public SolrParams getParams() {
     if (action == null) {
       throw new RuntimeException( "no action specified!" );
     }
     ModifiableSolrParams params = new ModifiableSolrParams();
+    if (asyncId != null) {
+      params.set(CommonAdminParams.ASYNC, asyncId);
+    }
     params.set(CoreAdminParams.ACTION, action.toString());
     return params;
   }
@@ -112,11 +125,9 @@ public abstract class CollectionAdminReq
     @Override
     public SolrParams getParams() {
       ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
-      params.set( CoreAdminParams.NAME, collection );
+      params.set(CoreAdminParams.NAME, collection);
       return params;
     }
-
-
   }
 
   protected abstract static class CollectionShardAdminRequest <T extends CollectionAdminRequest<T>> extends CollectionAdminRequest<T> {
@@ -141,16 +152,23 @@ public abstract class CollectionAdminReq
       return this.shardName;
     }
 
+    @Deprecated
     public ModifiableSolrParams getCommonParams() {
       ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
       params.set(CoreAdminParams.COLLECTION, collection);
       params.set(CoreAdminParams.SHARD, shardName);
+      if (asyncId != null) {
+        params.set(CommonAdminParams.ASYNC, asyncId);
+      }
       return params;
     }
 
     @Override
     public SolrParams getParams() {
-      return getCommonParams();
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      params.set(CoreAdminParams.COLLECTION, collection);
+      params.set(CoreAdminParams.SHARD, shardName);
+      return params;
     }
   }
   
@@ -202,7 +220,6 @@ public abstract class CollectionAdminReq
     private Properties properties;
     protected Boolean autoAddReplicas;
     protected Integer stateFormat;
-    protected String asyncId;
     private String[] rule , snitch;
     public Create() {
       action = CollectionAction.CREATE;
@@ -218,10 +235,6 @@ public abstract class CollectionAdminReq
     public Create setAutoAddReplicas(boolean autoAddReplicas) { this.autoAddReplicas = autoAddReplicas; return this; }
     public Create setReplicationFactor(Integer repl) { this.replicationFactor = repl; return this; }
     public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; }
-    public Create setAsyncId(String asyncId) {
-      this.asyncId = asyncId;
-      return this;
-    }
     public Create setRule(String... s){ this.rule = s; return this; }
     public Create setSnitch(String... s){ this.snitch = s; return this; }
 
@@ -234,9 +247,6 @@ public abstract class CollectionAdminReq
     public Integer getReplicationFactor() { return replicationFactor; }
     public Boolean getAutoAddReplicas() { return autoAddReplicas; }
     public Integer getStateFormat() { return stateFormat; }
-    public String getAsyncId() {
-      return asyncId;
-    }
 
     public Properties getProperties() {
       return properties;
@@ -267,7 +277,6 @@ public abstract class CollectionAdminReq
       if (replicationFactor != null) {
         params.set( "replicationFactor", replicationFactor);
       }
-      params.set(CommonAdminParams.ASYNC, asyncId);
       if (autoAddReplicas != null) {
         params.set(ZkStateReader.AUTO_ADD_REPLICAS, autoAddReplicas);
       }
@@ -295,6 +304,12 @@ public abstract class CollectionAdminReq
     }
 
     @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      return params;
+    }
+
+    @Override
     protected Reload getThis() {
       return this;
     }
@@ -302,13 +317,18 @@ public abstract class CollectionAdminReq
 
   // DELETE request
   public static class Delete extends CollectionSpecificAdminRequest<Delete> {
-    protected String collection = null;
 
     public Delete() {
       action = CollectionAction.DELETE;
     }
 
     @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      return params;
+    }
+
+    @Override
     protected Delete getThis() {
       return this;
     }
@@ -317,7 +337,7 @@ public abstract class CollectionAdminReq
   // CREATESHARD request
   public static class CreateShard extends CollectionShardAdminRequest<CreateShard> {
     protected String nodeSet;
-    private Properties properties;
+    protected Properties properties;
 
     public CreateShard setNodeSet(String nodeSet) {
       this.nodeSet = nodeSet;
@@ -343,7 +363,7 @@ public abstract class CollectionAdminReq
 
     @Override
     public SolrParams getParams() {
-      ModifiableSolrParams params = getCommonParams();
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
       if (nodeSet != null) {
         params.set("createNodeSet", nodeSet);
       }
@@ -363,7 +383,6 @@ public abstract class CollectionAdminReq
   public static class SplitShard extends CollectionShardAdminRequest<SplitShard> {
     protected String ranges;
     protected String splitKey;
-    protected String asyncId;
     
     private Properties properties;
 
@@ -391,19 +410,10 @@ public abstract class CollectionAdminReq
       this.properties = properties;
       return this;
     }
-
-    public SplitShard setAsyncId(String asyncId) {
-      this.asyncId = asyncId;
-      return this;
-    }
-
-    public String getAsyncId() {
-      return asyncId;
-    }
     
     @Override
     public SolrParams getParams() {
-      ModifiableSolrParams params = getCommonParams();
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
       params.set( "ranges", ranges);
 
       if(splitKey != null)
@@ -412,8 +422,6 @@ public abstract class CollectionAdminReq
       if(properties != null) {
         addProperties(params, properties);
       }
-      
-      params.set(CommonAdminParams.ASYNC, asyncId);
       return params;
     }
 
@@ -437,7 +445,6 @@ public abstract class CollectionAdminReq
   
   // FORCELEADER request
   public static class ForceLeader extends CollectionShardAdminRequest<ForceLeader> {
-    protected String asyncId;
 
     public ForceLeader() {
       action = CollectionAction.FORCELEADER;
@@ -447,15 +454,6 @@ public abstract class CollectionAdminReq
     protected ForceLeader getThis() {
       return this;
     }
-
-    @Override
-    public SolrParams getParams() {
-      ModifiableSolrParams params = getCommonParams();
-      if (asyncId != null) {
-        params.set(CommonAdminParams.ASYNC, asyncId);
-      }
-      return params;
-    }
   }
 
   // REQUESTSTATUS request
@@ -566,12 +564,11 @@ public abstract class CollectionAdminReq
 
   // ADDREPLICA request
   public static class AddReplica extends CollectionShardAdminRequest<AddReplica> {
-    private String node;
-    private String routeKey;
-    private String instanceDir;
-    private String dataDir;
-    private Properties properties;
-    private String asyncId;
+    protected String node;
+    protected String routeKey;
+    protected String instanceDir;
+    protected String dataDir;
+    protected Properties properties;
 
     public AddReplica() {
       action = CollectionAction.ADDREPLICA;
@@ -632,9 +629,6 @@ public abstract class CollectionAdminReq
         }
         params.add(ShardParams._ROUTE_, routeKey);
       }
-      if (asyncId != null) {
-        params.set(CommonAdminParams.ASYNC, asyncId);
-      }
       if (node != null) {
         params.add("node", node);
       }
@@ -650,15 +644,6 @@ public abstract class CollectionAdminReq
       return params;
     }
 
-    public AddReplica setAsyncId(String asyncId) {
-      this.asyncId = asyncId;
-      return this;
-    }
-    
-    public String getAsyncId() {
-      return asyncId;
-    }
-
     @Override
     protected AddReplica getThis() {
       return this;
@@ -667,8 +652,8 @@ public abstract class CollectionAdminReq
 
   // DELETEREPLICA request
   public static class DeleteReplica extends CollectionShardAdminRequest<DeleteReplica> {
-    private String replica;
-    private Boolean onlyIfDown;
+    protected String replica;
+    protected Boolean onlyIfDown;
     
     public DeleteReplica() {
       action = CollectionAction.DELETEREPLICA;
@@ -697,8 +682,8 @@ public abstract class CollectionAdminReq
       ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
       params.set(ZkStateReader.REPLICA_PROP, this.replica);
       
-      if(onlyIfDown != null) {
-        params.set("onlyIfDown", this.onlyIfDown);
+      if (onlyIfDown != null) {
+        params.set("onlyIfDown", onlyIfDown);
       }
       return params;
     }
@@ -758,7 +743,6 @@ public abstract class CollectionAdminReq
     private String splitKey;
     private Integer forwardTimeout;
     private Properties properties;
-    private String asyncId;
     
     public Migrate() {
       action = CollectionAction.MIGRATE;
@@ -818,8 +802,6 @@ public abstract class CollectionAdminReq
       if (forwardTimeout != null) {
         params.set("forward.timeout", forwardTimeout);
       }
-      params.set(CommonAdminParams.ASYNC, asyncId);
-      
       if (properties != null) {
         addProperties(params, properties);
       }
@@ -827,15 +809,6 @@ public abstract class CollectionAdminReq
       return params;
     }
 
-    public Migrate setAsyncId(String asyncId) {
-      this.asyncId = asyncId;
-      return this;
-    }
-    
-    public String getAsyncId() {
-      return asyncId;
-    }
-
     @Override
     protected Migrate getThis() {
       return this;



Mime
View raw message