lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject lucene-solr:branch_7x: SOLR-12130: CdcrReplicationDistributedZkTest is broken into two test classes, CdcrOpsAndBoundariesTest which does not require node restarts and CdcrWithNodesRestartsTest which does. The tests themselves are made faster and more res
Date Mon, 20 Aug 2018 09:19:58 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 83d95f759 -> 569d77103


SOLR-12130: CdcrReplicationDistributedZkTest is broken into two test classes, CdcrOpsAndBoundariesTest which does not require node restarts and CdcrWithNodesRestartsTest which does. The tests themselves are made faster and more resilient to spurious failures

(cherry picked from commit 1c0f95ee044af5e0c072fabb3ad423c61f0e033b)


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/569d7710
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/569d7710
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/569d7710

Branch: refs/heads/branch_7x
Commit: 569d77103e8c38dc8f5b78b3ac88a7dde4da5f86
Parents: 83d95f7
Author: Shalin Shekhar Mangar <shalin@apache.org>
Authored: Mon Aug 20 14:49:13 2018 +0530
Committer: Shalin Shekhar Mangar <shalin@apache.org>
Committed: Mon Aug 20 14:49:51 2018 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   4 +
 .../cloud/cdcr/CdcrOpsAndBoundariesTest.java    | 321 +++++++++
 .../cdcr/CdcrReplicationDistributedZkTest.java  | 681 -------------------
 .../apache/solr/cloud/cdcr/CdcrTestsUtil.java   | 132 +++-
 .../cloud/cdcr/CdcrWithNodesRestartsTest.java   | 343 ++++++++++
 5 files changed, 796 insertions(+), 685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/569d7710/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index be17206..5471825 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -278,6 +278,10 @@ Other Changes
 
 * SOLR-12675: Make LeaderVoteWaitTimeoutTest more resilient against side effects of test methods. (shalin)
 
+* SOLR-12130: CdcrReplicationDistributedZkTest is broken into two test classes, CdcrOpsAndBoundariesTest which does
+  not require node restarts and CdcrWithNodesRestartsTest which does. The tests themselves are made faster and more
+  resilient to spurious failures. (Varun Thacker, Amrit Sarkar via shalin)
+
 ==================  7.4.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/569d7710/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrOpsAndBoundariesTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrOpsAndBoundariesTest.java b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrOpsAndBoundariesTest.java
new file mode 100644
index 0000000..53942bb
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrOpsAndBoundariesTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.cdcr;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.CdcrParams;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CdcrOpsAndBoundariesTest extends SolrTestCaseJ4 {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  MiniSolrCloudCluster target, source;
+  CloudSolrClient sourceSolrClient, targetSolrClient;
+  private static String SOURCE_COLLECTION = "cdcr-source";
+  private static String TARGET_COLLECTION = "cdcr-target";
+  private static String ALL_Q = "*:*";
+
+  @Before
+  public void before() throws Exception {
+    target = new MiniSolrCloudCluster(1, createTempDir(TARGET_COLLECTION), buildJettyConfig("/solr"));
+    target.waitForAllNodes(30);
+    System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
+    source = new MiniSolrCloudCluster(1, createTempDir(SOURCE_COLLECTION), buildJettyConfig("/solr"));
+    source.waitForAllNodes(30);
+  }
+
+  @After
+  public void after() throws Exception {
+    target.shutdown();
+    source.shutdown();
+  }
+
+  /**
+   * Check the ops statistics.
+   */
+  @Test
+  public void testOps() throws Exception {
+    createCollections();
+
+    // Start CDCR
+    CdcrTestsUtil.cdcrRestart(sourceSolrClient);
+
+    // Index documents
+    CdcrTestsUtil.indexRandomDocs(100, sourceSolrClient);
+    double opsAll = 0.0;
+    NamedList ops = null;
+
+    // calculate ops
+    int itr = 10;
+    while (itr-- > 0 && opsAll == 0.0) {
+      NamedList rsp = CdcrTestsUtil.invokeCdcrAction(sourceSolrClient, CdcrParams.CdcrAction.OPS).getResponse();
+      NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.OPERATIONS_PER_SECOND)).getVal(0);
+      ops = (NamedList) collections.get(TARGET_COLLECTION);
+      opsAll = (Double) ops.get(CdcrParams.COUNTER_ALL);
+      Thread.sleep(250); // wait for cdcr to complete and check
+    }
+    // asserts ops values
+    double opsAdds = (Double) ops.get(CdcrParams.COUNTER_ADDS);
+    assertTrue(opsAll > 0);
+    assertTrue(opsAdds > 0);
+    double opsDeletes = (Double) ops.get(CdcrParams.COUNTER_DELETES);
+    assertEquals(0, opsDeletes, 0);
+
+    // Delete 10 documents: 10-19
+    List<String> ids;
+    for (int id = 0; id < 50; id++) {
+      ids = new ArrayList<>();
+      ids.add(Integer.toString(id));
+      sourceSolrClient.deleteById(ids, 1);
+      int dbq_id = 50 + id;
+      sourceSolrClient.deleteByQuery("id:" + dbq_id, 1);
+    }
+
+    itr = 10;
+    while (itr-- > 0) {
+      NamedList rsp = CdcrTestsUtil.invokeCdcrAction(sourceSolrClient, CdcrParams.CdcrAction.OPS).getResponse();
+      NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.OPERATIONS_PER_SECOND)).getVal(0);
+      ops = (NamedList) collections.get(TARGET_COLLECTION);
+      opsAll = (Double) ops.get(CdcrParams.COUNTER_ALL);
+      Thread.sleep(250); // wait for cdcr to complete and check
+    }
+    // asserts ops values
+    opsAdds = (Double) ops.get(CdcrParams.COUNTER_ADDS);
+    opsDeletes = (Double) ops.get(CdcrParams.COUNTER_DELETES);
+    assertTrue(opsAll > 0);
+    assertTrue(opsAdds > 0);
+    assertTrue(opsDeletes > 0);
+
+    deleteCollections();
+  }
+
+  @Test
+  public void testTargetCollectionNotAvailable() throws Exception {
+    createCollections();
+
+    // send start action to first shard
+    CdcrTestsUtil.cdcrStart(sourceSolrClient);
+
+    assertNotSame(null, CdcrTestsUtil.waitForClusterToSync
+        (sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound(), targetSolrClient));
+
+    // sleep for a bit to ensure that replicator threads are started
+    Thread.sleep(3000);
+
+    target.deleteAllCollections();
+
+    CdcrTestsUtil.indexRandomDocs(6, sourceSolrClient);
+    assertEquals(6L, sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound());
+
+    // we need to wait until the replicator thread is triggered
+    int cnt = 15; // timeout after 15 seconds
+    AssertionError lastAssertionError = null;
+    while (cnt > 0) {
+      try {
+        QueryResponse rsp = CdcrTestsUtil.invokeCdcrAction(sourceSolrClient, CdcrParams.CdcrAction.ERRORS);
+        NamedList collections = (NamedList) ((NamedList) rsp.getResponse().get(CdcrParams.ERRORS)).getVal(0);
+        NamedList errors = (NamedList) collections.get(TARGET_COLLECTION);
+        assertTrue(0 < (Long) errors.get(CdcrParams.CONSECUTIVE_ERRORS));
+        NamedList lastErrors = (NamedList) errors.get(CdcrParams.LAST);
+        assertNotNull(lastErrors);
+        assertTrue(0 < lastErrors.size());
+        deleteCollections();
+        return;
+      } catch (AssertionError e) {
+        lastAssertionError = e;
+        cnt--;
+        Thread.sleep(1000);
+      }
+    }
+
+    deleteCollections();
+    throw new AssertionError("Timeout while trying to assert replication errors", lastAssertionError);
+  }
+
+  @Test
+  public void testReplicationStartStop() throws Exception {
+    createCollections();
+
+    CdcrTestsUtil.indexRandomDocs(10, sourceSolrClient);
+    CdcrTestsUtil.cdcrStart(sourceSolrClient);
+
+    assertEquals(10, CdcrTestsUtil.waitForClusterToSync
+        (sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound(), targetSolrClient));
+
+    CdcrTestsUtil.cdcrStop(sourceSolrClient);
+
+    CdcrTestsUtil.indexRandomDocs(110, sourceSolrClient);
+
+    // Start again CDCR, the source cluster should reinitialise its log readers
+    // with the latest checkpoints
+
+    CdcrTestsUtil.cdcrRestart(sourceSolrClient);
+
+    assertEquals(110, CdcrTestsUtil.waitForClusterToSync
+        (sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound(), targetSolrClient));
+
+    deleteCollections();
+  }
+
+  /**
+   * Check that batch updates with deletes
+   */
+  @Test
+  public void testBatchAddsWithDelete() throws Exception {
+    createCollections();
+
+    // Start CDCR
+    CdcrTestsUtil.cdcrRestart(sourceSolrClient);
+    // Index 50 documents
+    CdcrTestsUtil.indexRandomDocs(50, sourceSolrClient);
+
+    // Delete 10 documents: 10-19
+    List<String> ids = new ArrayList<>();
+    for (int id = 10; id < 20; id++) {
+      ids.add(Integer.toString(id));
+    }
+    sourceSolrClient.deleteById(ids, 10);
+
+    CdcrTestsUtil.indexRandomDocs(50, 60, sourceSolrClient);
+
+    // Delete 1 document: 50
+    ids = new ArrayList<>();
+    ids.add(Integer.toString(50));
+    sourceSolrClient.deleteById(ids, 10);
+
+    CdcrTestsUtil.indexRandomDocs(60, 70, sourceSolrClient);
+
+    assertEquals(59, CdcrTestsUtil.waitForClusterToSync(59, sourceSolrClient));
+    assertEquals(59, CdcrTestsUtil.waitForClusterToSync
+        (sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound(), targetSolrClient));
+
+    deleteCollections();
+  }
+
+  /**
+   * Checks that batches are correctly constructed when batch boundaries are reached.
+   */
+  @Test
+  public void testBatchBoundaries() throws Exception {
+    createCollections();
+
+    // Start CDCR
+    CdcrTestsUtil.cdcrRestart(sourceSolrClient);
+
+    log.info("Indexing documents");
+
+    CdcrTestsUtil.indexRandomDocs(1000, sourceSolrClient);
+
+    assertEquals(1000, CdcrTestsUtil.waitForClusterToSync
+        (sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound(), targetSolrClient));
+
+    deleteCollections();
+  }
+
+  /**
+   * Check resilience of replication with delete by query executed on targets
+   */
+  @Test
+  public void testResilienceWithDeleteByQueryOnTarget() throws Exception {
+    createCollections();
+
+    // Start CDCR
+    CdcrTestsUtil.cdcrRestart(sourceSolrClient);
+
+    CdcrTestsUtil.indexRandomDocs(50, sourceSolrClient);
+
+    assertEquals(50, CdcrTestsUtil.waitForClusterToSync
+        (sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound(), targetSolrClient));
+
+    sourceSolrClient.deleteByQuery(ALL_Q, 1);
+
+    assertEquals(0, CdcrTestsUtil.waitForClusterToSync(0, sourceSolrClient));
+    assertEquals(0, CdcrTestsUtil.waitForClusterToSync
+        (sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound(), targetSolrClient));
+
+    CdcrTestsUtil.indexRandomDocs(51, 101, sourceSolrClient);
+
+    assertEquals(50, CdcrTestsUtil.waitForClusterToSync
+        (sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound(), targetSolrClient));
+
+    targetSolrClient.deleteByQuery(ALL_Q, 1);
+
+    assertEquals(50, sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound());
+    assertEquals(0, CdcrTestsUtil.waitForClusterToSync(0, targetSolrClient));
+
+    CdcrTestsUtil.indexRandomDocs(102, 152, sourceSolrClient);
+
+    assertEquals(100, sourceSolrClient.query(new SolrQuery(ALL_Q)).getResults().getNumFound());
+    assertEquals(50, CdcrTestsUtil.waitForClusterToSync(50, targetSolrClient));
+
+    deleteCollections();
+  }
+
+  private void createSourceCollection() throws Exception {
+    source.uploadConfigSet(configset(SOURCE_COLLECTION), SOURCE_COLLECTION);
+    CollectionAdminRequest.createCollection(SOURCE_COLLECTION, SOURCE_COLLECTION, 1, 1)
+        .withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
+        .process(source.getSolrClient());
+    Thread.sleep(1000);
+    sourceSolrClient = source.getSolrClient();
+    sourceSolrClient.setDefaultCollection(SOURCE_COLLECTION);
+  }
+
+  private void createTargetCollection() throws Exception {
+    target.uploadConfigSet(configset(TARGET_COLLECTION), TARGET_COLLECTION);
+    CollectionAdminRequest.createCollection(TARGET_COLLECTION, TARGET_COLLECTION, 1, 1)
+        .withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
+        .process(target.getSolrClient());
+    Thread.sleep(1000);
+    targetSolrClient = target.getSolrClient();
+    targetSolrClient.setDefaultCollection(TARGET_COLLECTION);
+  }
+
+  private void deleteSourceCollection() throws Exception {
+    source.deleteAllCollections();
+  }
+
+  private void deleteTargetcollection() throws Exception {
+    target.deleteAllCollections();
+  }
+
+  private void createCollections() throws Exception {
+    createTargetCollection();
+    createSourceCollection();
+  }
+
+  private void deleteCollections() throws Exception {
+    deleteSourceCollection();
+    deleteTargetcollection();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/569d7710/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrReplicationDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrReplicationDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrReplicationDistributedZkTest.java
deleted file mode 100644
index 82da287..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrReplicationDistributedZkTest.java
+++ /dev/null
@@ -1,681 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.cdcr;
-
-import java.io.File;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.lucene.util.LuceneTestCase.BadApple;
-import org.apache.lucene.util.LuceneTestCase.Nightly;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.handler.CdcrParams;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Nightly
-@BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028, https://issues.apache.org/jira/browse/SOLR-10107")
-public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest {
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  @Override
-  public void distribSetUp() throws Exception {
-    schemaString = "schema15.xml";      // we need a string id
-    super.distribSetUp();
-  }
-
-  /**
-   * Checks that the test framework handles properly the creation and deletion of collections and the
-   * restart of servers.
-   */
-  @Test
-  @ShardsFixed(num = 4)
-  public void testDeleteCreateSourceCollection() throws Exception {
-    log.info("Indexing documents");
-
-    List<SolrInputDocument> docs = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      docs.add(getDoc(id, Integer.toString(i)));
-    }
-    index(SOURCE_COLLECTION, docs);
-    index(TARGET_COLLECTION, docs);
-
-    assertNumDocs(10, SOURCE_COLLECTION);
-    assertNumDocs(10, TARGET_COLLECTION);
-
-    log.info("Restarting leader @ source_collection:shard1");
-
-    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
-
-    assertNumDocs(10, SOURCE_COLLECTION);
-    assertNumDocs(10, TARGET_COLLECTION);
-
-    log.info("Clearing source_collection");
-
-    this.clearSourceCollection();
-
-    assertNumDocs(0, SOURCE_COLLECTION);
-    assertNumDocs(10, TARGET_COLLECTION);
-
-    log.info("Restarting leader @ target_collection:shard1");
-
-    this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD1));
-
-    assertNumDocs(0, SOURCE_COLLECTION);
-    assertNumDocs(10, TARGET_COLLECTION);
-
-    log.info("Clearing target_collection");
-
-    this.clearTargetCollection();
-
-    assertNumDocs(0, SOURCE_COLLECTION);
-    assertNumDocs(0, TARGET_COLLECTION);
-
-    assertCollectionExpectations(SOURCE_COLLECTION);
-    assertCollectionExpectations(TARGET_COLLECTION);
-  }
-
-  @Test
-  @ShardsFixed(num = 4)
-  public void testTargetCollectionNotAvailable() throws Exception {
-    // send start action to first shard
-    NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
-    assertEquals(CdcrParams.ProcessState.STARTED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
-
-    // check status
-    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
-
-    this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD2);
-
-    // sleep for a bit to ensure that replicator threads are started
-    Thread.sleep(3000);
-
-    // Kill all the servers of the target
-    this.deleteCollection(TARGET_COLLECTION);
-
-    // Index a few documents to trigger the replication
-    index(SOURCE_COLLECTION, getDoc(id, "a"));
-    index(SOURCE_COLLECTION, getDoc(id, "b"));
-    index(SOURCE_COLLECTION, getDoc(id, "c"));
-    index(SOURCE_COLLECTION, getDoc(id, "d"));
-    index(SOURCE_COLLECTION, getDoc(id, "e"));
-    index(SOURCE_COLLECTION, getDoc(id, "f"));
-
-    assertNumDocs(6, SOURCE_COLLECTION);
-
-    // we need to wait until the replicator thread is triggered
-    int cnt = 15; // timeout after 15 seconds
-    AssertionError lastAssertionError = null;
-    while (cnt > 0) {
-      try {
-        rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ERRORS);
-        NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.ERRORS)).getVal(0);
-        NamedList errors = (NamedList) collections.get(TARGET_COLLECTION);
-        assertTrue(0 < (Long) errors.get(CdcrParams.CONSECUTIVE_ERRORS));
-        NamedList lastErrors = (NamedList) errors.get(CdcrParams.LAST);
-        assertNotNull(lastErrors);
-        assertTrue(0 < lastErrors.size());
-        return;
-      }
-      catch (AssertionError e) {
-        lastAssertionError = e;
-        cnt--;
-        Thread.sleep(1000);
-      }
-    }
-
-    throw new AssertionError("Timeout while trying to assert replication errors", lastAssertionError);
-  }
-
-  @Test
-  @ShardsFixed(num = 4)
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 05-Jul-2018
-  public void testReplicationStartStop() throws Exception {
-    int start = 0;
-    List<SolrInputDocument> docs = new ArrayList<>();
-    for (; start < 10; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    assertNumDocs(10, SOURCE_COLLECTION);
-    assertNumDocs(0, TARGET_COLLECTION);
-
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
-
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    commit(TARGET_COLLECTION);
-
-    assertNumDocs(10, SOURCE_COLLECTION);
-    assertNumDocs(10, TARGET_COLLECTION);
-
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    docs.clear();
-    for (; start < 110; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    assertNumDocs(110, SOURCE_COLLECTION);
-    assertNumDocs(10, TARGET_COLLECTION);
-
-    // Start again CDCR, the source cluster should reinitialise its log readers
-    // with the latest checkpoints
-
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
-
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    commit(TARGET_COLLECTION);
-
-    assertNumDocs(110, SOURCE_COLLECTION);
-    assertNumDocs(110, TARGET_COLLECTION);
-  }
-
-  /**
-   * Check that the replication manager is properly restarted after a node failure.
-   */
-  @Test
-  @ShardsFixed(num = 4)
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 05-Jul-2018
-  public void testReplicationAfterRestart() throws Exception {
-    log.info("Starting CDCR");
-
-    // send start action to first shard
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
-
-    log.info("Indexing 10 documents");
-
-    int start = 0;
-    List<SolrInputDocument> docs = new ArrayList<>();
-    for (; start < 10; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    log.info("Querying source collection");
-
-    assertNumDocs(10, SOURCE_COLLECTION);
-
-    log.info("Waiting for replication");
-
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    log.info("Querying target collection");
-
-    commit(TARGET_COLLECTION);
-    assertNumDocs(10, TARGET_COLLECTION);
-
-    log.info("Restarting shard1");
-
-    this.restartServers(shardToJetty.get(SOURCE_COLLECTION).get(SHARD1));
-
-    log.info("Indexing 100 documents");
-
-    docs.clear();
-    for (; start < 110; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    log.info("Querying source collection");
-
-    assertNumDocs(110, SOURCE_COLLECTION);
-
-    log.info("Waiting for replication");
-
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
-
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    log.info("Querying target collection");
-
-    commit(TARGET_COLLECTION);
-    assertNumDocs(110, TARGET_COLLECTION);
-  }
-
-  /**
-   * Check that the replication manager is properly started after a change of leader.
-   * This test also checks that the log readers on the new leaders are initialised with
-   * the target's checkpoint.
-   */
-  @Test
-  @ShardsFixed(num = 4)
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 05-Jul-2018
-  public void testReplicationAfterLeaderChange() throws Exception {
-    log.info("Starting CDCR");
-
-    // send start action to first shard
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
-
-    log.info("Indexing 10 documents");
-
-    int start = 0;
-    List<SolrInputDocument> docs = new ArrayList<>();
-    for (; start < 10; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    log.info("Querying source collection");
-
-    assertNumDocs(10, SOURCE_COLLECTION);
-
-    log.info("Waiting for replication");
-
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    log.info("Querying target collection");
-
-    commit(TARGET_COLLECTION);
-    assertNumDocs(10, TARGET_COLLECTION);
-
-    log.info("Restarting target leaders");
-
-    // Close all the leaders, then restart them
-    this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD1));
-    this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD2));
-
-    log.info("Restarting source leaders");
-
-    // Close all the leaders, then restart them
-    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
-    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2));
-
-    log.info("Checking queue size of new source leaders");
-
-    // If the log readers of the new leaders are initialised with the target's checkpoint, the
-    // queue size must be inferior to the current number of documents indexed.
-    // The queue might be not completely empty since the new target checkpoint is probably not the
-    // last document received
-    assertTrue(this.getQueueSize(SOURCE_COLLECTION, SHARD1) < 10);
-    assertTrue(this.getQueueSize(SOURCE_COLLECTION, SHARD2) < 10);
-
-    log.info("Indexing 100 documents");
-
-    docs.clear();
-    for (; start < 110; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    log.info("Querying source collection");
-
-    assertNumDocs(110, SOURCE_COLLECTION);
-
-    log.info("Waiting for replication");
-
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    log.info("Querying target collection");
-
-    commit(TARGET_COLLECTION);
-    assertNumDocs(110, TARGET_COLLECTION);
-  }
-
-  /**
-   * Check that the update logs are synchronised between leader and non-leader nodes
-   * when CDCR is on and buffer is disabled
-   */
-  @Test
-  @ShardsFixed(num = 4)
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 05-Jul-2018
-  public void testUpdateLogSynchronisation() throws Exception {
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    for (int i = 0; i < 100; i++) {
-      // will perform a commit for every document and will create one tlog file per commit
-      index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
-    }
-
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
-
-    // wait a bit for the replication to complete
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    commit(TARGET_COLLECTION);
-
-    // Check that the replication was done properly
-    assertNumDocs(100, SOURCE_COLLECTION);
-    assertNumDocs(100, TARGET_COLLECTION);
-
-    // Get the number of tlog files on the replicas (should be equal to the number of documents indexed)
-    int nTlogs = getNumberOfTlogFilesOnReplicas(SOURCE_COLLECTION);
-
-    // Disable the buffer - ulog synch should start on non-leader nodes
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    int cnt = 15; // timeout after 15 seconds
-    while (cnt > 0) {
-      // Index a new document with a commit to trigger update log cleaning
-      index(SOURCE_COLLECTION, getDoc(id, Integer.toString(50)));
-
-      // Check the update logs on non-leader nodes, the number of tlog files should decrease
-      int n = getNumberOfTlogFilesOnReplicas(SOURCE_COLLECTION);
-      if (n < nTlogs) return;
-
-      cnt--;
-      Thread.sleep(1000);
-    }
-
-    throw new AssertionError("Timeout while trying to assert update logs @ source_collection");
-  }
-
-  /**
-   * Check that the buffer is always activated on non-leader nodes.
-   */
-  @Test
-  @ShardsFixed(num = 4)
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 05-Jul-2018
-  public void testBufferOnNonLeader() throws Exception {
-    // buffering is enabled by default, so disable it
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    // Start CDCR
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    // Index documents
-    for (int i = 0; i < 200; i++) {
-      index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); // will perform a commit for every document
-    }
-
-    // And immediately, close all the leaders, then restart them. It is likely that the replication will not be
-    // performed fully, and therefore be continued by the new leader
-    // At this stage, the new leader must have been elected
-    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
-    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2));
-
-    // wait a bit for the replication to complete
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    // Commit to make the documents visible on the target
-    commit(TARGET_COLLECTION);
-
-    // If the non-leader node were buffering updates, then the replication must be complete
-    assertNumDocs(200, SOURCE_COLLECTION);
-    assertNumDocs(200, TARGET_COLLECTION);
-  }
-
-  /**
-   * Check the ops statistics.
-   */
-  @Test
-  @ShardsFixed(num = 4)
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 05-Jul-2018
-  public void testOps() throws Exception {
-    // Index documents
-    List<SolrInputDocument> docs = new ArrayList<>();
-    for (int i = 0; i < 200; i++) {
-      docs.add(getDoc(id, Integer.toString(i)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    // Start CDCR
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    // wait a bit for the replication to complete
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    NamedList rsp = this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.OPS);
-    NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.OPERATIONS_PER_SECOND)).getVal(0);
-    NamedList ops = (NamedList) collections.get(TARGET_COLLECTION);
-    double opsAll = (Double) ops.get(CdcrParams.COUNTER_ALL);
-    double opsAdds = (Double) ops.get(CdcrParams.COUNTER_ADDS);
-    assertTrue(opsAll > 0);
-    assertEquals(opsAll, opsAdds, 0);
-
-    double opsDeletes = (Double) ops.get(CdcrParams.COUNTER_DELETES);
-    assertEquals(0, opsDeletes, 0);
-  }
-
-  /**
-   * Check that batch updates with deletes
-   */
-  @Test
-  @ShardsFixed(num = 4)
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 05-Jul-2018
-  public void testBatchAddsWithDelete() throws Exception {
-    // Index 50 documents
-    int start = 0;
-    List<SolrInputDocument> docs = new ArrayList<>();
-    for (; start < 50; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    // Delete 10 documents: 10-19
-    List<String> ids = new ArrayList<>();
-    for (int id = 10; id < 20; id++) {
-      ids.add(Integer.toString(id));
-    }
-    deleteById(SOURCE_COLLECTION, ids);
-
-    // Index 10 documents
-    docs = new ArrayList<>();
-    for (; start < 60; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    // Delete 1 document: 50
-    ids = new ArrayList<>();
-    ids.add(Integer.toString(50));
-    deleteById(SOURCE_COLLECTION, ids);
-
-    // Index 10 documents
-    docs = new ArrayList<>();
-    for (; start < 70; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    // Start CDCR
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-    this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD1);
-    this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD2);
-
-    // wait a bit for the replication to complete
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    commit(TARGET_COLLECTION);
-
-    // If the non-leader node were buffering updates, then the replication must be complete
-    assertNumDocs(59, SOURCE_COLLECTION);
-    assertNumDocs(59, TARGET_COLLECTION);
-  }
-
-  /**
-   * Checks that batches are correctly constructed when batch boundaries are reached.
-   */
-  @Test
-  @ShardsFixed(num = 4)
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 05-Jul-2018
-  public void testBatchBoundaries() throws Exception {
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    log.info("Indexing documents");
-
-    List<SolrInputDocument> docs = new ArrayList<>();
-    for (int i = 0; i < 128; i++) { // should create two full batches (default batch = 64)
-      docs.add(getDoc(id, Integer.toString(i)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    assertNumDocs(128, SOURCE_COLLECTION);
-
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
-
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    commit(TARGET_COLLECTION);
-
-    assertNumDocs(128, SOURCE_COLLECTION);
-    assertNumDocs(128, TARGET_COLLECTION);
-  }
-
-  /**
-   * Check resilience of replication with delete by query executed on targets
-   */
-  @Test
-  @ShardsFixed(num = 4)
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 05-Jul-2018
-  public void testResilienceWithDeleteByQueryOnTarget() throws Exception {
-    // Index 50 documents
-    int start = 0;
-    List<SolrInputDocument> docs = new ArrayList<>();
-    for (; start < 50; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    // Start CDCR
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
-
-    // wait a bit for the replication to complete
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    commit(TARGET_COLLECTION);
-
-    // If the non-leader node were buffering updates, then the replication must be complete
-    assertNumDocs(50, SOURCE_COLLECTION);
-    assertNumDocs(50, TARGET_COLLECTION);
-
-    deleteByQuery(SOURCE_COLLECTION, "*:*");
-    deleteByQuery(TARGET_COLLECTION, "*:*");
-
-    assertNumDocs(0, SOURCE_COLLECTION);
-    assertNumDocs(0, TARGET_COLLECTION);
-
-    docs.clear();
-    for (; start < 100; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    // wait a bit for the replication to complete
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    commit(TARGET_COLLECTION);
-
-    assertNumDocs(50, SOURCE_COLLECTION);
-    assertNumDocs(50, TARGET_COLLECTION);
-
-    deleteByQuery(TARGET_COLLECTION, "*:*");
-
-    assertNumDocs(50, SOURCE_COLLECTION);
-    assertNumDocs(0, TARGET_COLLECTION);
-
-    // Restart CDCR
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
-    docs.clear();
-    for (; start < 150; start++) {
-      docs.add(getDoc(id, Integer.toString(start)));
-    }
-    index(SOURCE_COLLECTION, docs);
-
-    // wait a bit for the replication to complete
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
-    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
-
-    commit(TARGET_COLLECTION);
-
-    assertNumDocs(100, SOURCE_COLLECTION);
-    assertNumDocs(50, TARGET_COLLECTION);
-  }
-
-  private int numberOfFiles(String dir) {
-    File file = new File(dir);
-    if (!file.isDirectory()) {
-      assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
-    }
-    log.debug("Update log dir {} contains: {}", dir, file.listFiles());
-    return file.listFiles().length;
-  }
-
-  private int getNumberOfTlogFilesOnReplicas(String collection) throws Exception {
-    CollectionInfo info = collectInfo(collection);
-    Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
-
-    int count = 0;
-
-    for (String shard : shardToCoresMap.keySet()) {
-      for (int i = 0; i < replicationFactor - 1; i++) {
-        count += numberOfFiles(info.getReplicas(shard).get(i).ulogDir);
-      }
-    }
-
-    return count;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/569d7710/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrTestsUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrTestsUtil.java b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrTestsUtil.java
index 6a186fd..5207cd5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrTestsUtil.java
+++ b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrTestsUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.solr.cloud.cdcr;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.LinkedHashMap;
@@ -25,9 +26,15 @@ import java.util.concurrent.TimeUnit;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.ChaosMonkey;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
@@ -35,14 +42,20 @@ import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.CdcrParams;
 import org.apache.solr.util.TimeOut;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CdcrTestsUtil extends SolrTestCaseJ4{
+public class CdcrTestsUtil extends SolrTestCaseJ4 {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  protected static void cdcrRestart(CloudSolrClient client) throws SolrServerException, IOException {
+    cdcrStop(client);
+    cdcrStart(client);
+  }
+
   protected static void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException {
     QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
     assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
@@ -89,7 +102,7 @@ public class CdcrTestsUtil extends SolrTestCaseJ4{
     while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(20, TimeUnit.SECONDS)) {
       response = client.query(params);
       if (response.getResponse() != null && response.getResponse().get("fingerprint") != null) {
-        return (long)((LinkedHashMap)response.getResponse().get("fingerprint")).get("maxVersionEncountered");
+        return (long) ((LinkedHashMap) response.getResponse().get("fingerprint")).get("maxVersionEncountered");
       }
       Thread.sleep(200);
     }
@@ -97,6 +110,10 @@ public class CdcrTestsUtil extends SolrTestCaseJ4{
     return null;
   }
 
+  protected static long waitForClusterToSync(long numDocs, CloudSolrClient clusterSolrClient) throws Exception {
+    return waitForClusterToSync((int) numDocs, clusterSolrClient, "*:*");
+  }
+
   protected static long waitForClusterToSync(int numDocs, CloudSolrClient clusterSolrClient) throws Exception {
     return waitForClusterToSync(numDocs, clusterSolrClient, "*:*");
   }
@@ -112,7 +129,7 @@ public class CdcrTestsUtil extends SolrTestCaseJ4{
       }
       Thread.sleep(1000);
     }
-    return response != null ? response.getResults().getNumFound() : 0;
+    return response != null ? response.getResults().getNumFound() : null;
   }
 
   protected static boolean assertShardInSync(String collection, String shard, CloudSolrClient client) throws IOException, SolrServerException {
@@ -146,4 +163,111 @@ public class CdcrTestsUtil extends SolrTestCaseJ4{
     }
     return false;
   }
-}
+
+  public static void indexRandomDocs(Integer start, Integer count, CloudSolrClient solrClient) throws Exception {
+    // ADD operation on cluster 1
+    int docs = 0;
+    if (count == 0) {
+      docs = (TEST_NIGHTLY ? 100 : 10);
+    } else {
+      docs = count;
+    }
+    for (int k = start; k < docs; k++) {
+      UpdateRequest req = new UpdateRequest();
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", k);
+      req.add(doc);
+
+      req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+      req.process(solrClient);
+    }
+  }
+
+  public static void indexRandomDocs(Integer count, CloudSolrClient solrClient) throws Exception {
+    indexRandomDocs(0, count, solrClient);
+  }
+
+  public static void index(MiniSolrCloudCluster cluster, String collection, SolrInputDocument doc, boolean doCommit) throws IOException, SolrServerException {
+    CloudSolrClient client = createCloudClient(cluster, collection);
+    try {
+      client.add(doc);
+      if (doCommit) {
+        client.commit(true, true);
+      } else {
+        client.commit(true, false);
+      }
+    } finally {
+      client.close();
+    }
+  }
+
+  public static void index(MiniSolrCloudCluster cluster, String collection, SolrInputDocument doc) throws IOException, SolrServerException {
+    index(cluster, collection, doc, false);
+  }
+
+  public static CloudSolrClient createCloudClient(MiniSolrCloudCluster cluster, String defaultCollection) {
+    CloudSolrClient server = getCloudSolrClient(cluster.getZkServer().getZkAddress(), random().nextBoolean());
+    if (defaultCollection != null) server.setDefaultCollection(defaultCollection);
+    return server;
+  }
+
+
+  public static void restartClusterNode(MiniSolrCloudCluster cluster, String collection, int index) throws Exception {
+    System.setProperty("collection", collection);
+    restartNode(cluster.getJettySolrRunner(index));
+    System.clearProperty("collection");
+  }
+
+  public static void restartClusterNodes(MiniSolrCloudCluster cluster, String collection) throws Exception {
+    System.setProperty("collection", collection);
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      restartNode(jetty);
+    }
+    System.clearProperty("collection");
+  }
+
+  public static void restartNode(JettySolrRunner jetty) throws Exception {
+    ChaosMonkey.stop(jetty);
+    ChaosMonkey.start(jetty);
+    Thread.sleep(10000);
+  }
+
+  public static int numberOfFiles(String dir) {
+    File file = new File(dir);
+    if (!file.isDirectory()) {
+      assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
+    }
+    log.debug("Update log dir {} contains: {}", dir, file.listFiles());
+    return file.listFiles().length;
+  }
+
+  public static int getNumberOfTlogFilesOnReplicas(MiniSolrCloudCluster cluster) throws Exception {
+    int count = 0;
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      for (SolrCore core : jetty.getCoreContainer().getCores()) {
+        count += numberOfFiles(core.getUlogDir() + "/tlog");
+      }
+    }
+    return count;
+  }
+
+  public static String getNonLeaderNode(MiniSolrCloudCluster cluster, String collection) throws Exception {
+    String leaderNode = getLeaderNode(cluster, collection);
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      if (!jetty.getNodeName().equals(leaderNode)) {
+        return jetty.getNodeName();
+      }
+    }
+    return cluster.getJettySolrRunners().get(0).getNodeName();
+  }
+
+  public static String getLeaderNode(MiniSolrCloudCluster cluster, String collection) throws Exception {
+    for (Replica replica : cluster.getSolrClient().getClusterStateProvider().getCollection(collection).getReplicas()) {
+      if (cluster.getSolrClient().getClusterStateProvider().getCollection(collection).getLeader("shard1") == replica) {
+        return replica.getNodeName();
+      }
+    }
+    return "";
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/569d7710/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrWithNodesRestartsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrWithNodesRestartsTest.java b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrWithNodesRestartsTest.java
new file mode 100644
index 0000000..c8d5a32
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrWithNodesRestartsTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.cdcr;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CdcrWithNodesRestartsTest extends SolrTestCaseJ4 {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  MiniSolrCloudCluster target, source;
+  CloudSolrClient sourceSolrClient, targetSolrClient;
+  private static String SOURCE_COLLECTION = "cdcr-source";
+  private static String TARGET_COLLECTION = "cdcr-target";
+  private static String ALL_Q = "*:*";
+
+  @Before
+  public void before() throws Exception {
+    target = new MiniSolrCloudCluster(2, createTempDir(TARGET_COLLECTION), buildJettyConfig("/solr"));
+    target.waitForAllNodes(30);
+    System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
+    source = new MiniSolrCloudCluster(2, createTempDir(SOURCE_COLLECTION), buildJettyConfig("/solr"));
+    source.waitForAllNodes(30);
+  }
+
+  @After
+  public void after() throws Exception {
+    target.shutdown();
+    source.shutdown();
+  }
+
+  @Test
+  public void testBufferOnNonLeader() throws Exception {
+    createCollections();
+    CdcrTestsUtil.cdcrDisableBuffer(sourceSolrClient);
+    CdcrTestsUtil.cdcrStart(sourceSolrClient);
+    Thread.sleep(2000);
+
+    // index 100 docs
+    for (int i = 0; i < 100; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "doc_" + i);
+      CdcrTestsUtil.index(source, "cdcr-source", doc);
+      sourceSolrClient.commit();
+    }
+    Thread.sleep(2000);
+
+    // restart all the nodes at source cluster, one by one
+    CdcrTestsUtil.restartClusterNodes(source, SOURCE_COLLECTION);
+
+    //verify cdcr has replicated docs
+    QueryResponse response = sourceSolrClient.query(new SolrQuery(ALL_Q));
+    assertEquals("source docs mismatch", 100, response.getResults().getNumFound());
+    assertEquals("target docs mismatch", 100, CdcrTestsUtil.waitForClusterToSync(100, targetSolrClient));
+    CdcrTestsUtil.assertShardInSync(SOURCE_COLLECTION, "shard1", sourceSolrClient);
+    CdcrTestsUtil.assertShardInSync(TARGET_COLLECTION, "shard1", targetSolrClient);
+
+    CdcrTestsUtil.cdcrStop(sourceSolrClient);
+    CdcrTestsUtil.cdcrStop(targetSolrClient);
+
+    deleteCollections();
+  }
+
+  @Test
+  public void testUpdateLogSynchronisation() throws Exception {
+    createCollections();
+    CdcrTestsUtil.cdcrStart(sourceSolrClient);
+    Thread.sleep(2000);
+
+    // index 100 docs
+    for (int i = 0; i < 100; i++) {
+      // will perform a commit for every document and will create one tlog file per commit
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "doc_" + i);
+      CdcrTestsUtil.index(source, "cdcr-source", doc, true);
+    }
+    Thread.sleep(2000);
+
+    //verify cdcr has replicated docs
+    QueryResponse response = sourceSolrClient.query(new SolrQuery(ALL_Q));
+    assertEquals("source docs mismatch", 100, response.getResults().getNumFound());
+    assertEquals("target docs mismatch", 100, CdcrTestsUtil.waitForClusterToSync(100, targetSolrClient));
+
+    // Get the number of tlog files on the replicas (should be equal to the number of documents indexed)
+    int nTlogs = CdcrTestsUtil.getNumberOfTlogFilesOnReplicas(source);
+
+    // Disable the buffer - ulog synch should start on non-leader nodes
+    CdcrTestsUtil.cdcrDisableBuffer(sourceSolrClient);
+    Thread.sleep(2000);
+
+    int cnt = 15; // timeout after 15 seconds
+    int n = 0;
+    while (cnt > 0) {
+      // Index a new document with a commit to trigger update log cleaning
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "doc_" + random().nextLong());
+      CdcrTestsUtil.index(source, "cdcr-source", doc, true);
+
+      // Check the update logs on non-leader nodes, the number of tlog files should decrease
+      n = CdcrTestsUtil.getNumberOfTlogFilesOnReplicas(source);
+      if (n < nTlogs) {
+        cnt = Integer.MIN_VALUE;
+        break;
+      }
+      cnt--;
+      Thread.sleep(1000);
+    }
+    if (cnt == 0) {
+      throw new AssertionError("Timeout while trying to assert update logs @ source_collection, " + n + " " + nTlogs);
+    }
+
+    CdcrTestsUtil.cdcrStop(sourceSolrClient);
+    CdcrTestsUtil.cdcrStop(targetSolrClient);
+
+    deleteCollections();
+  }
+
+  @Test
+  public void testReplicationAfterRestart() throws Exception {
+    createCollections();
+    CdcrTestsUtil.cdcrStart(sourceSolrClient); // start CDCR
+    Thread.sleep(2000);
+
+    //index 100 docs
+    for (int i = 0; i < 100; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "doc_" + i);
+      CdcrTestsUtil.index(source, "cdcr-source", doc);
+      sourceSolrClient.commit();
+    }
+    Thread.sleep(2000);
+
+    // verify cdcr has replicated docs
+    QueryResponse response = sourceSolrClient.query(new SolrQuery(ALL_Q));
+    assertEquals("source docs mismatch", 100, response.getResults().getNumFound());
+    assertEquals("target docs mismatch", 100, CdcrTestsUtil.waitForClusterToSync(100, targetSolrClient));
+    CdcrTestsUtil.assertShardInSync("cdcr-source", "shard1", sourceSolrClient);
+
+    // restart all the source cluster nodes
+    CdcrTestsUtil.restartClusterNodes(source, "cdcr-source");
+    sourceSolrClient = source.getSolrClient();
+    sourceSolrClient.setDefaultCollection("cdcr-source");
+
+    // verify still the docs are there
+    response = sourceSolrClient.query(new SolrQuery(ALL_Q));
+    assertEquals("source docs mismatch", 100, response.getResults().getNumFound());
+    assertEquals("target docs mismatch", 100, CdcrTestsUtil.waitForClusterToSync(100, targetSolrClient));
+
+    // index 100 more
+    for (int i = 100; i < 200; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "doc_" + i);
+      CdcrTestsUtil.index(source, "cdcr-source", doc);
+      sourceSolrClient.commit();
+    }
+    Thread.sleep(2000);
+
+    // verify still the docs are there
+    response = sourceSolrClient.query(new SolrQuery(ALL_Q));
+    assertEquals("source docs mismatch", 200, response.getResults().getNumFound());
+    assertEquals("target docs mismatch", 200, CdcrTestsUtil.waitForClusterToSync(200, targetSolrClient));
+
+    CdcrTestsUtil.cdcrStop(sourceSolrClient);
+    CdcrTestsUtil.cdcrStop(targetSolrClient);
+
+    deleteCollections();
+  }
+
+  @Test
+  public void testReplicationAfterLeaderChange() throws Exception {
+    createCollections();
+    CdcrTestsUtil.cdcrStart(sourceSolrClient);
+    Thread.sleep(2000);
+
+    // index 100 docs
+    for (int i = 0; i < 100; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "doc_" + i);
+      CdcrTestsUtil.index(source, "cdcr-source", doc);
+      sourceSolrClient.commit();
+    }
+    Thread.sleep(2000);
+
+    // verify cdcr has replicated docs
+    QueryResponse response = sourceSolrClient.query(new SolrQuery(ALL_Q));
+    assertEquals("source docs mismatch", 100, response.getResults().getNumFound());
+    assertEquals("target docs mismatch", 100, CdcrTestsUtil.waitForClusterToSync(100, targetSolrClient));
+    CdcrTestsUtil.assertShardInSync("cdcr-source", "shard1", sourceSolrClient);
+
+    // restart one of the source cluster nodes
+    CdcrTestsUtil.restartClusterNode(source, "cdcr-source", 0);
+    sourceSolrClient = source.getSolrClient();
+    sourceSolrClient.setDefaultCollection("cdcr-source");
+
+    // add `100 more docs, 200 until now
+    for (int i = 100; i < 200; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "doc_" + i);
+      CdcrTestsUtil.index(source, "cdcr-source", doc);
+      sourceSolrClient.commit();
+    }
+    Thread.sleep(2000);
+
+    // verify cdcr has replicated docs
+    response = sourceSolrClient.query(new SolrQuery(ALL_Q));
+    assertEquals("source docs mismatch", 200, response.getResults().getNumFound());
+    assertEquals("target docs mismatch", 200, CdcrTestsUtil.waitForClusterToSync(200, targetSolrClient));
+
+    // restart the other source cluster node
+    CdcrTestsUtil.restartClusterNode(source, "cdcr-source", 1);
+    sourceSolrClient = source.getSolrClient();
+    sourceSolrClient.setDefaultCollection("cdcr-source");
+
+    // add `100 more docs, 300 until now
+    for (int i = 200; i < 300; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "doc_" + i);
+      CdcrTestsUtil.index(source, "cdcr-source", doc);
+      sourceSolrClient.commit();
+    }
+    Thread.sleep(2000);
+
+    // verify cdcr has replicated docs
+    response = sourceSolrClient.query(new SolrQuery(ALL_Q));
+    assertEquals("source docs mismatch", 300, response.getResults().getNumFound());
+    assertEquals("target docs mismatch", 300, CdcrTestsUtil.waitForClusterToSync(300, targetSolrClient));
+
+    // add a replica to 'target' collection
+    CollectionAdminRequest.addReplicaToShard(TARGET_COLLECTION, "shard1").
+        setNode(CdcrTestsUtil.getNonLeaderNode(target, TARGET_COLLECTION)).process(targetSolrClient);
+    Thread.sleep(2000);
+
+    // restart one of the target nodes
+    CdcrTestsUtil.restartClusterNode(source, "cdcr-target", 0);
+    targetSolrClient = target.getSolrClient();
+    targetSolrClient.setDefaultCollection("cdcr-target");
+
+    // add `100 more docs, 400 until now
+    for (int i = 300; i < 400; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "doc_" + i);
+      CdcrTestsUtil.index(source, "cdcr-source", doc);
+      sourceSolrClient.commit();
+    }
+    Thread.sleep(2000);
+
+    // verify cdcr has replicated docs
+    response = sourceSolrClient.query(new SolrQuery(ALL_Q));
+    assertEquals("source docs mismatch", 400, response.getResults().getNumFound());
+    assertEquals("target docs mismatch", 400, CdcrTestsUtil.waitForClusterToSync(400, targetSolrClient));
+
+    // restart the other target cluster node
+    CdcrTestsUtil.restartClusterNode(source, "cdcr-target", 1);
+    targetSolrClient = target.getSolrClient();
+    targetSolrClient.setDefaultCollection("cdcr-target");
+
+    // add `100 more docs, 500 until now
+    for (int i = 400; i < 500; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "doc_" + i);
+      CdcrTestsUtil.index(source, "cdcr-source", doc);
+      sourceSolrClient.commit();
+    }
+    Thread.sleep(2000);
+
+    // verify cdcr has replicated docs
+    response = sourceSolrClient.query(new SolrQuery(ALL_Q));
+    assertEquals("source docs mismatch", 500, response.getResults().getNumFound());
+    assertEquals("target docs mismatch", 500, CdcrTestsUtil.waitForClusterToSync(500, targetSolrClient));
+
+    CdcrTestsUtil.cdcrStop(sourceSolrClient);
+    CdcrTestsUtil.cdcrStop(targetSolrClient);
+
+    deleteCollections();
+  }
+
+  private void createSourceCollection() throws Exception {
+    source.uploadConfigSet(configset("cdcr-source"), "cdcr-source");
+    CollectionAdminRequest.createCollection("cdcr-source", "cdcr-source", 1, 2)
+        .withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
+        .process(source.getSolrClient());
+    Thread.sleep(1000);
+    sourceSolrClient = source.getSolrClient();
+    sourceSolrClient.setDefaultCollection("cdcr-source");
+  }
+
+  private void createTargetCollection() throws Exception {
+    target.uploadConfigSet(configset("cdcr-target"), "cdcr-target");
+    CollectionAdminRequest.createCollection("cdcr-target", "cdcr-target", 1, 1)
+        .withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
+        .process(target.getSolrClient());
+    Thread.sleep(1000);
+    targetSolrClient = target.getSolrClient();
+    targetSolrClient.setDefaultCollection("cdcr-target");
+  }
+
+  private void deleteSourceCollection() throws Exception {
+    source.deleteAllCollections();
+  }
+
+  private void deleteTargetcollection() throws Exception {
+    target.deleteAllCollections();
+  }
+
+  private void createCollections() throws Exception {
+    createTargetCollection();
+    createSourceCollection();
+  }
+
+  private void deleteCollections() throws Exception {
+    deleteSourceCollection();
+    deleteTargetcollection();
+  }
+
+}
\ No newline at end of file


Mime
View raw message