brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rich...@apache.org
Subject [14/50] [abbrv] git commit: fixed Riak ring not committing nodes after they join the cluster
Date Fri, 23 May 2014 14:51:45 GMT
fixed Riak ring not committing nodes after they join the cluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/16c5abb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/16c5abb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/16c5abb4

Branch: refs/pull/1411/merge
Commit: 16c5abb4d978057aaf071eeba181c276f3ba8142
Parents: bbd7582
Author: ZaidM <zaid.mohsin@cloudsoftcorp.com>
Authored: Thu May 22 12:02:17 2014 +0100
Committer: ZaidM <zaid.mohsin@cloudsoftcorp.com>
Committed: Thu May 22 12:02:17 2014 +0100

----------------------------------------------------------------------
 .../java/brooklyn/demo/RiakClusterExample.java  | 24 ++++----
 .../brooklyn/entity/nosql/riak/RiakCluster.java |  8 +++
 .../entity/nosql/riak/RiakClusterImpl.java      | 55 +++++++++++++-----
 .../brooklyn/entity/nosql/riak/RiakNode.java    | 48 +++++++++-------
 .../entity/nosql/riak/RiakNodeDriver.java       |  2 +
 .../entity/nosql/riak/RiakNodeImpl.java         |  3 +
 .../entity/nosql/riak/RiakNodeSshDriver.java    | 59 ++++++++++++--------
 .../nosql/riak/RiakClusterEc2LiveTest.java      |  7 ++-
 .../riak/RiakNodeGoogleComputeLiveTest.java     |  7 ++-
 9 files changed, 139 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
index 3f287f2..debfb76 100644
--- a/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
+++ b/examples/simple-nosql-cluster/src/main/java/brooklyn/demo/RiakClusterExample.java
@@ -20,28 +20,30 @@ import brooklyn.util.CommandLineUtil;
 @Catalog(name = "Riak Cluster Application", description = "Riak ring deployment blueprint")
 public class RiakClusterExample extends AbstractApplication {
 
+    public static final String DEFAULT_LOCATION_SPEC = "aws-ec2:us-east-1";
+
     @CatalogConfig(label = "Riak Ring Size")
     public static final ConfigKey<Integer> RIAK_RING_SIZE = ConfigKeys.newConfigKey(
-            "riak.ring.size", "Initial size of the Riak Ring", 2);
+            "riak.ring.size", "Initial size of the Riak Ring", 4);
 
-    public void init() {
-        addChild(EntitySpec.create(RiakCluster.class)
-                .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE)) );
-    }
-    
     public static void main(String[] argv) {
         List<String> args = Lists.newArrayList(argv);
-        String port =  CommandLineUtil.getCommandLineOption(args, "--port", "8081+");
-        String location = CommandLineUtil.getCommandLineOption(args, "--location", "jclouds:aws-ec2");
-        Preconditions.checkArgument(args.isEmpty(), "Unsupported args: "+args);
+        String port = CommandLineUtil.getCommandLineOption(args, "--port", "8081+");
+        String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION_SPEC);
+        Preconditions.checkArgument(args.isEmpty(), "Unsupported args: " + args);
 
         BrooklynLauncher launcher = BrooklynLauncher.newInstance()
-            .application(EntitySpec.create(StartableApplication.class, RiakClusterExample.class))
+                .application(EntitySpec.create(StartableApplication.class, RiakClusterExample.class))
                 .webconsolePort(port)
                 .location(location)
                 .start();
 
         Entities.dumpInfo(launcher.getApplications());
     }
-    
+
+    public void init() {
+        addChild(EntitySpec.create(RiakCluster.class)
+                .configure(RiakCluster.INITIAL_SIZE, getConfig(RIAK_RING_SIZE)));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
index 29a1883..41aafd2 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakCluster.java
@@ -4,15 +4,23 @@ import java.util.Map;
 
 import com.google.common.reflect.TypeToken;
 
+import brooklyn.config.ConfigKey;
 import brooklyn.entity.Entity;
+import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.entity.group.DynamicCluster;
 import brooklyn.entity.proxying.ImplementedBy;
 import brooklyn.event.AttributeSensor;
 import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.time.Duration;
 
 @ImplementedBy(RiakClusterImpl.class)
 public interface RiakCluster extends DynamicCluster {
 
     AttributeSensor<Map<Entity, String>> RIAK_CLUSTER_NODES = Sensors.newSensor(new
TypeToken<Map<Entity, String>>() {
     }, "riak.cluster.nodes", "Names of all active Riak nodes in the cluster <Entity,Riak
Name>");
+
+    @SetFromFlag("delayBeforeAdvertisingCluster")
+    ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class,
"couchbase.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before
checking and advertising its availability", Duration.seconds(2 * 60));
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
index 5508d5e..11ecf92 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -11,21 +11,23 @@ import javax.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.Lifecycle;
 import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
 import brooklyn.entity.group.DynamicClusterImpl;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.entity.trait.Startable;
 import brooklyn.location.Location;
 import brooklyn.util.collections.MutableMap;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
+import brooklyn.util.time.Time;
 
 
 public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
@@ -35,20 +37,42 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
     public void init() {
         log.info("Initializing the riak cluster...");
         super.init();
+
+
     }
 
     @Override
     public void start(Collection<? extends Location> locations) {
         super.start(locations);
         connectSensors();
+
+        Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
+
+        //FIXME: add a quorum to tolerate failed nodes before setting on fire.
+        Optional<Entity> anyNode = Iterables.tryFind(getMembers(), new Predicate<Entity>()
{
+
+            @Override
+            public boolean apply(@Nullable Entity entity) {
+                return (entity instanceof RiakNode && hasMemberJoinedCluster(entity)
&& entity.getAttribute(RiakNode.SERVICE_UP));
+            }
+        });
+
+        if (anyNode.isPresent()) {
+            log.info("Planning and Committing cluster changes on node: {}, cluster: {}",
anyNode.get().getId(), getId());
+            Entities.invokeEffector(this, anyNode.get(), RiakNode.COMMIT_RIAK_CLUSTER);
+        } else {
+            log.warn("No Riak Nodes are found on the cluster: {}. Initialization Failed",
getId());
+            setAttribute(SERVICE_STATE, Lifecycle.ON_FIRE);
+        }
     }
 
+
     protected EntitySpec<?> getMemberSpec() {
         EntitySpec<?> result = super.getMemberSpec();
-        if (result!=null) return result;
+        if (result != null) return result;
         return EntitySpec.create(RiakNode.class);
     }
-    
+
     protected void connectSensors() {
 
         Map<String, Object> flags = MutableMap.<String, Object>builder()
@@ -81,7 +105,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
         if (belongsInServerPool(member)) {
             // TODO can we discover the nodes by asking the riak cluster, rather than assuming
what we add will be in there?
             // TODO and can we do join as part of node starting?
-            
+
             Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
             if (nodes == null) nodes = Maps.newLinkedHashMap();
             String riakName = getRiakName(member);
@@ -94,7 +118,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
                     nodes.put(member, riakName);
                     setAttribute(RIAK_CLUSTER_NODES, nodes);
 
-                    ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_IN_CLUSTER,
Boolean.TRUE);
+                    ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER,
Boolean.TRUE);
                     isFirstNodeSet.set(true);
 
                     log.info("Adding riak node {}: {}; {} to cluster", new Object[]{this,
member, getRiakName(member)});
@@ -106,12 +130,12 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
                     Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(),
new Predicate<Entity>() {
                         @Override
                         public boolean apply(@Nullable Entity node) {
-                            return (node instanceof RiakNode && isMemberInCluster(node));
+                            return (node instanceof RiakNode && hasMemberJoinedCluster(node));
                         }
                     });
 
                     if (anyNodeInCluster.isPresent()) {
-                        if (!nodes.containsKey(member) && !isMemberInCluster(member))
{
+                        if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member))
{
 
                             String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
                             Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER,
anyNodeName);
@@ -142,7 +166,8 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
             return false;
         }
         if (!getMembers().contains(member)) {
-            if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, eliminating
because not member", this, member);
+            if (log.isTraceEnabled())
+                log.trace("Members of {}, checking {}, eliminating because not member", this,
member);
 
             return false;
         }
@@ -155,7 +180,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster
{
         return node.getAttribute(RiakNode.RIAK_NODE_NAME);
     }
 
-    private Boolean isMemberInCluster(Entity member) {
-        return Optional.fromNullable(member.getAttribute(RiakNode.RIAK_NODE_IN_CLUSTER)).or(Boolean.FALSE);
+    private Boolean hasMemberJoinedCluster(Entity member) {
+        return Optional.fromNullable(member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)).or(Boolean.FALSE);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
index 046f697..d77e73b 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java
@@ -32,9 +32,9 @@ public interface RiakNode extends SoftwareProcess {
             "riak.appConfig.templateUrl", "Template file (in freemarker format) for the app.config
config file",
             "classpath://brooklyn/entity/nosql/riak/app.config");
 
-    @SetFromFlag("riakNodeInCluster")
-    AttributeSensor<Boolean> RIAK_NODE_IN_CLUSTER = Sensors.newBooleanSensor(
-            "riak.node.inCluster", "Flag to indicate wether the node is a cluster member");
+    @SetFromFlag("riakNodeHasJoinedCluster")
+    AttributeSensor<Boolean> RIAK_NODE_HAS_JOINED_CLUSTER = Sensors.newBooleanSensor(
+            "riak.node.riakNodeHasJoinedCluster", "Flag to indicate wether the Riak node
has joined a cluster member");
 
     @SetFromFlag("riakNodeName")
     AttributeSensor<String> RIAK_NODE_NAME = Sensors.newStringSensor("riak.node", "Returns
the riak node name as defined in vm.args");
@@ -49,42 +49,45 @@ public interface RiakNode extends SoftwareProcess {
     PortAttributeSensorAndConfigKey EPMD_LISTENER_PORT = new PortAttributeSensorAndConfigKey("riak.epmdListenerPort",
"Erlang Port Mapper Daemon Listener Port", "4369");
     PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_START = new PortAttributeSensorAndConfigKey("riak.erlangPortRangeStart",
"Erlang Port Range Start", "6000+");
     PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("riak.erlangPortRangeEnd",
"Erlang Port Range End", "7999+");
-
-    // accessors, for use from template file
-    Integer getRiakWebPort();
-    Integer getRiakPbPort();
-    Integer getHandoffListenerPort();
-    Integer getEpmdListenerPort();
-    Integer getErlangPortRangeStart();
-    Integer getErlangPortRangeEnd();
-    
-    //Sensors for Riak Node Counters (within 1 minute window or lifetime of node.
-    //http://docs.basho.com/riak/latest/ops/running/stats-and-monitoring/#Statistics-from-Riak
-
     AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("node.gets");
     AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("node.gets.total");
     AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("node.puts");
     AttributeSensor<Integer> NODE_PUTS_TOTAL = Sensors.newIntegerSensor("node.puts.total");
     AttributeSensor<Integer> VNODE_GETS = Sensors.newIntegerSensor("vnode.gets");
     AttributeSensor<Integer> VNODE_GETS_TOTAL = Sensors.newIntegerSensor("vnode.gets.total");
+
+    //Sensors for Riak Node Counters (within 1 minute window or lifetime of node.
+    //http://docs.basho.com/riak/latest/ops/running/stats-and-monitoring/#Statistics-from-Riak
     AttributeSensor<Integer> VNODE_PUTS = Sensors.newIntegerSensor("vnode.puts");
     AttributeSensor<Integer> VNODE_PUTS_TOTAL = Sensors.newIntegerSensor("vnode.puts.total");
-
     AttributeSensor<Integer> READ_REPAIRS_TOTAL = Sensors.newIntegerSensor("read.repairs.total");
     AttributeSensor<Integer> COORD_REDIRS_TOTAL = Sensors.newIntegerSensor("coord.redirs.total");
-
     //Additional Riak node counters
     AttributeSensor<Integer> MEMORY_PROCESSES_USED = Sensors.newIntegerSensor("memory.processes.used");
     AttributeSensor<Integer> SYS_PROCESS_COUNT = Sensors.newIntegerSensor("sys.process.count");
     AttributeSensor<Integer> PBC_CONNECTS = Sensors.newIntegerSensor("pbc.connects");
     AttributeSensor<Integer> PBC_ACTIVE = Sensors.newIntegerSensor("pbc.active");
-
     @SuppressWarnings("serial")
-    AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>()
{}, 
-        "ring.members", "all the riak nodes in the ring");
-
+    AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>()
{
+                                                                   },
+            "ring.members", "all the riak nodes in the ring"
+    );
     public static final MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class,
"joinCluster");
     public static final MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class,
"leaveCluster");
+    public static final MethodEffector<Void> COMMIT_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class,
"commitCluster");
+
+    // accessors, for use from template file
+    Integer getRiakWebPort();
+
+    Integer getRiakPbPort();
+
+    Integer getHandoffListenerPort();
+
+    Integer getEpmdListenerPort();
+
+    Integer getErlangPortRangeStart();
+
+    Integer getErlangPortRangeEnd();
 
     @Effector(description = "add this riak node to the riak cluster")
     public void joinCluster(@EffectorParam(name = "nodeName") String nodeName);
@@ -95,4 +98,7 @@ public interface RiakNode extends SoftwareProcess {
     @Effector(description = "recover a failed riak node and join it back to the cluster (by
passing it a working node on the cluster 'node')")
     public void recoverFailedNode(@EffectorParam(name = "nodeName") String nodeName);
 
+    @Effector(description = "commit changes made to a Riak cluster")
+    public void commitCluster();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
index f894d66..7a09342 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java
@@ -11,4 +11,6 @@ public interface RiakNodeDriver extends SoftwareProcessDriver {
     public void leaveCluster();
 
     public void recoverFailedNode(String nodeName);
+
+    public void commitCluster();
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
index 5fe0cac..f5e2dac 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java
@@ -151,6 +151,9 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode
{
     }
 
     @Override
+    public void commitCluster(){ getDriver().commitCluster();}
+
+    @Override
     public void recoverFailedNode(String nodeName) {
         getDriver().recoverFailedNode(nodeName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
index 9676369..fc36198 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
@@ -1,11 +1,6 @@
 package brooklyn.entity.nosql.riak;
 
-import static brooklyn.util.ssh.BashCommands.INSTALL_CURL;
-import static brooklyn.util.ssh.BashCommands.INSTALL_TAR;
-import static brooklyn.util.ssh.BashCommands.alternatives;
-import static brooklyn.util.ssh.BashCommands.chainGroup;
-import static brooklyn.util.ssh.BashCommands.commandToDownloadUrlAs;
-import static brooklyn.util.ssh.BashCommands.sudo;
+import static brooklyn.util.ssh.BashCommands.*;
 import static java.lang.String.format;
 
 import java.util.List;
@@ -14,6 +9,11 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
 import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
 import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.Entities;
@@ -26,11 +26,6 @@ import brooklyn.util.collections.MutableMap;
 import brooklyn.util.net.Urls;
 import brooklyn.util.task.DynamicTasks;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
 // TODO: Alter -env ERL_CRASH_DUMP path in vm.args
 public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implements RiakNodeDriver
{
 
@@ -53,10 +48,10 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver
implemen
         MutableMap<String, String> result = MutableMap.copyOf(super.getShellEnvironment());
         // how to change epmd port, according to 
         // http://serverfault.com/questions/582787/how-to-change-listening-interface-of-rabbitmqs-epmd-port-4369
-        result.put("ERL_EPMD_PORT", ""+getEntity().getEpmdListenerPort());
+        result.put("ERL_EPMD_PORT", "" + getEntity().getEpmdListenerPort());
         return result;
     }
-    
+
     @Override
     public void install() {
         DownloadResolver resolver = Entities.newDownloader(this);
@@ -129,7 +124,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver
implemen
         isRiakOnPath = isPackageInstall ? isRiakOnPath() : true;
 
         OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
-        
+
         List<String> commands = Lists.newLinkedList();
 
         String vmArgsTemplate = processTemplate(entity.getConfig(RiakNode.RIAK_VM_ARGS_TEMPLATE_URL));
@@ -249,12 +244,10 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver
implemen
         if (getRiakName().equals(nodeName)) {
             log.warn("cannot join riak node: {} to itself", nodeName);
         } else {
-            if (!isInCluster()) {
+            if (!hasJoinedCluster()) {
 
                 ScriptHelper joinClusterScript = newScript("joinCluster")
                         .body.append(format("%s cluster join %s", getRiakAdminCmd(), nodeName))
-                        .body.append(format("%s cluster plan", getRiakAdminCmd()))
-                        .body.append(format("%s cluster commit", getRiakAdminCmd()))
                         .failOnNonZeroResultCode();
 
                 if (!isRiakOnPath) {
@@ -265,7 +258,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver
implemen
 
                 joinClusterScript.execute();
 
-                entity.setAttribute(RiakNode.RIAK_NODE_IN_CLUSTER, Boolean.TRUE);
+                entity.setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);
             } else {
                 log.warn("entity {}: is already in the riak cluster", entity.getId());
             }
@@ -278,7 +271,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver
implemen
         //FIXME: find a way to batch commit the changes, instead of committing for every
operation.
         //FIXME: find a way to check if the node is the last in the cluster to avoid removing
the only member and getting "last node error"
 
-        if (isInCluster()) {
+        if (hasJoinedCluster()) {
             ScriptHelper leaveClusterScript = newScript("leaveCluster")
                     .body.append(format("%s cluster leave", getRiakAdminCmd()))
                     .body.append(format("%s cluster plan", getRiakAdminCmd()))
@@ -292,7 +285,27 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver
implemen
 
             leaveClusterScript.execute();
 
-            entity.setAttribute(RiakNode.RIAK_NODE_IN_CLUSTER, Boolean.FALSE);
+            entity.setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.FALSE);
+        } else {
+            log.warn("entity {}: is not in the riak cluster", entity.getId());
+        }
+    }
+
+    @Override
+    public void commitCluster() {
+
+        if (hasJoinedCluster()) {
+            ScriptHelper commitClusterScript = newScript("commitCluster")
+                    .body.append(format("%s cluster plan", getRiakAdminCmd()))
+                    .body.append(format("%s cluster commit", getRiakAdminCmd()));
+
+            if (!isRiakOnPath) {
+                Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
+                log.warn("riak command not found on PATH. Altering future commands' environment
variables from {} to {}", getShellEnvironment(), newPathVariable);
+                commitClusterScript.environmentVariablesReset(newPathVariable);
+            }
+            commitClusterScript.execute();
+
         } else {
             log.warn("entity {}: is not in the riak cluster", entity.getId());
         }
@@ -305,7 +318,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver
implemen
         //argument passed 'node' is any working node in the riak cluster
         //following the instruction from: http://docs.basho.com/riak/latest/ops/running/recovery/failed-node/
 
-        if (isInCluster()) {
+        if (hasJoinedCluster()) {
             String failedNodeName = getRiakName();
 
 
@@ -350,8 +363,8 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver
implemen
             throw new IllegalArgumentException("Subnet address is not set.");
     }
 
-    private Boolean isInCluster() {
-        return Optional.fromNullable(entity.getAttribute(RiakNode.RIAK_NODE_IN_CLUSTER)).or(Boolean.FALSE);
+    private Boolean hasJoinedCluster() {
+        return Optional.fromNullable(entity.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)).or(Boolean.FALSE);
     }
 
     private boolean isRiakOnPath() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
index 691e1cb..6a42578 100644
--- a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
+++ b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java
@@ -29,8 +29,11 @@ public class RiakClusterEc2LiveTest extends AbstractEc2LiveTest {
         RiakNode first = (RiakNode) Iterables.get(cluster.getMembers(), 0);
         RiakNode second = (RiakNode) Iterables.get(cluster.getMembers(), 1);
 
-        EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.RIAK_NODE_IN_CLUSTER,
true);
-        EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.RIAK_NODE_IN_CLUSTER,
true);
+        EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.SERVICE_UP, true);
+        EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.SERVICE_UP, true);
+
+        EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER,
true);
+        EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER,
true);
     }
 
     @Test(enabled = false)

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16c5abb4/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeGoogleComputeLiveTest.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeGoogleComputeLiveTest.java
b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeGoogleComputeLiveTest.java
index 98da4b0..544ddd2 100644
--- a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeGoogleComputeLiveTest.java
+++ b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakNodeGoogleComputeLiveTest.java
@@ -23,8 +23,11 @@ public class RiakNodeGoogleComputeLiveTest extends AbstractGoogleComputeLiveTest
         RiakNode first = (RiakNode) Iterables.get(cluster.getMembers(), 0);
         RiakNode second = (RiakNode) Iterables.get(cluster.getMembers(), 1);
 
-        EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.RIAK_NODE_IN_CLUSTER,
true);
-        EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.RIAK_NODE_IN_CLUSTER,
true);
+        EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.SERVICE_UP, true);
+        EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.SERVICE_UP, true);
+
+        EntityTestUtils.assertAttributeEqualsEventually(first, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER,
true);
+        EntityTestUtils.assertAttributeEqualsEventually(second, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER,
true);
 
     }
 


Mime
View raw message