lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [1/4] lucene-solr:branch_7x: SOLR-11003: Support bi-directional syncing of cdcr clusters. We still only support actively into one cluster cluster, but have the ability to switch indexing clusters and cdcr will replicate correctly
Date Mon, 06 Nov 2017 16:13:24 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 1af62a6ed -> a30c92a79


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a30c92a7/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrReplicationHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrReplicationHandlerTest.java
b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrReplicationHandlerTest.java
new file mode 100644
index 0000000..65826c4
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrReplicationHandlerTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.cdcr;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.util.LuceneTestCase.Nightly;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.cloud.ChaosMonkey;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is testing the cdcr extension to the {@link org.apache.solr.handler.ReplicationHandler}
and
+ * {@link org.apache.solr.handler.IndexFetcher}.
+ */
+@Nightly
+public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Override
+  public void distribSetUp() throws Exception {
+    schemaString = "schema15.xml";      // we need a string id
+    createTargetCollection = false;     // we do not need the target cluster
+    shardCount = 1; // we need only one shard
+    // we need a persistent directory, otherwise the UpdateHandler will erase existing tlog
files after restarting a node
+    System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    super.distribSetUp();
+  }
+
+  /**
+   * Test the scenario where the slave is killed from the start. The replication
+   * strategy should fetch all the missing tlog files from the leader.
+   */
+  @Test
+  @ShardsFixed(num = 2)
+  public void testFullReplication() throws Exception {
+    List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+    ChaosMonkey.stop(slaves.get(0).jetty);
+
+    for (int i = 0; i < 10; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 10; j < (i * 10) + 10; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    assertNumDocs(100, SOURCE_COLLECTION);
+
+    // Restart the slave node to trigger Replication strategy
+    this.restartServer(slaves.get(0));
+
+    this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
+  }
+
+  /**
+   * Test the scenario where the slave is killed before receiving all the documents. The
replication
+   * strategy should fetch all the missing tlog files from the leader.
+   */
+  @Test
+  @ShardsFixed(num = 2)
+  public void testPartialReplication() throws Exception {
+    for (int i = 0; i < 5; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 20; j < (i * 20) + 20; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+    ChaosMonkey.stop(slaves.get(0).jetty);
+
+    for (int i = 5; i < 10; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 20; j < (i * 20) + 20; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    assertNumDocs(200, SOURCE_COLLECTION);
+
+    // Restart the slave node to trigger Replication strategy
+    this.restartServer(slaves.get(0));
+
+    // at this stage, the slave should have replicated the 5 missing tlog files
+    this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
+  }
+
+  /**
+   * Test the scenario where the slave is killed before receiving a commit. This creates
a truncated tlog
+   * file on the slave node. The replication strategy should detect this truncated file,
and fetch the
+   * non-truncated file from the leader.
+   */
+  @Test
+  @ShardsFixed(num = 2)
+  public void testPartialReplicationWithTruncatedTlog() throws Exception {
+    CloudSolrClient client = createCloudClient(SOURCE_COLLECTION);
+    List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+
+    try {
+      for (int i = 0; i < 10; i++) {
+        for (int j = i * 20; j < (i * 20) + 20; j++) {
+          client.add(getDoc(id, Integer.toString(j)));
+
+          // Stop the slave in the middle of a batch to create a truncated tlog on the slave
+          if (j == 45) {
+            ChaosMonkey.stop(slaves.get(0).jetty);
+          }
+
+        }
+        commit(SOURCE_COLLECTION);
+      }
+    } finally {
+      client.close();
+    }
+
+    assertNumDocs(200, SOURCE_COLLECTION);
+
+    // Restart the slave node to trigger Replication recovery
+    this.restartServer(slaves.get(0));
+
+    // at this stage, the slave should have replicated the 5 missing tlog files
+    this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
+  }
+
+  /**
+   * Test the scenario where the slave first recovered with a PeerSync strategy, then with
a Replication strategy.
+   * The PeerSync strategy will generate a single tlog file for all the missing updates on
the slave node.
+   * If a Replication strategy occurs at a later stage, it should remove this tlog file generated
by PeerSync
+   * and fetch the corresponding tlog files from the leader.
+   */
+  @Test
+  @ShardsFixed(num = 2)
+  public void testPartialReplicationAfterPeerSync() throws Exception {
+    for (int i = 0; i < 5; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 10; j < (i * 10) + 10; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+    ChaosMonkey.stop(slaves.get(0).jetty);
+
+    for (int i = 5; i < 10; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 10; j < (i * 10) + 10; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    assertNumDocs(100, SOURCE_COLLECTION);
+
+    // Restart the slave node to trigger PeerSync recovery
+    // (the update windows between leader and slave is small enough)
+    this.restartServer(slaves.get(0));
+
+    ChaosMonkey.stop(slaves.get(0).jetty);
+
+    for (int i = 10; i < 15; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 20; j < (i * 20) + 20; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    // restart the slave node to trigger Replication recovery
+    this.restartServer(slaves.get(0));
+
+    // at this stage, the slave should have replicated the 5 missing tlog files
+    this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15);
+  }
+
+  /**
+   * Test the scenario where the slave is killed while the leader is still receiving updates.
+   * The slave should buffer updates while in recovery, then replay them at the end of the
recovery.
+   * If updates were properly buffered and replayed, then the slave should have the same
number of documents
+   * than the leader. This checks if cdcr tlog replication interferes with buffered updates
- SOLR-8263.
+   */
+  @Test
+  @ShardsFixed(num = 2)
+  public void testReplicationWithBufferedUpdates() throws Exception {
+    List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+
+    AtomicInteger numDocs = new AtomicInteger(0);
+    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-test-update-scheduler"));
+    executor.scheduleWithFixedDelay(new UpdateThread(numDocs), 10, 10, TimeUnit.MILLISECONDS);
+
+    // Restart the slave node to trigger Replication strategy
+    this.restartServer(slaves.get(0));
+
+    // shutdown the update thread and wait for its completion
+    executor.shutdown();
+    executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+
+    // check that we have the expected number of documents in the cluster
+    assertNumDocs(numDocs.get(), SOURCE_COLLECTION);
+
+    // check that we have the expected number of documents on the slave
+    assertNumDocs(numDocs.get(), slaves.get(0));
+  }
+
+  private void assertNumDocs(int expectedNumDocs, CloudJettyRunner jetty)
+  throws InterruptedException, IOException, SolrServerException {
+    SolrClient client = createNewSolrServer(jetty.url);
+    try {
+      int cnt = 30; // timeout after 15 seconds
+      AssertionError lastAssertionError = null;
+      while (cnt > 0) {
+        try {
+          assertEquals(expectedNumDocs, client.query(new SolrQuery("*:*")).getResults().getNumFound());
+          return;
+        }
+        catch (AssertionError e) {
+          lastAssertionError = e;
+          cnt--;
+          Thread.sleep(500);
+        }
+      }
+      throw new AssertionError("Timeout while trying to assert number of documents @ " +
jetty.url, lastAssertionError);
+    } finally {
+      client.close();
+    }
+  }
+
+  private class UpdateThread implements Runnable {
+
+    private AtomicInteger numDocs;
+
+    private UpdateThread(AtomicInteger numDocs) {
+      this.numDocs = numDocs;
+    }
+
+    @Override
+    public void run() {
+      try {
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int j = numDocs.get(); j < (numDocs.get() + 10); j++) {
+          docs.add(getDoc(id, Integer.toString(j)));
+        }
+        index(SOURCE_COLLECTION, docs);
+        numDocs.getAndAdd(10);
+        log.info("Sent batch of {} updates - numDocs:{}", docs.size(), numDocs);
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+  }
+
+  private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard)
{
+    List<CloudJettyRunner> jetties = new ArrayList<>(shardToJetty.get(collection).get(shard));
+    CloudJettyRunner leader = shardToLeaderJetty.get(collection).get(shard);
+    jetties.remove(leader);
+    return jetties;
+  }
+
+  /**
+   * Asserts that the update logs are in sync between the leader and slave. The leader and
the slaves
+   * must have identical tlog files.
+   */
+  protected void assertUpdateLogsEquals(String collection, int numberOfTLogs) throws Exception
{
+    CollectionInfo info = collectInfo(collection);
+    Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
+
+    for (String shard : shardToCoresMap.keySet()) {
+      Map<Long, Long> leaderFilesMeta = this.getFilesMeta(info.getLeader(shard).ulogDir);
+      Map<Long, Long> slaveFilesMeta = this.getFilesMeta(info.getReplicas(shard).get(0).ulogDir);
+
+      assertEquals("Incorrect number of tlog files on the leader", numberOfTLogs, leaderFilesMeta.size());
+      assertEquals("Incorrect number of tlog files on the slave", numberOfTLogs, slaveFilesMeta.size());
+
+      for (Long leaderFileVersion : leaderFilesMeta.keySet()) {
+        assertTrue("Slave is missing a tlog for version " + leaderFileVersion, slaveFilesMeta.containsKey(leaderFileVersion));
+        assertEquals("Slave's tlog file size differs for version " + leaderFileVersion, leaderFilesMeta.get(leaderFileVersion),
slaveFilesMeta.get(leaderFileVersion));
+      }
+    }
+  }
+
+  private Map<Long, Long> getFilesMeta(String dir) {
+    File file = new File(dir);
+    if (!file.isDirectory()) {
+      assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
+    }
+
+    Map<Long, Long> filesMeta = new HashMap<>();
+    for (File tlogFile : file.listFiles()) {
+      filesMeta.put(Math.abs(Long.parseLong(tlogFile.getName().substring(tlogFile.getName().lastIndexOf('.')
+ 1))), tlogFile.length());
+    }
+    return filesMeta;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a30c92a7/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrRequestHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrRequestHandlerTest.java b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrRequestHandlerTest.java
new file mode 100644
index 0000000..237cc58
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrRequestHandlerTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.cdcr;
+
+import org.apache.lucene.util.LuceneTestCase.Nightly;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.CdcrParams;
+import org.junit.Test;
+
+@Nightly
+public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest {
+
+  @Override
+  public void distribSetUp() throws Exception {
+    schemaString = "schema15.xml";      // we need a string id
+    createTargetCollection = false;     // we do not need the target cluster
+    super.distribSetUp();
+  }
+
+  // check that the life-cycle state is properly synchronised across nodes
+  @Test
+  @ShardsFixed(num = 2)
+  public void testLifeCycleActions() throws Exception {
+    // check initial status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+
+    // send start action to first shard
+    NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1),
CdcrParams.CdcrAction.START);
+    NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+    assertEquals(CdcrParams.ProcessState.STARTED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
+
+    // Restart the leader of shard 1
+    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+
+    // check status - the node that died should have picked up the original state
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
+
+    // send stop action to second shard
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.STOP);
+    status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+    assertEquals(CdcrParams.ProcessState.STOPPED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+  }
+
+  // check the checkpoint API
+  @Test
+  @ShardsFixed(num = 2)
+  public void testCheckpointActions() throws Exception {
+    // initial request on an empty index, must return -1
+    NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1),
CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    assertEquals(-1l, rsp.get(CdcrParams.CHECKPOINT));
+
+    index(SOURCE_COLLECTION, getDoc(id, "a")); // shard 2
+
+    // only one document indexed in shard 2, the checkpoint must be still -1
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    assertEquals(-1l, rsp.get(CdcrParams.CHECKPOINT));
+
+    index(SOURCE_COLLECTION, getDoc(id, "b")); // shard 1
+
+    // a second document indexed in shard 1, the checkpoint must come from shard 2
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    long checkpoint1 = (Long) rsp.get(CdcrParams.CHECKPOINT);
+    long expected = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2),
CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+    assertEquals(expected, checkpoint1);
+
+    index(SOURCE_COLLECTION, getDoc(id, "c")); // shard 1
+
+    // a third document indexed in shard 1, the checkpoint must still come from shard 2
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    assertEquals(checkpoint1, rsp.get(CdcrParams.CHECKPOINT));
+
+    index(SOURCE_COLLECTION, getDoc(id, "d")); // shard 2
+
+    // a fourth document indexed in shard 2, the checkpoint must come from shard 1
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    long checkpoint2 = (Long) rsp.get(CdcrParams.CHECKPOINT);
+    expected = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1),
CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+    assertEquals(expected, checkpoint2);
+
+    // send a delete by query
+    deleteByQuery(SOURCE_COLLECTION, "*:*");
+
+    // all the checkpoints must come from the DBQ
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    long checkpoint3 = (Long) rsp.get(CdcrParams.CHECKPOINT);
+    assertTrue(checkpoint3 > 0); // ensure that checkpoints from deletes are in absolute
form
+    checkpoint3 = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1),
CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+    assertTrue(checkpoint3 > 0); // ensure that checkpoints from deletes are in absolute
form
+    checkpoint3 = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2),
CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+    assertTrue(checkpoint3 > 0); // ensure that checkpoints from deletes are in absolute
form
+
+    // replication never started, lastProcessedVersion should be -1 for both shards
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.LASTPROCESSEDVERSION);
+    long lastVersion = (Long) rsp.get(CdcrParams.LAST_PROCESSED_VERSION);
+    assertEquals(-1l, lastVersion);
+
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.LASTPROCESSEDVERSION);
+    lastVersion = (Long) rsp.get(CdcrParams.LAST_PROCESSED_VERSION);
+    assertEquals(-1l, lastVersion);
+  }
+
+  // check that the buffer state is properly synchronised across nodes
+  @Test
+  @ShardsFixed(num = 2)
+  public void testBufferActions() throws Exception {
+    // check initial status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+
+    // send disable buffer action to first shard
+    NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1),
CdcrParams.CdcrAction.DISABLEBUFFER);
+    NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+    assertEquals(CdcrParams.BufferState.DISABLED.toLower(), status.get(CdcrParams.BufferState.getParam()));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.DISABLED);
+
+    // Restart the leader of shard 1
+    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.DISABLED);
+
+    // send enable buffer action to second shard
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ENABLEBUFFER);
+    status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+    assertEquals(CdcrParams.BufferState.ENABLED.toLower(), status.get(CdcrParams.BufferState.getParam()));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a30c92a7/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrTestsUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrTestsUtil.java b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrTestsUtil.java
new file mode 100644
index 0000000..99aa471
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrTestsUtil.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.cdcr;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.LinkedHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.CdcrParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CdcrTestsUtil extends SolrTestCaseJ4{
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected static void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException
{
+    QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
+    assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
+  }
+
+  protected static void cdcrStop(CloudSolrClient client) throws SolrServerException, IOException
{
+    QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.STOP);
+    assertEquals("stopped", ((NamedList) response.getResponse().get("status")).get("process"));
+  }
+
+  protected static void cdcrEnableBuffer(CloudSolrClient client) throws IOException, SolrServerException
{
+    QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.ENABLEBUFFER);
+    assertEquals("enabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
+  }
+
+  protected static void cdcrDisableBuffer(CloudSolrClient client) throws IOException, SolrServerException
{
+    QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.DISABLEBUFFER);
+    assertEquals("disabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
+  }
+
+  protected static QueryResponse invokeCdcrAction(CloudSolrClient client, CdcrParams.CdcrAction
action) throws IOException, SolrServerException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CommonParams.QT, "/cdcr");
+    params.set(CommonParams.ACTION, action.toLower());
+    return client.query(params);
+  }
+
+  protected static QueryResponse getCdcrQueue(CloudSolrClient client) throws SolrServerException,
IOException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CommonParams.QT, "/cdcr");
+    params.set(CommonParams.ACTION, CdcrParams.QUEUES);
+    return client.query(params);
+  }
+
+  protected static Object getFingerPrintMaxVersion(CloudSolrClient client, String shardNames,
int numDocs) throws SolrServerException, IOException, InterruptedException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CommonParams.QT, "/get");
+    params.set("fingerprint", true);
+    params.set("shards", shardNames);
+    params.set("getVersions", numDocs);
+
+    QueryResponse response = null;
+    long start = System.nanoTime();
+    while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(20, TimeUnit.SECONDS))
{
+      response = client.query(params);
+      if (response.getResponse() != null && response.getResponse().get("fingerprint")
!= null) {
+        return (long)((LinkedHashMap)response.getResponse().get("fingerprint")).get("maxVersionEncountered");
+      }
+      Thread.sleep(200);
+    }
+    log.error("maxVersionEncountered not found for client : " + client + "in 20 attempts");
+    return null;
+  }
+
+  protected static long waitForClusterToSync(int numDocs, CloudSolrClient clusterSolrClient)
throws Exception {
+    return waitForClusterToSync(numDocs, clusterSolrClient, "*:*");
+  }
+
+  protected static long waitForClusterToSync(int numDocs, CloudSolrClient clusterSolrClient,
String query) throws Exception {
+    long start = System.nanoTime();
+    QueryResponse response = null;
+    while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS))
{
+      clusterSolrClient.commit();
+      response = clusterSolrClient.query(new SolrQuery(query));
+      if (response.getResults().getNumFound() == numDocs) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    return response != null ? response.getResults().getNumFound() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a30c92a7/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrVersionReplicationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrVersionReplicationTest.java
b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrVersionReplicationTest.java
new file mode 100644
index 0000000..ff9afe2
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/cdcr/CdcrVersionReplicationTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.cdcr;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.update.processor.CdcrUpdateProcessor;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CdcrVersionReplicationTest extends BaseCdcrDistributedZkTest {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final String vfield = CommonParams.VERSION_FIELD;
+  SolrClient solrServer;
+
+  public CdcrVersionReplicationTest() {
+    schemaString = "schema15.xml";      // we need a string id
+    super.createTargetCollection = false;
+  }
+
+  SolrClient createClientRandomly() throws Exception {
+    int r = random().nextInt(100);
+
+    // testing the smart cloud client (requests to leaders) is more important than testing
the forwarding logic
+    if (r < 80) {
+      return createCloudClient(SOURCE_COLLECTION);
+    }
+
+    if (r < 90) {
+      return createNewSolrServer(shardToJetty.get(SOURCE_COLLECTION).get(SHARD1).get(random().nextInt(2)).url);
+    }
+
+    return createNewSolrServer(shardToJetty.get(SOURCE_COLLECTION).get(SHARD2).get(random().nextInt(2)).url);
+  }
+
+  @Test
+  @ShardsFixed(num = 4)
+  public void testCdcrDocVersions() throws Exception {
+    SolrClient client = createClientRandomly();
+    try {
+      handle.clear();
+      handle.put("timestamp", SKIPVAL);
+
+      doTestCdcrDocVersions(client);
+
+      commit(SOURCE_COLLECTION); // work arround SOLR-5628
+    } finally {
+      client.close();
+    }
+  }
+
+  private void doTestCdcrDocVersions(SolrClient solrClient) throws Exception {
+    this.solrServer = solrClient;
+
+    log.info("### STARTING doCdcrTestDocVersions - Add commands, client: " + solrClient);
+
+    vadd("doc1", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
+    vadd("doc2", 11, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "11");
+    vadd("doc3", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
+    vadd("doc4", 11, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "11");
+    commit(SOURCE_COLLECTION);
+
+    // versions are preserved and verifiable both by query and by real-time get
+    doQuery(solrClient, "doc1,10,doc2,11,doc3,10,doc4,11", "q", "*:*");
+    doRealTimeGet("doc1,doc2,doc3,doc4", "10,11,10,11");
+
+    vadd("doc1", 5, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "5");
+    vadd("doc2", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
+    vadd("doc3", 9, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "9");
+    vadd("doc4", 8, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "8");
+
+    // lower versions are ignored
+    doRealTimeGet("doc1,doc2,doc3,doc4", "10,11,10,11");
+
+    vadd("doc1", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+    vadd("doc2", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+    vadd("doc3", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+    vadd("doc4", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+
+    // higher versions are accepted
+    doRealTimeGet("doc1,doc2,doc3,doc4", "12,12,12,12");
+
+    // non-cdcr update requests throw a version conflict exception for non-equal versions
(optimistic locking feature)
+    vaddFail("doc1", 13, 409);
+    vaddFail("doc2", 13, 409);
+    vaddFail("doc3", 13, 409);
+
+    commit(SOURCE_COLLECTION);
+
+    // versions are still as they were
+    doQuery(solrClient, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
+
+    // query all shard replicas individually
+    doQueryShardReplica(SHARD1, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
+    doQueryShardReplica(SHARD2, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
+
+    // optimistic locking update
+    vadd("doc4", 12);
+    commit(SOURCE_COLLECTION);
+
+    QueryResponse rsp = solrClient.query(params("qt", "/get", "ids", "doc4"));
+    long version = (long) rsp.getResults().get(0).get(vfield);
+
+    // update accepted and a new version number was generated
+    assertTrue(version > 1_000_000_000_000l);
+
+    log.info("### STARTING doCdcrTestDocVersions - Delete commands");
+
+    // send a delete update with an older version number
+    vdelete("doc1", 5, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "5");
+    // must ignore the delete
+    doRealTimeGet("doc1", "12");
+
+    // send a delete update with a higher version number
+    vdelete("doc1", 13, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "13");
+    // must be deleted
+    doRealTimeGet("doc1", "");
+
+    // send a delete update with a higher version number
+    vdelete("doc4", version + 1, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "" + (version
+ 1));
+    // must be deleted
+    doRealTimeGet("doc4", "");
+
+    commit(SOURCE_COLLECTION);
+
+    // query each shard replica individually
+    doQueryShardReplica(SHARD1, "doc2,12,doc3,12", "q", "*:*");
+    doQueryShardReplica(SHARD2, "doc2,12,doc3,12", "q", "*:*");
+
+    // version conflict thanks to optimistic locking
+    if (solrClient instanceof CloudSolrClient) // TODO: it seems that optimistic locking
doesn't work with forwarding, test with shard2 client
+      vdeleteFail("doc2", 50, 409);
+
+    // cleanup after ourselves for the next run
+    // deleteByQuery should work as usual with the CDCR_UPDATE param
+    doDeleteByQuery("id:doc*", CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, Long.toString(1));
+    commit(SOURCE_COLLECTION);
+
+    // deleteByQuery with a version lower than anything else should have no effect
+    doQuery(solrClient, "doc2,12,doc3,12", "q", "*:*");
+
+    doDeleteByQuery("id:doc*", CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, Long.toString(51));
+    commit(SOURCE_COLLECTION);
+
+    // deleteByQuery with a version higher than everything else should delete all remaining
docs
+    doQuery(solrClient, "", "q", "*:*");
+
+    // check that replicas are as expected too
+    doQueryShardReplica(SHARD1, "", "q", "*:*");
+    doQueryShardReplica(SHARD2, "", "q", "*:*");
+  }
+
+
+  // ------------------ auxiliary methods ------------------
+
+
+  void doQueryShardReplica(String shard, String expectedDocs, String... queryParams) throws
Exception {
+    for (CloudJettyRunner jetty : shardToJetty.get(SOURCE_COLLECTION).get(shard)) {
+      doQuery(jetty.client, expectedDocs, queryParams);
+    }
+  }
+
+  void vdelete(String id, long version, String... params) throws Exception {
+    UpdateRequest req = new UpdateRequest();
+    req.deleteById(id);
+    req.setParam(vfield, Long.toString(version));
+
+    for (int i = 0; i < params.length; i += 2) {
+      req.setParam(params[i], params[i + 1]);
+    }
+    solrServer.request(req);
+  }
+
+  void vdeleteFail(String id, long version, int errCode, String... params) throws Exception
{
+    boolean failed = false;
+    try {
+      vdelete(id, version, params);
+    } catch (SolrException e) {
+      failed = true;
+      if (e.getCause() instanceof SolrException && e.getCause() != e) {
+        e = (SolrException) e.getCause();
+      }
+      assertEquals(errCode, e.code());
+    } catch (SolrServerException ex) {
+      Throwable t = ex.getCause();
+      if (t instanceof SolrException) {
+        failed = true;
+        SolrException exception = (SolrException) t;
+        assertEquals(errCode, exception.code());
+      }
+    } catch (Exception e) {
+      log.error("ERROR", e);
+    }
+    assertTrue(failed);
+  }
+
+  void vadd(String id, long version, String... params) throws Exception {
+    UpdateRequest req = new UpdateRequest();
+    req.add(sdoc("id", id, vfield, version));
+    for (int i = 0; i < params.length; i += 2) {
+      req.setParam(params[i], params[i + 1]);
+    }
+    solrServer.request(req);
+  }
+
+  void vaddFail(String id, long version, int errCode, String... params) throws Exception
{
+    boolean failed = false;
+    try {
+      vadd(id, version, params);
+    } catch (SolrException e) {
+      failed = true;
+      if (e.getCause() instanceof SolrException && e.getCause() != e) {
+        e = (SolrException) e.getCause();
+      }
+      assertEquals(errCode, e.code());
+    } catch (SolrServerException ex) {
+      Throwable t = ex.getCause();
+      if (t instanceof SolrException) {
+        failed = true;
+        SolrException exception = (SolrException) t;
+        assertEquals(errCode, exception.code());
+      }
+    } catch (Exception e) {
+      log.error("ERROR", e);
+    }
+    assertTrue(failed);
+  }
+
+  void doQuery(SolrClient ss, String expectedDocs, String... queryParams) throws Exception
{
+
+    List<String> strs = StrUtils.splitSmart(expectedDocs, ",", true);
+    Map<String, Object> expectedIds = new HashMap<>();
+    for (int i = 0; i < strs.size(); i += 2) {
+      String id = strs.get(i);
+      String vS = strs.get(i + 1);
+      Long v = Long.valueOf(vS);
+      expectedIds.put(id, v);
+    }
+
+    QueryResponse rsp = ss.query(params(queryParams));
+    Map<String, Object> obtainedIds = new HashMap<>();
+    for (SolrDocument doc : rsp.getResults()) {
+      obtainedIds.put((String) doc.get("id"), doc.get(vfield));
+    }
+
+    assertEquals(expectedIds, obtainedIds);
+  }
+
+
+  void doRealTimeGet(String ids, String versions) throws Exception {
+    Map<String, Object> expectedIds = new HashMap<>();
+    List<String> strs = StrUtils.splitSmart(ids, ",", true);
+    List<String> verS = StrUtils.splitSmart(versions, ",", true);
+    for (int i = 0; i < strs.size(); i++) {
+      if (!verS.isEmpty()) {
+        expectedIds.put(strs.get(i), Long.valueOf(verS.get(i)));
+      }
+    }
+
+    QueryResponse rsp = solrServer.query(params("qt", "/get", "ids", ids));
+    Map<String, Object> obtainedIds = new HashMap<>();
+    for (SolrDocument doc : rsp.getResults()) {
+      obtainedIds.put((String) doc.get("id"), doc.get(vfield));
+    }
+
+    assertEquals(expectedIds, obtainedIds);
+  }
+
+  void doDeleteByQuery(String q, String... reqParams) throws Exception {
+    UpdateRequest req = new UpdateRequest();
+    req.deleteByQuery(q);
+    req.setParams(params(reqParams));
+    req.process(solrServer);
+  }
+
+}
+


Mime
View raw message