brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From grk...@apache.org
Subject [4/6] git commit: Adds MachineEntity class for use in server pools
Date Mon, 23 Jun 2014 14:59:47 GMT
Adds MachineEntity class for use in server pools


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/dcf011e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/dcf011e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/dcf011e7

Branch: refs/heads/master
Commit: dcf011e769c42fad1211d6ccf7f53d89c719402f
Parents: 874fa3a
Author: Sam Corbett <sam.corbett@cloudsoftcorp.com>
Authored: Wed Jun 18 18:15:00 2014 +0100
Committer: Andrew Kennedy <grkvlt@apache.org>
Committed: Mon Jun 23 15:31:20 2014 +0100

----------------------------------------------------------------------
 .../brooklyn/entity/pool/MachineEntity.java     |  19 ++
 .../brooklyn/entity/pool/MachineEntityImpl.java | 186 +++++++++++++++++++
 .../java/brooklyn/entity/pool/ServerPool.java   |   3 +-
 3 files changed, 206 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dcf011e7/software/base/src/main/java/brooklyn/entity/pool/MachineEntity.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/pool/MachineEntity.java b/software/base/src/main/java/brooklyn/entity/pool/MachineEntity.java
new file mode 100644
index 0000000..7589ecb
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/pool/MachineEntity.java
@@ -0,0 +1,19 @@
+package brooklyn.entity.pool;
+
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.time.Duration;
+
+@ImplementedBy(MachineEntityImpl.class)
+public interface MachineEntity extends SoftwareProcess {
+
+    AttributeSensor<Duration> UPTIME = Sensors.newSensor(Duration.class, "pool.machine.uptime",
"Current uptime");
+    AttributeSensor<Double> LOAD_AVERAGE = Sensors.newDoubleSensor("pool.machine.loadAverage",
"Current load average");
+    AttributeSensor<Double> CPU_USAGE = Sensors.newDoubleSensor("pool.machine.cpu",
"Current CPU usage");
+    AttributeSensor<Long> FREE_MEMORY = Sensors.newLongSensor("pool.machine.memory.free",
"Current free memory");
+    AttributeSensor<Long> TOTAL_MEMORY = Sensors.newLongSensor("pool.machine.memory.total",
"Total memory");
+    AttributeSensor<Long> USED_MEMORY = Sensors.newLongSensor("pool.machine.memory.used",
"Current memory usage");
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dcf011e7/software/base/src/main/java/brooklyn/entity/pool/MachineEntityImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/pool/MachineEntityImpl.java b/software/base/src/main/java/brooklyn/entity/pool/MachineEntityImpl.java
new file mode 100644
index 0000000..daf6ff3
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/pool/MachineEntityImpl.java
@@ -0,0 +1,186 @@
+package brooklyn.entity.pool;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Splitter;
+
+import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.entity.basic.SoftwareProcessDriver;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.software.SshEffectorTasks;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.event.feed.function.FunctionPollConfig;
+import brooklyn.location.MachineLocation;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.system.ProcessTaskWrapper;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+public class MachineEntityImpl extends SoftwareProcessImpl implements MachineEntity {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MachineEntityImpl.class);
+
+    private transient FunctionFeed sensorFeed;
+
+    @Override
+    public void init() {
+        LOG.info("Starting server pool machine with id {}", getId());
+    }
+
+    @Override
+    protected void connectSensors() {
+        super.connectSensors();
+        connectServiceUpIsRunning();
+
+        // Sensors linux-specific
+        if (!getDriver().getMachine().getMachineDetails().getOsDetails().isLinux())
+            return;
+
+        sensorFeed = FunctionFeed.builder()
+                .entity(this)
+                .period(Duration.THIRTY_SECONDS)
+                .poll(new FunctionPollConfig<Double, Double>(LOAD_AVERAGE)
+                        .onFailureOrException(Functions.constant(-1d))
+                        .callable(new Callable<Double>() {
+                            @Override
+                            public Double call() throws Exception {
+                                String output = getDriver().execCommand("uptime");
+                                String loadAverage = Strings.getFirstWordAfter(output, "load
average:").replace(",", "");
+                                return Double.valueOf(loadAverage);
+                            }
+                        }))
+                .poll(new FunctionPollConfig<Double, Double>(CPU_USAGE)
+                        .onFailureOrException(Functions.constant(0d))
+                        .callable(new Callable<Double>() {
+                            @Override
+                            public Double call() throws Exception {
+                                String output = getDriver().execCommand("cat /proc/stat");
+                                List<String> cpuData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(output));
+                                Integer system = Integer.parseInt(cpuData.get(1));
+                                Integer user = Integer.parseInt(cpuData.get(3));
+                                Integer idle = Integer.parseInt(cpuData.get(4));
+                                Double cpuUsage = (double) (system + user) / (double) (system
+ user + idle);
+                                return cpuUsage * 100d;
+                            }
+                        }))
+                .poll(new FunctionPollConfig<Long, Long>(USED_MEMORY)
+                        .onFailureOrException(Functions.constant(-1L))
+                        .callable(new Callable<Long>() {
+                            @Override
+                            public Long call() throws Exception {
+                                String output = getDriver().execCommand("free | grep Mem:");
+                                List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(output));
+                                return Long.parseLong(memoryData.get(2));
+                            }
+                        }))
+                .poll(new FunctionPollConfig<Long, Long>(FREE_MEMORY)
+                        .onFailureOrException(Functions.constant(-1L))
+                        .callable(new Callable<Long>() {
+                            @Override
+                            public Long call() throws Exception {
+                                String output = getDriver().execCommand("free | grep Mem:");
+                                List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(output));
+                                return Long.parseLong(memoryData.get(3));
+                            }
+                        }))
+                .poll(new FunctionPollConfig<Long, Long>(TOTAL_MEMORY)
+                        .onFailureOrException(Functions.constant(-1L))
+                        .callable(new Callable<Long>() {
+                            @Override
+                            public Long call() throws Exception {
+                                String output = getDriver().execCommand("free | grep Mem:");
+                                List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(output));
+                                return Long.parseLong(memoryData.get(1));
+                            }
+                        }))
+                .build();
+
+    }
+
+    @Override
+    public void disconnectSensors() {
+        disconnectServiceUpIsRunning();
+        if (sensorFeed != null) sensorFeed.stop();
+        super.disconnectSensors();
+    }
+
+    @Override
+    public Class<?> getDriverInterface() {
+        return SoftwareProcessDriver.class;
+    }
+
+    @Override
+    public MachineEntitySshDriver getDriver() {
+        return (MachineEntitySshDriver) super.getDriver();
+    }
+
+    @Override
+    protected SoftwareProcessDriver newDriver(MachineLocation loc) {
+        return new MachineEntitySshDriver(this, (SshMachineLocation) getMachineOrNull());
+    }
+
+    private static class MachineEntitySshDriver extends AbstractSoftwareProcessSshDriver
{
+
+        public MachineEntitySshDriver(EntityLocal entity, SshMachineLocation machine) {
+            super(entity, machine);
+        }
+
+        @Override
+        public void stop() {
+        }
+
+        @Override
+        public void install() {
+        }
+
+        @Override
+        public void customize() {
+        }
+
+        @Override
+        public void launch() {
+        }
+
+        @Override
+        public boolean isRunning() {
+            return true;
+        }
+
+        public String execCommand(String command) {
+            return execCommand(command, Duration.ONE_MINUTE);
+        }
+
+        public String execCommand(String command, Duration timeout) {
+            try {
+                ProcessTaskWrapper<Integer> task = SshEffectorTasks.ssh(command)
+                        .machine(getMachine())
+                        .summary(Strings.getFirstWord(command))
+                        .newTask();
+                Integer result = DynamicTasks.queueIfPossible(task)
+                        .executionContext(getEntity())
+                        .orSubmitAsync()
+                        .asTask()
+                        .get(timeout);
+                if (result != 0) {
+                    log.warn("Command failed: {}", task.getStderr());
+                    throw new IllegalStateException("Command failed, return code " + result);
+                }
+                return task.getStdout();
+            } catch (TimeoutException te) {
+                throw new IllegalStateException("Timed out running command: " + command);
+            } catch (Exception e) {
+                throw Exceptions.propagate(e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dcf011e7/software/base/src/main/java/brooklyn/entity/pool/ServerPool.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/pool/ServerPool.java b/software/base/src/main/java/brooklyn/entity/pool/ServerPool.java
index f9b6e2b..53a7185 100644
--- a/software/base/src/main/java/brooklyn/entity/pool/ServerPool.java
+++ b/software/base/src/main/java/brooklyn/entity/pool/ServerPool.java
@@ -4,7 +4,6 @@ import java.util.Map;
 
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.EmptySoftwareProcess;
 import brooklyn.entity.group.DynamicCluster;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.entity.proxying.ImplementedBy;
@@ -41,7 +40,7 @@ public interface ServerPool extends DynamicCluster, LocationOwner<ServerPoolLoca
             "pool.claimed", "The number of locations in the pool that are in use");
 
     ConfigKey<EntitySpec<?>> MEMBER_SPEC = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.MEMBER_SPEC,
-            EntitySpec.create(EmptySoftwareProcess.class));
+            EntitySpec.create(MachineEntity.class));
 
     public MachineLocation claimMachine(Map<?, ?> flags) throws NoMachinesAvailableException;
 


Mime
View raw message