Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 31E1C200D33 for ; Wed, 8 Nov 2017 18:05:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 30458160BDA; Wed, 8 Nov 2017 17:05:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 27A471609E0 for ; Wed, 8 Nov 2017 18:05:07 +0100 (CET) Received: (qmail 95836 invoked by uid 500); 8 Nov 2017 17:05:06 -0000 Mailing-List: contact dev-help@brooklyn.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.apache.org Delivered-To: mailing list dev@brooklyn.apache.org Received: (qmail 95823 invoked by uid 99); 8 Nov 2017 17:05:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Nov 2017 17:05:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6E3EDDFB3D; Wed, 8 Nov 2017 17:05:05 +0000 (UTC) From: ahgittin To: dev@brooklyn.apache.org Reply-To: dev@brooklyn.apache.org References: In-Reply-To: Subject: [GitHub] brooklyn-server pull request #879: Elect primary / failover policies Content-Type: text/plain Message-Id: <20171108170505.6E3EDDFB3D@git1-us-west.apache.org> Date: Wed, 8 Nov 2017 17:05:05 +0000 (UTC) archived-at: Wed, 08 Nov 2017 17:05:08 -0000 Github user ahgittin commented on a diff in the pull request: https://github.com/apache/brooklyn-server/pull/879#discussion_r149732862 --- Diff: policy/src/main/java/org/apache/brooklyn/policy/ha/ElectPrimaryEffector.java --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityInitializer; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.EffectorBody; +import org.apache.brooklyn.core.effector.EffectorTasks.EffectorBodyTaskFactory; +import org.apache.brooklyn.core.effector.Effectors; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.UserFacingException; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; + +/** +This effector will scan candidates among children or members to determine which should be noted as "primary". +The primary is selected from service-up candidates based on a numeric weight as a sensor or config on the candidates +(`ha.primary.weight`, unless overridden), with higher weights being preferred. +In the case of ties, or a new candidate emerging with a weight higher than a current healthy primary, +behaviour can be configured with `primary.selection.mode`. + +Returns a map containing a message, newPrimary, oldPrimary, and a {@link ResultCode} code. +*/ +public class ElectPrimaryEffector implements EntityInitializer, ElectPrimaryConfig { + + private static final Logger log = LoggerFactory.getLogger(ElectPrimaryEffector.class); + + public static enum ResultCode { PRIMARY_UNCHANGED, NEW_PRIMARY_ELECTED, NO_PRIMARY_AVAILABLE } + + public static final Effector EFFECTOR = Effectors.effector(Object.class, "electPrimary"). + description("Scan to detect whether there is or should be a new primary").buildAbstract(); + + private final ConfigBag paramsCreationTime; + + public ElectPrimaryEffector(ConfigBag params) { + this.paramsCreationTime = params; + } + + public ElectPrimaryEffector(Map params) { + this(ConfigBag.newInstance(params)); + } + + + // wire up the entity to call the task factory to create the task on invocation + + @Override + public void apply(@SuppressWarnings("deprecation") org.apache.brooklyn.api.entity.EntityLocal entity) { + ((EntityInternal)entity).getMutableEntityType().addEffector(makeEffector(paramsCreationTime)); + } + + public static Effector makeEffector(ConfigBag params) { + return Effectors.effector(EFFECTOR).impl(new EffectorBodyTaskFactory(new ElectPrimaryEffectorBody(params))).build(); + } + + protected static class ElectPrimaryEffectorBody extends EffectorBody { + private final ConfigBag paramsCreationTime; + + public ElectPrimaryEffectorBody(ConfigBag paramsCreationTime) { + this.paramsCreationTime = paramsCreationTime; + } + + // these are the actual tasks we do + + @Override + public Object call(ConfigBag paramsInvocationTime) { + ConfigBag params = ConfigBag.newInstanceCopying(paramsCreationTime).copy(paramsInvocationTime); + + try { + Entity newPrimary = DynamicTasks.queue("check primaries", new CheckPrimaries(params)).getUnchecked(); + + Entity currentActive = getCurrentActive(params); + if (newPrimary==null) { +// If no primary can be found, the effector will: +// * add a "primary-election" problem so that service state logic, if applicable, will know that the entity is unhealthy +// * set service up false +// * if the local entity is expected to be RUNNING, it will set actual state to ON_FIRE +// * if the local entity has no expectation, it will set actual state to STOPPED +// * demote any old primary + ServiceProblemsLogic.updateProblemsIndicator(entity(), "primary", "No primary could be found"); + entity().sensors().set(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME)), null); + entity().sensors().set(Attributes.SERVICE_UP, false); + if (Lifecycle.RUNNING.equals( entity().getAttribute(Attributes.SERVICE_STATE_EXPECTED) )) { + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + } else { + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STOPPED); + } + DynamicTasks.queue(Tasks.create("demote "+currentActive, new Demote(params)) ).getUnchecked(); + return MutableMap.of("code", ResultCode.NO_PRIMARY_AVAILABLE, "message", "No primary available", "primary", null); + } + + if (newPrimary.equals(currentActive)) { + // If there is a primary and it is unchanged, the effector will end. + return MutableMap.of("code", ResultCode.PRIMARY_UNCHANGED, "message", "No change required", "primary", newPrimary); + } + + log.info("Detected new primary "+newPrimary+" at "+entity()+" (previously had "+currentActive+")"); +// If a new primary is detected, the effector will: +// * set the local entity to the STARTING state +// * clear any "primary-election" problem +// * publish the new primary in a sensor called `primary` (or the sensor set in `primary.sensor.name`) +// * cancel any other ongoing promote calls, and if there is an ongoing demote call on the entity being promoted, cancel that also +// * in parallel +// * invoke `promote` (or the effector called `primary.promote.effector.name`) on the local entity or the entity being promoted +// * invoke `demote` (or the effector called `primary.promote.effector.name`) on the local entity or the entity being demoted, if an entity is being demoted +// * set service up true +// * set the local entity to the RUNNING state + + boolean wasRunning = entity().sensors().get(Attributes.SERVICE_STATE_ACTUAL) == Lifecycle.RUNNING; + if (wasRunning) { + log.debug("Transititioning "+entity()+" to starting while promoting/demoting"); + ServiceStateLogic.setExpectedState(entity(), Lifecycle.STARTING); + } + ServiceProblemsLogic.clearProblemsIndicator(entity(), "primary"); + entity().sensors().set(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME)), newPrimary); + try { + // TODO cancel other promote/demote calls + + promoteAndDemote(params, currentActive, newPrimary); + + log.debug("Promoted/demoted primary for "+entity()+", now setting service up "+(wasRunning ? "and running" : "(but not setting as 'running' because it wasn't 'running' before)")); + entity().sensors().set(Attributes.SERVICE_UP, true); + if (wasRunning) ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING); + + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.debug("Error promoting/demoting primary for "+entity()+" (rethrowing): "+e); + ServiceProblemsLogic.updateProblemsIndicator(entity(), "primary", e); + ServiceStateLogic.setExpectedStateRunningWithErrors(entity()); + Exceptions.propagate(e); + } + + return MutableMap.of("code", ResultCode.NEW_PRIMARY_ELECTED, "message", "New primary found", "primary", newPrimary); + + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + + if (Entities.isNoLongerManaged(entity())) { + // ignore errors if shutting down + return ""; + } + + Lifecycle expected = ServiceStateLogic.getExpectedState(entity()); + if (expected==Lifecycle.RUNNING || expected==Lifecycle.STARTING) { + // including SelectionModeStrictFailed + log.warn("Error electing new primary at "+entity()+": "+Exceptions.collapseText(e)); + ServiceProblemsLogic.updateProblemsIndicator(entity(), "primary", "Error electing primary: "+ + Exceptions.collapseText(e)); + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + + throw Exceptions.propagateAnnotated("Error electing primary (when "+expected.toString().toLowerCase()+")", e); + } + + throw Exceptions.propagateAnnotated("Error electing primary (when not starting/running)", e); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected void promoteAndDemote(ConfigBag params, Entity oldPrimary, Entity newPrimary) { + params.configureStringKey("oldPrimary", oldPrimary); + params.configureStringKey("newPrimary", newPrimary); + MutableList tasks = MutableList.of(); + + if (newPrimary!=null) tasks.append(Tasks.create("promote "+newPrimary, new Promote(params))); + else tasks.append(Tasks.warning("No new primary; nothing to promote", null, false)); + + if (oldPrimary!=null) tasks.append(Tasks.create("demote "+oldPrimary, new Demote(params))); + else tasks.append(Tasks.warning("No old primary; nothing to demote", null, false)); + + log.debug("Running "+tasks); + List result = DynamicTasks.queue(Tasks.parallel("promote/demote", (List>)(List) tasks)).getUnchecked(); + log.debug("Ran "+tasks+", results: "+result); + } + + private Entity getCurrentActive(ConfigBag params) { + return entity().getAttribute(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME))); + } + + protected class CheckPrimaries implements Callable { --- End diff -- made this `@VisibleForTesting public static` and have one call to it from tests now ---