lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/2] lucene-solr:branch_7x: SOLR-11445: Overseer should not hang when process bad message
Date Fri, 13 Oct 2017 01:18:37 GMT
Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 75498a613 -> 2795287b0


SOLR-11445: Overseer should not hang when process bad message


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/fc981dd5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/fc981dd5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/fc981dd5

Branch: refs/heads/branch_7x
Commit: fc981dd5acee6962ff1d35e8238af4e47829e4b5
Parents: 75498a6
Author: Cao Manh Dat <datcm@apache.org>
Authored: Thu Oct 12 15:08:24 2017 +0700
Committer: Cao Manh Dat <datcm@apache.org>
Committed: Fri Oct 13 08:17:48 2017 +0700

----------------------------------------------------------------------
 .../java/org/apache/solr/cloud/Overseer.java    | 56 ++++++++++---------
 .../org/apache/solr/cloud/OverseerTest.java     | 57 ++++++++++++++++++++
 2 files changed, 88 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fc981dd5/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index e70226e..1e6b088 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -158,7 +158,15 @@ public class Overseer implements Closeable {
                 log.debug("processMessage: workQueueSize: {}, message = {}", workQueue.getStats().getQueueLength(),
message);
                 // force flush to ZK after each message because there is no fallback if workQueue
items
                 // are removed from workQueue but fail to be written to ZK
-                clusterState = processQueueItem(message, clusterState, zkStateWriter, false,
null);
+                try {
+                  clusterState = processQueueItem(message, clusterState, zkStateWriter, false,
null);
+                } catch (Exception e) {
+                  if (isBadMessage(e)) {
+                    log.warn("Exception when process message = {}, consider as bad message
and poll out from the queue", message);
+                    workQueue.poll();
+                  }
+                  throw e;
+                }
                 workQueue.poll(); // poll-ing removes the element we got by peek-ing
                 data = workQueue.peek();
               }
@@ -166,33 +174,28 @@ public class Overseer implements Closeable {
               if (hadWorkItems) {
                 clusterState = zkStateWriter.writePendingUpdates();
               }
-            } catch (KeeperException e) {
-              if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
-                log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
-                return;
-              }
-              log.error("Exception in Overseer work queue loop", e);
+            } catch (KeeperException.SessionExpiredException e) {
+              log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
+              return;
             } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               return;
             } catch (Exception e) {
-              log.error("Exception in Overseer work queue loop", e);
+              log.error("Exception in Overseer when process message from work queue, retrying",
e);
+              refreshClusterState = true;
+              continue;
             }
           }
 
           byte[] head = null;
           try {
             head = stateUpdateQueue.peek(true);
-          } catch (KeeperException e) {
-            if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
-              log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
-              return;
-            }
-            log.error("Exception in Overseer main queue loop", e);
+          } catch (KeeperException.SessionExpiredException e) {
+            log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
+            return;
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             return;
-
           } catch (Exception e) {
             log.error("Exception in Overseer main queue loop", e);
           }
@@ -236,16 +239,9 @@ public class Overseer implements Closeable {
             // clean work queue
             while (workQueue.poll() != null);
 
-          } catch (KeeperException.BadVersionException bve) {
-            log.warn("Bad version writing to ZK using compare-and-set, will force refresh
cluster state: {}", bve.getMessage());
-            refreshClusterState = true;
-          } catch (KeeperException e) {
-            if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
-              log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
-              return;
-            }
-            log.error("Exception in Overseer main queue loop", e);
-            refreshClusterState = true; // force refresh state in case of all errors
+          } catch (KeeperException.SessionExpiredException e) {
+            log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
+            return;
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             return;
@@ -261,6 +257,16 @@ public class Overseer implements Closeable {
       }
     }
 
+    // Return true whenever the exception thrown by ZkStateWriter is correspond
+    // to a invalid state or 'bad' message (in this case, we should remove that message from
queue)
+    private boolean isBadMessage(Exception e) {
+      if (e instanceof KeeperException) {
+        KeeperException ke = (KeeperException) e;
+        return ke.code() == KeeperException.Code.NONODE || ke.code() == KeeperException.Code.NODEEXISTS;
+      }
+      return !(e instanceof InterruptedException);
+    }
+
     private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState,
ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback)
throws Exception {
       final String operation = message.getStr(QUEUE_OPERATION);
       List<ZkWriteCommand> zkWriteCommands = null;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/fc981dd5/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 030be51..62f97cb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -689,6 +689,63 @@ public class OverseerTest extends SolrTestCaseJ4 {
       }
     }
   }
+
+  @Test
+  public void testExceptionWhenFlushClusterState() throws Exception {
+    String zkDir = createTempDir("zkData").toFile().getAbsolutePath();
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient controllerClient = null;
+    SolrZkClient overseerClient = null;
+    ZkStateReader reader = null;
+
+    try {
+      server.run();
+      controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+      ZkController.createClusterZkNodes(controllerClient);
+
+      reader = new ZkStateReader(controllerClient);
+      reader.createClusterStateWatchersAndUpdate();
+
+      // We did not create /collections -> this message will cause exception when Overseer
try to flush the clusterstate
+      ZkNodeProps badMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
+          "name", "collection1",
+          ZkStateReader.REPLICATION_FACTOR, "1",
+          ZkStateReader.NUM_SHARDS_PROP, "1",
+          DocCollection.STATE_FORMAT, "2",
+          "createNodeSet", "");
+      ZkNodeProps goodMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
+          "name", "collection2",
+          ZkStateReader.REPLICATION_FACTOR, "1",
+          ZkStateReader.NUM_SHARDS_PROP, "1",
+          DocCollection.STATE_FORMAT, "1",
+          "createNodeSet", "");
+      ZkDistributedQueue workQueue = Overseer.getInternalWorkQueue(controllerClient, new
Overseer.Stats());
+      workQueue.offer(Utils.toJSON(badMessage));
+      workQueue.offer(Utils.toJSON(goodMessage));
+      overseerClient = electNewOverseer(server.getZkAddress());
+      waitForCollections(reader, "collection2");
+
+      ZkDistributedQueue q = Overseer.getStateUpdateQueue(controllerClient);
+      q.offer(Utils.toJSON(badMessage));
+      q.offer(Utils.toJSON(goodMessage.plus("name", "collection3")));
+      waitForCollections(reader, "collection2", "collection3");
+      assertNotNull(reader.getClusterState().getCollectionOrNull("collection2"));
+      assertNotNull(reader.getClusterState().getCollectionOrNull("collection3"));
+
+      assertTrue(workQueue.peek() == null);
+      assertTrue(q.peek() == null);
+    } finally {
+      close(overseerClient);
+      close(controllerClient);
+      close(reader);
+      server.shutdown();
+    }
+  }
   
   @Test
   public void testShardLeaderChange() throws Exception {


Mime
View raw message