From commits-return-98554-archive-asf-public=cust-asf.ponee.io@lucene.apache.org Tue Jan 23 11:30:40 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 5E7B4180676 for ; Tue, 23 Jan 2018 11:30:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4EA22160C50; Tue, 23 Jan 2018 10:30:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 84A6E160C55 for ; Tue, 23 Jan 2018 11:30:37 +0100 (CET) Received: (qmail 87235 invoked by uid 500); 23 Jan 2018 10:30:34 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 86515 invoked by uid 99); 23 Jan 2018 10:30:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Jan 2018 10:30:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4BF96F4DCF; Tue, 23 Jan 2018 10:30:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: datcm@apache.org To: commits@lucene.apache.org Date: Tue, 23 Jan 2018 10:30:48 -0000 Message-Id: In-Reply-To: <26b4b7452743478e8aa7c1f5b64bb8bf@git.apache.org> References: <26b4b7452743478e8aa7c1f5b64bb8bf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/41] lucene-solr:jira/solr-11702: SOLR-11817: Move Collections API classes to it's own package http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java new file mode 100644 index 0000000..eefe903 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.api.collections; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.common.util.Utils; +import org.apache.solr.handler.component.ShardHandler; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + + +public class DeleteReplicaCmd implements Cmd { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final OverseerCollectionMessageHandler ocmh; + + public DeleteReplicaCmd(OverseerCollectionMessageHandler ocmh) { + this.ocmh = ocmh; + } + + @Override + @SuppressWarnings("unchecked") + + public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { + deleteReplica(clusterState, message, results,null); + } + + + @SuppressWarnings("unchecked") + void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete) + throws KeeperException, InterruptedException { + log.debug("deleteReplica() : {}", Utils.toJSONString(message)); + boolean parallel = message.getBool("parallel", false); + + //If a count is specified the strategy needs be different + if (message.getStr(COUNT_PROP) != null) { + deleteReplicaBasedOnCount(clusterState, message, results, onComplete, parallel); + return; + } + + + ocmh.checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP); + String collectionName = message.getStr(COLLECTION_PROP); + String shard = message.getStr(SHARD_ID_PROP); + String replicaName = message.getStr(REPLICA_PROP); + + DocCollection coll = clusterState.getCollection(collectionName); + Slice slice = coll.getSlice(shard); + if (slice == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Invalid shard name : " + shard + " in collection : " + collectionName); + } + + deleteCore(slice, collectionName, replicaName, message, shard, results, onComplete, parallel); + + } + + + /** + * Delete replicas based on count for a given collection. If a shard is passed, uses that + * else deletes given num replicas across all shards for the given collection. + */ + void deleteReplicaBasedOnCount(ClusterState clusterState, + ZkNodeProps message, + NamedList results, + Runnable onComplete, + boolean parallel) + throws KeeperException, InterruptedException { + ocmh.checkRequired(message, COLLECTION_PROP, COUNT_PROP); + int count = Integer.parseInt(message.getStr(COUNT_PROP)); + String collectionName = message.getStr(COLLECTION_PROP); + String shard = message.getStr(SHARD_ID_PROP); + DocCollection coll = clusterState.getCollection(collectionName); + Slice slice = null; + //Validate if shard is passed. + if (shard != null) { + slice = coll.getSlice(shard); + if (slice == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Invalid shard name : " + shard + " in collection : " + collectionName); + } + } + + Map> shardToReplicasMapping = new HashMap>(); + if (slice != null) { + Set replicasToBeDeleted = pickReplicasTobeDeleted(slice, shard, collectionName, count); + shardToReplicasMapping.put(slice,replicasToBeDeleted); + } else { + + //If there are many replicas left, remove the rest based on count. + Collection allSlices = coll.getSlices(); + for (Slice individualSlice : allSlices) { + Set replicasToBeDeleted = pickReplicasTobeDeleted(individualSlice, individualSlice.getName(), collectionName, count); + shardToReplicasMapping.put(individualSlice, replicasToBeDeleted); + } + } + + for (Slice shardSlice: shardToReplicasMapping.keySet()) { + String shardId = shardSlice.getName(); + Set replicas = shardToReplicasMapping.get(shardSlice); + //callDeleteReplica on all replicas + for (String replica: replicas) { + log.debug("Deleting replica {} for shard {} based on count {}", replica, shardId, count); + deleteCore(shardSlice, collectionName, replica, message, shard, results, onComplete, parallel); + } + results.add("shard_id", shardId); + results.add("replicas_deleted", replicas); + } + + } + + + /** + * Pick replicas to be deleted. Avoid picking the leader. + */ + private Set pickReplicasTobeDeleted(Slice slice, String shard, String collectionName, int count) { + validateReplicaAvailability(slice, shard, collectionName, count); + Collection allReplicas = slice.getReplicas(); + Set replicasToBeRemoved = new HashSet(); + Replica leader = slice.getLeader(); + for (Replica replica: allReplicas) { + if (count == 0) { + break; + } + //Try avoiding to pick up the leader to minimize activity on the cluster. + if (leader.getCoreName().equals(replica.getCoreName())) { + continue; + } + replicasToBeRemoved.add(replica.getName()); + count --; + } + return replicasToBeRemoved; + } + + /** + * Validate if there is less replicas than requested to remove. Also error out if there is + * only one replica available + */ + private void validateReplicaAvailability(Slice slice, String shard, String collectionName, int count) { + //If there is a specific shard passed, validate if there any or just 1 replica left + if (slice != null) { + Collection allReplicasForShard = slice.getReplicas(); + if (allReplicasForShard == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No replicas found in shard/collection: " + + shard + "/" + collectionName); + } + + + if (allReplicasForShard.size() == 1) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There is only one replica available in shard/collection: " + + shard + "/" + collectionName + ". Cannot delete that."); + } + + if (allReplicasForShard.size() <= count) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There are lesser num replicas requested to be deleted than are available in shard/collection : " + + shard + "/" + collectionName + " Requested: " + count + " Available: " + allReplicasForShard.size() + "."); + } + } + } + + void deleteCore(Slice slice, String collectionName, String replicaName,ZkNodeProps message, String shard, NamedList results, Runnable onComplete, boolean parallel) throws KeeperException, InterruptedException { + + Replica replica = slice.getReplica(replicaName); + if (replica == null) { + ArrayList l = new ArrayList<>(); + for (Replica r : slice.getReplicas()) + l.add(r.getName()); + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : " + + shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ',')); + } + + // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true + // on the command. + if (Boolean.parseBoolean(message.getStr(OverseerCollectionMessageHandler.ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName + + " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'"); + } + + ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); + String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); + String asyncId = message.getStr(ASYNC); + AtomicReference> requestMap = new AtomicReference<>(null); + if (asyncId != null) { + requestMap.set(new HashMap<>(1, 1.0f)); + } + + ModifiableSolrParams params = new ModifiableSolrParams(); + params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString()); + params.add(CoreAdminParams.CORE, core); + + params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true)); + params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true)); + params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true)); + + boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName()); + if (isLive) { + ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get()); + } + + Callable callable = () -> { + try { + if (isLive) { + ocmh.processResponses(results, shardHandler, false, null, asyncId, requestMap.get()); + + //check if the core unload removed the corenode zk entry + if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return Boolean.TRUE; + } + + // try and ensure core info is removed from cluster state + ocmh.deleteCoreNode(collectionName, replicaName, replica, core); + if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE; + return Boolean.FALSE; + } catch (Exception e) { + results.add("failure", "Could not complete delete " + e.getMessage()); + throw e; + } finally { + if (onComplete != null) onComplete.run(); + } + }; + + if (!parallel) { + try { + if (!callable.call()) + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName); + } catch (InterruptedException | KeeperException e) { + throw e; + } catch (Exception ex) { + throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex); + } + + } else { + ocmh.tpe.submit(callable); + } + + } + +} http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java new file mode 100644 index 0000000..2ef2955 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java @@ -0,0 +1,178 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.api.collections; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.solr.client.solrj.cloud.DistributedQueue; +import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.overseer.OverseerAction; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.common.util.Utils; +import org.apache.solr.util.TimeOut; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + +public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final OverseerCollectionMessageHandler ocmh; + private final TimeSource timeSource; + + public DeleteShardCmd(OverseerCollectionMessageHandler ocmh) { + this.ocmh = ocmh; + this.timeSource = ocmh.cloudManager.getTimeSource(); + } + + @Override + public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { + String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP); + String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP); + + log.info("Delete shard invoked"); + Slice slice = clusterState.getCollection(collectionName).getSlice(sliceId); + if (slice == null) throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "No shard with name " + sliceId + " exists for collection " + collectionName); + + // For now, only allow for deletions of Inactive slices or custom hashes (range==null). + // TODO: Add check for range gaps on Slice deletion + final Slice.State state = slice.getState(); + if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY + || state == Slice.State.CONSTRUCTION) || state == Slice.State.RECOVERY_FAILED) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state + + ". Only non-active (or custom-hashed) slices can be deleted."); + } + + if (state == Slice.State.RECOVERY) { + // mark the slice as 'construction' and only then try to delete the cores + // see SOLR-9455 + DistributedQueue inQueue = Overseer.getStateUpdateQueue(ocmh.zkStateReader.getZkClient()); + Map propMap = new HashMap<>(); + propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower()); + propMap.put(sliceId, Slice.State.CONSTRUCTION.toString()); + propMap.put(ZkStateReader.COLLECTION_PROP, collectionName); + ZkNodeProps m = new ZkNodeProps(propMap); + inQueue.offer(Utils.toJSON(m)); + } + + String asyncId = message.getStr(ASYNC); + + try { + List replicas = getReplicasForSlice(collectionName, slice); + CountDownLatch cleanupLatch = new CountDownLatch(replicas.size()); + for (ZkNodeProps r : replicas) { + final ZkNodeProps replica = r.plus(message.getProperties()).plus("parallel", "true").plus(ASYNC, asyncId); + log.info("Deleting replica for collection={} shard={} on node={}", replica.getStr(COLLECTION_PROP), replica.getStr(SHARD_ID_PROP), replica.getStr(CoreAdminParams.NODE)); + NamedList deleteResult = new NamedList(); + try { + ((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, replica, deleteResult, () -> { + cleanupLatch.countDown(); + if (deleteResult.get("failure") != null) { + synchronized (results) { + results.add("failure", String.format(Locale.ROOT, "Failed to delete replica for collection=%s shard=%s" + + " on node=%s", replica.getStr(COLLECTION_PROP), replica.getStr(SHARD_ID_PROP), replica.getStr(NODE_NAME_PROP))); + } + } + SimpleOrderedMap success = (SimpleOrderedMap) deleteResult.get("success"); + if (success != null) { + synchronized (results) { + results.add("success", success); + } + } + }); + } catch (KeeperException e) { + log.warn("Error deleting replica: " + r, e); + cleanupLatch.countDown(); + } catch (Exception e) { + log.warn("Error deleting replica: " + r, e); + cleanupLatch.countDown(); + throw e; + } + } + log.debug("Waiting for delete shard action to complete"); + cleanupLatch.await(5, TimeUnit.MINUTES); + + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP, + collectionName, ZkStateReader.SHARD_ID_PROP, sliceId); + ZkStateReader zkStateReader = ocmh.zkStateReader; + Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m)); + + // wait for a while until we don't see the shard + TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource); + boolean removed = false; + while (!timeout.hasTimedOut()) { + timeout.sleep(100); + DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName); + removed = collection.getSlice(sliceId) == null; + if (removed) { + timeout.sleep(100); // just a bit of time so it's more likely other readers see on return + break; + } + } + if (!removed) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Could not fully remove collection: " + collectionName + " shard: " + sliceId); + } + + log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId); + } catch (SolrException e) { + throw e; + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + "Error executing delete operation for collection: " + collectionName + " shard: " + sliceId, e); + } + } + + private List getReplicasForSlice(String collectionName, Slice slice) { + List sourceReplicas = new ArrayList<>(); + for (Replica replica : slice.getReplicas()) { + ZkNodeProps props = new ZkNodeProps( + COLLECTION_PROP, collectionName, + SHARD_ID_PROP, slice.getName(), + ZkStateReader.CORE_NAME_PROP, replica.getCoreName(), + ZkStateReader.REPLICA_PROP, replica.getName(), + CoreAdminParams.NODE, replica.getNodeName()); + sourceReplicas.add(props); + } + return sourceReplicas; + } +} http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java new file mode 100644 index 0000000..cf0a234 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.api.collections; + +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonParams.NAME; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.Replica.State; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.snapshots.CollectionSnapshotMetaData; +import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData; +import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus; +import org.apache.solr.core.snapshots.SolrSnapshotManager; +import org.apache.solr.handler.component.ShardHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements the functionality of deleting a collection level snapshot. + */ +public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final OverseerCollectionMessageHandler ocmh; + + public DeleteSnapshotCmd (OverseerCollectionMessageHandler ocmh) { + this.ocmh = ocmh; + } + + @Override + public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception { + String collectionName = message.getStr(COLLECTION_PROP); + String commitName = message.getStr(CoreAdminParams.COMMIT_NAME); + String asyncId = message.getStr(ASYNC); + Map requestMap = new HashMap<>(); + NamedList shardRequestResults = new NamedList(); + ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); + SolrZkClient zkClient = ocmh.zkStateReader.getZkClient(); + + Optional meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName); + if (!meta.isPresent()) { // Snapshot not found. Nothing to do. + return; + } + + log.info("Deleting a snapshot for collection={} with commitName={}", collectionName, commitName); + + Set existingCores = new HashSet<>(); + for (Slice s : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) { + for (Replica r : s.getReplicas()) { + existingCores.add(r.getCoreName()); + } + } + + Set coresWithSnapshot = new HashSet<>(); + for (CoreSnapshotMetaData m : meta.get().getReplicaSnapshots()) { + if (existingCores.contains(m.getCoreName())) { + coresWithSnapshot.add(m.getCoreName()); + } + } + + log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores); + for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) { + for (Replica replica : slice.getReplicas()) { + if (replica.getState() == State.DOWN) { + continue; // Since replica is down - no point sending a request. + } + + // Note - when a snapshot is found in_progress state - it is the result of overseer + // failure while handling the snapshot creation. Since we don't know the exact set of + // replicas to contact at this point, we try on all replicas. + if (meta.get().getStatus() == SnapshotStatus.InProgress || coresWithSnapshot.contains(replica.getCoreName())) { + String coreName = replica.getStr(CORE_NAME_PROP); + + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminAction.DELETESNAPSHOT.toString()); + params.set(NAME, slice.getName()); + params.set(CORE_NAME_PROP, coreName); + params.set(CoreAdminParams.COMMIT_NAME, commitName); + + log.info("Sending deletesnapshot request to core={} with commitName={}", coreName, commitName); + ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap); + } + } + } + + ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap); + NamedList success = (NamedList) shardRequestResults.get("success"); + List replicas = new ArrayList<>(); + if (success != null) { + for ( int i = 0 ; i < success.size() ; i++) { + NamedList resp = (NamedList)success.getVal(i); + // Unfortunately async processing logic doesn't provide the "core" name automatically. + String coreName = (String)resp.get("core"); + coresWithSnapshot.remove(coreName); + } + } + + if (!coresWithSnapshot.isEmpty()) { // One or more failures. + log.warn("Failed to delete a snapshot for collection {} with commitName = {}. Snapshot could not be deleted for following cores {}", + collectionName, commitName, coresWithSnapshot); + + List replicasWithSnapshot = new ArrayList<>(); + for (CoreSnapshotMetaData m : meta.get().getReplicaSnapshots()) { + if (coresWithSnapshot.contains(m.getCoreName())) { + replicasWithSnapshot.add(m); + } + } + + // Update the ZK meta-data to include only cores with the snapshot. This will enable users to figure out + // which cores still contain the named snapshot. + CollectionSnapshotMetaData newResult = new CollectionSnapshotMetaData(meta.get().getName(), SnapshotStatus.Failed, + meta.get().getCreationDate(), replicasWithSnapshot); + SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, newResult); + log.info("Saved snapshot information for collection={} with commitName={} in Zookeeper as follows", collectionName, commitName, + Utils.toJSON(newResult)); + throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to delete snapshot on cores " + coresWithSnapshot); + + } else { + // Delete the ZK path so that we eliminate the references of this snapshot from collection level meta-data. + SolrSnapshotManager.deleteCollectionLevelSnapshot(zkClient, collectionName, commitName); + log.info("Deleted Zookeeper snapshot metdata for collection={} with commitName={}", collectionName, commitName); + log.info("Successfully deleted snapshot for collection={} with commitName={}", collectionName, commitName); + } + } +} http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java new file mode 100644 index 0000000..a80fdc0 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.api.collections; + +import java.util.Set; + +import org.apache.solr.common.SolrCloseableLatch; +import org.apache.solr.common.cloud.CollectionStateWatcher; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; + +/** + * We use this watcher to wait for any eligible replica in a shard to become active so that it can become a leader. + */ +public class LeaderRecoveryWatcher implements CollectionStateWatcher { + String collectionId; + String shardId; + String replicaId; + String targetCore; + SolrCloseableLatch latch; + + /** + * Watch for recovery of a replica + * + * @param collectionId collection name + * @param shardId shard id + * @param replicaId source replica name (coreNodeName) + * @param targetCore specific target core name - if null then any active replica will do + * @param latch countdown when recovered + */ + LeaderRecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, SolrCloseableLatch latch) { + this.collectionId = collectionId; + this.shardId = shardId; + this.replicaId = replicaId; + this.targetCore = targetCore; + this.latch = latch; + } + + @Override + public boolean onStateChanged(Set liveNodes, DocCollection collectionState) { + if (collectionState == null) { // collection has been deleted - don't wait + latch.countDown(); + return true; + } + Slice slice = collectionState.getSlice(shardId); + if (slice == null) { // shard has been removed - don't wait + latch.countDown(); + return true; + } + for (Replica replica : slice.getReplicas()) { + // check if another replica exists - doesn't have to be the one we're moving + // as long as it's active and can become a leader, in which case we don't have to wait + // for recovery of specifically the one that we've just added + if (!replica.getName().equals(replicaId)) { + if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election + continue; + } + // check its state + String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP); + if (targetCore != null && !targetCore.equals(coreName)) { + continue; + } + if (replica.isActive(liveNodes)) { // recovered - stop waiting + latch.countDown(); + return true; + } + } + } + // set the watch again to wait for the new replica to recover + return false; + } +} http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java new file mode 100644 index 0000000..4edc363 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.api.collections; + +import java.lang.invoke.MethodHandles; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.solr.client.solrj.request.CoreAdminRequest; +import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.overseer.OverseerAction; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CompositeIdRouter; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.RoutingRule; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.common.util.Utils; +import org.apache.solr.handler.component.ShardHandler; +import org.apache.solr.handler.component.ShardHandlerFactory; +import org.apache.solr.update.SolrIndexSplitter; +import org.apache.solr.util.TimeOut; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonParams.NAME; +import static org.apache.solr.common.util.Utils.makeMap; + +public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final OverseerCollectionMessageHandler ocmh; + private final TimeSource timeSource; + + public MigrateCmd(OverseerCollectionMessageHandler ocmh) { + this.ocmh = ocmh; + this.timeSource = ocmh.cloudManager.getTimeSource(); + } + + + @Override + public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { + String sourceCollectionName = message.getStr("collection"); + String splitKey = message.getStr("split.key"); + String targetCollectionName = message.getStr("target.collection"); + int timeout = message.getInt("forward.timeout", 10 * 60) * 1000; + + DocCollection sourceCollection = clusterState.getCollection(sourceCollectionName); + if (sourceCollection == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown source collection: " + sourceCollectionName); + } + DocCollection targetCollection = clusterState.getCollection(targetCollectionName); + if (targetCollection == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown target collection: " + sourceCollectionName); + } + if (!(sourceCollection.getRouter() instanceof CompositeIdRouter)) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection must use a compositeId router"); + } + if (!(targetCollection.getRouter() instanceof CompositeIdRouter)) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target collection must use a compositeId router"); + } + CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter(); + CompositeIdRouter targetRouter = (CompositeIdRouter) targetCollection.getRouter(); + Collection sourceSlices = sourceRouter.getSearchSlicesSingle(splitKey, null, sourceCollection); + if (sourceSlices.isEmpty()) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "No active slices available in source collection: " + sourceCollection + "for given split.key: " + splitKey); + } + Collection targetSlices = targetRouter.getSearchSlicesSingle(splitKey, null, targetCollection); + if (targetSlices.isEmpty()) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey); + } + + String asyncId = null; + if (message.containsKey(ASYNC) && message.get(ASYNC) != null) + asyncId = message.getStr(ASYNC); + + for (Slice sourceSlice : sourceSlices) { + for (Slice targetSlice : targetSlices) { + log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice); + migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, + timeout, results, asyncId, message); + } + } + } + + private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice, + DocCollection targetCollection, Slice targetSlice, + String splitKey, int timeout, + NamedList results, String asyncId, ZkNodeProps message) throws Exception { + String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName(); + ZkStateReader zkStateReader = ocmh.zkStateReader; + if (clusterState.hasCollection(tempSourceCollectionName)) { + log.info("Deleting temporary collection: " + tempSourceCollectionName); + Map props = makeMap( + Overseer.QUEUE_OPERATION, DELETE.toLower(), + NAME, tempSourceCollectionName); + + try { + ocmh.commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results); + clusterState = zkStateReader.getClusterState(); + } catch (Exception e) { + log.warn("Unable to clean up existing temporary collection: " + tempSourceCollectionName, e); + } + } + + CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter(); + DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey); + + ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory; + ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); + + log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange); + // intersect source range, keyHashRange and target range + // this is the range that has to be split from source and transferred to target + DocRouter.Range splitRange = ocmh.intersect(targetSlice.getRange(), ocmh.intersect(sourceSlice.getRange(), keyHashRange)); + if (splitRange == null) { + log.info("No common hashes between source shard: {} and target shard: {}", sourceSlice.getName(), targetSlice.getName()); + return; + } + log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName()); + + Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000); + // For tracking async calls. + Map requestMap = new HashMap<>(); + + log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: " + + targetLeader.getStr("core") + " to buffer updates"); + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString()); + params.set(CoreAdminParams.NAME, targetLeader.getStr("core")); + + ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); + + ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap); + + ZkNodeProps m = new ZkNodeProps( + Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(), + COLLECTION_PROP, sourceCollection.getName(), + SHARD_ID_PROP, sourceSlice.getName(), + "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!", + "range", splitRange.toString(), + "targetCollection", targetCollection.getName(), + "expireAt", RoutingRule.makeExpiryAt(timeout)); + log.info("Adding routing rule: " + m); + Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m)); + + // wait for a while until we see the new rule + log.info("Waiting to see routing rule updated in clusterstate"); + TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS, timeSource); + boolean added = false; + while (!waitUntil.hasTimedOut()) { + waitUntil.sleep(100); + sourceCollection = zkStateReader.getClusterState().getCollection(sourceCollection.getName()); + sourceSlice = sourceCollection.getSlice(sourceSlice.getName()); + Map rules = sourceSlice.getRoutingRules(); + if (rules != null) { + RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!"); + if (rule != null && rule.getRouteRanges().contains(splitRange)) { + added = true; + break; + } + } + } + if (!added) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not add routing rule: " + m); + } + + log.info("Routing rule added successfully"); + + // Create temp core on source shard + Replica sourceLeader = zkStateReader.getLeaderRetry(sourceCollection.getName(), sourceSlice.getName(), 10000); + + // create a temporary collection with just one node on the shard leader + String configName = zkStateReader.readConfigName(sourceCollection.getName()); + Map props = makeMap( + Overseer.QUEUE_OPERATION, CREATE.toLower(), + NAME, tempSourceCollectionName, + NRT_REPLICAS, 1, + OverseerCollectionMessageHandler.NUM_SLICES, 1, + OverseerCollectionMessageHandler.COLL_CONF, configName, + OverseerCollectionMessageHandler.CREATE_NODE_SET, sourceLeader.getNodeName()); + if (asyncId != null) { + String internalAsyncId = asyncId + Math.abs(System.nanoTime()); + props.put(ASYNC, internalAsyncId); + } + + log.info("Creating temporary collection: " + props); + ocmh.commandMap.get(CREATE).call(clusterState, new ZkNodeProps(props), results); + // refresh cluster state + clusterState = zkStateReader.getClusterState(); + Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next(); + Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000); + + String tempCollectionReplica1 = tempSourceLeader.getCoreName(); + String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName, + sourceLeader.getNodeName(), tempCollectionReplica1); + // wait for the replicas to be seen as active on temp source leader + log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName()); + CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState(); + cmd.setCoreName(tempCollectionReplica1); + cmd.setNodeName(sourceLeader.getNodeName()); + cmd.setCoreNodeName(coreNodeName); + cmd.setState(Replica.State.ACTIVE); + cmd.setCheckLive(true); + cmd.setOnlyIfLeader(true); + // we don't want this to happen asynchronously + ocmh.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null); + + ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" + + " or timed out waiting for it to come up", asyncId, requestMap); + + log.info("Asking source leader to split index"); + params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString()); + params.set(CoreAdminParams.CORE, sourceLeader.getStr("core")); + params.add(CoreAdminParams.TARGET_CORE, tempSourceLeader.getStr("core")); + params.set(CoreAdminParams.RANGES, splitRange.toString()); + params.set("split.key", splitKey); + + String tempNodeName = sourceLeader.getNodeName(); + + ocmh.sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap); + ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap); + + log.info("Creating a replica of temporary collection: {} on the target leader node: {}", + tempSourceCollectionName, targetLeader.getNodeName()); + String tempCollectionReplica2 = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), + zkStateReader.getClusterState().getCollection(tempSourceCollectionName), tempSourceSlice.getName(), Replica.Type.NRT); + props = new HashMap<>(); + props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower()); + props.put(COLLECTION_PROP, tempSourceCollectionName); + props.put(SHARD_ID_PROP, tempSourceSlice.getName()); + props.put("node", targetLeader.getNodeName()); + props.put(CoreAdminParams.NAME, tempCollectionReplica2); + // copy over property params: + for (String key : message.keySet()) { + if (key.startsWith(OverseerCollectionMessageHandler.COLL_PROP_PREFIX)) { + props.put(key, message.getStr(key)); + } + } + // add async param + if (asyncId != null) { + props.put(ASYNC, asyncId); + } + ((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null); + + ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " + + "temporary collection in target leader node.", asyncId, requestMap); + + coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName, + targetLeader.getNodeName(), tempCollectionReplica2); + // wait for the replicas to be seen as active on temp source leader + log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName()); + cmd = new CoreAdminRequest.WaitForState(); + cmd.setCoreName(tempSourceLeader.getStr("core")); + cmd.setNodeName(targetLeader.getNodeName()); + cmd.setCoreNodeName(coreNodeName); + cmd.setState(Replica.State.ACTIVE); + cmd.setCheckLive(true); + cmd.setOnlyIfLeader(true); + params = new ModifiableSolrParams(cmd.getParams()); + + ocmh.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap); + + ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" + + " replica or timed out waiting for them to come up", asyncId, requestMap); + + log.info("Successfully created replica of temp source collection on target leader node"); + + log.info("Requesting merge of temp source collection replica to target leader"); + params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.MERGEINDEXES.toString()); + params.set(CoreAdminParams.CORE, targetLeader.getStr("core")); + params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2); + + ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); + String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to " + + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName(); + ocmh.processResponses(results, shardHandler, true, msg, asyncId, requestMap); + + log.info("Asking target leader to apply buffered updates"); + params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString()); + params.set(CoreAdminParams.NAME, targetLeader.getStr("core")); + + ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); + ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates", + asyncId, requestMap); + + try { + log.info("Deleting temporary collection: " + tempSourceCollectionName); + props = makeMap( + Overseer.QUEUE_OPERATION, DELETE.toLower(), + NAME, tempSourceCollectionName); + ocmh.commandMap.get(DELETE). call(zkStateReader.getClusterState(), new ZkNodeProps(props), results); + } catch (Exception e) { + log.error("Unable to delete temporary collection: " + tempSourceCollectionName + + ". Please remove it manually", e); + } + } +} http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java new file mode 100644 index 0000000..f9392b5 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.api.collections; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +import org.apache.solr.cloud.ActiveReplicaWatcher; +import org.apache.solr.common.SolrCloseableLatch; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.common.util.Utils; +import org.apache.solr.update.UpdateLog; +import org.apache.solr.util.TimeOut; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE; +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE; +import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT; +import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; + +public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final OverseerCollectionMessageHandler ocmh; + private final TimeSource timeSource; + + public MoveReplicaCmd(OverseerCollectionMessageHandler ocmh) { + this.ocmh = ocmh; + this.timeSource = ocmh.cloudManager.getTimeSource(); + } + + @Override + public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception { + moveReplica(ocmh.zkStateReader.getClusterState(), message, results); + } + + private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { + log.debug("moveReplica() : {}", Utils.toJSONString(message)); + ocmh.checkRequired(message, COLLECTION_PROP, CollectionParams.TARGET_NODE); + String collection = message.getStr(COLLECTION_PROP); + String targetNode = message.getStr(CollectionParams.TARGET_NODE); + boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false); + boolean inPlaceMove = message.getBool(IN_PLACE_MOVE, true); + int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes + + String async = message.getStr(ASYNC); + + DocCollection coll = clusterState.getCollection(collection); + if (coll == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist"); + } + if (!clusterState.getLiveNodes().contains(targetNode)) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target node: " + targetNode + " not in live nodes: " + clusterState.getLiveNodes()); + } + Replica replica = null; + if (message.containsKey(REPLICA_PROP)) { + String replicaName = message.getStr(REPLICA_PROP); + replica = coll.getReplica(replicaName); + if (replica == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Collection: " + collection + " replica: " + replicaName + " does not exist"); + } + } else { + String sourceNode = message.getStr(CollectionParams.SOURCE_NODE, message.getStr(CollectionParams.FROM_NODE)); + if (sourceNode == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + CollectionParams.SOURCE_NODE + + " or '" + CollectionParams.FROM_NODE + "' is a required param"); + } + String shardId = message.getStr(SHARD_ID_PROP); + if (shardId == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + SHARD_ID_PROP + "' is a required param"); + } + Slice slice = clusterState.getCollection(collection).getSlice(shardId); + List sliceReplicas = new ArrayList<>(slice.getReplicas()); + Collections.shuffle(sliceReplicas, OverseerCollectionMessageHandler.RANDOM); + // this picks up a single random replica from the sourceNode + for (Replica r : slice.getReplicas()) { + if (r.getNodeName().equals(sourceNode)) { + replica = r; + } + } + if (replica == null) { + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, + "Collection: " + collection + " node: " + sourceNode + " does not have any replica belonging to shard: " + shardId); + } + } + + log.info("Replica will be moved to node {}: {}", targetNode, replica); + Slice slice = null; + for (Slice s : coll.getSlices()) { + if (s.getReplicas().contains(replica)) { + slice = s; + } + } + assert slice != null; + Object dataDir = replica.get("dataDir"); + boolean isSharedFS = replica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && dataDir != null; + + if (isSharedFS && inPlaceMove) { + log.debug("-- moveHdfsReplica"); + moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout, waitForFinalState); + } else { + log.debug("-- moveNormalReplica (inPlaceMove=" + inPlaceMove + ", isSharedFS=" + isSharedFS); + moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout, waitForFinalState); + } + } + + private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async, + DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception { + String skipCreateReplicaInClusterState = "true"; + if (clusterState.getLiveNodes().contains(replica.getNodeName())) { + skipCreateReplicaInClusterState = "false"; + ZkNodeProps removeReplicasProps = new ZkNodeProps( + COLLECTION_PROP, coll.getName(), + SHARD_ID_PROP, slice.getName(), + REPLICA_PROP, replica.getName() + ); + removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false); + removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false); + if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async); + NamedList deleteResult = new NamedList(); + ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null); + if (deleteResult.get("failure") != null) { + String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s", + coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure")); + log.warn(errorString); + results.add("failure", errorString); + return; + } + + TimeOut timeOut = new TimeOut(20L, TimeUnit.SECONDS, timeSource); + while (!timeOut.hasTimedOut()) { + coll = ocmh.zkStateReader.getClusterState().getCollection(coll.getName()); + if (coll.getReplica(replica.getName()) != null) { + timeOut.sleep(100); + } else { + break; + } + } + if (timeOut.hasTimedOut()) { + results.add("failure", "Still see deleted replica in clusterstate!"); + return; + } + + } + + String ulogDir = replica.getStr(CoreAdminParams.ULOG_DIR); + ZkNodeProps addReplicasProps = new ZkNodeProps( + COLLECTION_PROP, coll.getName(), + SHARD_ID_PROP, slice.getName(), + CoreAdminParams.NODE, targetNode, + CoreAdminParams.CORE_NODE_NAME, replica.getName(), + CoreAdminParams.NAME, replica.getCoreName(), + WAIT_FOR_FINAL_STATE, String.valueOf(waitForFinalState), + SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, skipCreateReplicaInClusterState, + CoreAdminParams.ULOG_DIR, ulogDir.substring(0, ulogDir.lastIndexOf(UpdateLog.TLOG_NAME)), + CoreAdminParams.DATA_DIR, dataDir); + if(async!=null) addReplicasProps.getProperties().put(ASYNC, async); + NamedList addResult = new NamedList(); + try { + ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, addResult, null); + } catch (Exception e) { + // fatal error - try rolling back + String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" + + " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure")); + results.add("failure", errorString); + log.warn("Error adding replica " + addReplicasProps + " - trying to roll back...", e); + addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName()); + NamedList rollback = new NamedList(); + ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null); + if (rollback.get("failure") != null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica + + ", collection may be inconsistent: " + rollback.get("failure")); + } + return; + } + if (addResult.get("failure") != null) { + String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" + + " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure")); + log.warn(errorString); + results.add("failure", errorString); + log.debug("--- trying to roll back..."); + // try to roll back + addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName()); + NamedList rollback = new NamedList(); + try { + ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null); + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica + + ", collection may be inconsistent!", e); + } + if (rollback.get("failure") != null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica + + ", collection may be inconsistent! Failure: " + rollback.get("failure")); + } + return; + } else { + String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " + + "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), replica.getCoreName(), targetNode); + results.add("success", successString); + } + } + + private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async, + DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception { + String newCoreName = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType()); + ZkNodeProps addReplicasProps = new ZkNodeProps( + COLLECTION_PROP, coll.getName(), + SHARD_ID_PROP, slice.getName(), + CoreAdminParams.NODE, targetNode, + CoreAdminParams.NAME, newCoreName); + if (async != null) addReplicasProps.getProperties().put(ASYNC, async); + NamedList addResult = new NamedList(); + SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh); + ActiveReplicaWatcher watcher = null; + ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null); + log.debug("props " + props); + if (replica.equals(slice.getLeader()) || waitForFinalState) { + watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch); + log.debug("-- registered watcher " + watcher); + ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher); + } + if (addResult.get("failure") != null) { + String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" + + " on node=%s, failure=", coll.getName(), slice.getName(), targetNode, addResult.get("failure")); + log.warn(errorString); + results.add("failure", errorString); + if (watcher != null) { // unregister + ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher); + } + return; + } + // wait for the other replica to be active if the source replica was a leader + if (watcher != null) { + try { + log.debug("Waiting for leader's replica to recover."); + if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) { + String errorString = String.format(Locale.ROOT, "Timed out waiting for leader's replica to recover, collection=%s shard=%s" + + " on node=%s", coll.getName(), slice.getName(), targetNode); + log.warn(errorString); + results.add("failure", errorString); + return; + } else { + log.debug("Replica " + watcher.getActiveReplicas() + " is active - deleting the source..."); + } + } finally { + ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher); + } + } + + ZkNodeProps removeReplicasProps = new ZkNodeProps( + COLLECTION_PROP, coll.getName(), + SHARD_ID_PROP, slice.getName(), + REPLICA_PROP, replica.getName()); + if (async != null) removeReplicasProps.getProperties().put(ASYNC, async); + NamedList deleteResult = new NamedList(); + ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null); + if (deleteResult.get("failure") != null) { + String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s", + coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure")); + log.warn(errorString); + results.add("failure", errorString); + } else { + String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " + + "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode); + results.add("success", successString); + } + } +}