lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject lucene-solr:jira/solr-11702: SOLR-11702: Recover testReplicasInLIRNoLeader
Date Wed, 10 Jan 2018 07:42:56 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-11702 2e26d30f9 -> 583a4a549


SOLR-11702: Recover testReplicasInLIRNoLeader


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/583a4a54
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/583a4a54
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/583a4a54

Branch: refs/heads/jira/solr-11702
Commit: 583a4a54960e19fbd12df1f08f4f18b6662ceaad
Parents: 2e26d30
Author: Cao Manh Dat <datcm@apache.org>
Authored: Wed Jan 10 14:42:43 2018 +0700
Committer: Cao Manh Dat <datcm@apache.org>
Committed: Wed Jan 10 14:42:43 2018 +0700

----------------------------------------------------------------------
 .../org/apache/solr/cloud/ForceLeaderTest.java  | 229 ++++++++++++++++++-
 1 file changed, 226 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/583a4a54/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index d31e822..9ce3036 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -18,8 +18,10 @@ package org.apache.solr.cloud;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
@@ -45,6 +47,9 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.junit.Ignore;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,9 +59,121 @@ public class ForceLeaderTest extends HttpPartitionTest {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final boolean onlyLeaderIndexes = random().nextBoolean();
 
+  @Test
   @Override
-  protected boolean useTlogReplicas() {
-    return onlyLeaderIndexes;
+  @Ignore
+  public void test() throws Exception {
+
+  }
+
+  /***
+   * Tests that FORCELEADER can get an active leader after leader puts all replicas in LIR
and itself goes down,
+   * hence resulting in a leaderless shard.
+   */
+  @Test
+  @Slow
+  //TODO remove in SOLR-11812
+  public void testReplicasInLIRNoLeader() throws Exception {
+    handle.put("maxScore", SKIPVAL);
+    handle.put("timestamp", SKIPVAL);
+
+    String testCollectionName = "forceleader_test_collection";
+    createOldLirCollection(testCollectionName, 3);
+    cloudClient.setDefaultCollection(testCollectionName);
+
+    try {
+      List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1,
1, 3, maxWaitSecsToSeeAllActive);
+      assertEquals("Expected 2 replicas for collection " + testCollectionName
+          + " but found " + notLeaders.size() + "; clusterState: "
+          + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
+
+      Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName,
SHARD1);
+      JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+      ZkController zkController = notLeader0.getCoreContainer().getZkController();
+
+      putNonLeadersIntoLIR(testCollectionName, SHARD1, zkController, leader, notLeaders);
+
+      cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
+      ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+      int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName,
SHARD1);
+      assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
+          "; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
+
+      int numReplicasOnLiveNodes = 0;
+      for (Replica rep : clusterState.getCollection(testCollectionName).getSlice(SHARD1).getReplicas())
{
+        if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
+          numReplicasOnLiveNodes++;
+        }
+      }
+      assertEquals(2, numReplicasOnLiveNodes);
+      log.info("Before forcing leader: " + printClusterStateInfo());
+      // Assert there is no leader yet
+      assertNull("Expected no leader right now. State: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1),
+          clusterState.getCollection(testCollectionName).getSlice(SHARD1).getLeader());
+
+      assertSendDocFails(3);
+
+      doForceLeader(cloudClient, testCollectionName, SHARD1);
+
+      // By now we have an active leader. Wait for recoveries to begin
+      waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
+
+      cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
+      clusterState = cloudClient.getZkStateReader().getClusterState();
+      log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
+      // we have a leader
+      Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
+      assertNotNull(newLeader);
+      // leader is active
+      assertEquals(State.ACTIVE, newLeader.getState());
+
+      numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+      assertEquals(2, numActiveReplicas);
+
+      // Assert that indexing works again
+      log.info("Sending doc 4...");
+      sendDoc(4);
+      log.info("Committing...");
+      cloudClient.commit();
+      log.info("Doc 4 sent and commit issued");
+
+      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
+      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 4, 4);
+
+      // Docs 1 and 4 should be here. 2 was lost during the partition, 3 had failed to be
indexed.
+      log.info("Checking doc counts...");
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.add("q", "*:*");
+      assertEquals("Expected only 2 documents in the index", 2, cloudClient.query(params).getResults().getNumFound());
+
+      bringBackOldLeaderAndSendDoc(testCollectionName, leader, notLeaders, 5);
+    } finally {
+      log.info("Cleaning up after the test.");
+      // try to clean up
+      attemptCollectionDelete(cloudClient, testCollectionName);
+    }
+  }
+
+  private void createOldLirCollection(String collection, int numReplicas) throws IOException,
SolrServerException {
+    if (onlyLeaderIndexes) {
+      CollectionAdminRequest
+          .createCollection(collection, "conf1", 1, 0, numReplicas, 0)
+          .setCreateNodeSet("")
+          .process(cloudClient);
+    } else {
+      CollectionAdminRequest.createCollection(collection, "conf1", 1, numReplicas)
+          .setCreateNodeSet("")
+          .process(cloudClient);
+    }
+    Properties oldLir = new Properties();
+    oldLir.setProperty("lirVersion", "old");
+    for (int i = 0; i < numReplicas; i++) {
+      // this is the only way to create replicas which run in old lir implementation
+      CollectionAdminRequest
+          .addReplicaToShard(collection, "shard1", onlyLeaderIndexes? Replica.Type.TLOG:
Replica.Type.NRT)
+          .setProperties(oldLir)
+          .process(cloudClient);
+    }
   }
 
   /**
@@ -69,7 +186,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
     handle.put("timestamp", SKIPVAL);
 
     String testCollectionName = "forceleader_last_published";
-    createCollection(testCollectionName, "conf1", 1, 3, 1);
+    createOldLirCollection(testCollectionName, 3);
     cloudClient.setDefaultCollection(testCollectionName);
     log.info("Collection created: " + testCollectionName);
 
@@ -169,6 +286,102 @@ public class ForceLeaderTest extends HttpPartitionTest {
     }
   }
 
+  void putNonLeadersIntoLIR(String collectionName, String shard, ZkController zkController,
Replica leader, List<Replica> notLeaders) throws Exception {
+    SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
+    for (int i = 0; i < notLeaders.size(); i++)
+      nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
+
+    sendDoc(1);
+
+    // ok, now introduce a network partition between the leader and both replicas
+    log.info("Closing proxies for the non-leader replicas...");
+    for (SocketProxy proxy : nonLeaderProxies)
+      proxy.close();
+
+    // indexing during a partition
+    log.info("Sending a doc during the network partition...");
+    sendDoc(2);
+
+    // Wait a little
+    Thread.sleep(2000);
+
+    // Kill the leader
+    log.info("Killing leader for shard1 of " + collectionName + " on node " + leader.getNodeName()
+ "");
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+    getProxyForReplica(leader).close();
+    leaderJetty.stop();
+
+    // Wait for a steady state, till LIR flags have been set and the shard is leaderless
+    log.info("Sleep and periodically wake up to check for state...");
+    for (int i = 0; i < 20; i++) {
+      Thread.sleep(1000);
+      State lirStates[] = new State[notLeaders.size()];
+      for (int j = 0; j < notLeaders.size(); j++)
+        lirStates[j] = zkController.getLeaderInitiatedRecoveryState(collectionName, shard,
notLeaders.get(j).getName());
+
+      ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+      boolean allDown = true;
+      for (State lirState : lirStates)
+        if (Replica.State.DOWN.equals(lirState) == false)
+          allDown = false;
+      if (allDown && clusterState.getCollection(collectionName).getSlice(shard).getLeader()
== null) {
+        break;
+      }
+      log.warn("Attempt " + i + ", waiting on for 1 sec to settle down in the steady state.
State: " +
+          printClusterStateInfo(collectionName));
+      log.warn("LIR state: " + getLIRState(zkController, collectionName, shard));
+    }
+    log.info("Waking up...");
+
+    // remove the network partition
+    log.info("Reopening the proxies for the non-leader replicas...");
+    for (SocketProxy proxy : nonLeaderProxies)
+      proxy.reopen();
+
+    log.info("LIR state: " + getLIRState(zkController, collectionName, shard));
+
+    State lirStates[] = new State[notLeaders.size()];
+    for (int j = 0; j < notLeaders.size(); j++)
+      lirStates[j] = zkController.getLeaderInitiatedRecoveryState(collectionName, shard,
notLeaders.get(j).getName());
+    for (State lirState : lirStates)
+      assertTrue("Expected that the replicas would be in LIR state by now. LIR states: "+Arrays.toString(lirStates),
+          Replica.State.DOWN == lirState || Replica.State.RECOVERING == lirState);
+  }
+
+  protected void bringBackOldLeaderAndSendDoc(String collection, Replica leader, List<Replica>
notLeaders, int docid) throws Exception {
+    // Bring back the leader which was stopped
+    log.info("Bringing back originally killed leader...");
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+    getProxyForReplica(leader).reopen();
+    leaderJetty.start();
+    waitForRecoveriesToFinish(collection, cloudClient.getZkStateReader(), true);
+    cloudClient.getZkStateReader().forceUpdateCollection(collection);
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    log.info("After bringing back leader: " + clusterState.getCollection(collection).getSlice(SHARD1));
+    int numActiveReplicas = getNumberOfActiveReplicas(clusterState, collection, SHARD1);
+    assertEquals(1+notLeaders.size(), numActiveReplicas);
+    log.info("Sending doc "+docid+"...");
+    sendDoc(docid);
+    log.info("Committing...");
+    cloudClient.commit();
+    log.info("Doc "+docid+" sent and commit issued");
+    assertDocsExistInAllReplicas(notLeaders, collection, docid, docid);
+    assertDocsExistInAllReplicas(Collections.singletonList(leader), collection, docid, docid);
+  }
+
+  protected String getLIRState(ZkController zkController, String collection, String shard)
throws KeeperException, InterruptedException {
+    StringBuilder sb = new StringBuilder();
+    String path = zkController.getLeaderInitiatedRecoveryZnodePath(collection, shard);
+    if (path == null)
+      return null;
+    try {
+      zkController.getZkClient().printLayout(path, 4, sb);
+    } catch (NoNodeException ex) {
+      return null;
+    }
+    return sb.toString();
+  }
+
   @Override
   protected int sendDoc(int docId) throws Exception {
     SolrInputDocument doc = new SolrInputDocument();
@@ -183,5 +396,15 @@ public class ForceLeaderTest extends HttpPartitionTest {
     client.request(forceLeader);
   }
 
+  protected int getNumberOfActiveReplicas(ClusterState clusterState, String collection, String
sliceId) {
+    int numActiveReplicas = 0;
+    // Assert all replicas are active
+    for (Replica rep : clusterState.getCollection(collection).getSlice(sliceId).getReplicas())
{
+      if (rep.getState().equals(State.ACTIVE)) {
+        numActiveReplicas++;
+      }
+    }
+    return numActiveReplicas;
+  }
 }
 


Mime
View raw message