brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rich...@apache.org
Subject [18/21] git commit: added leave cluster call if node exits cluster + added after cluster initialization cluster commit calls
Date Wed, 28 May 2014 16:05:34 GMT
added leave cluster call if node exits cluster + added after cluster initialization cluster
commit calls


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/66461f6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/66461f6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/66461f6c

Branch: refs/heads/master
Commit: 66461f6c2cc7d4a6b9e3eb3b4c13da32a0044541
Parents: 11cf342
Author: ZaidM <zaid.mohsin@cloudsoftcorp.com>
Authored: Thu May 22 18:00:56 2014 +0100
Committer: Andrew Kennedy <andrew.kennedy@cloudsoftcorp.com>
Committed: Wed May 28 16:48:25 2014 +0100

----------------------------------------------------------------------
 .../java/brooklyn/demo/RiakClusterExample.java  | 11 ++++++++-
 .../brooklyn/entity/nosql/riak/RiakCluster.java |  1 +
 .../entity/nosql/riak/RiakClusterImpl.java      | 26 +++++++++++++++++---
 3 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/66461f6c/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
index debfb76..d332a95 100644
--- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
+++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
@@ -12,9 +12,14 @@ import brooklyn.entity.basic.AbstractApplication;
 import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.StartableApplication;
+import brooklyn.entity.nosql.cassandra.CassandraDatacenter;
 import brooklyn.entity.nosql.riak.RiakCluster;
+import brooklyn.entity.nosql.riak.RiakNode;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.launcher.BrooklynLauncher;
+import brooklyn.policy.PolicySpec;
+import brooklyn.policy.ha.ServiceFailureDetector;
+import brooklyn.policy.ha.ServiceRestarter;
 import brooklyn.util.CommandLineUtil;
 
 @Catalog(name = "Riak Cluster Application", description = "Riak ring deployment blueprint")
@@ -43,7 +48,11 @@ public class RiakClusterExample extends AbstractApplication {
 
     public void init() {
         addChild(EntitySpec.create(RiakCluster.class)
-                .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE)));
+                .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE))
+                .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(RiakNode.class)
+                        .policy(PolicySpec.create(ServiceFailureDetector.class))
+                        .policy(PolicySpec.create(ServiceRestarter.class)
+                                .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, ServiceFailureDetector.ENTITY_FAILED))));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/66461f6c/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
index 41aafd2..de65c74 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
@@ -23,4 +23,5 @@ public interface RiakCluster extends DynamicCluster {
     @SetFromFlag("delayBeforeAdvertisingCluster")
     ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class,
"couchbase.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before
checking and advertising its availability", Duration.seconds(2 * 60));
 
+    AttributeSensor<Boolean> IS_CLUSTER_INIT = Sensors.newBooleanSensor("riak.cluster.isClusterInit",
"flag to determine if the cluster was already initialized");
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/66461f6c/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
index 9604c28..3db99d8 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -35,10 +35,9 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
     private AtomicBoolean isFirstNodeSet = new AtomicBoolean();
 
     public void init() {
-        log.info("Initializing the riak cluster...");
         super.init();
-
-
+        log.info("Initializing the riak cluster...");
+        setAttribute(IS_CLUSTER_INIT, false);
     }
 
     @Override
@@ -60,6 +59,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
         if (anyNode.isPresent()) {
             log.info("Planning and Committing cluster changes on node: {}, cluster: {}",
anyNode.get().getId(), getId());
             Entities.invokeEffector(this, anyNode.get(), RiakNode.COMMIT_RIAK_CLUSTER);
+            setAttribute(IS_CLUSTER_INIT, true);
         } else {
             log.warn("No Riak Nodes are found on the cluster: {}. Initialization Failed",
getId());
             setAttribute(SERVICE_STATE, Lifecycle.ON_FIRE);
@@ -119,9 +119,12 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
                     if (anyNodeInCluster.isPresent()) {
                         if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member))
{
 
+
                             String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
                             Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER,
anyNodeName);
-
+                            if (getAttribute(IS_CLUSTER_INIT)) {
+                                Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(),
RiakNode.COMMIT_RIAK_CLUSTER);
+                            }
                             nodes.put(member, riakName);
                             setAttribute(RIAK_CLUSTER_NODES, nodes);
                             log.info("Adding riak node {}: {}; {} to cluster", new Object[]{this,
member, getRiakName(member)});
@@ -134,9 +137,23 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
         } else {
             Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
             if (nodes != null && nodes.containsKey(member)) {
+                final Entity memberToBeRemoved = member;
+
+                Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(),
new Predicate<Entity>() {
+
+                    @Override
+                    public boolean apply(@Nullable Entity node) {
+                        return (node instanceof RiakNode && hasMemberJoinedCluster(node)
&& !node.equals(memberToBeRemoved));
+                    }
+                });
+                if (anyNodeInCluster.isPresent()) {
+                    Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER,
getRiakName(memberToBeRemoved));
+                }
+
                 nodes.remove(member);
                 setAttribute(RIAK_CLUSTER_NODES, nodes);
                 log.info("Removing riak node {}: {}; {} from cluster", new Object[]{this,
member, getRiakName(member)});
+
             }
         }
         if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member);
@@ -174,4 +191,5 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
             ((RiakClusterImpl) super.entity).onServerPoolMemberChanged(entity);
         }
     }
+
 }


Mime
View raw message