Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4193D200D49 for ; Thu, 9 Nov 2017 22:19:18 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 401C7160C05; Thu, 9 Nov 2017 21:19:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A35A1160BEF for ; Thu, 9 Nov 2017 22:19:16 +0100 (CET) Received: (qmail 4328 invoked by uid 500); 9 Nov 2017 21:19:10 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 3786 invoked by uid 99); 9 Nov 2017 21:19:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Nov 2017 21:19:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CFC30F5E05; Thu, 9 Nov 2017 21:19:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ab@apache.org To: commits@lucene.apache.org Date: Thu, 09 Nov 2017 21:19:56 -0000 Message-Id: <9e40608f4e5249278bb6f714448a395d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [50/50] [abbrv] lucene-solr:jira/solr-11285-sim: SOLR-11285: More refactoring to support running OverseerTriggerThread using simulated components. archived-at: Thu, 09 Nov 2017 21:19:18 -0000 SOLR-11285: More refactoring to support running OverseerTriggerThread using simulated components. Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/73b1a95b Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/73b1a95b Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/73b1a95b Branch: refs/heads/jira/solr-11285-sim Commit: 73b1a95b7d8def8fe35d0a14231b2d8569f46626 Parents: 93a880d Author: Andrzej Bialecki Authored: Thu Nov 9 22:17:50 2017 +0100 Committer: Andrzej Bialecki Committed: Thu Nov 9 22:17:50 2017 +0100 ---------------------------------------------------------------------- .../java/org/apache/solr/cloud/Overseer.java | 7 +- .../solr/cloud/autoscaling/AutoScaling.java | 14 +-- .../cloud/autoscaling/ExecutePlanAction.java | 12 +-- .../cloud/autoscaling/HttpTriggerListener.java | 6 +- .../autoscaling/OverseerTriggerThread.java | 20 ++--- .../cloud/autoscaling/SystemLogListener.java | 6 +- .../cloud/autoscaling/TriggerListenerBase.java | 6 +- .../autoscaling/TriggerIntegrationTest.java | 4 +- .../solr/cloud/autoscaling/sim/ActionError.java | 24 +++++ .../cloud/autoscaling/sim/RandomThrottle.java | 51 +++++++++++ .../cloud/autoscaling/sim/SimCloudManager.java | 18 +++- .../sim/SimClusterStateProvider.java | 95 +++++++++++++++++++- .../autoscaling/sim/SimDistribStateManager.java | 4 - .../autoscaling/sim/SimNodeStateProvider.java | 5 ++ .../sim/TestClusterStateProvider.java | 10 ++- .../apache/solr/cloud/rule/RuleEngineTest.java | 3 + .../DelegatingNodeStateProvider.java | 11 +++ .../cloud/autoscaling/DistribStateManager.java | 9 +- .../cloud/autoscaling/NodeStateProvider.java | 4 +- .../client/solrj/impl/ClusterStateProvider.java | 4 +- .../solrj/impl/SolrClientNodeStateProvider.java | 5 ++ .../solrj/impl/ZkDistribStateManager.java | 4 + 22 files changed, 263 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/java/org/apache/solr/cloud/Overseer.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 3b65d6f..eed8d41 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -49,7 +49,6 @@ import org.apache.solr.common.util.ObjectReleaseTracker; import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Utils; import org.apache.solr.core.CloudConfig; -import org.apache.solr.core.CoreContainer; import org.apache.solr.handler.admin.CollectionsHandler; import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.update.UpdateShardHandler; @@ -442,7 +441,7 @@ public class Overseer implements SolrCloseable { } - static class OverseerThread extends Thread implements Closeable { + public static class OverseerThread extends Thread implements Closeable { protected volatile boolean isClosed; private Closeable thread; @@ -544,10 +543,6 @@ public class Overseer implements SolrCloseable { return zkController; } - public CoreContainer getCoreContainer() { - return zkController.getCoreContainer(); - } - public SolrCloudManager getSolrCloudManager() { return zkController.getSolrCloudManager(); } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java index e61536b..039067c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java @@ -133,13 +133,13 @@ public class AutoScaling { */ public static class TriggerFactoryImpl extends TriggerFactory { - private final SolrCloudManager dataProvider; + private final SolrCloudManager cloudManager; private final SolrResourceLoader loader; - public TriggerFactoryImpl(SolrResourceLoader loader, SolrCloudManager dataProvider) { - Objects.requireNonNull(dataProvider); + public TriggerFactoryImpl(SolrResourceLoader loader, SolrCloudManager cloudManager) { + Objects.requireNonNull(cloudManager); Objects.requireNonNull(loader); - this.dataProvider = dataProvider; + this.cloudManager = cloudManager; this.loader = loader; } @@ -150,11 +150,11 @@ public class AutoScaling { } switch (type) { case NODEADDED: - return new NodeAddedTrigger(name, props, loader, dataProvider); + return new NodeAddedTrigger(name, props, loader, cloudManager); case NODELOST: - return new NodeLostTrigger(name, props, loader, dataProvider); + return new NodeLostTrigger(name, props, loader, cloudManager); case SEARCHRATE: - return new SearchRateTrigger(name, props, loader, dataProvider); + return new SearchRateTrigger(name, props, loader, cloudManager); default: throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name); } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java index ebe0660..4c1a53d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java @@ -56,7 +56,7 @@ public class ExecutePlanAction extends TriggerActionBase { @Override public void process(TriggerEvent event, ActionContext context) throws Exception { log.debug("-- processing event: {} with context properties: {}", event, context.getProperties()); - SolrCloudManager dataProvider = context.getCloudManager(); + SolrCloudManager cloudManager = context.getCloudManager(); List operations = (List) context.getProperty("operations"); if (operations == null || operations.isEmpty()) { log.info("No operations to execute for event: {}", event); @@ -73,22 +73,22 @@ public class ExecutePlanAction extends TriggerActionBase { // waitForFinalState so that the end effects of operations are visible req.setWaitForFinalState(true); String asyncId = event.getSource() + '/' + event.getId() + '/' + counter; - String znode = saveAsyncId(dataProvider.getDistribStateManager(), event, asyncId); + String znode = saveAsyncId(cloudManager.getDistribStateManager(), event, asyncId); log.debug("Saved requestId: {} in znode: {}", asyncId, znode); // TODO: find a better way of using async calls using dataProvider API !!! req.setAsyncId(asyncId); - SolrResponse asyncResponse = dataProvider.request(req); + SolrResponse asyncResponse = cloudManager.request(req); if (asyncResponse.getResponse().get("error") != null) { throw new IOException("" + asyncResponse.getResponse().get("error")); } asyncId = (String)asyncResponse.getResponse().get("requestid"); - CollectionAdminRequest.RequestStatusResponse statusResponse = waitForTaskToFinish(dataProvider, asyncId, + CollectionAdminRequest.RequestStatusResponse statusResponse = waitForTaskToFinish(cloudManager, asyncId, DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS); if (statusResponse != null) { RequestStatusState state = statusResponse.getRequestStatus(); if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED || state == RequestStatusState.NOT_FOUND) { try { - dataProvider.getDistribStateManager().removeData(znode, -1); + cloudManager.getDistribStateManager().removeData(znode, -1); } catch (Exception e) { log.warn("Unexpected exception while trying to delete znode: " + znode, e); } @@ -96,7 +96,7 @@ public class ExecutePlanAction extends TriggerActionBase { response = statusResponse; } } else { - response = dataProvider.request(operation); + response = cloudManager.request(operation); } NamedList result = response.getResponse(); context.getProperties().compute("responses", (s, o) -> { http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java index 2003cb8..0388472 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java @@ -66,8 +66,8 @@ public class HttpTriggerListener extends TriggerListenerBase { private boolean followRedirects; @Override - public void init(SolrCloudManager dataProvider, AutoScalingConfig.TriggerListenerConfig config) { - super.init(dataProvider, config); + public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) { + super.init(cloudManager, config); urlTemplate = (String)config.properties.get("url"); payloadTemplate = (String)config.properties.get("payload"); contentType = (String)config.properties.get("contentType"); @@ -148,7 +148,7 @@ public class HttpTriggerListener extends TriggerListenerBase { }); headers.put("Content-Type", type); try { - dataProvider.httpRequest(url, SolrRequest.METHOD.POST, headers, payload, timeout, followRedirects); + cloudManager.httpRequest(url, SolrRequest.METHOD.POST, headers, payload, timeout, followRedirects); } catch (IOException e) { LOG.warn("Exception sending request for event " + event, e); } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java index 3171404..7a9390b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java @@ -57,7 +57,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final SolrCloudManager dataProvider; + private final SolrCloudManager cloudManager; private final CloudConfig cloudConfig; @@ -80,11 +80,11 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable { private AutoScalingConfig autoScalingConfig; - public OverseerTriggerThread(SolrResourceLoader loader, SolrCloudManager dataProvider, CloudConfig cloudConfig) { - this.dataProvider = dataProvider; + public OverseerTriggerThread(SolrResourceLoader loader, SolrCloudManager cloudManager, CloudConfig cloudConfig) { + this.cloudManager = cloudManager; this.cloudConfig = cloudConfig; - scheduledTriggers = new ScheduledTriggers(loader, dataProvider); - triggerFactory = new AutoScaling.TriggerFactoryImpl(loader, dataProvider); + scheduledTriggers = new ScheduledTriggers(loader, cloudManager); + triggerFactory = new AutoScaling.TriggerFactoryImpl(loader, cloudManager); } @Override @@ -114,11 +114,11 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable { // we automatically add a trigger for auto add replicas if it does not exists already while (!isClosed) { try { - AutoScalingConfig autoScalingConfig = dataProvider.getDistribStateManager().getAutoScalingConfig(); + AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig(); AutoScalingConfig withAutoAddReplicasTrigger = withAutoAddReplicasTrigger(autoScalingConfig); if (withAutoAddReplicasTrigger.equals(autoScalingConfig)) break; log.debug("Adding .autoAddReplicas trigger"); - dataProvider.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(withAutoAddReplicasTrigger), withAutoAddReplicasTrigger.getZkVersion()); + cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(withAutoAddReplicasTrigger), withAutoAddReplicasTrigger.getZkVersion()); break; } catch (BadVersionException bve) { // somebody else has changed the configuration so we must retry @@ -225,7 +225,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable { throw new IllegalStateException("Caught AlreadyClosedException from ScheduledTriggers, but we're not closed yet!", e); } } - DistribStateManager stateManager = dataProvider.getDistribStateManager(); + DistribStateManager stateManager = cloudManager.getDistribStateManager(); if (cleanOldNodeLostMarkers) { log.debug("-- clean old nodeLost markers"); try { @@ -259,7 +259,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable { private void removeNodeMarker(String path, String nodeName) { path = path + "/" + nodeName; try { - dataProvider.getDistribStateManager().removeData(path, -1); + cloudManager.getDistribStateManager().removeData(path, -1); log.debug(" -- deleted " + path); } catch (NoSuchElementException e) { // ignore @@ -297,7 +297,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable { if (isClosed) { return; } - AutoScalingConfig currentConfig = dataProvider.getDistribStateManager().getAutoScalingConfig(watcher); + AutoScalingConfig currentConfig = cloudManager.getDistribStateManager().getAutoScalingConfig(watcher); log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, currentConfig.getZkVersion()); if (znodeVersion >= currentConfig.getZkVersion()) { // protect against reordered watcher fires by ensuring that we only move forward http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java index 34761f2..3282075 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java @@ -72,8 +72,8 @@ public class SystemLogListener extends TriggerListenerBase { private boolean enabled = true; @Override - public void init(SolrCloudManager dataProvider, AutoScalingConfig.TriggerListenerConfig config) { - super.init(dataProvider, config); + public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) { + super.init(cloudManager, config); collection = (String)config.properties.getOrDefault(CollectionAdminParams.COLLECTION, CollectionAdminParams.SYSTEM_COLL); enabled = Boolean.parseBoolean(String.valueOf(config.properties.getOrDefault("enabled", true))); } @@ -119,7 +119,7 @@ public class SystemLogListener extends TriggerListenerBase { UpdateRequest req = new UpdateRequest(); req.add(doc); req.setParam(CollectionAdminParams.COLLECTION, collection); - dataProvider.request(req); + cloudManager.request(req); } catch (Exception e) { if ((e instanceof SolrException) && e.getMessage().contains("Collection not found")) { // relatively benign http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java ---------------------------------------------------------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java index 507c77d..61a95db 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java @@ -27,11 +27,11 @@ import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager; public abstract class TriggerListenerBase implements TriggerListener { protected AutoScalingConfig.TriggerListenerConfig config; - protected SolrCloudManager dataProvider; + protected SolrCloudManager cloudManager; @Override - public void init(SolrCloudManager dataProvider, AutoScalingConfig.TriggerListenerConfig config) { - this.dataProvider = dataProvider; + public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) { + this.cloudManager = cloudManager; this.config = config; } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java index e64f588..4b98618 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java @@ -979,8 +979,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase { public static class TestTriggerListener extends TriggerListenerBase { @Override - public void init(SolrCloudManager dataProvider, AutoScalingConfig.TriggerListenerConfig config) { - super.init(dataProvider, config); + public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) { + super.init(cloudManager, config); listenerCreated.countDown(); } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/ActionError.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/ActionError.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/ActionError.java new file mode 100644 index 0000000..c10b42e --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/ActionError.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling.sim; + +/** + * + */ +public interface ActionError { + boolean shouldFail(String... args); +} http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/RandomThrottle.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/RandomThrottle.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/RandomThrottle.java new file mode 100644 index 0000000..1b8981a --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/RandomThrottle.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud.autoscaling.sim; + +import org.apache.commons.math3.distribution.RealDistribution; +import org.apache.commons.math3.distribution.UniformRealDistribution; + +/** + * + */ +public class RandomThrottle { + + private final int minMs; + private final RealDistribution distribution; + + public RandomThrottle(int minMs, int maxMs) { + this.minMs = minMs; + this.distribution = new UniformRealDistribution(0, maxMs - minMs); + } + + public RandomThrottle(int minMs, RealDistribution distribution) { + this.minMs = minMs; + this.distribution = distribution; + } + + public void throttle() { + double random = distribution.sample(); + if (random < 0) { + random = 0; + } + try { + Thread.sleep(minMs + Math.round(random)); + } catch (InterruptedException e) { + // do nothing + } + } +} http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java index 9b1ae39..c31819a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java @@ -29,6 +29,11 @@ import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager; import org.apache.solr.client.solrj.cloud.autoscaling.NodeStateProvider; import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager; import org.apache.solr.client.solrj.impl.ClusterStateProvider; +import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.autoscaling.OverseerTriggerThread; +import org.apache.solr.common.util.IOUtils; +import org.apache.solr.core.CloudConfig; +import org.apache.solr.core.SolrResourceLoader; /** * Simulated {@link SolrCloudManager}. @@ -42,12 +47,19 @@ public class SimCloudManager implements SolrCloudManager { private SolrClient solrClient; private final SimHttpServer httpServer; + private Overseer.OverseerThread triggerThread; + public SimCloudManager() { this.stateManager = new SimDistribStateManager(); this.clusterStateProvider = new SimClusterStateProvider(); this.nodeStateProvider = new SimNodeStateProvider(this.clusterStateProvider, null); this.queueFactory = new SimDistributedQueueFactory(); this.httpServer = new SimHttpServer(); + ThreadGroup triggerThreadGroup = new ThreadGroup("Simulated Overseer autoscaling triggers"); + OverseerTriggerThread trigger = new OverseerTriggerThread(new SolrResourceLoader(), this, + new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build()); + triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread"); + triggerThread.start(); } public void setSolrClient(SolrClient solrClient) { @@ -116,6 +128,10 @@ public class SimCloudManager implements SolrCloudManager { @Override public void close() throws IOException { - + IOUtils.closeQuietly(clusterStateProvider); + IOUtils.closeQuietly(nodeStateProvider); + IOUtils.closeQuietly(stateManager); + IOUtils.closeQuietly(triggerThread); + triggerThread.interrupt(); } } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java index d6180df..37888f3 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -28,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import org.apache.solr.client.solrj.SolrRequest; @@ -35,18 +38,24 @@ import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo; import org.apache.solr.client.solrj.impl.ClusterStateProvider; import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.SolrResponseBase; +import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionAdminParams; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; +import org.apache.solr.util.DefaultSolrThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +65,8 @@ import org.slf4j.LoggerFactory; public class SimClusterStateProvider implements ClusterStateProvider { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String REPLICA_INFO_PROP = "__ri__"; + private final Map> nodeReplicaMap = new ConcurrentHashMap<>(); private final Set liveNodes = ConcurrentHashMap.newKeySet(); @@ -63,6 +74,10 @@ public class SimClusterStateProvider implements ClusterStateProvider { private final Map> collProperties = new ConcurrentHashMap<>(); private final Map>> sliceProperties = new ConcurrentHashMap<>(); + private final List systemColl = Collections.synchronizedList(new ArrayList<>()); + + private final ExecutorService simStateProviderPool; + private final ReentrantLock lock = new ReentrantLock(); /** @@ -70,7 +85,7 @@ public class SimClusterStateProvider implements ClusterStateProvider { * to ensure proper behavior, otherwise it will behave as a cluster with zero live nodes and zero replicas. */ public SimClusterStateProvider() { - + simStateProviderPool = ExecutorUtil.newMDCAwareCachedThreadPool(new DefaultSolrThreadFactory("simStateProviderPool")); } // ============== SIMULATOR SETUP METHODS ==================== @@ -133,7 +148,9 @@ public class SimClusterStateProvider implements ClusterStateProvider { r.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()); }); } - return liveNodes.remove(nodeId); + boolean res = liveNodes.remove(nodeId); + simStateProviderPool.submit(() -> simRunLeaderElection()); + return res; } finally { lock.unlock(); } @@ -153,6 +170,7 @@ public class SimClusterStateProvider implements ClusterStateProvider { try { List replicas = nodeReplicaMap.computeIfAbsent(nodeId, n -> new ArrayList<>()); replicas.add(replicaInfo); + simStateProviderPool.submit(() -> simRunLeaderElection()); } finally { lock.unlock(); } @@ -166,6 +184,7 @@ public class SimClusterStateProvider implements ClusterStateProvider { for (int i = 0; i < replicas.size(); i++) { if (coreNodeName.equals(replicas.get(i).getCore())) { replicas.remove(i); + simStateProviderPool.submit(() -> simRunLeaderElection()); return; } } @@ -173,7 +192,56 @@ public class SimClusterStateProvider implements ClusterStateProvider { } finally { lock.unlock(); } + } + public synchronized void simRunLeaderElection() { + try { + ClusterState state = getClusterState(); + state.forEachCollection(dc -> { + dc.getSlices().forEach(s -> { + Replica leader = s.getLeader(); + if (leader == null || !liveNodes.contains(leader.getNodeName())) { + LOG.info("Running leader election for " + dc.getName() + " / " + s.getName()); + if (s.getReplicas().isEmpty()) { // no replicas - punt + return; + } + // mark all replicas as non-leader (probably not necessary) and collect all active and live + List active = new ArrayList<>(); + s.getReplicas().forEach(r -> { + AtomicReference riRef = new AtomicReference<>(); + // find our ReplicaInfo for this replica + nodeReplicaMap.get(r.getNodeName()).forEach(info -> { + if (info.getName().equals(r.getName())) { + riRef.set(info); + } + }); + ReplicaInfo ri = riRef.get(); + if (ri == null) { + throw new RuntimeException("-- could not find ReplicaInfo for replica " + r); + } + ri.getVariables().remove(ZkStateReader.LEADER_PROP); + if (r.isActive(liveNodes)) { + active.add(ri); + } else { // if it's on a node that is not live mark it down + if (!liveNodes.contains(r.getNodeName())) { + ri.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()); + } + } + }); + if (active.isEmpty()) { + LOG.warn("-- can't find any active replicas for " + dc.getName() + " / " + s.getName()); + return; + } + Collections.shuffle(active); + ReplicaInfo ri = active.get(0); + ri.getVariables().put(ZkStateReader.LEADER_PROP, "true"); + LOG.info("-- elected new leader for " + dc.getName() + " / " + s.getName() + ": " + ri); + } + }); + }); + } catch (Exception e) { + throw new RuntimeException("simRunLeaderElection failed!", e); + } } public void simDeleteCollection(String collection) throws Exception { @@ -205,6 +273,14 @@ public class SimClusterStateProvider implements ClusterStateProvider { } } + public void simClearSystemCollection() { + systemColl.clear(); + } + + public List simGetSystemCollection() { + return systemColl; + } + // todo: maybe hook up DistribStateManager /clusterstate.json watchers? public void simSetClusterProperties(Map properties) { clusterProperties.clear(); @@ -260,6 +336,19 @@ public class SimClusterStateProvider implements ClusterStateProvider { } public SolrResponse simHandleSolrRequest(SolrRequest req) throws IOException { + LOG.info("--- got SolrRequest: " + req); + if (req instanceof UpdateRequest) { + // support only updates to the system collection + UpdateRequest ureq = (UpdateRequest)req; + if (ureq.getCollection() == null || !ureq.getCollection().equals(CollectionAdminParams.SYSTEM_COLL)) { + throw new UnsupportedOperationException("Only .system updates are supported but got: " + req); + } + List docs = ureq.getDocuments(); + if (docs != null) { + systemColl.addAll(docs); + } + return new UpdateResponse(); + } // support only a specific subset of collection admin ops if (!(req instanceof CollectionAdminRequest)) { throw new UnsupportedOperationException("Only CollectionAdminRequest-s are supported: " + req.getClass().getName()); @@ -417,6 +506,6 @@ public class SimClusterStateProvider implements ClusterStateProvider { @Override public void close() throws IOException { - + simStateProviderPool.shutdownNow(); } } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java index 768b913..e3e10d1 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java @@ -185,10 +185,6 @@ public class SimDistribStateManager implements DistribStateManager { } - public interface ActionError { - boolean shouldFail(String path); - } - // shared state across all instances private static Node sharedRoot = createNewRootNode(); private static final ReentrantLock multiLock = new ReentrantLock(); http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java index 77d6056..cbac7cb 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java @@ -1,5 +1,6 @@ package org.apache.solr.cloud.autoscaling.sim; +import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; @@ -66,4 +67,8 @@ public class SimNodeStateProvider implements NodeStateProvider { return res; } + @Override + public void close() throws IOException { + + } } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java index 73753bb..6a101a4 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java @@ -27,6 +27,7 @@ import org.apache.solr.common.cloud.rule.ImplicitSnitch; import org.apache.solr.common.params.CollectionAdminParams; import org.apache.solr.common.util.Utils; import org.apache.zookeeper.Watcher; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -54,7 +55,7 @@ public class TestClusterStateProvider extends SolrCloudTestCase { @BeforeClass public static void setupCluster() throws Exception { simulated = random().nextBoolean(); - //simulated = true; + simulated = true; LOG.info("####### Using simulated component? " + simulated); configureCluster(NODE_COUNT) @@ -65,6 +66,13 @@ public class TestClusterStateProvider extends SolrCloudTestCase { init(); } + @AfterClass + public static void closeCloudManager() throws Exception { + if (simulated && cloudManager != null) { + cloudManager.close(); + } + } + private static void init() throws Exception { SolrCloudManager realManager = cluster.getJettySolrRunner(cluster.getJettySolrRunners().size() - 1).getCoreContainer() .getZkController().getSolrCloudManager(); http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java ---------------------------------------------------------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java index 76c5c0f..626374c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java @@ -201,6 +201,9 @@ public class RuleEngineTest extends SolrTestCaseJ4{ public NodeStateProvider getNodeStateProvider() { return new NodeStateProvider() { @Override + public void close() throws IOException { } + + @Override public Map getNodeValues(String node, Collection tags) { return (Map) MockSnitch.nodeVsTags.get(node); } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java ---------------------------------------------------------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java index 8b717f8..9ffde0f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java @@ -17,6 +17,7 @@ package org.apache.solr.client.solrj.cloud.autoscaling; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; @@ -40,4 +41,14 @@ public class DelegatingNodeStateProvider implements NodeStateProvider { public Map>> getReplicaInfo(String node, Collection keys) { return delegate.getReplicaInfo(node, keys); } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public boolean isClosed() { + return delegate.isClosed(); + } } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java ---------------------------------------------------------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java index 6b89204..c6a348e 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java @@ -16,11 +16,11 @@ */ package org.apache.solr.client.solrj.cloud.autoscaling; -import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; +import org.apache.solr.common.SolrCloseable; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; @@ -30,7 +30,7 @@ import org.apache.zookeeper.Watcher; /** * This interface represents a distributed state repository. */ -public interface DistribStateManager extends Closeable { +public interface DistribStateManager extends SolrCloseable { // state accessors @@ -69,9 +69,4 @@ public interface DistribStateManager extends Closeable { default AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException { return getAutoScalingConfig(null); } - - @Override - default void close() throws IOException { - - } } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java ---------------------------------------------------------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java index dbf6836..68dfa39 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java @@ -20,10 +20,12 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.solr.common.SolrCloseable; + /** * This interface models the access to node and replica information. */ -public interface NodeStateProvider { +public interface NodeStateProvider extends SolrCloseable { /** * Get the value of each tag for a given node * http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ClusterStateProvider.java ---------------------------------------------------------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ClusterStateProvider.java index c285452..3041a13 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ClusterStateProvider.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ClusterStateProvider.java @@ -16,15 +16,15 @@ */ package org.apache.solr.client.solrj.impl; -import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.List; import java.util.Set; +import org.apache.solr.common.SolrCloseable; import org.apache.solr.common.cloud.ClusterState; -public interface ClusterStateProvider extends Closeable { +public interface ClusterStateProvider extends SolrCloseable { /** * Obtain the state of the collection (cluster status). http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java ---------------------------------------------------------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java index 818bd65..c90dce9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java @@ -156,6 +156,11 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter } } + @Override + public void close() throws IOException { + + } + //uses metrics API to get node information static class AutoScalingSnitch extends ImplicitSnitch { @Override http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73b1a95b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java ---------------------------------------------------------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java index deabd08..6abc4ba 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java @@ -167,4 +167,8 @@ public class ZkDistribStateManager implements DistribStateManager { return new AutoScalingConfig(map); } + @Override + public void close() throws IOException { + + } }