lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/2] lucene-solr:branch_6x: SOLR-10239: MOVEREPLICA API
Date Thu, 06 Apr 2017 08:58:57 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x e606d901e -> 78b84e65b


SOLR-10239: MOVEREPLICA API


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c3bb6e20
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c3bb6e20
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c3bb6e20

Branch: refs/heads/branch_6x
Commit: c3bb6e20591d69e0f5039a56780a32d69c2543ec
Parents: e606d90
Author: Cao Manh Dat <datcm@apache.org>
Authored: Thu Apr 6 15:48:38 2017 +0700
Committer: Cao Manh Dat <datcm@apache.org>
Committed: Thu Apr 6 15:58:25 2017 +0700

----------------------------------------------------------------------
 .../org/apache/solr/cloud/MoveReplicaCmd.java   | 193 +++++++++++++++++++
 .../cloud/OverseerCollectionMessageHandler.java |   1 +
 .../solr/handler/admin/CollectionsHandler.java  |  12 +-
 .../CollectionsAPIAsyncDistributedZkTest.java   |  16 +-
 .../org/apache/solr/cloud/MoveReplicaTest.java  | 125 ++++++++++++
 .../HdfsCollectionsAPIDistributedZkTest.java    | 114 +++++++++++
 .../solrj/request/CollectionAdminRequest.java   |  38 ++++
 .../solr/common/params/CollectionParams.java    |   1 +
 8 files changed, 497 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
new file mode 100644
index 0000000..09d3b79
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.*;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class MoveReplicaCmd implements Cmd{
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public MoveReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception
{
+    moveReplica(ocmh.zkStateReader.getClusterState(), message, results);
+  }
+
+  private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results)
throws Exception {
+    log.info("moveReplica() : {}", Utils.toJSONString(message));
+    ocmh.checkRequired(message, COLLECTION_PROP, "targetNode");
+    String collection = message.getStr(COLLECTION_PROP);
+    String targetNode = message.getStr("targetNode");
+
+    String async = message.getStr(ASYNC);
+
+    DocCollection coll = clusterState.getCollection(collection);
+    if (coll == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection
+ " does not exist");
+    }
+    Replica replica = null;
+    if (message.containsKey(REPLICA_PROP)) {
+      String replicaName = message.getStr(REPLICA_PROP);
+      replica = coll.getReplica(replicaName);
+      if (replica == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            "Collection: " + collection + " replica: " + replicaName + " does not exist");
+      }
+    } else {
+      ocmh.checkRequired(message, SHARD_ID_PROP, "fromNode");
+      String fromNode = message.getStr("fromNode");
+      String shardId = message.getStr(SHARD_ID_PROP);
+      Slice slice = clusterState.getCollection(collection).getSlice(shardId);
+      List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas());
+      Collections.shuffle(sliceReplicas, RANDOM);
+      for (Replica r : slice.getReplicas()) {
+        if (r.getNodeName().equals(fromNode)) {
+          replica = r;
+        }
+      }
+      if (replica == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            "Collection: " + collection + " node: " + fromNode + " do not have any replica
belong to shard: " + shardId);
+      }
+    }
+
+    log.info("Replica will be moved {}", replica);
+    Slice slice = null;
+    for (Slice s : coll.getSlices()) {
+      if (s.getReplicas().contains(replica)) {
+        slice = s;
+      }
+    }
+    assert slice != null;
+    Object dataDir = replica.get("dataDir");
+    if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) {
+      moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll,
replica, slice);
+    } else {
+      moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice);
+    }
+  }
+
+  private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir,
String targetNode, String async,
+                                 DocCollection coll, Replica replica, Slice slice) throws
Exception {
+    String newCoreName = Assign.buildCoreName(coll, slice.getName());
+
+    ZkNodeProps removeReplicasProps = new ZkNodeProps(
+        COLLECTION_PROP, coll.getName(),
+        SHARD_ID_PROP, slice.getName(),
+        REPLICA_PROP, replica.getName()
+        );
+    removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false);
+    removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false);
+    if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
+    NamedList deleteResult = new NamedList();
+    ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, ()->{});
+    if (deleteResult.get("failure") != null) {
+      String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s
shard=%s name=%s",
+          coll.getName(), slice.getName(), replica.getName());
+      log.warn(errorString);
+      results.add("failure", errorString + ", because of : " + deleteResult.get("failure"));
+      return;
+    }
+
+    ZkNodeProps addReplicasProps = new ZkNodeProps(
+        COLLECTION_PROP, coll.getName(),
+        SHARD_ID_PROP, slice.getName(),
+        CoreAdminParams.NODE, targetNode,
+        CoreAdminParams.NAME, newCoreName,
+        CoreAdminParams.DATA_DIR, dataDir);
+    if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
+    NamedList addResult = new NamedList();
+    ocmh.addReplica(clusterState, addReplicasProps, addResult, ()->{});
+    if (addResult.get("failure") != null) {
+      String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s
shard=%s" +
+          " on node=%s", coll.getName(), slice.getName(), targetNode);
+      log.warn(errorString);
+      results.add("failure", errorString);
+      return;
+    } else {
+      String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully,
moved replica=%s at node=%s " +
+          "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName,
targetNode);
+      results.add("success", successString);
+    }
+  }
+
+  private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode,
String async,
+                                 DocCollection coll, Replica replica, Slice slice) throws
Exception {
+    String newCoreName = Assign.buildCoreName(coll, slice.getName());
+    ZkNodeProps addReplicasProps = new ZkNodeProps(
+        COLLECTION_PROP, coll.getName(),
+        SHARD_ID_PROP, slice.getName(),
+        CoreAdminParams.NODE, targetNode,
+        CoreAdminParams.NAME, newCoreName);
+    if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
+    NamedList addResult = new NamedList();
+    ocmh.addReplica(clusterState, addReplicasProps, addResult, ()->{});
+    if (addResult.get("failure") != null) {
+      String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s
shard=%s" +
+          " on node=%s", coll.getName(), slice.getName(), targetNode);
+      log.warn(errorString);
+      results.add("failure", errorString);
+      return;
+    }
+
+    ZkNodeProps removeReplicasProps = new ZkNodeProps(
+        COLLECTION_PROP, coll.getName(),
+        SHARD_ID_PROP, slice.getName(),
+        REPLICA_PROP, replica.getName());
+    if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
+    NamedList deleteResult = new NamedList();
+    ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, ()->{});
+    if (deleteResult.get("failure") != null) {
+      String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s
shard=%s name=%s",
+          coll.getName(), slice.getName(), replica.getName());
+      log.warn(errorString);
+      results.add("failure", errorString + ", because of : " + deleteResult.get("failure"));
+    } else {
+      String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully,
moved replica=%s at node=%s " +
+          "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName,
targetNode);
+      results.add("success", successString);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 00eb12d..a13323d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -207,6 +207,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
         .put(DELETESHARD, new DeleteShardCmd(this))
         .put(DELETEREPLICA, new DeleteReplicaCmd(this))
         .put(ADDREPLICA, new AddReplicaCmd(this))
+        .put(MOVEREPLICA, new MoveReplicaCmd(this))
         .build()
     ;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index d7759ca..c50e2b4 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -857,6 +857,16 @@ public class CollectionsHandler extends RequestHandlerBase implements
Permission
       return null;
     }),
     REPLACENODE_OP(REPLACENODE, (req, rsp, h) -> req.getParams().required().getAll(req.getParams().getAll(null,
"parallel"), "source", "target")),
+    MOVEREPLICA_OP(MOVEREPLICA, (req, rsp, h) -> {
+      Map<String, Object> map = req.getParams().required().getAll(null,
+          COLLECTION_PROP);
+
+      return req.getParams().getAll(map,
+          "fromNode",
+          "targetNode",
+          "replica",
+          "shard");
+    }),
     DELETENODE_OP(DELETENODE, (req, rsp, h) -> req.getParams().required().getAll(null,
"node"));
     public final CollectionOp fun;
     CollectionAction action;
@@ -879,7 +889,7 @@ public class CollectionsHandler extends RequestHandlerBase implements
Permission
       for (CollectionOperation op : values()) {
         if (op.action == action) return op;
       }
-      throw new SolrException(ErrorCode.SERVER_ERROR, "No such action" + action);
+      throw new SolrException(ErrorCode.SERVER_ERROR, "No such action " + action);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
index dcb115a..30c3c9e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
@@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard;
 import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -178,11 +179,22 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase
{
       //expected
     }
 
-    String replica = shard1.getReplicas().iterator().next().getName();
+    Replica replica = shard1.getReplicas().iterator().next();
+    for (String liveNode : client.getZkStateReader().getClusterState().getLiveNodes()) {
+      if (!replica.getNodeName().equals(liveNode)) {
+        state = new CollectionAdminRequest.MoveReplica(collection, replica.getName(), liveNode)
+            .processAndWait(client, MAX_TIMEOUT_SECONDS);
+        assertSame("MoveReplica did not complete", RequestStatusState.COMPLETED, state);
+        break;
+      }
+    }
+
+    shard1 = client.getZkStateReader().getClusterState().getSlice(collection, "shard1");
+    String replicaName = shard1.getReplicas().iterator().next().getName();
     state = new CollectionAdminRequest.DeleteReplica()
         .setCollectionName(collection)
         .setShardName("shard1")
-        .setReplica(replica)
+        .setReplica(replicaName)
         .processAndWait(client, MAX_TIMEOUT_SECONDS);
     assertSame("DeleteReplica did not complete", RequestStatusState.COMPLETED, state);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
new file mode 100644
index 0000000..4368fea
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MoveReplicaTest extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(4)
+        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+        .configure();
+  }
+
+  protected String getSolrXml() {
+    return "solr.xml";
+  }
+
+  @Test
+  public void test() throws Exception {
+    cluster.waitForAllNodes(5000);
+    String coll = "movereplicatest_coll";
+    log.info("total_jettys: " + cluster.getJettySolrRunners().size());
+
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll,
"conf1", 2, 2);
+    create.setMaxShardsPerNode(2);
+    cloudClient.request(create);
+
+    Replica replica = getRandomReplica(coll, cloudClient);
+    Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
+    ArrayList<String> l = new ArrayList<>(liveNodes);
+    Collections.shuffle(l, random());
+    String targetNode = null;
+    for (String node : liveNodes) {
+      if (!replica.getNodeName().equals(node)) {
+        targetNode = node;
+        break;
+      }
+    }
+    assertNotNull(targetNode);
+    String shardId = null;
+    for (Slice slice : cloudClient.getZkStateReader().getClusterState().getCollection(coll).getSlices())
{
+      if (slice.getReplicas().contains(replica)) {
+        shardId = slice.getName();
+      }
+    }
+
+    CollectionAdminRequest.MoveReplica moveReplica = new CollectionAdminRequest.MoveReplica(coll,
replica.getName(), targetNode);
+    moveReplica.processAsync("000", cloudClient);
+    CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
+    // wait for async request success
+    boolean success = false;
+    for (int i = 0; i < 200; i++) {
+      CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
+      if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
+        success = true;
+        break;
+      }
+      assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
+      Thread.sleep(50);
+    }
+    assertTrue(success);
+    checkNumOfCores(cloudClient, replica.getNodeName(), 0);
+    checkNumOfCores(cloudClient, targetNode, 2);
+
+    moveReplica = new CollectionAdminRequest.MoveReplica(coll, shardId, targetNode, replica.getNodeName());
+    moveReplica.process(cloudClient);
+    checkNumOfCores(cloudClient, replica.getNodeName(), 1);
+    checkNumOfCores(cloudClient, targetNode, 1);
+  }
+
+  private Replica getRandomReplica(String coll, CloudSolrClient cloudClient) {
+    List<Replica> replicas = cloudClient.getZkStateReader().getClusterState().getCollection(coll).getReplicas();
+    Collections.shuffle(replicas, random());
+    return replicas.get(0);
+  }
+
+  private void checkNumOfCores(CloudSolrClient cloudClient, String nodeName, int expectedCores)
throws IOException, SolrServerException {
+    assertEquals(nodeName + " does not have expected number of cores",expectedCores, getNumOfCores(cloudClient,
nodeName));
+  }
+
+  private int getNumOfCores(CloudSolrClient cloudClient, String nodeName) throws IOException,
SolrServerException {
+    try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(nodeName)))
{
+      CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
+      return status.getCoreStatus().size();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsCollectionsAPIDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsCollectionsAPIDistributedZkTest.java
b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsCollectionsAPIDistributedZkTest.java
index 1b830ad..58d499b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsCollectionsAPIDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsCollectionsAPIDistributedZkTest.java
@@ -16,15 +16,37 @@
  */
 package org.apache.solr.cloud.hdfs;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import com.carrotsearch.randomizedtesting.annotations.Nightly;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Metric;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.CoreStatus;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
 import org.apache.solr.cloud.CollectionsAPIDistributedZkTest;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.metrics.SolrMetricManager;
 import org.apache.solr.util.BadHdfsThreadsFilter;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Test;
 
 @Slow
 @Nightly
@@ -59,4 +81,96 @@ public class HdfsCollectionsAPIDistributedZkTest extends CollectionsAPIDistribut
     System.clearProperty("solr.hdfs.home");
   }
 
+  @Test
+  public void moveReplicaTest() throws Exception {
+    cluster.waitForAllNodes(5000);
+    String coll = "movereplicatest_coll";
+
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll,
"conf", 2, 2);
+    create.setMaxShardsPerNode(2);
+    cloudClient.request(create);
+
+    for (int i = 0; i < 10; i++) {
+      cloudClient.add(coll, sdoc("id",String.valueOf(i)));
+      cloudClient.commit(coll);
+    }
+
+    List<Slice> slices = new ArrayList<>(cloudClient.getZkStateReader().getClusterState().getCollection(coll).getSlices());
+    Collections.shuffle(slices, random());
+    Slice slice = null;
+    Replica replica = null;
+    for (Slice s : slices) {
+      slice = s;
+      for (Replica r : s.getReplicas()) {
+        if (s.getLeader() != r) {
+          replica = r;
+        }
+      }
+    }
+    String dataDir = getDataDir(replica);
+
+    Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
+    ArrayList<String> l = new ArrayList<>(liveNodes);
+    Collections.shuffle(l, random());
+    String targetNode = null;
+    for (String node : liveNodes) {
+      if (!replica.getNodeName().equals(node)) {
+        targetNode = node;
+        break;
+      }
+    }
+    assertNotNull(targetNode);
+
+    CollectionAdminRequest.MoveReplica moveReplica = new CollectionAdminRequest.MoveReplica(coll,
replica.getName(), targetNode);
+    moveReplica.process(cloudClient);
+
+    checkNumOfCores(cloudClient, replica.getNodeName(), 0);
+    checkNumOfCores(cloudClient, targetNode, 2);
+
+    waitForState("Wait for recovery finish failed",coll, clusterShape(2,2));
+    slice = cloudClient.getZkStateReader().getClusterState().getCollection(coll).getSlice(slice.getName());
+    boolean found = false;
+    for (Replica newReplica : slice.getReplicas()) {
+      if (getDataDir(newReplica).equals(dataDir)) {
+        found = true;
+      }
+    }
+    assertTrue(found);
+
+
+    // data dir is reused so replication will be skipped
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      SolrMetricManager manager = jetty.getCoreContainer().getMetricManager();
+      List<String> registryNames = manager.registryNames().stream()
+          .filter(s -> s.startsWith("solr.core.")).collect(Collectors.toList());
+      for (String registry : registryNames) {
+        Map<String, Metric> metrics = manager.registry(registry).getMetrics();
+        Counter counter = (Counter) metrics.get("REPLICATION./replication.requests");
+        if (counter != null) {
+          assertEquals(0, counter.getCount());
+        }
+      }
+    }
+  }
+
+
+  private void checkNumOfCores(CloudSolrClient cloudClient, String nodeName, int expectedCores)
throws IOException, SolrServerException {
+    assertEquals(nodeName + " does not have expected number of cores",expectedCores, getNumOfCores(cloudClient,
nodeName));
+  }
+
+  private int getNumOfCores(CloudSolrClient cloudClient, String nodeName) throws IOException,
SolrServerException {
+    try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(nodeName)))
{
+      CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
+      return status.getCoreStatus().size();
+    }
+  }
+
+  private String getDataDir(Replica replica) throws IOException, SolrServerException {
+    try (HttpSolrClient coreclient = getHttpSolrClient(replica.getBaseUrl())) {
+      CoreStatus status = CoreAdminRequest.getCoreStatus(replica.getCoreName(), coreclient);
+      return status.getDataDirectory();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 94750c0a..0b1609d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -607,6 +607,44 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
 
   }
 
+  public static class MoveReplica extends AsyncCollectionAdminRequest {
+    String collection, replica, targetNode;
+    String shard, fromNode;
+    boolean randomlyMoveReplica;
+
+    public MoveReplica(String collection, String replica, String targetNode) {
+      super(CollectionAction.MOVEREPLICA);
+      this.collection = collection;
+      this.replica = replica;
+      this.targetNode = targetNode;
+      this.randomlyMoveReplica = false;
+    }
+
+    public MoveReplica(String collection, String shard, String fromNode, String targetNode)
{
+      super(CollectionAction.MOVEREPLICA);
+      this.collection = collection;
+      this.shard = shard;
+      this.fromNode = fromNode;
+      this.targetNode = targetNode;
+      this.randomlyMoveReplica = true;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      params.set("collection", collection);
+      params.set("targetNode", targetNode);
+      if (randomlyMoveReplica) {
+        params.set("shard", shard);
+        params.set("fromNode", fromNode);
+      } else {
+        params.set("replica", replica);
+      }
+      return params;
+    }
+  }
+
+
   /*
    * Returns a RebalanceLeaders object to rebalance leaders for a collection
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index f1e5a52..51db039 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -80,6 +80,7 @@ public interface CollectionParams {
     REQUESTSTATUS(false, LockLevel.NONE),
     DELETESTATUS(false, LockLevel.NONE),
     ADDREPLICA(true, LockLevel.SHARD),
+    MOVEREPLICA(true, LockLevel.SHARD),
     OVERSEERSTATUS(false, LockLevel.NONE),
     LIST(false, LockLevel.NONE),
     CLUSTERSTATUS(false, LockLevel.NONE),


Mime
View raw message