Return-Path: X-Original-To: apmail-brooklyn-commits-archive@minotaur.apache.org Delivered-To: apmail-brooklyn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 332EB1122E for ; Fri, 23 May 2014 14:52:02 +0000 (UTC) Received: (qmail 82521 invoked by uid 500); 23 May 2014 14:52:02 -0000 Delivered-To: apmail-brooklyn-commits-archive@brooklyn.apache.org Received: (qmail 82500 invoked by uid 500); 23 May 2014 14:52:02 -0000 Mailing-List: contact commits-help@brooklyn.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.incubator.apache.org Delivered-To: mailing list commits@brooklyn.incubator.apache.org Received: (qmail 82493 invoked by uid 99); 23 May 2014 14:52:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 May 2014 14:52:02 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 23 May 2014 14:52:00 +0000 Received: (qmail 81757 invoked by uid 99); 23 May 2014 14:51:33 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 May 2014 14:51:33 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 46E139A46FA; Fri, 23 May 2014 14:51:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: richard@apache.org To: commits@brooklyn.incubator.apache.org Date: Fri, 23 May 2014 14:52:03 -0000 Message-Id: <3fa893a552f54b60b09e3d2940aa9432@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [32/50] [abbrv] git commit: added leave cluster call if node exits cluster + added after cluster initialization cluster commit calls X-Virus-Checked: Checked by ClamAV on apache.org added leave cluster call if node exits cluster + added after cluster initialization cluster commit calls Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/dd001766 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/dd001766 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/dd001766 Branch: refs/pull/1411/merge Commit: dd001766a080e14361cc23d48379cd7e9db919d2 Parents: 16c5abb Author: ZaidM Authored: Thu May 22 18:00:56 2014 +0100 Committer: ZaidM Committed: Thu May 22 18:06:02 2014 +0100 ---------------------------------------------------------------------- .../java/brooklyn/demo/RiakClusterExample.java | 11 ++++++++- .../brooklyn/entity/nosql/riak/RiakCluster.java | 1 + .../entity/nosql/riak/RiakClusterImpl.java | 26 +++++++++++++++++--- 3 files changed, 33 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd001766/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java ---------------------------------------------------------------------- diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java index debfb76..d332a95 100644 --- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java +++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java @@ -12,9 +12,14 @@ import brooklyn.entity.basic.AbstractApplication; import brooklyn.entity.basic.ConfigKeys; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.StartableApplication; +import brooklyn.entity.nosql.cassandra.CassandraDatacenter; import brooklyn.entity.nosql.riak.RiakCluster; +import brooklyn.entity.nosql.riak.RiakNode; import brooklyn.entity.proxying.EntitySpec; import brooklyn.launcher.BrooklynLauncher; +import brooklyn.policy.PolicySpec; +import brooklyn.policy.ha.ServiceFailureDetector; +import brooklyn.policy.ha.ServiceRestarter; import brooklyn.util.CommandLineUtil; @Catalog(name = "Riak Cluster Application", description = "Riak ring deployment blueprint") @@ -43,7 +48,11 @@ public class RiakClusterExample extends AbstractApplication { public void init() { addChild(EntitySpec.create(RiakCluster.class) - .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE))); + .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE)) + .configure(CassandraDatacenter.MEMBER_SPEC, EntitySpec.create(RiakNode.class) + .policy(PolicySpec.create(ServiceFailureDetector.class)) + .policy(PolicySpec.create(ServiceRestarter.class) + .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, ServiceFailureDetector.ENTITY_FAILED)))); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd001766/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java index 41aafd2..de65c74 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java @@ -23,4 +23,5 @@ public interface RiakCluster extends DynamicCluster { @SetFromFlag("delayBeforeAdvertisingCluster") ConfigKey DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "couchbase.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.seconds(2 * 60)); + AttributeSensor IS_CLUSTER_INIT = Sensors.newBooleanSensor("riak.cluster.isClusterInit", "flag to determine if the cluster was already initialized"); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd001766/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java index 11ecf92..12dd5fd 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java @@ -35,10 +35,9 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { private AtomicBoolean isFirstNodeSet = new AtomicBoolean(); public void init() { - log.info("Initializing the riak cluster..."); super.init(); - - + log.info("Initializing the riak cluster..."); + setAttribute(IS_CLUSTER_INIT, false); } @Override @@ -60,6 +59,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { if (anyNode.isPresent()) { log.info("Planning and Committing cluster changes on node: {}, cluster: {}", anyNode.get().getId(), getId()); Entities.invokeEffector(this, anyNode.get(), RiakNode.COMMIT_RIAK_CLUSTER); + setAttribute(IS_CLUSTER_INIT, true); } else { log.warn("No Riak Nodes are found on the cluster: {}. Initialization Failed", getId()); setAttribute(SERVICE_STATE, Lifecycle.ON_FIRE); @@ -137,9 +137,12 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { if (anyNodeInCluster.isPresent()) { if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) { + String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME); Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName); - + if (getAttribute(IS_CLUSTER_INIT)) { + Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER); + } nodes.put(member, riakName); setAttribute(RIAK_CLUSTER_NODES, nodes); log.info("Adding riak node {}: {}; {} to cluster", new Object[]{this, member, getRiakName(member)}); @@ -152,9 +155,23 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { } else { Map nodes = getAttribute(RIAK_CLUSTER_NODES); if (nodes != null && nodes.containsKey(member)) { + final Entity memberToBeRemoved = member; + + Optional anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate() { + + @Override + public boolean apply(@Nullable Entity node) { + return (node instanceof RiakNode && hasMemberJoinedCluster(node) && !node.equals(memberToBeRemoved)); + } + }); + if (anyNodeInCluster.isPresent()) { + Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(memberToBeRemoved)); + } + nodes.remove(member); setAttribute(RIAK_CLUSTER_NODES, nodes); log.info("Removing riak node {}: {}; {} from cluster", new Object[]{this, member, getRiakName(member)}); + } } if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member); @@ -183,4 +200,5 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { private Boolean hasMemberJoinedCluster(Entity member) { return Optional.fromNullable(member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)).or(Boolean.FALSE); } + }