Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 78819200D2F for ; Tue, 17 Oct 2017 17:53:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 76B28160BEC; Tue, 17 Oct 2017 15:53:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9FDEB160BEA for ; Tue, 17 Oct 2017 17:53:23 +0200 (CEST) Received: (qmail 45157 invoked by uid 500); 17 Oct 2017 15:53:17 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 43472 invoked by uid 99); 17 Oct 2017 15:53:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Oct 2017 15:53:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EF29ADFBC7; Tue, 17 Oct 2017 15:53:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ab@apache.org To: commits@lucene.apache.org Date: Tue, 17 Oct 2017 15:53:37 -0000 Message-Id: <72d8188987d8450a814dbb62eaac07da@git.apache.org> In-Reply-To: <7a1b15201e484e89871c53666dad8ab2@git.apache.org> References: <7a1b15201e484e89871c53666dad8ab2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [24/50] [abbrv] lucene-solr:jira/solr-11320: SOLR-11445: Overseer should not hang when process bad message archived-at: Tue, 17 Oct 2017 15:53:24 -0000 SOLR-11445: Overseer should not hang when process bad message Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/df5fefb0 Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/df5fefb0 Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/df5fefb0 Branch: refs/heads/jira/solr-11320 Commit: df5fefb0dbfca1783126798df9c7e303d40ace16 Parents: b21721f Author: Cao Manh Dat Authored: Thu Oct 12 15:08:24 2017 +0700 Committer: Cao Manh Dat Committed: Thu Oct 12 15:08:24 2017 +0700 ---------------------------------------------------------------------- .../java/org/apache/solr/cloud/Overseer.java | 56 ++++++++++--------- .../org/apache/solr/cloud/OverseerTest.java | 57 ++++++++++++++++++++ 2 files changed, 88 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/df5fefb0/solr/core/src/java/org/apache/solr/cloud/Overseer.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 1dafa7d..3589fd8 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -159,7 +159,15 @@ public class Overseer implements Closeable { log.debug("processMessage: workQueueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message); // force flush to ZK after each message because there is no fallback if workQueue items // are removed from workQueue but fail to be written to ZK - clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null); + try { + clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null); + } catch (Exception e) { + if (isBadMessage(e)) { + log.warn("Exception when process message = {}, consider as bad message and poll out from the queue", message); + workQueue.poll(); + } + throw e; + } workQueue.poll(); // poll-ing removes the element we got by peek-ing data = workQueue.peek(); } @@ -167,33 +175,28 @@ public class Overseer implements Closeable { if (hadWorkItems) { clusterState = zkStateWriter.writePendingUpdates(); } - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED) { - log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e); - return; - } - log.error("Exception in Overseer work queue loop", e); + } catch (KeeperException.SessionExpiredException e) { + log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e); + return; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } catch (Exception e) { - log.error("Exception in Overseer work queue loop", e); + log.error("Exception in Overseer when process message from work queue, retrying", e); + refreshClusterState = true; + continue; } } byte[] head = null; try { head = stateUpdateQueue.peek(true); - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED) { - log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); - return; - } - log.error("Exception in Overseer main queue loop", e); + } catch (KeeperException.SessionExpiredException e) { + log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); + return; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; - } catch (Exception e) { log.error("Exception in Overseer main queue loop", e); } @@ -237,16 +240,9 @@ public class Overseer implements Closeable { // clean work queue while (workQueue.poll() != null); - } catch (KeeperException.BadVersionException bve) { - log.warn("Bad version writing to ZK using compare-and-set, will force refresh cluster state: {}", bve.getMessage()); - refreshClusterState = true; - } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED) { - log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); - return; - } - log.error("Exception in Overseer main queue loop", e); - refreshClusterState = true; // force refresh state in case of all errors + } catch (KeeperException.SessionExpiredException e) { + log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e); + return; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; @@ -262,6 +258,16 @@ public class Overseer implements Closeable { } } + // Return true whenever the exception thrown by ZkStateWriter is correspond + // to a invalid state or 'bad' message (in this case, we should remove that message from queue) + private boolean isBadMessage(Exception e) { + if (e instanceof KeeperException) { + KeeperException ke = (KeeperException) e; + return ke.code() == KeeperException.Code.NONODE || ke.code() == KeeperException.Code.NODEEXISTS; + } + return !(e instanceof InterruptedException); + } + private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception { final String operation = message.getStr(QUEUE_OPERATION); List zkWriteCommands = null; http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/df5fefb0/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index 030be51..62f97cb 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -689,6 +689,63 @@ public class OverseerTest extends SolrTestCaseJ4 { } } } + + @Test + public void testExceptionWhenFlushClusterState() throws Exception { + String zkDir = createTempDir("zkData").toFile().getAbsolutePath(); + + ZkTestServer server = new ZkTestServer(zkDir); + + SolrZkClient controllerClient = null; + SolrZkClient overseerClient = null; + ZkStateReader reader = null; + + try { + server.run(); + controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT); + + AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); + AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); + ZkController.createClusterZkNodes(controllerClient); + + reader = new ZkStateReader(controllerClient); + reader.createClusterStateWatchersAndUpdate(); + + // We did not create /collections -> this message will cause exception when Overseer try to flush the clusterstate + ZkNodeProps badMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), + "name", "collection1", + ZkStateReader.REPLICATION_FACTOR, "1", + ZkStateReader.NUM_SHARDS_PROP, "1", + DocCollection.STATE_FORMAT, "2", + "createNodeSet", ""); + ZkNodeProps goodMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), + "name", "collection2", + ZkStateReader.REPLICATION_FACTOR, "1", + ZkStateReader.NUM_SHARDS_PROP, "1", + DocCollection.STATE_FORMAT, "1", + "createNodeSet", ""); + ZkDistributedQueue workQueue = Overseer.getInternalWorkQueue(controllerClient, new Overseer.Stats()); + workQueue.offer(Utils.toJSON(badMessage)); + workQueue.offer(Utils.toJSON(goodMessage)); + overseerClient = electNewOverseer(server.getZkAddress()); + waitForCollections(reader, "collection2"); + + ZkDistributedQueue q = Overseer.getStateUpdateQueue(controllerClient); + q.offer(Utils.toJSON(badMessage)); + q.offer(Utils.toJSON(goodMessage.plus("name", "collection3"))); + waitForCollections(reader, "collection2", "collection3"); + assertNotNull(reader.getClusterState().getCollectionOrNull("collection2")); + assertNotNull(reader.getClusterState().getCollectionOrNull("collection3")); + + assertTrue(workQueue.peek() == null); + assertTrue(q.peek() == null); + } finally { + close(overseerClient); + close(controllerClient); + close(reader); + server.shutdown(); + } + } @Test public void testShardLeaderChange() throws Exception {