brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [26/50] [abbrv] brooklyn-library git commit: sensors at riak cluster level and other minor tidies for riak and couchbase
Date Mon, 01 Feb 2016 17:47:18 GMT
sensors at riak cluster level and other minor tidies for riak and couchbase


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/21ff2059
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/21ff2059
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/21ff2059

Branch: refs/heads/0.7.0-incubating
Commit: 21ff20599391b87f825cdfdb8ab9aea9588e69d4
Parents: cce08e8
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Fri Jun 19 08:31:19 2015 -0700
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Fri Jun 19 08:32:53 2015 -0700

----------------------------------------------------------------------
 .../nosql/couchbase/CouchbaseClusterImpl.java   |  1 +
 .../nosql/couchbase/CouchbaseNodeImpl.java      |  3 --
 .../brooklyn/entity/nosql/riak/RiakCluster.java |  4 ++
 .../entity/nosql/riak/RiakClusterImpl.java      | 55 +++++++++++++++++---
 .../brooklyn/entity/nosql/riak/RiakNode.java    | 11 ++--
 .../entity/nosql/riak/RiakNodeImpl.java         |  4 ++
 6 files changed, 65 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/21ff2059/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
index dcdafb6..5c47fe7 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java
@@ -414,6 +414,7 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements
Couchbas
         if (config().getLocalRaw(UP_QUORUM_CHECK).isAbsent()) {
             // TODO Only leaving CouchbaseQuorumCheck here in case it is contained in persisted
state.
             // If so, need a transformer and then to delete it
+            @SuppressWarnings({ "unused", "hiding" })
             @Deprecated
             class CouchbaseQuorumCheck implements QuorumCheck {
                 @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/21ff2059/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
index e4b1c0a..d7439ca 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java
@@ -21,7 +21,6 @@ package brooklyn.entity.nosql.couchbase;
 import static java.lang.String.format;
 
 import java.net.URI;
-import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
@@ -37,7 +36,6 @@ import brooklyn.entity.effector.EffectorBody;
 import brooklyn.event.AttributeSensor;
 import brooklyn.event.SensorEvent;
 import brooklyn.event.SensorEventListener;
-import brooklyn.event.basic.DependentConfiguration;
 import brooklyn.event.feed.http.HttpFeed;
 import brooklyn.event.feed.http.HttpPollConfig;
 import brooklyn.event.feed.http.HttpValueFunctions;
@@ -55,7 +53,6 @@ import brooklyn.util.guava.TypeTokens;
 import brooklyn.util.http.HttpTool;
 import brooklyn.util.http.HttpToolResponse;
 import brooklyn.util.net.Urls;
-import brooklyn.util.task.DynamicTasks;
 import brooklyn.util.task.Tasks;
 import brooklyn.util.text.Strings;
 import brooklyn.util.time.Duration;

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/21ff2059/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
index 41aa6be..876ebde 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
@@ -58,4 +58,8 @@ public interface RiakCluster extends DynamicCluster {
 
     AttributeSensor<URI> RIAK_CONSOLE_URI = Attributes.MAIN_URI;
 
+    AttributeSensor<Integer> NODE_GETS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.gets.1m.perNode",
"Gets in the last minute, averaged across cluster");
+    AttributeSensor<Integer> NODE_PUTS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.puts.1m.perNode",
"Puts in the last minute, averaged across cluster");
+    AttributeSensor<Integer> NODE_OPS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.ops.1m.perNode",
"Sum of node gets and puts in the last minute, averaged across cluster");
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/21ff2059/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
index 637346e..7b256c0 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -41,10 +41,11 @@ import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
 import brooklyn.entity.group.DynamicClusterImpl;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.entity.trait.Startable;
+import brooklyn.event.AttributeSensor;
 import brooklyn.event.basic.DependentConfiguration;
-import brooklyn.location.Location;
 import brooklyn.policy.EnricherSpec;
 import brooklyn.policy.PolicySpec;
+import brooklyn.util.task.Tasks;
 import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
 
@@ -53,6 +54,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -71,13 +73,20 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
     }
 
     @Override
-    public void start(Collection<? extends Location> locations) {
-        super.start(locations);
+    protected void doStart() {
+        super.doStart();
         connectSensors();
 
-        Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
+        try {
+            Duration delay = getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER);
+            Tasks.setBlockingDetails("Sleeping for "+delay+" before advertising cluster available");
+            Time.sleep(delay);
+        } finally {
+            Tasks.resetBlockingDetails();
+        }
 
         //FIXME: add a quorum to tolerate failed nodes before setting on fire.
+        @SuppressWarnings("unchecked")
         Optional<Entity> anyNode = Iterables.tryFind(getMembers(), Predicates.and(
                 Predicates.instanceOf(RiakNode.class),
                 EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER,
true),
@@ -91,8 +100,9 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
     }
 
     protected EntitySpec<?> getMemberSpec() {
-        return getConfig(MEMBER_SPEC, EntitySpec.create(RiakNode.class));
-
+        EntitySpec<?> result = config().get(MEMBER_SPEC);
+        if (result!=null) return result;
+        return EntitySpec.create(RiakNode.class);
     }
 
     protected void connectSensors() {
@@ -112,6 +122,38 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
                  .fromMembers()
                  .build();
         addEnricher(first);
+        
+        Map<? extends AttributeSensor<? extends Number>, ? extends AttributeSensor<?
extends Number>> enricherSetup = 
+            ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<?
extends Number>>builder()
+                .put(RiakNode.NODE_PUTS, RiakCluster.NODE_PUTS_1MIN_PER_NODE)
+                .put(RiakNode.NODE_GETS, RiakCluster.NODE_GETS_1MIN_PER_NODE)
+                .put(RiakNode.NODE_OPS, RiakCluster.NODE_OPS_1MIN_PER_NODE)
+            .build();
+        // construct sum and average over cluster
+        for (AttributeSensor<? extends Number> nodeSensor : enricherSetup.keySet())
{
+            addSummingMemberEnricher(nodeSensor);
+            addAveragingMemberEnricher(nodeSensor, enricherSetup.get(nodeSensor));
+        }
+    }
+
+    private void addAveragingMemberEnricher(AttributeSensor<? extends Number> fromSensor,
AttributeSensor<? extends Number> toSensor) {
+        addEnricher(Enrichers.builder()
+            .aggregating(fromSensor)
+            .publishing(toSensor)
+            .fromMembers()
+            .computingAverage()
+            .build()
+        );
+    }
+
+    private void addSummingMemberEnricher(AttributeSensor<? extends Number> source)
{
+        addEnricher(Enrichers.builder()
+            .aggregating(source)
+            .publishing(source)
+            .fromMembers()
+            .computingSum()
+            .build()
+        );
     }
 
     protected void onServerPoolMemberChanged(final Entity member) {
@@ -161,6 +203,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
             } else {
                 if (nodes != null && nodes.containsKey(member)) {
                     DependentConfiguration.attributeWhenReady(member, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER,
Predicates.equalTo(false)).blockUntilEnded(Duration.TWO_MINUTES);
+                    @SuppressWarnings("unchecked")
                     Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(),
Predicates.and(
                             Predicates.instanceOf(RiakNode.class),
                             EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER,
true),

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/21ff2059/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
index d82c3f4..1f29366 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
@@ -126,10 +126,10 @@ public interface RiakNode extends SoftwareProcess {
     PortAttributeSensorAndConfigKey SEARCH_SOLR_PORT = new PortAttributeSensorAndConfigKey("search.solr.port",
"Solr port", "8093+");
     PortAttributeSensorAndConfigKey SEARCH_SOLR_JMX_PORT = new PortAttributeSensorAndConfigKey("search.solr.jmx_port",
"Solr port", "8985+");
 
-    AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("riak.node.gets");
-    AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.node.gets.total");
-    AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("riak.node.puts");
-    AttributeSensor<Integer> NODE_PUTS_TOTAL = Sensors.newIntegerSensor("riak.node.puts.total");
+    AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("riak.node.gets",
"Gets in the last minute");
+    AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.node.gets.total",
"Total gets since node started");
+    AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("riak.node.puts",
"Puts in the last minute");
+    AttributeSensor<Integer> NODE_PUTS_TOTAL = Sensors.newIntegerSensor("riak.node.puts.total",
"Total puts since node started");
     AttributeSensor<Integer> VNODE_GETS = Sensors.newIntegerSensor("riak.vnode.gets");
     AttributeSensor<Integer> VNODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.vnode.gets.total");
 
@@ -147,6 +147,9 @@ public interface RiakNode extends SoftwareProcess {
     @SuppressWarnings("serial")
     AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>()
{},
             "ring.members", "all the riak nodes in the ring");
+    
+    AttributeSensor<Integer> NODE_OPS = Sensors.newIntegerSensor("riak.node.ops", "Sum
of node gets and puts in the last minute");
+    AttributeSensor<Integer> NODE_OPS_TOTAL = Sensors.newIntegerSensor("riak.node.ops.total",
"Sum of node gets and puts since the node started");
 
     MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class,
"joinCluster");
     MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class,
"leaveCluster");

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/21ff2059/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
index dea8b87..590cb3a 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
+import brooklyn.enricher.Enrichers;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.SoftwareProcessImpl;
 import brooklyn.entity.webapp.WebAppServiceMethods;
@@ -78,6 +79,7 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode
{
                 maxOpenFiles, defaultMaxOpenFiles);
     }
 
+    @SuppressWarnings("rawtypes")
     public boolean isPackageDownloadUrlProvided() {
         AttributeSensorAndConfigKey[] downloadProperties = { DOWNLOAD_URL_RHEL_CENTOS, DOWNLOAD_URL_UBUNTU,
DOWNLOAD_URL_DEBIAN };
         for (AttributeSensorAndConfigKey property : downloadProperties) {
@@ -182,6 +184,8 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode
{
 
         httpFeed = httpFeedBuilder.build();
 
+        addEnricher(Enrichers.builder().combining(NODE_GETS, NODE_PUTS).computingSum().publishing(NODE_OPS).build());
+        addEnricher(Enrichers.builder().combining(NODE_GETS_TOTAL, NODE_PUTS_TOTAL).computingSum().publishing(NODE_OPS_TOTAL).build());
         WebAppServiceMethods.connectWebAppServerPolicies(this);
     }
 


Mime
View raw message