Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A9EBC200D41 for ; Tue, 7 Nov 2017 13:35:24 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A84EA1609C8; Tue, 7 Nov 2017 12:35:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9EC92160C00 for ; Tue, 7 Nov 2017 13:35:23 +0100 (CET) Received: (qmail 90834 invoked by uid 500); 7 Nov 2017 12:35:22 -0000 Mailing-List: contact dev-help@brooklyn.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.apache.org Delivered-To: mailing list dev@brooklyn.apache.org Received: (qmail 90553 invoked by uid 99); 7 Nov 2017 12:35:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Nov 2017 12:35:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 28E36DFDCE; Tue, 7 Nov 2017 12:35:22 +0000 (UTC) From: aledsage To: dev@brooklyn.apache.org Reply-To: dev@brooklyn.apache.org References: In-Reply-To: Subject: [GitHub] brooklyn-server pull request #879: Elect primary / failover policies Content-Type: text/plain Message-Id: <20171107123522.28E36DFDCE@git1-us-west.apache.org> Date: Tue, 7 Nov 2017 12:35:22 +0000 (UTC) archived-at: Tue, 07 Nov 2017 12:35:24 -0000 Github user aledsage commented on a diff in the pull request: https://github.com/apache/brooklyn-server/pull/879#discussion_r149335124 --- Diff: policy/src/main/java/org/apache/brooklyn/policy/ha/ElectPrimaryEffector.java --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityInitializer; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.EffectorBody; +import org.apache.brooklyn.core.effector.EffectorTasks.EffectorBodyTaskFactory; +import org.apache.brooklyn.core.effector.Effectors; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.UserFacingException; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; + +/** +This effector will scan candidates among children or members to determine which should be noted as "primary". +The primary is selected from service-up candidates based on a numeric weight as a sensor or config on the candidates +(`ha.primary.weight`, unless overridden), with higher weights being preferred. +In the case of ties, or a new candidate emerging with a weight higher than a current healthy primary, +behaviour can be configured with `primary.selection.mode`. + +Returns a map containing a message, newPrimary, oldPrimary, and a {@link ResultCode} code. +*/ +public class ElectPrimaryEffector implements EntityInitializer, ElectPrimaryConfig { + + private static final Logger log = LoggerFactory.getLogger(ElectPrimaryEffector.class); + + public static enum ResultCode { PRIMARY_UNCHANGED, NEW_PRIMARY_ELECTED, NO_PRIMARY_AVAILABLE } + + public static final Effector EFFECTOR = Effectors.effector(Object.class, "electPrimary"). + description("Scan to detect whether there is or should be a new primary").buildAbstract(); + + private final ConfigBag paramsCreationTime; + + public ElectPrimaryEffector(ConfigBag params) { + this.paramsCreationTime = params; + } + + public ElectPrimaryEffector(Map params) { + this(ConfigBag.newInstance(params)); + } + + + // wire up the entity to call the task factory to create the task on invocation + + @Override + public void apply(@SuppressWarnings("deprecation") org.apache.brooklyn.api.entity.EntityLocal entity) { + ((EntityInternal)entity).getMutableEntityType().addEffector(makeEffector(paramsCreationTime)); + } + + public static Effector makeEffector(ConfigBag params) { + return Effectors.effector(EFFECTOR).impl(new EffectorBodyTaskFactory(new ElectPrimaryEffectorBody(params))).build(); + } + + protected static class ElectPrimaryEffectorBody extends EffectorBody { + private final ConfigBag paramsCreationTime; + + public ElectPrimaryEffectorBody(ConfigBag paramsCreationTime) { + this.paramsCreationTime = paramsCreationTime; + } + + // these are the actual tasks we do + + @Override + public Object call(ConfigBag paramsInvocationTime) { + ConfigBag params = ConfigBag.newInstanceCopying(paramsCreationTime).copy(paramsInvocationTime); + + try { + Entity newPrimary = DynamicTasks.queue("check primaries", new CheckPrimaries(params)).getUnchecked(); + + Entity currentActive = getCurrentActive(params); + if (newPrimary==null) { +// If no primary can be found, the effector will: +// * add a "primary-election" problem so that service state logic, if applicable, will know that the entity is unhealthy +// * set service up false +// * if the local entity is expected to be RUNNING, it will set actual state to ON_FIRE +// * if the local entity has no expectation, it will set actual state to STOPPED +// * demote any old primary + ServiceProblemsLogic.updateProblemsIndicator(entity(), "primary", "No primary could be found"); + entity().sensors().set(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME)), null); + entity().sensors().set(Attributes.SERVICE_UP, false); + if (Lifecycle.RUNNING.equals( entity().getAttribute(Attributes.SERVICE_STATE_EXPECTED) )) { + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + } else { + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STOPPED); + } + DynamicTasks.queue(Tasks.create("demote "+currentActive, new Demote(params)) ).getUnchecked(); + return MutableMap.of("code", ResultCode.NO_PRIMARY_AVAILABLE, "message", "No primary available", "primary", null); + } + + if (newPrimary.equals(currentActive)) { + // If there is a primary and it is unchanged, the effector will end. + return MutableMap.of("code", ResultCode.PRIMARY_UNCHANGED, "message", "No change required", "primary", newPrimary); + } + + log.info("Detected new primary "+newPrimary+" at "+entity()+" (previously had "+currentActive+")"); +// If a new primary is detected, the effector will: +// * set the local entity to the STARTING state +// * clear any "primary-election" problem +// * publish the new primary in a sensor called `primary` (or the sensor set in `primary.sensor.name`) +// * cancel any other ongoing promote calls, and if there is an ongoing demote call on the entity being promoted, cancel that also +// * in parallel +// * invoke `promote` (or the effector called `primary.promote.effector.name`) on the local entity or the entity being promoted +// * invoke `demote` (or the effector called `primary.promote.effector.name`) on the local entity or the entity being demoted, if an entity is being demoted +// * set service up true +// * set the local entity to the RUNNING state + + boolean wasRunning = entity().sensors().get(Attributes.SERVICE_STATE_ACTUAL) == Lifecycle.RUNNING; + if (wasRunning) { + log.debug("Transititioning "+entity()+" to starting while promoting/demoting"); + ServiceStateLogic.setExpectedState(entity(), Lifecycle.STARTING); + } + ServiceProblemsLogic.clearProblemsIndicator(entity(), "primary"); + entity().sensors().set(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME)), newPrimary); + try { + // TODO cancel other promote/demote calls + + promoteAndDemote(params, currentActive, newPrimary); + + log.debug("Promoted/demoted primary for "+entity()+", now setting service up "+(wasRunning ? "and running" : "(but not setting as 'running' because it wasn't 'running' before)")); + entity().sensors().set(Attributes.SERVICE_UP, true); + if (wasRunning) ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING); + + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.debug("Error promoting/demoting primary for "+entity()+" (rethrowing): "+e); + ServiceProblemsLogic.updateProblemsIndicator(entity(), "primary", e); + ServiceStateLogic.setExpectedStateRunningWithErrors(entity()); + Exceptions.propagate(e); + } + + return MutableMap.of("code", ResultCode.NEW_PRIMARY_ELECTED, "message", "New primary found", "primary", newPrimary); + + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + + if (Entities.isNoLongerManaged(entity())) { + // ignore errors if shutting down + return ""; + } + + Lifecycle expected = ServiceStateLogic.getExpectedState(entity()); + if (expected==Lifecycle.RUNNING || expected==Lifecycle.STARTING) { + // including SelectionModeStrictFailed + log.warn("Error electing new primary at "+entity()+": "+Exceptions.collapseText(e)); + ServiceProblemsLogic.updateProblemsIndicator(entity(), "primary", "Error electing primary: "+ + Exceptions.collapseText(e)); + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + + throw Exceptions.propagateAnnotated("Error electing primary (when "+expected.toString().toLowerCase()+")", e); + } + + throw Exceptions.propagateAnnotated("Error electing primary (when not starting/running)", e); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected void promoteAndDemote(ConfigBag params, Entity oldPrimary, Entity newPrimary) { + params.configureStringKey("oldPrimary", oldPrimary); + params.configureStringKey("newPrimary", newPrimary); + MutableList tasks = MutableList.of(); + + if (newPrimary!=null) tasks.append(Tasks.create("promote "+newPrimary, new Promote(params))); + else tasks.append(Tasks.warning("No new primary; nothing to promote", null, false)); + + if (oldPrimary!=null) tasks.append(Tasks.create("demote "+oldPrimary, new Demote(params))); + else tasks.append(Tasks.warning("No old primary; nothing to demote", null, false)); + + log.debug("Running "+tasks); + List result = DynamicTasks.queue(Tasks.parallel("promote/demote", (List>)(List) tasks)).getUnchecked(); + log.debug("Ran "+tasks+", results: "+result); + } + + private Entity getCurrentActive(ConfigBag params) { + return entity().getAttribute(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME))); + } + + protected class CheckPrimaries implements Callable { + final ConfigBag params; + public CheckPrimaries(ConfigBag params) { this.params = params; } + + @Override + public Entity call() throws Exception { + Stopwatch elapsedTime = Stopwatch.createStarted(); + boolean extendedForNonViableRunning = false; + outer: while (true) { + TargetMode target = params.get(TARGET_MODE); + Iterable candidates = + target==TargetMode.CHILDREN ? entity().getChildren() : + target==TargetMode.MEMBERS ? ((Group)entity()).getMembers() : + // auto - prefer members + entity() instanceof Group ? ((Group)entity()).getMembers() : entity().getChildren(); + + SelectionMode mode = params.get(SELECTION_MODE); + Entity currentActive = getCurrentActive(params); + if (mode==SelectionMode.FAILOVER && currentActive!=null && Iterables.contains(candidates, currentActive) && isViable(currentActive)) { + return currentActive; + } + // find best live and/or time to wait + Duration delayForBest = Duration.ZERO; + Duration period = Duration.millis(10); + + if (candidates.iterator().hasNext()) { + List weightedEntities = MutableList.of(); + for (Entity candidate: candidates) { + weightedEntities.add(new WeightedEntity(candidate)); + } + Collections.sort(weightedEntities); + WeightedEntity anyBestTheoreticalW = weightedEntities.iterator().next(); + WeightedEntity bestLive = null; + for (WeightedEntity w: weightedEntities) { + if (w.score < -0.00001) { + // this is disallowed, as are all following + break; + } + if (Math.abs(anyBestTheoreticalW.score - w.score) >= 0.0000000001) { + if (bestLive==null && !delayForBest.isLongerThan(elapsedTime)) { + // no viable highest-score and can't wait any longer; + // take the next highest that we find + if (isViable(w.entity)) { + log.debug("Theoretical best primary at "+entity()+" ("+anyBestTheoreticalW+", maybe others) not available, using next best: "+w); + return w.entity; + } else { + continue; + } + } else { + // don't bother looking at non-highest scoring nodes, we know what to do + break; + } + } + + // examining the highest scoring ones + + if (isViable(w.entity)) { + log.debug("Viable best primary at "+entity()+" detected: "+w); + if (bestLive==null) { + bestLive = w; + } else if (mode==SelectionMode.STRICT) { + // in strict mode, should only have one node with the best score, + // so this is a failure. we might want to allow some grace if a + // method is promoting a new one by changing (non-atomically) which + // single entity has a weight 1 where others are 0. + // but preferred way to avoid that is to increment weight at new primary instead. + throw new SelectionModeStrictFailed(w.entity, bestLive.entity, bestLive.score); + } else if (w.entity.equals(currentActive)) { + // always prefer current active if it is best + // (safe to bail out here but we won't yet) + bestLive = w; + } else { + // two equally good bests, neither current active + // could prefer either but go with the first (no-op here) + } + + } else { + // this best not viable - determine why and how long to wait + Lifecycle state = w.entity.getAttribute(Attributes.SERVICE_STATE_ACTUAL); + log.debug("Theoretical best primary at "+entity()+": "+w.entity+" "+state+" (not viable); may re-check"); + if (state==Lifecycle.STARTING) { + delayForBest = Duration.max(delayForBest, entity().config().get(BEST_STARTING_WAIT_TIMEOUT)); + + } else if (state==Lifecycle.RUNNING) { + + // running, not on fire, but not viable - either a race (has just become viable) + // or a coding error (caller did not set "up" correctly); give extra 5s if we haven't already + if (!extendedForNonViableRunning) { + delayForBest = Duration.max(delayForBest, Duration.of(elapsedTime).add(entity().config().get(BEST_WAIT_TIMEOUT))); + extendedForNonViableRunning = true; + } + } else { + delayForBest = Duration.max(delayForBest, entity().config().get(BEST_WAIT_TIMEOUT)); + } + } + } + + // finished looking at all nodes, or all theoretical bests and found one + if (bestLive!=null) { + // found a best live (preferring current if viable, + // but doesn't wait for current) + return bestLive.entity; + } + } else { + delayForBest = entity().config().get(BEST_WAIT_TIMEOUT); + } + Duration delay = delayForBest.subtract(Duration.of(elapsedTime)); + if (delay.isPositive()) { + delay = Duration.min(delay, period); + period = Duration.min(Duration.ONE_SECOND, period.multiply(1.5)); + log.debug("Delaying "+delay+" ("+delayForBest+" allowed, "+Duration.of(elapsedTime)+" elapsed) then rechecking for best primary at "+entity()); + // there was a theoretical best that wasn't started + Time.sleep(delay); + continue outer; + } + + // none viable or worth waiting for + return null; + } + + } + + private class WeightedEntity implements Comparable { + public final Entity entity; + public final double score; + + public WeightedEntity(Entity candidate) { + this.entity = candidate; + this.score = score(candidate); + } + + @Override + public int compareTo(WeightedEntity o) { + double v = o.score - this.score; + return (v > 0.00000001 ? 1 : v<-0.00000001 ? -1 : 0); + } + @Override + public String toString() { + return entity+":"+score; + } + } + + protected boolean isViable(Entity candidate) { + if (!Lifecycle.RUNNING.equals( candidate.getAttribute(Attributes.SERVICE_STATE_ACTUAL) )) return false; + if (!Boolean.TRUE.equals( candidate.getAttribute(Attributes.SERVICE_UP) )) return false; + if (score(candidate) <= -0.000000001) return false; + return true; + } + + private double score(Entity candidate) { + Double s = candidate.getAttribute(Sensors.newDoubleSensor(params.get(PRIMARY_WEIGHT_NAME))); + if (s!=null) return s; + s = candidate.getConfig(ConfigKeys.newDoubleConfigKey(params.get(PRIMARY_WEIGHT_NAME))); + if (s!=null) return s; + return 0; + } + } + + protected class Promote implements Callable { + final ConfigBag params; + public Promote(ConfigBag params) { this.params = params; } + + @Override + public Object call() throws Exception { + String promoteEffectorName = params.get(PROMOTE_EFFECTOR_NAME); + Effector eff = entity().getEffector(promoteEffectorName); + if (eff!=null) { + return DynamicTasks.queue( Effectors.invocation(entity(), eff, params) ).asTask().getUnchecked(); + } + EntityInternal newPrimary = (EntityInternal)params.getStringKey("newPrimary"); + if (newPrimary==null) { + return "Nothing to demote; no new primary"; --- End diff -- Should be `Nothing to promote; no new primary` ---