brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From grk...@apache.org
Subject [2/6] git commit: Update MachineEntity with attributes class from Docker
Date Mon, 23 Jun 2014 14:59:45 GMT
Update MachineEntity with attributes class from Docker


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/28c03c73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/28c03c73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/28c03c73

Branch: refs/heads/master
Commit: 28c03c738036a45f5946ae982444b1dd921202d4
Parents: c8e0013
Author: Andrew Kennedy <andrew.kennedy@cloudsoftcorp.com>
Authored: Thu Jun 19 23:26:55 2014 +0100
Committer: Andrew Kennedy <grkvlt@apache.org>
Committed: Mon Jun 23 15:31:20 2014 +0100

----------------------------------------------------------------------
 .../brooklyn/entity/pool/MachineAttributes.java |  83 +++++++++
 .../brooklyn/entity/pool/MachineEntity.java     |  34 +++-
 .../brooklyn/entity/pool/MachineEntityImpl.java | 171 ++++++++-----------
 3 files changed, 180 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/28c03c73/software/base/src/main/java/brooklyn/entity/pool/MachineAttributes.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/pool/MachineAttributes.java b/software/base/src/main/java/brooklyn/entity/pool/MachineAttributes.java
new file mode 100644
index 0000000..d7a661a
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/pool/MachineAttributes.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2014 by Cloudsoft Corporation Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.pool;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import brooklyn.config.render.RendererHints;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.math.MathFunctions;
+import brooklyn.util.text.ByteSizeStrings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Function;
+
+public class MachineAttributes {
+
+    /** Do not instantiate. */
+    private MachineAttributes() { }
+
+    /*
+     * Sensor attributes for machines.
+     */
+
+    public static final AttributeSensor<Duration> UPTIME = Sensors.newSensor(Duration.class,
"machine.uptime", "Current uptime");
+    public static final AttributeSensor<Double> LOAD_AVERAGE = Sensors.newDoubleSensor("machine.loadAverage",
"Current load average");
+
+    public static final AttributeSensor<Double> CPU_USAGE = Sensors.newDoubleSensor("machine.cpu",
"Current CPU usage");
+    public static final AttributeSensor<Double> AVERAGE_CPU_USAGE = Sensors.newDoubleSensor("cpu.average",
"Average CPU usage across the cluster");
+
+    public static final AttributeSensor<Long> FREE_MEMORY = Sensors.newLongSensor("machine.memory.free",
"Current free memory");
+    public static final AttributeSensor<Long> TOTAL_MEMORY = Sensors.newLongSensor("machine.memory.total",
"Total memory");
+    public static final AttributeSensor<Long> USED_MEMORY = Sensors.newLongSensor("machine.memory.used",
"Current memory usage");
+    public static final AttributeSensor<Double> USED_MEMORY_DELTA_PER_SECOND_LAST =
Sensors.newDoubleSensor("memory.used.delta", "Change in memory usage per second");
+    public static final AttributeSensor<Double> USED_MEMORY_DELTA_PER_SECOND_IN_WINDOW
= Sensors.newDoubleSensor("memory.used.windowed", "Average change in memory usage over 30s");
+
+    private static AtomicBoolean initialized = new AtomicBoolean(false);
+
+    /** Setup renderer hints. */
+    public static void init() {
+        if (initialized.getAndSet(true)) return;
+
+        final Function<Double, Long> longValue = new Function<Double, Long>()
{
+            @Override
+            public Long apply(@Nullable Double input) {
+                if (input == null) return null;
+                return input.longValue();
+            }
+        };
+
+        RendererHints.register(UPTIME, RendererHints.displayValue(Time.toTimeStringRounded()));
+
+        RendererHints.register(CPU_USAGE, RendererHints.displayValue(MathFunctions.percent(2)));
+        RendererHints.register(AVERAGE_CPU_USAGE, RendererHints.displayValue(MathFunctions.percent(2)));
+
+        RendererHints.register(FREE_MEMORY, RendererHints.displayValue(ByteSizeStrings.metric()));
+        RendererHints.register(TOTAL_MEMORY, RendererHints.displayValue(ByteSizeStrings.metric()));
+        RendererHints.register(USED_MEMORY, RendererHints.displayValue(ByteSizeStrings.metric()));
+        RendererHints.register(USED_MEMORY_DELTA_PER_SECOND_LAST, RendererHints.displayValue(Functionals.chain(longValue,
ByteSizeStrings.metric())));
+        RendererHints.register(USED_MEMORY_DELTA_PER_SECOND_IN_WINDOW, RendererHints.displayValue(Functionals.chain(longValue,
ByteSizeStrings.metric())));
+    }
+
+    static {
+        init();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/28c03c73/software/base/src/main/java/brooklyn/entity/pool/MachineEntity.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/pool/MachineEntity.java b/software/base/src/main/java/brooklyn/entity/pool/MachineEntity.java
index 7589ecb..219c9cd 100644
--- a/software/base/src/main/java/brooklyn/entity/pool/MachineEntity.java
+++ b/software/base/src/main/java/brooklyn/entity/pool/MachineEntity.java
@@ -1,19 +1,39 @@
 package brooklyn.entity.pool;
 
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.annotation.EffectorParam;
+import brooklyn.entity.basic.MethodEffector;
 import brooklyn.entity.basic.SoftwareProcess;
 import brooklyn.entity.proxying.ImplementedBy;
 import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.Sensors;
 import brooklyn.util.time.Duration;
 
 @ImplementedBy(MachineEntityImpl.class)
 public interface MachineEntity extends SoftwareProcess {
 
-    AttributeSensor<Duration> UPTIME = Sensors.newSensor(Duration.class, "pool.machine.uptime",
"Current uptime");
-    AttributeSensor<Double> LOAD_AVERAGE = Sensors.newDoubleSensor("pool.machine.loadAverage",
"Current load average");
-    AttributeSensor<Double> CPU_USAGE = Sensors.newDoubleSensor("pool.machine.cpu",
"Current CPU usage");
-    AttributeSensor<Long> FREE_MEMORY = Sensors.newLongSensor("pool.machine.memory.free",
"Current free memory");
-    AttributeSensor<Long> TOTAL_MEMORY = Sensors.newLongSensor("pool.machine.memory.total",
"Total memory");
-    AttributeSensor<Long> USED_MEMORY = Sensors.newLongSensor("pool.machine.memory.used",
"Current memory usage");
+    AttributeSensor<Duration> UPTIME = MachineAttributes.UPTIME;
+    AttributeSensor<Double> LOAD_AVERAGE = MachineAttributes.LOAD_AVERAGE;
+    AttributeSensor<Double> CPU_USAGE = MachineAttributes.CPU_USAGE;
+    AttributeSensor<Long> FREE_MEMORY = MachineAttributes.FREE_MEMORY;
+    AttributeSensor<Long> TOTAL_MEMORY = MachineAttributes.TOTAL_MEMORY;
+    AttributeSensor<Long> USED_MEMORY = MachineAttributes.USED_MEMORY;
+
+    MethodEffector<String> EXEC_COMMAND = new MethodEffector<String>(MachineEntity.class,
"execCommand");
+    MethodEffector<String> EXEC_COMMAND_TIMEOUT = new MethodEffector<String>(MachineEntity.class,
"execCommandTimeout");
+
+    /**
+     * Execute a command and return the output.
+     */
+    @Effector(description="Execute a command and return the output")
+    String execCommand(
+            @EffectorParam(name="command", description="Command") String command);
+
+    /**
+     * Execute a command and return the output, or throw an exception after a timeout.
+     */
+    @Effector(description="Execute a command and return the output")
+    String execCommandTimeout(
+            @EffectorParam(name="command", description="Command") String command,
+            @EffectorParam(name="timeout", description="Timeout") Duration timeout);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/28c03c73/software/base/src/main/java/brooklyn/entity/pool/MachineEntityImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/pool/MachineEntityImpl.java b/software/base/src/main/java/brooklyn/entity/pool/MachineEntityImpl.java
index daf6ff3..4fc87dc 100644
--- a/software/base/src/main/java/brooklyn/entity/pool/MachineEntityImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/pool/MachineEntityImpl.java
@@ -1,23 +1,18 @@
 package brooklyn.entity.pool;
 
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Functions;
-import com.google.common.base.Splitter;
-
-import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
-import brooklyn.entity.basic.EntityLocal;
-import brooklyn.entity.basic.SoftwareProcessDriver;
+import brooklyn.entity.basic.EmptySoftwareProcessDriver;
 import brooklyn.entity.basic.SoftwareProcessImpl;
 import brooklyn.entity.software.SshEffectorTasks;
-import brooklyn.event.feed.function.FunctionFeed;
-import brooklyn.event.feed.function.FunctionPollConfig;
-import brooklyn.location.MachineLocation;
+import brooklyn.event.feed.ssh.SshFeed;
+import brooklyn.event.feed.ssh.SshPollConfig;
+import brooklyn.event.feed.ssh.SshPollValue;
+import brooklyn.location.basic.Machines;
 import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.task.DynamicTasks;
@@ -25,11 +20,19 @@ import brooklyn.util.task.system.ProcessTaskWrapper;
 import brooklyn.util.text.Strings;
 import brooklyn.util.time.Duration;
 
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Splitter;
+
 public class MachineEntityImpl extends SoftwareProcessImpl implements MachineEntity {
 
     private static final Logger LOG = LoggerFactory.getLogger(MachineEntityImpl.class);
 
-    private transient FunctionFeed sensorFeed;
+    static {
+    	MachineAttributes.init();
+    }
+
+    private transient SshFeed sensorFeed;
 
     @Override
     public void init() {
@@ -42,63 +45,61 @@ public class MachineEntityImpl extends SoftwareProcessImpl implements
MachineEnt
         connectServiceUpIsRunning();
 
         // Sensors linux-specific
-        if (!getDriver().getMachine().getMachineDetails().getOsDetails().isLinux())
-            return;
+        if (!getMachine().getMachineDetails().getOsDetails().isLinux()) return;
 
-        sensorFeed = FunctionFeed.builder()
+        sensorFeed = SshFeed.builder()
                 .entity(this)
                 .period(Duration.THIRTY_SECONDS)
-                .poll(new FunctionPollConfig<Double, Double>(LOAD_AVERAGE)
+                .poll(new SshPollConfig<Double>(LOAD_AVERAGE)
+                		.command("uptime")
                         .onFailureOrException(Functions.constant(-1d))
-                        .callable(new Callable<Double>() {
+                        .onSuccess(new Function<SshPollValue, Double>() {
                             @Override
-                            public Double call() throws Exception {
-                                String output = getDriver().execCommand("uptime");
-                                String loadAverage = Strings.getFirstWordAfter(output, "load
average:").replace(",", "");
+                            public Double apply(SshPollValue input) {
+                                String loadAverage = Strings.getFirstWordAfter(input.getStdout(),
"load average:").replace(",", "");
                                 return Double.valueOf(loadAverage);
                             }
                         }))
-                .poll(new FunctionPollConfig<Double, Double>(CPU_USAGE)
+                .poll(new SshPollConfig<Double>(CPU_USAGE)
+                        .command("cat /proc/stat")
                         .onFailureOrException(Functions.constant(0d))
-                        .callable(new Callable<Double>() {
+                        .onSuccess(new Function<SshPollValue, Double>() {
                             @Override
-                            public Double call() throws Exception {
-                                String output = getDriver().execCommand("cat /proc/stat");
-                                List<String> cpuData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(output));
+                            public Double apply(SshPollValue input) {
+                                List<String> cpuData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout()));
                                 Integer system = Integer.parseInt(cpuData.get(1));
                                 Integer user = Integer.parseInt(cpuData.get(3));
                                 Integer idle = Integer.parseInt(cpuData.get(4));
-                                Double cpuUsage = (double) (system + user) / (double) (system
+ user + idle);
-                                return cpuUsage * 100d;
+                                return (double) (system + user) / (double) (system + user
+ idle);
                             }
                         }))
-                .poll(new FunctionPollConfig<Long, Long>(USED_MEMORY)
+                .poll(new SshPollConfig<Long>(USED_MEMORY)
+                		.command("free | grep Mem:")
                         .onFailureOrException(Functions.constant(-1L))
-                        .callable(new Callable<Long>() {
+                        .onSuccess(new Function<SshPollValue, Long>() {
                             @Override
-                            public Long call() throws Exception {
-                                String output = getDriver().execCommand("free | grep Mem:");
-                                List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(output));
+                            public Long apply(SshPollValue input) {
+                                List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout()));
                                 return Long.parseLong(memoryData.get(2));
                             }
                         }))
-                .poll(new FunctionPollConfig<Long, Long>(FREE_MEMORY)
+                .poll(new SshPollConfig<Long>(FREE_MEMORY)
+                		.command("free | grep Mem:")
                         .onFailureOrException(Functions.constant(-1L))
-                        .callable(new Callable<Long>() {
+                        .onSuccess(new Function<SshPollValue, Long>() {
                             @Override
-                            public Long call() throws Exception {
-                                String output = getDriver().execCommand("free | grep Mem:");
-                                List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(output));
+                            public Long apply(SshPollValue input) {
+                                List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout()));
                                 return Long.parseLong(memoryData.get(3));
                             }
                         }))
-                .poll(new FunctionPollConfig<Long, Long>(TOTAL_MEMORY)
+                .poll(new SshPollConfig<Long>(TOTAL_MEMORY)
+                		.command("free | grep Mem:")
                         .onFailureOrException(Functions.constant(-1L))
-                        .callable(new Callable<Long>() {
+                        .onSuccess(new Function<SshPollValue, Long>() {
                             @Override
-                            public Long call() throws Exception {
-                                String output = getDriver().execCommand("free | grep Mem:");
-                                List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(output));
+                            public Long apply(SshPollValue input) {
+                                List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout()));
                                 return Long.parseLong(memoryData.get(1));
                             }
                         }))
@@ -113,73 +114,41 @@ public class MachineEntityImpl extends SoftwareProcessImpl implements
MachineEnt
         super.disconnectSensors();
     }
 
-    @Override
-    public Class<?> getDriverInterface() {
-        return SoftwareProcessDriver.class;
-    }
+	@Override
+	public Class<?> getDriverInterface() {
+		return EmptySoftwareProcessDriver.class;
+	}
 
-    @Override
-    public MachineEntitySshDriver getDriver() {
-        return (MachineEntitySshDriver) super.getDriver();
+    public SshMachineLocation getMachine() {
+    	return Machines.findUniqueSshMachineLocation(getLocations()).get();
     }
 
     @Override
-    protected SoftwareProcessDriver newDriver(MachineLocation loc) {
-        return new MachineEntitySshDriver(this, (SshMachineLocation) getMachineOrNull());
+    public String execCommand(String command) {
+        return execCommandTimeout(command, Duration.ONE_MINUTE);
     }
 
-    private static class MachineEntitySshDriver extends AbstractSoftwareProcessSshDriver
{
-
-        public MachineEntitySshDriver(EntityLocal entity, SshMachineLocation machine) {
-            super(entity, machine);
-        }
-
-        @Override
-        public void stop() {
-        }
-
-        @Override
-        public void install() {
-        }
-
-        @Override
-        public void customize() {
-        }
-
-        @Override
-        public void launch() {
-        }
-
-        @Override
-        public boolean isRunning() {
-            return true;
-        }
-
-        public String execCommand(String command) {
-            return execCommand(command, Duration.ONE_MINUTE);
-        }
-
-        public String execCommand(String command, Duration timeout) {
-            try {
-                ProcessTaskWrapper<Integer> task = SshEffectorTasks.ssh(command)
-                        .machine(getMachine())
-                        .summary(Strings.getFirstWord(command))
-                        .newTask();
-                Integer result = DynamicTasks.queueIfPossible(task)
-                        .executionContext(getEntity())
-                        .orSubmitAsync()
-                        .asTask()
-                        .get(timeout);
-                if (result != 0) {
-                    log.warn("Command failed: {}", task.getStderr());
-                    throw new IllegalStateException("Command failed, return code " + result);
-                }
-                return task.getStdout();
-            } catch (TimeoutException te) {
-                throw new IllegalStateException("Timed out running command: " + command);
-            } catch (Exception e) {
-                throw Exceptions.propagate(e);
+    @Override
+    public String execCommandTimeout(String command, Duration timeout) {
+        try {
+            ProcessTaskWrapper<Integer> task = SshEffectorTasks.ssh(command)
+                    .machine(getMachine())
+                    .summary(Strings.getFirstWord(command))
+                    .newTask();
+            Integer result = DynamicTasks.queueIfPossible(task)
+                    .executionContext(this)
+                    .orSubmitAsync()
+                    .asTask()
+                    .get(timeout);
+            if (result != 0) {
+                LOG.warn("Command failed: {}", task.getStderr());
+                throw new IllegalStateException("Command failed, return code " + result);
             }
+            return task.getStdout();
+        } catch (TimeoutException te) {
+            throw new IllegalStateException("Timed out running command: " + command);
+        } catch (Exception e) {
+            throw Exceptions.propagate(e);
         }
     }
 


Mime
View raw message