brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [05/29] git commit: broolyn cluster upgrade should take extra config, and then set them as config after we've failed over, and has nice names of phases, plus refactor/tidy
Date Mon, 03 Nov 2014 15:51:49 GMT
broolyn cluster upgrade should take extra config, and then set them as config after we've failed
over, and has nice names of phases, plus refactor/tidy


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/294c2176
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/294c2176
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/294c2176

Branch: refs/heads/master
Commit: 294c2176ce258078c7e61ed38eeac8013d45a7a8
Parents: 78d8cc3
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Sun Oct 26 14:12:33 2014 +0000
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Fri Oct 31 09:38:19 2014 -0500

----------------------------------------------------------------------
 .../brooklyn/entity/proxying/EntitySpec.java    |   7 +
 .../entity/brooklynnode/BrooklynCluster.java    |  15 +-
 .../brooklynnode/BrooklynClusterImpl.java       |   1 +
 .../BrooklynClusterUpgradeEffectorBody.java     | 150 ++++++++++---------
 .../BrooklynNodeUpgradeEffectorBody.java        |  81 ++++------
 5 files changed, 126 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/294c2176/api/src/main/java/brooklyn/entity/proxying/EntitySpec.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/entity/proxying/EntitySpec.java b/api/src/main/java/brooklyn/entity/proxying/EntitySpec.java
index e8acea2..7d65121 100644
--- a/api/src/main/java/brooklyn/entity/proxying/EntitySpec.java
+++ b/api/src/main/java/brooklyn/entity/proxying/EntitySpec.java
@@ -42,6 +42,7 @@ import brooklyn.policy.EnricherSpec;
 import brooklyn.policy.Policy;
 import brooklyn.policy.PolicySpec;
 
+import com.google.common.annotations.Beta;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
@@ -229,6 +230,12 @@ public class EntitySpec<T extends Entity> extends AbstractBrooklynObjectSpec<T,E
     public Map<ConfigKey<?>, Object> getConfig() {
         return Collections.unmodifiableMap(config);
     }
+    
+    /** @return Live instance of the config map, for instance in case mass clearances are
desired */
+    @Beta
+    public Map<ConfigKey<?>, Object> getConfigLive() {
+        return config;
+    }
         
     public List<PolicySpec<?>> getPolicySpecs() {
         return policySpecs;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/294c2176/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
index 30a46bd..7b5c095 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
@@ -18,10 +18,12 @@
  */
 package brooklyn.entity.brooklynnode;
 
+import java.util.Map;
+
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.Effector;
 import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.brooklynnode.effector.BrooklynNodeUpgradeEffectorBody;
 import brooklyn.entity.effector.Effectors;
 import brooklyn.entity.group.DynamicCluster;
 import brooklyn.entity.proxying.ImplementedBy;
@@ -45,9 +47,16 @@ public interface BrooklynCluster extends DynamicCluster {
     public static final Effector<Void> SELECT_MASTER = SelectMasterEffector.SELECT_MASTER;
 
     public interface UpgradeClusterEffector {
+        public static final ConfigKey<String> DOWNLOAD_URL = BrooklynNode.DOWNLOAD_URL.getConfigKey();
+        public static final ConfigKey<Map<String,Object>> EXTRA_CONFIG = BrooklynNodeUpgradeEffectorBody.EXTRA_CONFIG;
+
         Effector<Void> UPGRADE_CLUSTER = Effectors.effector(Void.class, "upgradeCluster")
-                .description("Upgrade the cluster with new distribution version")
-                .parameter(SoftwareProcess.DOWNLOAD_URL.getConfigKey())
+                .description("Upgrade the cluster with new distribution version, "
+                    + "by provisioning new nodes with the new version, failing over, "
+                    + "and then deprovisioning the original nodes")
+                .parameter(BrooklynNode.SUGGESTED_VERSION)
+                .parameter(DOWNLOAD_URL)
+                .parameter(EXTRA_CONFIG)
                 .buildAbstract();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/294c2176/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
index 0d1afb6..4247ff3 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
@@ -48,6 +48,7 @@ public class BrooklynClusterImpl extends DynamicClusterImpl implements BrooklynC
     private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class);
 
     static {
+        // XXX not needed or wanted
         RendererHints.register(MASTER_NODE, RendererHints.namedActionWithUrl());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/294c2176/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
index 8215a0b..331f363 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
@@ -23,13 +23,19 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.Effector;
 import brooklyn.entity.Entity;
 import brooklyn.entity.Group;
+import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.EntityTasks;
 import brooklyn.entity.basic.Lifecycle;
 import brooklyn.entity.brooklynnode.BrooklynCluster;
 import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
@@ -40,25 +46,28 @@ import brooklyn.entity.effector.EffectorBody;
 import brooklyn.entity.effector.Effectors;
 import brooklyn.entity.group.DynamicCluster;
 import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.event.AttributeSensor;
+import brooklyn.management.TaskAdaptable;
 import brooklyn.management.ha.HighAvailabilityMode;
 import brooklyn.management.ha.ManagementNodeState;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.config.ConfigBag;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.net.Urls;
-import brooklyn.util.repeat.Repeater;
 import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.Tasks;
 import brooklyn.util.time.Duration;
 
 import com.google.api.client.util.Preconditions;
-import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterables;
 
 public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> implements
UpgradeClusterEffector {
-    public static final Effector<Void> UPGRADE_CLUSTER = Effectors.effector(UpgradeClusterEffector.UPGRADE_CLUSTER).impl(new
BrooklynClusterUpgradeEffectorBody()).build();
+    
+    private static final Logger log = LoggerFactory.getLogger(BrooklynClusterUpgradeEffectorBody.class);
+    
+    public static final Effector<Void> UPGRADE_CLUSTER = Effectors.effector(UpgradeClusterEffector.UPGRADE_CLUSTER)
+        .impl(new BrooklynClusterUpgradeEffectorBody()).build();
 
     private AtomicBoolean upgradeInProgress = new AtomicBoolean();
 
@@ -71,19 +80,35 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void>
imple
         EntitySpec<?> memberSpec = entity().getConfig(BrooklynCluster.MEMBER_SPEC);
         Preconditions.checkNotNull(memberSpec, BrooklynCluster.MEMBER_SPEC.getName() + "
is required for " + UpgradeClusterEffector.class.getName());
 
-        Map<ConfigKey<?>, Object> specCfg = memberSpec.getConfig();
-        String oldDownloadUrl = (String) specCfg.get(BrooklynNode.DOWNLOAD_URL);
-        String oldUploadUrl = (String) specCfg.get(BrooklynNode.DISTRO_UPLOAD_URL);
-        String newDownloadUrl = parameters.get(BrooklynNode.DOWNLOAD_URL.getConfigKey());
-        String newUploadUrl = inferUploadUrl(newDownloadUrl);
+        ConfigBag specCfg = ConfigBag.newInstance( memberSpec.getConfig() );
+        ConfigBag origSpecCfg = ConfigBag.newInstanceCopying(specCfg);
+        log.debug("Upgrading "+entity()+", changing "+BrooklynCluster.MEMBER_SPEC+" from
"+memberSpec+" / "+origSpecCfg);
+        
         try {
-            memberSpec.configure(BrooklynNode.DOWNLOAD_URL, newUploadUrl);
-            memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, newUploadUrl);
+            String newDownloadUrl = parameters.get(DOWNLOAD_URL);
+            specCfg.putIfNotNull(DOWNLOAD_URL, newDownloadUrl);
+            specCfg.put(BrooklynNode.DISTRO_UPLOAD_URL, inferUploadUrl(newDownloadUrl));
+            
+            specCfg.putAll(ConfigBag.newInstance(parameters.get(EXTRA_CONFIG)).getAllConfigAsConfigKeyMap());
+            
+            Map<ConfigKey<?>, Object> cfgLive = memberSpec.getConfigLive();
+            cfgLive.clear();
+            cfgLive.putAll(specCfg.getAllConfigAsConfigKeyMap());
+            // not necessary, but good practice
+            entity().setConfig(BrooklynCluster.MEMBER_SPEC, memberSpec);
+            log.debug("Upgrading "+entity()+", new "+BrooklynCluster.MEMBER_SPEC+": "+memberSpec+"
/ "+specCfg);
+            
             upgrade(parameters);
         } catch (Exception e) {
-            memberSpec.configure(BrooklynNode.DOWNLOAD_URL, oldDownloadUrl);
-            memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, oldUploadUrl);
+            log.debug("Upgrading "+entity()+" failed, will rethrow after restoring "+BrooklynCluster.MEMBER_SPEC+"
to: "+origSpecCfg);
+            Map<ConfigKey<?>, Object> cfgLive = memberSpec.getConfigLive();
+            cfgLive.clear();
+            cfgLive.putAll(origSpecCfg.getAllConfigAsConfigKeyMap());
+            // not necessary, but good practice
+            entity().setConfig(BrooklynCluster.MEMBER_SPEC, memberSpec);
+            
             throw Exceptions.propagate(e);
+            
         } finally {
             upgradeInProgress.set(false);
         }
@@ -91,6 +116,7 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void>
imple
     }
 
     private String inferUploadUrl(String newDownloadUrl) {
+        if (newDownloadUrl==null) return null;
         boolean isLocal = "file".equals(Urls.getProtocol(newDownloadUrl)) || new File(newDownloadUrl).exists();
         if (isLocal) {
             return newDownloadUrl;
@@ -99,22 +125,30 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void>
imple
         }
     }
 
-    private void upgrade(ConfigBag parameters) {
-        //TODO might be worth separating each step in a task for better UI
-        //TODO currently this will fight with auto-scaler policies; you should turn them
off
+    protected void upgrade(ConfigBag parameters) {
+        //TODO currently this will fight with auto-scaler policies; they must be turned off
for upgrade to work
 
         Group cluster = (Group)entity();
         Collection<Entity> initialMembers = cluster.getMembers();
         int initialClusterSize = initialMembers.size();
+        
+        if (!BrooklynNodeUpgradeEffectorBody.isPersistenceModeEnabled(cluster)) {
+            // would could try a `forcePersistNow`, but that's sloppy; 
+            // for now, require HA/persistence for upgrading 
+            DynamicTasks.queue( Tasks.warning("Check persistence", 
+                new IllegalStateException("Persistence does not appear to be enabled at this
cluster. "
+                + "Cluster upgrade will not succeed unless a custom launch script enables
it.")) );
+        }
 
         //1. Initially create a single node to check if it will launch successfully
-        Entity initialNode = Iterables.getOnlyElement(createNodes(1));
+        TaskAdaptable<Collection<Entity>> initialNodeTask = DynamicTasks.queue(newCreateNodesTask(1,
"Creating first upgraded version node"));
 
         //2. If everything is OK with the first node launch the rest as well
-        Collection<Entity> remainingNodes = createNodes(initialClusterSize - 1);
+        TaskAdaptable<Collection<Entity>> remainingNodesTask = DynamicTasks.queue(newCreateNodesTask(initialClusterSize
- 1, "Creating remaining upgraded version nodes ("+(initialClusterSize - 1)+")"));
 
         //3. Once we have all nodes running without errors switch master
-        DynamicTasks.queue(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, MutableMap.of(SelectMasterEffector.NEW_MASTER_ID,
initialNode.getId()))).asTask().getUnchecked();
+        DynamicTasks.queue(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, MutableMap.of(SelectMasterEffector.NEW_MASTER_ID,

+            Iterables.getOnlyElement(initialNodeTask.asTask().getUnchecked()).getId()))).asTask().getUnchecked();
 
         //4. Stop the nodes which were running at the start of the upgrade call, but keep
them around.
         //   Should we create a quarantine-like zone for old stopped version?
@@ -125,34 +159,46 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void>
imple
         //version on this cluster before the above select-master call, and then delete any
which are running the old
         //version (would require tracking the version number at the entity)
         HashSet<Entity> oldMembers = new HashSet<Entity>(initialMembers);
-        oldMembers.removeAll(remainingNodes);
-        oldMembers.remove(initialNode);
+        oldMembers.removeAll(remainingNodesTask.asTask().getUnchecked());
+        oldMembers.removeAll(initialNodeTask.asTask().getUnchecked());
         DynamicTasks.queue(Effectors.invocation(BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, Collections.emptyMap(),
oldMembers)).asTask().getUnchecked();
     }
 
-    private Collection<Entity> createNodes(int nodeCnt) {
+    private TaskAdaptable<Collection<Entity>> newCreateNodesTask(int size, String
name) {
+        return Tasks.<Collection<Entity>>builder().name(name).body(new CreateNodesCallable(size)).build();
+    }
+
+    protected class CreateNodesCallable implements Callable<Collection<Entity>>
{
+        private final int size;
+        public CreateNodesCallable(int size) {
+            this.size = size;
+        }
+        @Override
+        public Collection<Entity> call() throws Exception {
+            return createNodes(size);
+        }
+    }
+
+    protected Collection<Entity> createNodes(int nodeCnt) {
         DynamicCluster cluster = (DynamicCluster)entity();
 
         //1. Create the nodes
         Collection<Entity> newNodes = cluster.resizeByDelta(nodeCnt);
 
-        //2. Wait for them to be RUNNING
-        waitAttributeNotEqualTo(
-                newNodes,
-                BrooklynNode.SERVICE_STATE_ACTUAL, Lifecycle.STARTING);
+        //2. Wait for them to be RUNNING (or at least STARTING to have completed)
+        // (should already be the case, because above is synchronous and, we think, it will
fail if start does not succeed)
+        DynamicTasks.queue(EntityTasks.awaitingAttribute(newNodes, Attributes.SERVICE_STATE_ACTUAL,

+                Predicates.not(Predicates.equalTo(Lifecycle.STARTING)), Duration.minutes(30)));
 
         //3. Set HOT_STANDBY in case it is not enabled on the command line ...
         DynamicTasks.queue(Effectors.invocation(
                 BrooklynNode.SET_HA_MODE,
                 MutableMap.of(SetHAModeEffector.MODE, HighAvailabilityMode.HOT_STANDBY),

                 newNodes)).asTask().getUnchecked();
-
-        //4. ... and wait until all of the nodes change state
-        //TODO if the REST call is blocking this is not needed
-        waitAttributeEqualTo(
-                newNodes,
-                BrooklynNode.MANAGEMENT_NODE_STATE,
-                ManagementNodeState.HOT_STANDBY);
+        //... and wait until all of the nodes change state
+        // TODO fail quicker if state changes to FAILED
+        DynamicTasks.queue(EntityTasks.awaitingAttribute(newNodes, BrooklynNode.MANAGEMENT_NODE_STATE,

+                Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.FIVE_MINUTES));
 
         //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out
ON_FIRE as well)
         Collection<Entity> failedNodes = Collections2.filter(newNodes, EntityPredicates.attributeEqualTo(BrooklynNode.SERVICE_UP,
Boolean.FALSE));
@@ -162,42 +208,4 @@ public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void>
imple
         return newNodes;
     }
 
-    private <T> void waitAttributeEqualTo(Collection<Entity> nodes, AttributeSensor<T>
sensor, T value) {
-        waitPredicate(
-                nodes, 
-                EntityPredicates.attributeEqualTo(sensor, value),
-                "Waiting for nodes " + nodes + ", sensor " + sensor + " to be " + value,
-                "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to
change to " + value);
-    }
-
-    private <T> void waitAttributeNotEqualTo(Collection<Entity> nodes, AttributeSensor<T>
sensor, T value) {
-        waitPredicate(
-                nodes, 
-                EntityPredicates.attributeNotEqualTo(sensor, value),
-                "Waiting for nodes " + nodes + ", sensor " + sensor + " to change from "
+ value,
-                "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to
change from " + value);
-    }
-
-    private <T extends Entity> void waitPredicate(Collection<T> nodes, Predicate<T>
waitPredicate, String blockingMsg, String errorMsg) {
-        Tasks.setBlockingDetails(blockingMsg);
-        boolean pollSuccess = Repeater.create(blockingMsg)
-            .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
-            .limitTimeTo(Duration.ONE_HOUR)
-            .until(nodes, allMatch(waitPredicate))
-            .run();
-        Tasks.resetBlockingDetails();
-
-        if (!pollSuccess) {
-            throw new IllegalStateException(errorMsg);
-        }
-    }
-
-    public static <T> Predicate<Collection<T>> allMatch(final Predicate<T>
predicate) {
-        return new Predicate<Collection<T>>() {
-            @Override
-            public boolean apply(Collection<T> input) {
-                return Iterables.all(input, predicate);
-            }
-        };
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/294c2176/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
index a6433aa..299bd5b 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
@@ -19,7 +19,6 @@
 package brooklyn.entity.brooklynnode.effector;
 
 import java.util.Map;
-import java.util.concurrent.Callable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,33 +28,27 @@ import brooklyn.entity.Effector;
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.EntityTasks;
 import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.brooklynnode.BrooklynCluster;
 import brooklyn.entity.brooklynnode.BrooklynNode;
 import brooklyn.entity.effector.EffectorBody;
 import brooklyn.entity.effector.Effectors;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.entity.software.SshEffectorTasks;
-import brooklyn.event.AttributeSensor;
 import brooklyn.event.basic.MapConfigKey;
-import brooklyn.management.TaskAdaptable;
 import brooklyn.management.ha.HighAvailabilityMode;
 import brooklyn.management.ha.ManagementNodeState;
 import brooklyn.util.config.ConfigBag;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.exceptions.ReferenceWithError;
-import brooklyn.util.guava.Functionals;
 import brooklyn.util.net.Urls;
-import brooklyn.util.repeat.Repeater;
 import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.Tasks;
 import brooklyn.util.text.Identifiers;
 import brooklyn.util.text.Strings;
 import brooklyn.util.time.Duration;
 
-import com.google.common.base.Functions;
+import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.reflect.TypeToken;
 
@@ -70,21 +63,28 @@ public class BrooklynNodeUpgradeEffectorBody extends EffectorBody<Void>
{
     private static final Logger log = LoggerFactory.getLogger(BrooklynNodeUpgradeEffectorBody.class);
     
     public static final ConfigKey<String> DOWNLOAD_URL = BrooklynNode.DOWNLOAD_URL.getConfigKey();
-    public static final ConfigKey<Map<String,Object>> EXTRA_CONFIG = MapConfigKey.builder(new
TypeToken<Map<String,Object>>() {}).name("extraConfig").description("Additional
new config to set on this entity as part of upgrading").build();
+    public static final ConfigKey<Map<String,Object>> EXTRA_CONFIG = MapConfigKey.builder(new
TypeToken<Map<String,Object>>() {})
+        .name("extraConfig")
+        .description("Additional new config to set on the BrooklynNode as part of upgrading")
+        .build();
 
     public static final Effector<Void> UPGRADE = Effectors.effector(Void.class, "upgrade")
-        .description("Changes the Brooklyn build used to run this node, by spawning a dry-run
node then copying the installed files across. "
+        .description("Changes the Brooklyn build used to run this node, "
+            + "by spawning a dry-run node then copying the installed files across. "
             + "This node must be running for persistence for in-place upgrading to work.")
-        .parameter(BrooklynNode.SUGGESTED_VERSION).parameter(DOWNLOAD_URL).parameter(EXTRA_CONFIG)
+        .parameter(BrooklynNode.SUGGESTED_VERSION)
+        .parameter(DOWNLOAD_URL)
+        .parameter(EXTRA_CONFIG)
         .impl(new BrooklynNodeUpgradeEffectorBody()).build();
     
     @Override
     public Void call(ConfigBag parametersO) {
         if (!isPersistenceModeEnabled(entity())) {
             // would could try a `forcePersistNow`, but that's sloppy; 
-            // for now, require HA/persistence for upgrading 
-            DynamicTasks.queue( Tasks.warning("Persistence does not appear to be enabled
at this node. "
-                + "In-place upgrade is unlikely to succeed.", null) );
+            // for now, require HA/persistence for upgrading
+            DynamicTasks.queue( Tasks.warning("Check persistence", 
+                new IllegalStateException("Persistence does not appear to be enabled at this
cluster. "
+                + "In-place node upgrade will not succeed unless a custom launch script enables
it.")) );
         }
 
         ConfigBag parameters = ConfigBag.newInstanceCopying(parametersO);
@@ -114,6 +114,7 @@ public class BrooklynNodeUpgradeEffectorBody extends EffectorBody<Void>
{
             .configure(parameters.getAllConfig()));
 
         //force this to start as hot-standby
+        // TODO alternatively could use REST API as in BrooklynClusterUpgradeEffectorBody
         String launchParameters = dryRunChild.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
         if (Strings.isBlank(launchParameters)) launchParameters = "";
         else launchParameters += " ";
@@ -127,8 +128,8 @@ public class BrooklynNodeUpgradeEffectorBody extends EffectorBody<Void>
{
         DynamicTasks.queue(Effectors.invocation(dryRunChild, BrooklynNode.START, ConfigBag.EMPTY));
 
         // 2 confirm hot standby status
-        DynamicTasks.queue(newWaitForAttributeTask(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE,

-            Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.TWO_MINUTES));
+        DynamicTasks.queue(EntityTasks.awaitingAttribute(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE,

+            Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.FIVE_MINUTES));
 
         // 3 stop new version
         // 4 stop old version
@@ -166,47 +167,19 @@ public class BrooklynNodeUpgradeEffectorBody extends EffectorBody<Void>
{
         return null;
     }
 
-    private boolean isPersistenceModeEnabled(EntityInternal entity) {
+    @Beta
+    static boolean isPersistenceModeEnabled(Entity entity) {
         // TODO when there are PERSIST* options in BrooklynNode, look at them here!
         // or, better, have a sensor for persistence
-        String params = entity.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
+        String params = null;
+        if (entity instanceof BrooklynCluster) {
+            EntitySpec<?> spec = entity.getConfig(BrooklynCluster.MEMBER_SPEC);
+            params = Strings.toString( spec.getConfig().get(BrooklynNode.EXTRA_LAUNCH_PARAMETERS)
);
+        }
+        if (params==null) params = entity.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
         if (params==null) return false;
         if (params.indexOf("persist")==0) return false;
         return true;
     }
 
-    private static class WaitForRepeaterCallable implements Callable<Boolean> {
-        protected Repeater repeater;
-        protected boolean requireTrue;
-
-        public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) {
-            this.repeater = repeater;
-            this.requireTrue = requireTrue;
-        }
-
-        @Override
-        public Boolean call() {
-            ReferenceWithError<Boolean> result = repeater.runKeepingError();
-            if (Boolean.TRUE.equals(result.getWithoutError()))
-                return true;
-            if (result.hasError()) 
-                throw Exceptions.propagate(result.getError());
-            if (requireTrue)
-                throw new IllegalStateException("timeout - "+repeater.getDescription());
-            return false;
-        }
-    }
-
-    private static <T> TaskAdaptable<Boolean> newWaitForAttributeTask(Entity
node, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
-        return awaiting( Repeater.create("waiting on "+node+" "+sensor.getName()+" "+condition)
-                    .backoff(Duration.millis(10), 1.5, Duration.millis(200))
-                    .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout)
-                    .until(Functionals.callable(Functions.forPredicate(EntityPredicates.attributeSatisfies(sensor,
condition)), node)),
-                    true);
-    }
-
-    private static TaskAdaptable<Boolean> awaiting(Repeater repeater, boolean requireTrue)
{
-        return Tasks.<Boolean>builder().name(repeater.getDescription()).body(new WaitForRepeaterCallable(repeater,
requireTrue)).build();
-    }
-
 }


Mime
View raw message