lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/2] lucene-solr:jira/solr-11702: SOLR-11702: Fix bug in case of reconnect to ZK
Date Tue, 09 Jan 2018 08:10:59 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-11702 9fd8c4494 -> 030fcae48


SOLR-11702: Fix bug in case of reconnect to ZK


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/09b70276
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/09b70276
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/09b70276

Branch: refs/heads/jira/solr-11702
Commit: 09b7027610b5111aee407e396dcb309301d14c15
Parents: 9fd8c44
Author: Cao Manh Dat <datcm@apache.org>
Authored: Tue Jan 9 15:10:00 2018 +0700
Committer: Cao Manh Dat <datcm@apache.org>
Committed: Tue Jan 9 15:10:00 2018 +0700

----------------------------------------------------------------------
 .../org/apache/solr/cloud/ZkController.java     |  9 ++--
 .../org/apache/solr/cloud/ZkShardTerms.java     | 57 ++++++++++++--------
 .../org/apache/solr/cloud/ZkShardTermsTest.java | 57 +++++++++++++++++++-
 3 files changed, 97 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09b70276/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 2e495d3..b3d6bb1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1036,8 +1036,11 @@ public class ZkController {
       final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
       assert coreZkNodeName != null : "we should have a coreNodeName by now";
 
+      ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
       if ("new".equals(desc.getCoreProperty("lirVersion", "new"))) {
-        getShardTerms(collection, cloudDesc.getShardId()).registerTerm(coreZkNodeName);
+        // this call is useful in case of reconnecting to ZK
+        shardTerms.refreshTerms(true);
+        shardTerms.registerTerm(coreZkNodeName);
       }
       String shardId = cloudDesc.getShardId();
       Map<String,Object> props = new HashMap<>();
@@ -1131,7 +1134,7 @@ public class ZkController {
         }
 
         if ("new".equals(desc.getCoreProperty("lirVersion", "new"))) {
-          getShardTerms(collection, shardId).addListener(new RecoveringCoreTermWatcher(core));
+          shardTerms.addListener(new RecoveringCoreTermWatcher(core));
         }
         core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
       }
@@ -1464,7 +1467,7 @@ public class ZkController {
     return getCollectionTerms(collection).getShard(shardId);
   }
 
-  public ZkCollectionTerms getCollectionTerms(String collection) {
+  private ZkCollectionTerms getCollectionTerms(String collection) {
     synchronized (collectionToTerms) {
       if (!collectionToTerms.containsKey(collection)) collectionToTerms.put(collection, new
ZkCollectionTerms(collection, zkClient));
       return collectionToTerms.get(collection);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09b70276/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index f1f6ae7..d0a8b63 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -83,7 +83,7 @@ public class ZkShardTerms implements AutoCloseable{
     this.shard = shard;
     this.zkClient = zkClient;
     ensureTermNodeExist();
-    refreshTerms();
+    refreshTerms(false);
     ObjectReleaseTracker.track(this);
   }
 
@@ -91,7 +91,6 @@ public class ZkShardTerms implements AutoCloseable{
    * Ensure that leader's term is lower than some replica's terms
    * @param leader coreNodeName of leader
    * @param replicasInLowerTerms replicas which should their term should be lower than leader's
term
-   * @return
    */
   public void ensureTermsIsHigher(String leader, Set<String> replicasInLowerTerms)
{
     Terms newTerms;
@@ -120,7 +119,7 @@ public class ZkShardTerms implements AutoCloseable{
 
   public void close() {
     // no watcher will be registered
-    numWatcher.addAndGet(1);
+    numWatcher.set(2);
     ObjectReleaseTracker.release(this);
   }
 
@@ -163,7 +162,7 @@ public class ZkShardTerms implements AutoCloseable{
   }
 
   /**
-   * Register a repilca's term (term value will be 0).
+   * Register a replica's term (term value will be 0).
    * If a term is already associate with this replica do nothing
    * @param coreNodeName of the replica
    */
@@ -192,6 +191,11 @@ public class ZkShardTerms implements AutoCloseable{
     }
   }
 
+  // package private for testing, only used by tests
+  int getNumWatcher() {
+    return numWatcher.get();
+  }
+
   /**
    * Set new terms to ZK.
    * In case of correspond ZK term node is not created, create it
@@ -221,7 +225,7 @@ public class ZkShardTerms implements AutoCloseable{
       return true;
     } catch (KeeperException.BadVersionException e) {
       log.info("Failed to save terms, version is not match, retrying");
-      refreshTerms();
+      refreshTerms(false);
     } catch (KeeperException.NoNodeException e) {
       throw e;
     } catch (Exception e) {
@@ -264,27 +268,36 @@ public class ZkShardTerms implements AutoCloseable{
    * Fetch the latest terms from ZK.
    * This method will atomically register a watcher to the correspond ZK term node,
    * so {@link ZkShardTerms#terms} will stay up to date.
+   * @param earlyStop early stop if the ZK node is already watched
    */
-  private void refreshTerms() {
-    try {
+  public void refreshTerms(boolean earlyStop) {
+    Terms newTerms;
+    synchronized (numWatcher) {
+      // This block is synchronized because we want only one and at least one valid watcher
is watching the term node.
       Watcher watcher = null;
-      if (numWatcher.compareAndSet(0, 1)) {
-        watcher = event -> {
-          numWatcher.decrementAndGet();
-          refreshTerms();
-        };
-      }
+      try {
+        if (numWatcher.compareAndSet(0, 1)) {
+          watcher = event -> {
+            numWatcher.compareAndSet(1, 0);
+            refreshTerms(false);
+          };
+        }
 
-      Stat stat = new Stat();
-      byte[] data = zkClient.getData(znodePath, watcher, stat, true);
-      Terms newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
-      setNewTerms(newTerms);
-    } catch (InterruptedException e) {
-      Thread.interrupted();
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard
term for collection:" + collection, e);
-    } catch (KeeperException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard
term for collection:" + collection, e);
+        if (earlyStop && watcher == null) return;
+
+        Stat stat = new Stat();
+        byte[] data = zkClient.getData(znodePath, watcher, stat, true);
+        newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard
term for collection:" + collection, e);
+      } catch (KeeperException e) {
+        // if an keeper exception is thrown, the watcher will never be called
+        if (watcher != null) numWatcher.compareAndSet(1, 0);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard
term for collection:" + collection, e);
+      }
     }
+    setNewTerms(newTerms);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09b70276/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
index fc96687..8d8db94 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -23,18 +23,25 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.DefaultConnectionStrategy;
+import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -184,6 +191,54 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     replicaTerms.close();
   }
 
+  public void testCoreTermWatcherOnLosingZKConnection() throws InterruptedException, IOException,
KeeperException, TimeoutException {
+    String collection = "testCoreTermWatcherOnLosingZKConnection";
+
+    String zkDir = createTempDir("zkData").toFile().getAbsolutePath();
+    ZkTestServer server = new ZkTestServer(zkDir);
+    try {
+      server.run();
+      try (SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), 1500)) {
+        zkClient.makePath("/", true);
+        zkClient.makePath("/collections", true);
+      }
+
+      try (SolrZkClient leaderZkClient = new SolrZkClient(server.getZkAddress(), 1500);
+           ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", leaderZkClient))
{
+        leaderTerms.registerTerm("leader");
+        AtomicInteger count = new AtomicInteger(0);
+        Set<ZkShardTerms> shardTerms = new HashSet<>();
+        OnReconnect onReconnect = () -> {
+          log.info("On reconnect {}", shardTerms);
+          shardTerms.iterator().next().refreshTerms(true);
+        };
+        try (SolrZkClient replicaZkClient = new SolrZkClient(server.getZkAddress(), 1500,
1500, new DefaultConnectionStrategy(), onReconnect);
+             ZkShardTerms replicaTerms = new ZkShardTerms(collection, "shard1", replicaZkClient))
{
+          shardTerms.add(replicaTerms);
+          replicaTerms.addListener(terms -> {
+            count.incrementAndGet();
+            return true;
+          });
+          replicaTerms.registerTerm("replica");
+          waitFor(1, count::get);
+          server.expire(replicaZkClient.getSolrZooKeeper().getSessionId());
+          leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
+          replicaZkClient.getConnectionManager().waitForDisconnected(10000);
+          replicaZkClient.getConnectionManager().waitForConnected(10000);
+          waitFor(2, count::get);
+          waitFor(1, replicaTerms::getNumWatcher);
+          replicaTerms.setEqualsToMax("replica");
+          waitFor(3, count::get);
+          waitFor(1L, () -> leaderTerms.getTerms().get("replica"));
+          leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
+          waitFor(4, count::get);
+        }
+      }
+    } finally {
+      server.shutdown();
+    }
+  }
+
   public void testEnsureTermsIsHigher() {
     Map<String, Long> map = new HashMap<>();
     map.put("leader", 0L);
@@ -193,7 +248,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
   }
 
   private <T> void waitFor(T expected, Supplier<T> supplier) throws InterruptedException
{
-    TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+    TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
     while (!timeOut.hasTimedOut()) {
       if (expected == supplier.get()) return;
       Thread.sleep(100);


Mime
View raw message