brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [1/5] brooklyn-server git commit: add InvokeEffectorOnSensorChange and let SSH run over members
Date Fri, 26 Aug 2016 10:54:58 GMT
Repository: brooklyn-server
Updated Branches:
  refs/heads/master 41f723f86 -> 9238a1081


add InvokeEffectorOnSensorChange and let SSH run over members

helps w DNS, likely many others


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/2ab4e5c1
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/2ab4e5c1
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/2ab4e5c1

Branch: refs/heads/master
Commit: 2ab4e5c10a6df18353cee5e3c4b51a6ddec76c0f
Parents: 8ff627c
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Thu Aug 25 16:16:23 2016 +0100
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Thu Aug 25 16:17:41 2016 +0100

----------------------------------------------------------------------
 .../core/effector/ssh/SshCommandEffector.java   | 97 +++++++++++++++++---
 .../SshCommandMembershipTrackingPolicy.java     | 20 +++-
 .../policy/InvokeEffectorOnSensorChange.java    | 71 ++++++++++++++
 3 files changed, 171 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2ab4e5c1/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshCommandEffector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshCommandEffector.java
b/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshCommandEffector.java
index 957d68e..8d7ca13 100644
--- a/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshCommandEffector.java
+++ b/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshCommandEffector.java
@@ -18,15 +18,14 @@
  */
 package org.apache.brooklyn.core.effector.ssh;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Maps;
-
 import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.effector.ParameterType;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.config.MapConfigKey;
@@ -34,13 +33,25 @@ import org.apache.brooklyn.core.effector.AddEffector;
 import org.apache.brooklyn.core.effector.EffectorBody;
 import org.apache.brooklyn.core.effector.Effectors;
 import org.apache.brooklyn.core.effector.Effectors.EffectorBuilder;
+import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks.SshEffectorTaskFactory;
+import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.location.Locations;
 import org.apache.brooklyn.core.sensor.ssh.SshCommandSensor;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.json.ShellEnvironmentSerializer;
+import org.apache.brooklyn.util.core.task.TaskBuilder;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Maps;
 
 public final class SshCommandEffector extends AddEffector {
 
@@ -48,6 +59,16 @@ public final class SshCommandEffector extends AddEffector {
     public static final ConfigKey<String> EFFECTOR_EXECUTION_DIR = SshCommandSensor.SENSOR_EXECUTION_DIR;
     public static final MapConfigKey<Object> EFFECTOR_SHELL_ENVIRONMENT = BrooklynConfigKeys.SHELL_ENVIRONMENT;
 
+    public static enum ExecutionTarget {
+        ENTITY,
+        MEMBERS,
+        CHILDREN
+    }
+    public static final ConfigKey<ExecutionTarget> EXECUTION_TARGET = ConfigKeys.newConfigKey(ExecutionTarget.class,
"executionTarget", 
+        "Where this command should run; by default on this 'entity'; alternatively on all
'children' or all 'members' (if it's a group); "
+        + "in the latter cases the sets are filtered by entities which have a machine and
are not stopping.",
+        ExecutionTarget.ENTITY);
+
     public SshCommandEffector(ConfigBag params) {
         super(newEffectorBuilder(params).build());
     }
@@ -65,16 +86,60 @@ public final class SshCommandEffector extends AddEffector {
     protected static class Body extends EffectorBody<String> {
         private final Effector<?> effector;
         private final String command;
+        private final Map<String, Object> shellEnv;
         private final String executionDir;
+        private final ExecutionTarget executionTarget;
 
         public Body(Effector<?> eff, ConfigBag params) {
             this.effector = eff;
             this.command = Preconditions.checkNotNull(params.get(EFFECTOR_COMMAND), "SSH
command must be supplied when defining this effector");
+            this.shellEnv = params.get(EFFECTOR_SHELL_ENVIRONMENT);
             this.executionDir = params.get(EFFECTOR_EXECUTION_DIR);
+            this.executionTarget = params.get(EXECUTION_TARGET);
         }
 
         @Override
         public String call(ConfigBag params) {
+            switch (executionTarget) {
+            case ENTITY:
+                return callOne(params);
+            case MEMBERS:
+                return callMany(((Group)entity()).getMembers(), params);
+            case CHILDREN:
+                return callMany(entity().getChildren(), params);
+            default:
+                throw new IllegalStateException("Unknown value passed as execution target:
" + executionTarget);
+            }
+        }
+        
+        public String callOne(ConfigBag params) {
+            return queue(
+                makePartialTaskFactory(params)
+                    .summary("effector "+effector.getName()+" ssh call")
+                ).get();
+        }
+        public String callMany(Collection<Entity> targets, ConfigBag params) {
+            TaskBuilder<Object> ptb = Tasks.builder().parallel(true).displayName("effector
"+effector.getName()+" ssh to targets");
+            for (Entity target: targets) {
+                if (Entities.isNoLongerManaged(target)) continue;
+                
+                Lifecycle state = target.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
+                if (state==Lifecycle.STOPPING || state==Lifecycle.STOPPED) continue;
+
+                Maybe<SshMachineLocation> machine = Locations.findUniqueSshMachineLocation(target.getLocations());
+                if (machine.isAbsent()) continue;
+                
+                SshEffectorTaskFactory<String> t = makePartialTaskFactory(params);
+                t.summary("effector "+effector.getName()+" at "+target); 
+                t.machine( machine.get() );
+                
+                ptb.add(t.newTask());
+            }
+            queue(ptb.build()).getUnchecked();
+            return null;
+        }
+        
+        public SshEffectorTaskFactory<String> makePartialTaskFactory(ConfigBag params)
{
             String sshCommand = SshCommandSensor.makeCommandExecutingInDirectory(command,
executionDir, entity());
 
             MutableMap<String, Object> env = MutableMap.of();
@@ -84,33 +149,39 @@ public final class SshCommandEffector extends AddEffector {
                 env.addIfNotNull(param.getName(), params.get(Effectors.asConfigKey(param)));
             }
 
-            // Set things from the entities defined shell environment, if applicable
+            // Set things from the entity's defined shell environment, if applicable
             env.putAll(entity().config().get(BrooklynConfigKeys.SHELL_ENVIRONMENT));
 
-            // Add the shell environment entries from our configuration
-            Map<String, Object> effectorEnv = params.get(EFFECTOR_SHELL_ENVIRONMENT);
-            if (effectorEnv != null) env.putAll(effectorEnv);
-
             // Set the parameters we've been passed. This will repeat declared parameters
but to no harm,
             // it may pick up additional values (could be a flag defining whether this is
permitted or not.)
             // Make sure we do not include the shell.env here again, by filtering it out.
             env.putAll(Maps.filterKeys(params.getAllConfig(), Predicates.not(Predicates.equalTo(EFFECTOR_SHELL_ENVIRONMENT.getName()))));
 
+            // Add the shell environment entries from the effector configuration
+            if (shellEnv != null) env.putAll(shellEnv);
+            
+            // Add the shell environment entries from our invocation
+            Map<String, Object> effectorEnv = params.get(EFFECTOR_SHELL_ENVIRONMENT);
+            if (effectorEnv != null) env.putAll(effectorEnv);
+            
             // Try to resolve the configuration in the env Map
             try {
-                env = (MutableMap<String, Object>) Tasks.resolveDeepValue(env, Object.class,
entity().getExecutionContext());
+                env = MutableMap.copyOf(resolveEnv(env));
             } catch (InterruptedException | ExecutionException e) {
                 Exceptions.propagateIfFatal(e);
             }
 
             // Execute the effector with the serialized environment strings
             ShellEnvironmentSerializer serializer = new ShellEnvironmentSerializer(entity().getManagementContext());
-            SshEffectorTasks.SshEffectorTaskFactory<String> task = SshEffectorTasks.ssh(sshCommand)
+            
+            return SshEffectorTasks.ssh(sshCommand)
                     .requiringZeroAndReturningStdout()
-                    .summary("effector "+effector.getName())
                     .environmentVariables(serializer.serialize(env));
+        }
 
-            return queue(task).get();
+        @SuppressWarnings("unchecked")
+        private Map<String, Object> resolveEnv(MutableMap<String, Object> env)
throws ExecutionException, InterruptedException {
+            return (Map<String, Object>) Tasks.resolveDeepValue(env, Object.class,
entity().getExecutionContext());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2ab4e5c1/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
b/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
index 432364c..7810ce4 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/SshCommandMembershipTrackingPolicy.java
@@ -28,8 +28,11 @@ import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.config.MapConfigKey;
 import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
+import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
 import org.apache.brooklyn.core.location.Locations;
 import org.apache.brooklyn.core.location.Machines;
 import org.apache.brooklyn.core.sensor.ssh.SshCommandSensor;
@@ -61,11 +64,16 @@ import com.google.common.base.Preconditions;
  * executed. This can be one of: the {@link ExecutionTarget#ENTITY owing entity};
  * the {@link ExecutionTarget#MEMBER member} that was updated; or
  * {@link ExecutionTarget#ALL_MEMBERS all members} of the group.
+ * 
+ * @deprecated introduced and removed in 0.10.0-snapshot; use a combination of 
+ * InvokeEffectorOnSensorChange and SshCommandEffector instead
+ * (much simpler semantics as you normally will want to listen to a local sensor
+ * (which in turn is aggregated over members) instead of children sensors directly).
+ * If there is a need for {@link #EVENT_TYPE} or {@link #MEMBER_ID} then we'll want
+ * InvokeEffectorOnSensorChange to be able to subscribe to members/children etc as well,
+ * and pass parameters based on source.entityId and source.sensor.
  */
-// TODO might make sense to split up behaviour into two classes,
-// an InvokeEffectorMembershipTrackingPolicy and an SshMultiEntityCommandEffector -- 
-// where the latter has the configurable target introduced here
-@Beta
+@Deprecated
 public class SshCommandMembershipTrackingPolicy extends AbstractMembershipTrackingPolicy
{
 
     private static final Logger LOG = LoggerFactory.getLogger(SshCommandMembershipTrackingPolicy.class);
@@ -148,6 +156,10 @@ public class SshCommandMembershipTrackingPolicy extends AbstractMembershipTracki
 
     @SuppressWarnings("unchecked")
     public void execute(Entity target, String command, String type, String memberId) {
+        if (Entities.isNoLongerManaged(target)) return;
+        Lifecycle state = target.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
+        if (state==Lifecycle.STOPPING || state==Lifecycle.STOPPED) return;
+        
         Collection<? extends Location> locations = Locations.getLocationsCheckingAncestors(target.getLocations(),
target);
         Maybe<SshMachineLocation> machine = Machines.findUniqueMachineLocation(locations,
SshMachineLocation.class);
         if (machine.isAbsentOrNull()) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2ab4e5c1/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
new file mode 100644
index 0000000..8a81f49
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.policy.AbstractPolicy;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Invokes the given effector when the policy changes.
+ * 
+ * TODO
+ * * support parameters
+ * * support conditions
+ * * allow to be triggered by sensors on members
+ */
+public class InvokeEffectorOnSensorChange extends AbstractPolicy implements SensorEventListener<Object>
{
+    
+    private static final Logger LOG = LoggerFactory.getLogger(InvokeEffectorOnSensorChange.class);
+
+    public static final ConfigKey<Object> SENSOR = ConfigKeys.newConfigKey(Object.class,

+            "sensor", "Sensor to be monitored, as string or sensor type");
+
+    public static final ConfigKey<String> EFFECTOR = ConfigKeys.newStringConfigKey(
+            "effector", "Name of effector to invoke");
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+        Preconditions.checkNotNull(getConfig(EFFECTOR), EFFECTOR);
+        Object sensor = Preconditions.checkNotNull(getConfig(SENSOR), SENSOR);
+        if (sensor instanceof String) sensor = Sensors.newSensor(Object.class, (String)sensor);
+        subscriptions().subscribe(entity, (Sensor<?>)sensor, this);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public void onEvent(SensorEvent<Object> event) {
+        entity.invoke((Effector)entity.getEntityType().getEffectorByName( getConfig(EFFECTOR)
).get(), (Map)MutableMap.of());
+    }
+    
+}


Mime
View raw message