Return-Path: X-Original-To: apmail-lucene-commits-archive@www.apache.org Delivered-To: apmail-lucene-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0B15B18B83 for ; Tue, 19 Jan 2016 11:32:27 +0000 (UTC) Received: (qmail 42984 invoked by uid 500); 19 Jan 2016 11:32:26 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 42975 invoked by uid 99); 19 Jan 2016 11:32:26 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jan 2016 11:32:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 4E3FCC0ECE for ; Tue, 19 Jan 2016 11:32:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.246 X-Spam-Level: * X-Spam-Status: No, score=1.246 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 9ecjbDNAPIoc for ; Tue, 19 Jan 2016 11:32:17 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTP id 561D1203BE for ; Tue, 19 Jan 2016 11:32:16 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 58CA8E0185 for ; Tue, 19 Jan 2016 11:32:15 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 55A843A0249 for ; Tue, 19 Jan 2016 11:32:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1725474 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/client/solrj/request/ Date: Tue, 19 Jan 2016 11:32:15 -0000 To: commits@lucene.apache.org From: varun@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20160119113215.55A843A0249@svn01-us-west.apache.org> Author: varun Date: Tue Jan 19 11:32:14 2016 New Revision: 1725474 URL: http://svn.apache.org/viewvc?rev=1725474&view=rev Log: SOLR-8534: Add generic support for collection APIs to be async Modified: lucene/dev/trunk/solr/CHANGES.txt lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java Modified: lucene/dev/trunk/solr/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1725474&r1=1725473&r2=1725474&view=diff ============================================================================== --- lucene/dev/trunk/solr/CHANGES.txt (original) +++ lucene/dev/trunk/solr/CHANGES.txt Tue Jan 19 11:32:14 2016 @@ -318,6 +318,10 @@ New Features * SOLR-8312: Add domain size and numBuckets to facet telemetry info (facet debug info for the new Facet Module). (Michael Sun, yonik) +* SOLR-8534: Add generic support for collection APIs to be async. Thus more actions benefit from having async + support. The commands that additionally get async support are: delete/reload collection, create/delete alias, + create/delete shard, delete replica, add/delete replica property, add/remove role, + overseer status, balance shard unique, rebalance leaders, modify collection, migrate state format (Varun Thacker) Bug Fixes ---------------------- Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1725474&r1=1725473&r2=1725474&view=diff ============================================================================== --- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original) +++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Tue Jan 19 11:32:14 2016 @@ -224,9 +224,7 @@ public class OverseerCollectionMessageHa deleteCollection(message, results); break; case RELOAD: - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString()); - collectionCmd(zkStateReader.getClusterState(), message, params, results, Replica.State.ACTIVE); + reloadCollection(message, results); break; case CREATEALIAS: createAlias(zkStateReader.getAliases(), message); @@ -303,6 +301,18 @@ public class OverseerCollectionMessageHa return new OverseerSolrResponse(results); } + private void reloadCollection(ZkNodeProps message, NamedList results) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString()); + + String asyncId = message.getStr(ASYNC); + Map requestMap = null; + if (asyncId != null) { + requestMap = new HashMap<>(); + } + collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId, requestMap); + } + @SuppressWarnings("unchecked") private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException { checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP, @@ -670,7 +680,6 @@ public class OverseerCollectionMessageHa DocCollection coll = clusterState.getCollection(collectionName); Slice slice = coll.getSlice(shard); - ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); if (slice == null) { throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid shard name : " + shard + " in collection : " + collectionName); @@ -691,36 +700,34 @@ public class OverseerCollectionMessageHa "Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName + " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'"); } - - String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); + + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); - - // assume the core exists and try to unload it - Map m = makeMap("qt", adminPath, CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE, - core, CoreAdminParams.DELETE_INSTANCE_DIR, "true", CoreAdminParams.DELETE_DATA_DIR, "true"); - - ShardRequest sreq = new ShardRequest(); - sreq.purpose = 1; - sreq.shards = new String[] {baseUrl}; - sreq.actualShards = sreq.shards; - sreq.params = new ModifiableSolrParams(new MapSolrParams(m)); - try { - shardHandler.submit(sreq, baseUrl, sreq.params); - } catch (Exception e) { - log.warn("Exception trying to unload core " + sreq, e); + String asyncId = message.getStr(ASYNC); + Map requestMap = null; + if (asyncId != null) { + requestMap = new HashMap<>(1, 1.0f); } - - collectShardResponses(replica.getState() != Replica.State.ACTIVE ? new NamedList() : results, false, null, - shardHandler); - - if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return;// check if the core unload removed the - // corenode zk enry - deleteCoreNode(collectionName, replicaName, replica, core); // try and ensure core info is removed from clusterstate + + ModifiableSolrParams params = new ModifiableSolrParams(); + params.add(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString()); + params.add(CoreAdminParams.CORE, core); + params.add(CoreAdminParams.DELETE_INSTANCE_DIR, "true"); + params.add(CoreAdminParams.DELETE_DATA_DIR, "true"); + + sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap); + + processResponses(results, shardHandler, false, null, asyncId, requestMap); + + //check if the core unload removed the corenode zk entry + if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return; + + // try and ensure core info is removed from cluster state + deleteCoreNode(collectionName, replicaName, replica, core); if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return; throw new SolrException(ErrorCode.SERVER_ERROR, "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName); - } private boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException { @@ -760,21 +767,23 @@ public class OverseerCollectionMessageHa } - private void deleteCollection(ZkNodeProps message, NamedList results) - throws KeeperException, InterruptedException { + private void deleteCollection(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException { final String collection = message.getStr(NAME); try { ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString()); params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true); params.set(CoreAdminParams.DELETE_DATA_DIR, true); - collectionCmd(zkStateReader.getClusterState(), message, params, results, - null); - ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, - DELETE.toLower(), NAME, collection); - Overseer.getInQueue(zkStateReader.getZkClient()).offer( - Utils.toJSON(m)); + String asyncId = message.getStr(ASYNC); + Map requestMap = null; + if (asyncId != null) { + requestMap = new HashMap<>(); + } + collectionCmd(message, params, results, null, asyncId, requestMap); + + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection); + Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m)); // wait for a while until we don't see the collection TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS); @@ -877,7 +886,7 @@ public class OverseerCollectionMessageHa TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS); boolean success = false; - Aliases aliases = null; + Aliases aliases; while (! timeout.hasTimedOut()) { aliases = zkStateReader.getAliases(); String collections = aliases.getCollectionAlias(name); @@ -942,7 +951,7 @@ public class OverseerCollectionMessageHa throws KeeperException, InterruptedException { String collectionName = message.getStr(COLLECTION_PROP); String sliceName = message.getStr(SHARD_ID_PROP); - + log.info("Create shard invoked: {}", message); if (collectionName == null || sliceName == null) throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters"); @@ -968,6 +977,13 @@ public class OverseerCollectionMessageHa throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME)); String configName = message.getStr(COLL_CONF); + + String async = message.getStr(ASYNC); + Map requestMap = null; + if (async != null) { + requestMap = new HashMap<>(repFactor, 1.0f); + } + for (int j = 1; j <= repFactor; j++) { String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName; String shardName = collectionName + "_" + sliceName + "_replica" + j; @@ -977,27 +993,17 @@ public class OverseerCollectionMessageHa // Need to create new params for each request ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString()); - params.set(CoreAdminParams.NAME, shardName); params.set(COLL_CONF, configName); params.set(CoreAdminParams.COLLECTION, collectionName); params.set(CoreAdminParams.SHARD, sliceName); params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); addPropertyParams(message, params); - - ShardRequest sreq = new ShardRequest(); - params.set("qt", adminPath); - sreq.purpose = 1; - String replica = zkStateReader.getBaseUrlForNodeName(nodeName); - sreq.shards = new String[] {replica}; - sreq.actualShards = sreq.shards; - sreq.params = params; - - shardHandler.submit(sreq, replica, sreq.params); - + + sendShardRequest(nodeName, params, shardHandler, async, requestMap); } - processResponses(results, shardHandler); + processResponses(results, shardHandler, true, "Failed to create shard", async, requestMap); log.info("Finished create command on all shards for collection: " + collectionName); @@ -1016,7 +1022,7 @@ public class OverseerCollectionMessageHa DocCollection collection = clusterState.getCollection(collectionName); DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT; - Slice parentSlice = null; + Slice parentSlice; if (slice == null) { if (router instanceof CompositeIdRouter) { @@ -1157,13 +1163,8 @@ public class OverseerCollectionMessageHa } } - // do not abort splitshard if the unloading fails - // this can happen because the replicas created previously may be down - // the only side effect of this is that the sub shard may end up having more replicas than we want - collectShardResponses(results, false, null, shardHandler); - final String asyncId = message.getStr(ASYNC); - HashMap requestMap = new HashMap<>(); + Map requestMap = new HashMap<>(); for (int i = 0; i < subRanges.size(); i++) { String subSlice = subSlices.get(i); @@ -1208,10 +1209,8 @@ public class OverseerCollectionMessageHa } addReplica(clusterState, new ZkNodeProps(propMap), results); } - - collectShardResponses(results, true, "SPLITSHARD failed to create subshard leaders", shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); + + processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap); for (String subShardName : subShardNames) { // wait for parent leader to acknowledge the sub-shard core @@ -1228,11 +1227,9 @@ public class OverseerCollectionMessageHa ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams()); sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap); } - - collectShardResponses(results, true, "SPLITSHARD timed out waiting for subshard leaders to come up", - shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); + + processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up", + asyncId, requestMap); log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice + " on: " + parentShardLeader); @@ -1250,9 +1247,9 @@ public class OverseerCollectionMessageHa params.set(CoreAdminParams.RANGES, rangesStr); sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap); - - collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command", shardHandler); - completeAsyncRequest(asyncId, requestMap, results); + + processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId, + requestMap); log.info("Index on shard: " + nodeName + " split into two successfully"); @@ -1268,11 +1265,9 @@ public class OverseerCollectionMessageHa sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap); } - - collectShardResponses(results, true, "SPLITSHARD failed while asking sub shard leaders to apply buffered updates", - shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); + + processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" + + " to apply buffered updates", asyncId, requestMap); log.info("Successfully applied buffered updates on : " + subShardNames); @@ -1380,11 +1375,8 @@ public class OverseerCollectionMessageHa for (Map replica : replicas) { addReplica(clusterState, new ZkNodeProps(replica), results); } - - collectShardResponses(results, true, - "SPLITSHARD failed to create subshard replicas", shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); + + processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap); log.info("Successfully created all replica shards for all sub-slices " + subSlices); @@ -1481,26 +1473,6 @@ public class OverseerCollectionMessageHa ); } - private void collectShardResponses(NamedList results, boolean abortOnError, - String msgOnError, - ShardHandler shardHandler) { - ShardResponse srsp; - do { - srsp = shardHandler.takeCompletedOrError(); - if (srsp != null) { - processResponse(results, srsp); - Throwable exception = srsp.getException(); - if (abortOnError && exception != null) { - // drain pending requests - while (srsp != null) { - srsp = shardHandler.takeCompletedOrError(); - } - throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception); - } - } - } while (srsp != null); - } - private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) { String collection = message.getStr(ZkStateReader.COLLECTION_PROP); String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP); @@ -1525,15 +1497,21 @@ public class OverseerCollectionMessageHa + ". Only non-active (or custom-hashed) slices can be deleted."); } ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); + + String asyncId = message.getStr(ASYNC); + Map requestMap = null; + if (asyncId != null) { + requestMap = new HashMap<>(slice.getReplicas().size(), 1.0f); + } try { ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString()); params.set(CoreAdminParams.DELETE_INDEX, "true"); - sliceCmd(clusterState, params, null, slice, shardHandler); - - processResponses(results, shardHandler); - + sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap); + + processResponses(results, shardHandler, true, "Failed to delete shard", asyncId, requestMap); + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.SHARD_ID_PROP, sliceId); Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m)); @@ -1646,21 +1624,17 @@ public class OverseerCollectionMessageHa Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000); // For tracking async calls. - HashMap requestMap = new HashMap(); + Map requestMap = new HashMap<>(); log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: " + targetLeader.getStr("core") + " to buffer updates"); ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString()); params.set(CoreAdminParams.NAME, targetLeader.getStr("core")); - String nodeName = targetLeader.getNodeName(); sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); - collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates", - shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); + processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap); ZkNodeProps m = new ZkNodeProps( Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(), @@ -1671,8 +1645,7 @@ public class OverseerCollectionMessageHa "targetCollection", targetCollection.getName(), "expireAt", RoutingRule.makeExpiryAt(timeout)); log.info("Adding routing rule: " + m); - Overseer.getInQueue(zkStateReader.getZkClient()).offer( - Utils.toJSON(m)); + Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m)); // wait for a while until we see the new rule log.info("Waiting to see routing rule updated in clusterstate"); @@ -1707,7 +1680,7 @@ public class OverseerCollectionMessageHa NUM_SLICES, 1, COLL_CONF, configName, CREATE_NODE_SET, sourceLeader.getNodeName()); - if(asyncId != null) { + if (asyncId != null) { String internalAsyncId = asyncId + Math.abs(System.nanoTime()); props.put(ASYNC, internalAsyncId); } @@ -1734,9 +1707,8 @@ public class OverseerCollectionMessageHa // we don't want this to happen asynchronously sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null); - collectShardResponses(results, true, - "MIGRATE failed to create temp collection leader or timed out waiting for it to come up", - shardHandler); + processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" + + " or timed out waiting for it to come up", asyncId, requestMap); log.info("Asking source leader to split index"); params = new ModifiableSolrParams(); @@ -1749,8 +1721,7 @@ public class OverseerCollectionMessageHa String tempNodeName = sourceLeader.getNodeName(); sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap); - collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command", shardHandler); - completeAsyncRequest(asyncId, requestMap, results); + processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap); log.info("Creating a replica of temporary collection: {} on the target leader node: {}", tempSourceCollectionName, targetLeader.getNodeName()); @@ -1773,11 +1744,8 @@ public class OverseerCollectionMessageHa } addReplica(clusterState, new ZkNodeProps(props), results); - collectShardResponses(results, true, - "MIGRATE failed to create replica of temporary collection in target leader node.", - shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); + processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " + + "temporary collection in target leader node.", asyncId, requestMap); coreNodeName = waitForCoreNodeName(tempSourceCollectionName, targetLeader.getNodeName(), tempCollectionReplica2); @@ -1794,11 +1762,9 @@ public class OverseerCollectionMessageHa sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap); - collectShardResponses(results, true, - "MIGRATE failed to create temp collection replica or timed out waiting for them to come up", - shardHandler); + processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" + + " replica or timed out waiting for them to come up", asyncId, requestMap); - completeAsyncRequest(asyncId, requestMap, results); log.info("Successfully created replica of temp source collection on target leader node"); log.info("Requesting merge of temp source collection replica to target leader"); @@ -1808,12 +1774,9 @@ public class OverseerCollectionMessageHa params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2); sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); - collectShardResponses(results, true, - "MIGRATE failed to merge " + tempCollectionReplica2 + - " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName(), - shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); + String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to " + + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName(); + processResponses(results, shardHandler, true, msg, asyncId, requestMap); log.info("Asking target leader to apply buffered updates"); params = new ModifiableSolrParams(); @@ -1821,11 +1784,8 @@ public class OverseerCollectionMessageHa params.set(CoreAdminParams.NAME, targetLeader.getStr("core")); sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); - collectShardResponses(results, true, - "MIGRATE failed to request node to apply buffered updates", - shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); + processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates", + asyncId, requestMap); try { log.info("Deleting temporary collection: " + tempSourceCollectionName); @@ -1839,13 +1799,6 @@ public class OverseerCollectionMessageHa } } - private void completeAsyncRequest(String asyncId, HashMap requestMap, NamedList results) { - if(asyncId != null) { - waitForAsyncCallsToComplete(requestMap, results); - requestMap.clear(); - } - } - private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) { if (a == null || b == null || !a.overlaps(b)) { return null; @@ -1867,11 +1820,9 @@ public class OverseerCollectionMessageHa } - public static void sendShardRequest(String nodeName, ModifiableSolrParams params, - ShardHandler shardHandler, String asyncId, - Map requestMap, - String adminPath, ZkStateReader zkStateReader - ) { + public static void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler, + String asyncId, Map requestMap, String adminPath, + ZkStateReader zkStateReader) { if (asyncId != null) { String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime()); params.set(ASYNC, coreAdminAsyncId); @@ -2029,7 +1980,7 @@ public class OverseerCollectionMessageHa } // For tracking async calls. - HashMap requestMap = new HashMap(); + Map requestMap = new HashMap<>(); log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}", @@ -2098,11 +2049,9 @@ public class OverseerCollectionMessageHa } } - processResponses(results, shardHandler); - - completeAsyncRequest(async, requestMap, results); + processResponses(results, shardHandler, false, null, async, requestMap); - log.info("Finished create command on all shards for collection: " + log.debug("Finished create command on all shards for collection: " + collectionName); } catch (SolrException ex) { @@ -2263,22 +2212,37 @@ public class OverseerCollectionMessageHa addPropertyParams(message, params); // For tracking async calls. - HashMap requestMap = new HashMap<>(); + Map requestMap = new HashMap<>(); sendShardRequest(node, params, shardHandler, asyncId, requestMap); - - collectShardResponses(results, true, "ADDREPLICA failed to create replica", shardHandler); - - completeAsyncRequest(asyncId, requestMap, results); + + processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap); } - private void processResponses(NamedList results, ShardHandler shardHandler) { + + private void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError, + String asyncId, Map requestMap) { + //Processes all shard responses ShardResponse srsp; do { srsp = shardHandler.takeCompletedOrError(); if (srsp != null) { processResponse(results, srsp); + Throwable exception = srsp.getException(); + if (abortOnError && exception != null) { + // drain pending requests + while (srsp != null) { + srsp = shardHandler.takeCompletedOrError(); + } + throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception); + } } } while (srsp != null); + + //If request is async wait for the core admin to complete before returning + if (asyncId != null) { + waitForAsyncCallsToComplete(requestMap, results); + requestMap.clear(); + } } private String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException { @@ -2332,47 +2296,36 @@ public class OverseerCollectionMessageHa } - private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, Replica.State stateMatcher) { + private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params, + NamedList results, Replica.State stateMatcher, String asyncId, Map requestMap) { log.info("Executing Collection Cmd : " + params); String collectionName = message.getStr(NAME); ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); - + + ClusterState clusterState = zkStateReader.getClusterState(); DocCollection coll = clusterState.getCollection(collectionName); - for (Map.Entry entry : coll.getSlicesMap().entrySet()) { - Slice slice = entry.getValue(); - sliceCmd(clusterState, params, stateMatcher, slice, shardHandler); + for (Slice slice : coll.getSlices()) { + sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap); } - processResponses(results, shardHandler); + processResponses(results, shardHandler, false, null, asyncId, requestMap); } private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher, - Slice slice, ShardHandler shardHandler) { - Map shards = slice.getReplicasMap(); - Set> shardEntries = shards.entrySet(); - for (Map.Entry shardEntry : shardEntries) { - final ZkNodeProps node = shardEntry.getValue(); - if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) - && (stateMatcher == null || Replica.State.getState(node.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) { + Slice slice, ShardHandler shardHandler, String asyncId, Map requestMap) { + + for (Replica replica : slice.getReplicas()) { + if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP)) + && (stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) { + // For thread safety, only simple clone the ModifiableSolrParams ModifiableSolrParams cloneParams = new ModifiableSolrParams(); cloneParams.add(params); - cloneParams.set(CoreAdminParams.CORE, node.getStr(ZkStateReader.CORE_NAME_PROP)); + cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP)); - String replica = node.getStr(ZkStateReader.BASE_URL_PROP); - ShardRequest sreq = new ShardRequest(); - sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP); - // yes, they must use same admin handler path everywhere... - cloneParams.set("qt", adminPath); - sreq.purpose = 1; - sreq.shards = new String[] {replica}; - sreq.actualShards = sreq.shards; - sreq.params = cloneParams; - log.info("Collection Admin sending CoreAdmin cmd to " + replica - + " params:" + sreq.params); - shardHandler.submit(sreq, replica, sreq.params); + sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap); } } } @@ -2413,7 +2366,7 @@ public class OverseerCollectionMessageHa @SuppressWarnings("unchecked") private void waitForAsyncCallsToComplete(Map requestMap, NamedList results) { - for(String k:requestMap.keySet()) { + for (String k:requestMap.keySet()) { log.debug("I am Waiting for :{}/{}", k, requestMap.get(k)); results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k))); } Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1725474&r1=1725473&r2=1725474&view=diff ============================================================================== --- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original) +++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Tue Jan 19 11:32:14 2016 @@ -72,7 +72,6 @@ import org.apache.solr.core.CoreContaine import org.apache.solr.handler.BlobHandler; import org.apache.solr.handler.RequestHandlerBase; import org.apache.solr.handler.component.ShardHandler; -import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; import org.apache.zookeeper.CreateMode; @@ -169,23 +168,26 @@ public class CollectionsHandler extends String a = params.get(CoreAdminParams.ACTION); if (a != null) { CollectionAction action = CollectionAction.get(a); - if (action == null) + if (action == null) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a); + } CollectionOperation operation = CollectionOperation.get(action); log.info("Invoked Collection Action :{} with params {} ", action.toLower(), req.getParamString()); - Map result = operation.call(req, rsp, this); - if (result != null) { - result.put(QUEUE_OPERATION, operation.action.toLower()); - ZkNodeProps props = new ZkNodeProps(result); - if (operation.sendToOCPQueue) handleResponse(operation.action.toLower(), props, rsp, operation.timeOut); - else Overseer.getInQueue(coreContainer.getZkController().getZkClient()).offer(Utils.toJSON(props)); + Map props = operation.call(req, rsp, this); + String asyncId = req.getParams().get(ASYNC); + if (props != null) { + if (asyncId != null) { + props.put(ASYNC, asyncId); + } + props.put(QUEUE_OPERATION, operation.action.toLower()); + ZkNodeProps zkProps = new ZkNodeProps(props); + if (operation.sendToOCPQueue) handleResponse(operation.action.toLower(), zkProps, rsp, operation.timeOut); + else Overseer.getInQueue(coreContainer.getZkController().getZkClient()).offer(Utils.toJSON(props)); } } else { throw new SolrException(ErrorCode.BAD_REQUEST, "action is a required param"); - } - rsp.setHttpCaching(false); } @@ -315,7 +317,6 @@ public class CollectionsHandler extends MAX_SHARDS_PER_NODE, CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE, SHARDS_PROP, - ASYNC, STATE_FORMAT, AUTO_ADD_REPLICAS, RULE, @@ -366,7 +367,6 @@ public class CollectionsHandler extends Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception { return req.getParams().required().getAll(null, NAME); - } }, SYNCSHARD_OP(SYNCSHARD) { @@ -381,7 +381,6 @@ public class CollectionsHandler extends ZkNodeProps leaderProps = clusterState.getLeader(collection, shard); ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps); - ; try (HttpSolrClient client = new HttpSolrClient(nodeProps.getBaseUrl())) { client.setConnectionTimeout(15000); client.setSoTimeout(60000); @@ -436,8 +435,7 @@ public class CollectionsHandler extends COLLECTION_PROP, SHARD_ID_PROP, "split.key", - CoreAdminParams.RANGES, - ASYNC); + CoreAdminParams.RANGES); return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX); } }, @@ -453,7 +451,6 @@ public class CollectionsHandler extends @Override Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception { forceLeaderElection(req, handler); - return null; } }, @@ -468,7 +465,7 @@ public class CollectionsHandler extends throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections"); req.getParams().getAll(map, REPLICATION_FACTOR, - CREATE_NODE_SET, ASYNC); + CREATE_NODE_SET); return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX); } }, @@ -479,14 +476,14 @@ public class CollectionsHandler extends COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP); - return req.getParams().getAll(map, ASYNC, ONLY_IF_DOWN); + return req.getParams().getAll(map, ONLY_IF_DOWN); } }, MIGRATE_OP(MIGRATE) { @Override Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { Map map = req.getParams().required().getAll(null, COLLECTION_PROP, "split.key", "target.collection"); - return req.getParams().getAll(map, "forward.timeout", ASYNC); + return req.getParams().getAll(map, "forward.timeout"); } }, ADDROLE_OP(ADDROLE) { @@ -586,8 +583,7 @@ public class CollectionsHandler extends _ROUTE_, CoreAdminParams.NAME, INSTANCE_DIR, - DATA_DIR, - ASYNC); + DATA_DIR); return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX); } }, @@ -687,8 +683,7 @@ public class CollectionsHandler extends prop = COLL_PROP_PREFIX + prop; } - if (!shardUnique && - !SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) { + if (!shardUnique && !SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) { throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that" + " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " + " Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique)); Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java?rev=1725474&r1=1725473&r2=1725474&view=diff ============================================================================== --- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java (original) +++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java Tue Jan 19 11:32:14 2016 @@ -17,12 +17,22 @@ package org.apache.solr.cloud; * limitations under the License. */ +import java.util.ArrayList; +import java.util.List; + import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.lucene.util.TestUtil; import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create; import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.params.ModifiableSolrParams; import org.junit.Test; /** @@ -83,4 +93,138 @@ public class CollectionsAPIAsyncDistribu assertEquals("Shard split did not complete. Last recorded state: " + state, "completed", state); } } + + @Test + public void testAsyncRequests() throws Exception { + String collection = "testAsyncOperations"; + + Create createCollectionRequest = new Create() + .setCollectionName(collection) + .setNumShards(1) + .setRouterName("implicit") + .setShards("shard1") + .setConfigName("conf1") + .setAsyncId("42"); + CollectionAdminResponse response = createCollectionRequest.process(cloudClient); + assertEquals("42", response.getResponse().get("requestid")); + String state = getRequestStateAfterCompletion("42", MAX_TIMEOUT_SECONDS, cloudClient); + assertEquals("CreateCollection task did not complete!", "completed", state); + + //Add a few documents to shard1 + int numDocs = TestUtil.nextInt(random(), 10, 100); + List docs = new ArrayList<>(numDocs); + for (int i=0; i 1000) { + fail("2nd Replica not reflecting in the cluster state"); + } + Thread.sleep(100); + } + + CollectionAdminRequest.CreateAlias createAlias = new CollectionAdminRequest.CreateAlias() + .setAliasName("myalias") + .setAliasedCollections(collection) + .setAsyncId("47"); + response = createAlias.process(cloudClient); + assertEquals("47", response.getResponse().get("requestid")); + state = getRequestStateAfterCompletion("47", MAX_TIMEOUT_SECONDS, cloudClient); + assertEquals("CreateAlias did not complete", "completed", state); + + query = new SolrQuery("*:*"); + query.set("shards", "shard1"); + assertEquals(numDocs, cloudClient.query("myalias", query).getResults().getNumFound()); + + CollectionAdminRequest.DeleteAlias deleteAlias = new CollectionAdminRequest.DeleteAlias() + .setAliasName("myalias") + .setAsyncId("48"); + response = deleteAlias.process(cloudClient); + assertEquals("48", response.getResponse().get("requestid")); + state = getRequestStateAfterCompletion("48", MAX_TIMEOUT_SECONDS, cloudClient); + assertEquals("DeleteAlias did not complete", "completed", state); + + try { + cloudClient.query("myalias", query); + fail("Alias should not exist"); + } catch (SolrException e) { + //expected + } + + String replica = shard1.getReplicas().iterator().next().getName(); + CollectionAdminRequest.DeleteReplica deleteReplica = new CollectionAdminRequest.DeleteReplica() + .setCollectionName(collection) + .setShardName("shard1") + .setReplica(replica) + .setAsyncId("47"); + response = deleteReplica.process(cloudClient); + assertEquals("47", response.getResponse().get("requestid")); + state = getRequestStateAfterCompletion("47", MAX_TIMEOUT_SECONDS, cloudClient); + assertEquals("DeleteReplica did not complete", "completed", state); + + CollectionAdminRequest.Delete deleteCollection = new CollectionAdminRequest.Delete() + .setCollectionName(collection) + .setAsyncId("48"); + response = deleteCollection.process(cloudClient); + assertEquals("48", response.getResponse().get("requestid")); + state = getRequestStateAfterCompletion("48", MAX_TIMEOUT_SECONDS, cloudClient); + assertEquals("DeleteCollection did not complete", "completed", state); + } } Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java?rev=1725474&r1=1725473&r2=1725474&view=diff ============================================================================== --- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java (original) +++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java Tue Jan 19 11:32:14 2016 @@ -45,6 +45,7 @@ import java.util.Properties; public abstract class CollectionAdminRequest > extends SolrRequest { protected CollectionAction action = null; + protected String asyncId; private static String PROPERTY_PREFIX = "property."; @@ -63,12 +64,24 @@ public abstract class CollectionAdminReq protected abstract Q getThis(); + public Q setAsyncId(String asyncId) { + this.asyncId = asyncId; + return getThis(); + } + + public String getAsyncId() { + return asyncId; + } + @Override public SolrParams getParams() { if (action == null) { throw new RuntimeException( "no action specified!" ); } ModifiableSolrParams params = new ModifiableSolrParams(); + if (asyncId != null) { + params.set(CommonAdminParams.ASYNC, asyncId); + } params.set(CoreAdminParams.ACTION, action.toString()); return params; } @@ -112,11 +125,9 @@ public abstract class CollectionAdminReq @Override public SolrParams getParams() { ModifiableSolrParams params = new ModifiableSolrParams(super.getParams()); - params.set( CoreAdminParams.NAME, collection ); + params.set(CoreAdminParams.NAME, collection); return params; } - - } protected abstract static class CollectionShardAdminRequest > extends CollectionAdminRequest { @@ -141,16 +152,23 @@ public abstract class CollectionAdminReq return this.shardName; } + @Deprecated public ModifiableSolrParams getCommonParams() { ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); params.set(CoreAdminParams.COLLECTION, collection); params.set(CoreAdminParams.SHARD, shardName); + if (asyncId != null) { + params.set(CommonAdminParams.ASYNC, asyncId); + } return params; } @Override public SolrParams getParams() { - return getCommonParams(); + ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); + params.set(CoreAdminParams.COLLECTION, collection); + params.set(CoreAdminParams.SHARD, shardName); + return params; } } @@ -202,7 +220,6 @@ public abstract class CollectionAdminReq private Properties properties; protected Boolean autoAddReplicas; protected Integer stateFormat; - protected String asyncId; private String[] rule , snitch; public Create() { action = CollectionAction.CREATE; @@ -218,10 +235,6 @@ public abstract class CollectionAdminReq public Create setAutoAddReplicas(boolean autoAddReplicas) { this.autoAddReplicas = autoAddReplicas; return this; } public Create setReplicationFactor(Integer repl) { this.replicationFactor = repl; return this; } public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; } - public Create setAsyncId(String asyncId) { - this.asyncId = asyncId; - return this; - } public Create setRule(String... s){ this.rule = s; return this; } public Create setSnitch(String... s){ this.snitch = s; return this; } @@ -234,9 +247,6 @@ public abstract class CollectionAdminReq public Integer getReplicationFactor() { return replicationFactor; } public Boolean getAutoAddReplicas() { return autoAddReplicas; } public Integer getStateFormat() { return stateFormat; } - public String getAsyncId() { - return asyncId; - } public Properties getProperties() { return properties; @@ -267,7 +277,6 @@ public abstract class CollectionAdminReq if (replicationFactor != null) { params.set( "replicationFactor", replicationFactor); } - params.set(CommonAdminParams.ASYNC, asyncId); if (autoAddReplicas != null) { params.set(ZkStateReader.AUTO_ADD_REPLICAS, autoAddReplicas); } @@ -295,6 +304,12 @@ public abstract class CollectionAdminReq } @Override + public SolrParams getParams() { + ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); + return params; + } + + @Override protected Reload getThis() { return this; } @@ -302,13 +317,18 @@ public abstract class CollectionAdminReq // DELETE request public static class Delete extends CollectionSpecificAdminRequest { - protected String collection = null; public Delete() { action = CollectionAction.DELETE; } @Override + public SolrParams getParams() { + ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); + return params; + } + + @Override protected Delete getThis() { return this; } @@ -317,7 +337,7 @@ public abstract class CollectionAdminReq // CREATESHARD request public static class CreateShard extends CollectionShardAdminRequest { protected String nodeSet; - private Properties properties; + protected Properties properties; public CreateShard setNodeSet(String nodeSet) { this.nodeSet = nodeSet; @@ -343,7 +363,7 @@ public abstract class CollectionAdminReq @Override public SolrParams getParams() { - ModifiableSolrParams params = getCommonParams(); + ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); if (nodeSet != null) { params.set("createNodeSet", nodeSet); } @@ -363,7 +383,6 @@ public abstract class CollectionAdminReq public static class SplitShard extends CollectionShardAdminRequest { protected String ranges; protected String splitKey; - protected String asyncId; private Properties properties; @@ -391,19 +410,10 @@ public abstract class CollectionAdminReq this.properties = properties; return this; } - - public SplitShard setAsyncId(String asyncId) { - this.asyncId = asyncId; - return this; - } - - public String getAsyncId() { - return asyncId; - } @Override public SolrParams getParams() { - ModifiableSolrParams params = getCommonParams(); + ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); params.set( "ranges", ranges); if(splitKey != null) @@ -412,8 +422,6 @@ public abstract class CollectionAdminReq if(properties != null) { addProperties(params, properties); } - - params.set(CommonAdminParams.ASYNC, asyncId); return params; } @@ -437,7 +445,6 @@ public abstract class CollectionAdminReq // FORCELEADER request public static class ForceLeader extends CollectionShardAdminRequest { - protected String asyncId; public ForceLeader() { action = CollectionAction.FORCELEADER; @@ -447,15 +454,6 @@ public abstract class CollectionAdminReq protected ForceLeader getThis() { return this; } - - @Override - public SolrParams getParams() { - ModifiableSolrParams params = getCommonParams(); - if (asyncId != null) { - params.set(CommonAdminParams.ASYNC, asyncId); - } - return params; - } } // REQUESTSTATUS request @@ -566,12 +564,11 @@ public abstract class CollectionAdminReq // ADDREPLICA request public static class AddReplica extends CollectionShardAdminRequest { - private String node; - private String routeKey; - private String instanceDir; - private String dataDir; - private Properties properties; - private String asyncId; + protected String node; + protected String routeKey; + protected String instanceDir; + protected String dataDir; + protected Properties properties; public AddReplica() { action = CollectionAction.ADDREPLICA; @@ -632,9 +629,6 @@ public abstract class CollectionAdminReq } params.add(ShardParams._ROUTE_, routeKey); } - if (asyncId != null) { - params.set(CommonAdminParams.ASYNC, asyncId); - } if (node != null) { params.add("node", node); } @@ -650,15 +644,6 @@ public abstract class CollectionAdminReq return params; } - public AddReplica setAsyncId(String asyncId) { - this.asyncId = asyncId; - return this; - } - - public String getAsyncId() { - return asyncId; - } - @Override protected AddReplica getThis() { return this; @@ -667,8 +652,8 @@ public abstract class CollectionAdminReq // DELETEREPLICA request public static class DeleteReplica extends CollectionShardAdminRequest { - private String replica; - private Boolean onlyIfDown; + protected String replica; + protected Boolean onlyIfDown; public DeleteReplica() { action = CollectionAction.DELETEREPLICA; @@ -697,8 +682,8 @@ public abstract class CollectionAdminReq ModifiableSolrParams params = new ModifiableSolrParams(super.getParams()); params.set(ZkStateReader.REPLICA_PROP, this.replica); - if(onlyIfDown != null) { - params.set("onlyIfDown", this.onlyIfDown); + if (onlyIfDown != null) { + params.set("onlyIfDown", onlyIfDown); } return params; } @@ -758,7 +743,6 @@ public abstract class CollectionAdminReq private String splitKey; private Integer forwardTimeout; private Properties properties; - private String asyncId; public Migrate() { action = CollectionAction.MIGRATE; @@ -818,8 +802,6 @@ public abstract class CollectionAdminReq if (forwardTimeout != null) { params.set("forward.timeout", forwardTimeout); } - params.set(CommonAdminParams.ASYNC, asyncId); - if (properties != null) { addProperties(params, properties); } @@ -827,15 +809,6 @@ public abstract class CollectionAdminReq return params; } - public Migrate setAsyncId(String asyncId) { - this.asyncId = asyncId; - return this; - } - - public String getAsyncId() { - return asyncId; - } - @Override protected Migrate getThis() { return this;