brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [11/26] incubator-brooklyn git commit: [BROOKLYN-162] Renaming of the NoSQL packages
Date Thu, 06 Aug 2015 16:32:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakClusterImpl.java
new file mode 100644
index 0000000..49bd515
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakClusterImpl.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.riak;
+
+import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.DependentConfiguration;
+import brooklyn.policy.EnricherSpec;
+import brooklyn.policy.PolicySpec;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster {
+
+    private static final Logger log = LoggerFactory.getLogger(RiakClusterImpl.class);
+
+    private transient Object mutex = new Object[0];
+
+    public void init() {
+        super.init();
+        log.info("Initializing the riak cluster...");
+        setAttribute(IS_CLUSTER_INIT, false);
+    }
+
+    @Override
+    protected void doStart() {
+        super.doStart();
+        connectSensors();
+
+        try {
+            Duration delay = getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER);
+            Tasks.setBlockingDetails("Sleeping for "+delay+" before advertising cluster available");
+            Time.sleep(delay);
+        } finally {
+            Tasks.resetBlockingDetails();
+        }
+
+        //FIXME: add a quorum to tolerate failed nodes before setting on fire.
+        @SuppressWarnings("unchecked")
+        Optional<Entity> anyNode = Iterables.tryFind(getMembers(), Predicates.and(
+                Predicates.instanceOf(RiakNode.class),
+                EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true),
+                EntityPredicates.attributeEqualTo(RiakNode.SERVICE_UP, true)));
+        if (anyNode.isPresent()) {
+            setAttribute(IS_CLUSTER_INIT, true);
+        } else {
+            log.warn("No Riak Nodes are found on the cluster: {}. Initialization Failed", getId());
+            ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
+        }
+    }
+
+    protected EntitySpec<?> getMemberSpec() {
+        EntitySpec<?> result = config().get(MEMBER_SPEC);
+        if (result!=null) return result;
+        return EntitySpec.create(RiakNode.class);
+    }
+
+    protected void connectSensors() {
+        addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+                .displayName("Controller targets tracker")
+                .configure("sensorsToTrack", ImmutableSet.of(RiakNode.SERVICE_UP))
+                .configure("group", this));
+
+        EnricherSpec<?> first = Enrichers.builder()
+                 .aggregating(Attributes.MAIN_URI)
+                 .publishing(Attributes.MAIN_URI)
+                 .computing(new Function<Collection<URI>,URI>() {
+                    @Override
+                    public URI apply(Collection<URI> input) {
+                        return input.iterator().next();
+                    } })
+                 .fromMembers()
+                 .build();
+        addEnricher(first);
+        
+        Map<? extends AttributeSensor<? extends Number>, ? extends AttributeSensor<? extends Number>> enricherSetup = 
+            ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<? extends Number>>builder()
+                .put(RiakNode.NODE_PUTS, RiakCluster.NODE_PUTS_1MIN_PER_NODE)
+                .put(RiakNode.NODE_GETS, RiakCluster.NODE_GETS_1MIN_PER_NODE)
+                .put(RiakNode.NODE_OPS, RiakCluster.NODE_OPS_1MIN_PER_NODE)
+            .build();
+        // construct sum and average over cluster
+        for (AttributeSensor<? extends Number> nodeSensor : enricherSetup.keySet()) {
+            addSummingMemberEnricher(nodeSensor);
+            addAveragingMemberEnricher(nodeSensor, enricherSetup.get(nodeSensor));
+        }
+    }
+
+    private void addAveragingMemberEnricher(AttributeSensor<? extends Number> fromSensor, AttributeSensor<? extends Number> toSensor) {
+        addEnricher(Enrichers.builder()
+            .aggregating(fromSensor)
+            .publishing(toSensor)
+            .fromMembers()
+            .computingAverage()
+            .build()
+        );
+    }
+
+    private void addSummingMemberEnricher(AttributeSensor<? extends Number> source) {
+        addEnricher(Enrichers.builder()
+            .aggregating(source)
+            .publishing(source)
+            .fromMembers()
+            .computingSum()
+            .build()
+        );
+    }
+
+    protected void onServerPoolMemberChanged(final Entity member) {
+        synchronized (mutex) {
+            log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{ this, member, member.getLocations() });
+
+            Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES);
+            if (belongsInServerPool(member)) {
+                // TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there?
+                // TODO and can we do join as part of node starting?
+
+                if (nodes == null) {
+                    nodes = Maps.newLinkedHashMap();
+                }
+                String riakName = getRiakName(member);
+                Preconditions.checkNotNull(riakName);
+
+                // flag a first node to be the first node in the riak cluster.
+                Boolean firstNode = getAttribute(IS_FIRST_NODE_SET);
+                if (!Boolean.TRUE.equals(firstNode)) {
+                    setAttribute(IS_FIRST_NODE_SET, Boolean.TRUE);
+
+                    nodes.put(member, riakName);
+                    setAttribute(RIAK_CLUSTER_NODES, nodes);
+
+                    ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);
+
+                    log.info("Added initial Riak node {}: {}; {} to new cluster", new Object[] { this, member, getRiakName(member) });
+                } else {
+                    // TODO: be wary of erroneous nodes but are still flagged 'in cluster'
+                    // add the new node to be part of the riak cluster.
+                    Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and(
+                            Predicates.instanceOf(RiakNode.class),
+                            EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true)));
+                    if (anyNodeInCluster.isPresent()) {
+                        if (!nodes.containsKey(member) && member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER) == null) {
+                            String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME);
+                            Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName).blockUntilEnded();
+                            nodes.put(member, riakName);
+                            setAttribute(RIAK_CLUSTER_NODES, nodes);
+                            log.info("Added Riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) });
+                        }
+                    } else {
+                        log.error("isFirstNodeSet, but no cluster members found to add {}", member.getId());
+                    }
+                }
+            } else {
+                if (nodes != null && nodes.containsKey(member)) {
+                    DependentConfiguration.attributeWhenReady(member, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Predicates.equalTo(false)).blockUntilEnded(Duration.TWO_MINUTES);
+                    @SuppressWarnings("unchecked")
+                    Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and(
+                            Predicates.instanceOf(RiakNode.class),
+                            EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true),
+                            Predicates.not(Predicates.equalTo(member))));
+                    if (anyNodeInCluster.isPresent()) {
+                        Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.REMOVE_FROM_CLUSTER, getRiakName(member)).blockUntilEnded();
+                    }
+                    nodes.remove(member);
+                    setAttribute(RIAK_CLUSTER_NODES, nodes);
+                    log.info("Removed Riak node {}: {}; {} from cluster", new Object[]{ this, member, getRiakName(member) });
+                }
+            }
+
+            ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES);
+
+            calculateClusterAddresses();
+        }
+    }
+
+    private void calculateClusterAddresses() {
+        List<String> addresses = Lists.newArrayList();
+        List<String> addressesPbPort = Lists.newArrayList();
+        for (Entity entity : this.getMembers()) {
+            if (entity instanceof RiakNode && entity.getAttribute(Attributes.SERVICE_UP)) {
+                RiakNode riakNode = (RiakNode) entity;
+                addresses.add(riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute(RiakNode.RIAK_WEB_PORT));
+                addressesPbPort.add(riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute(RiakNode.RIAK_PB_PORT));
+            }
+        }
+        setAttribute(RiakCluster.NODE_LIST, Joiner.on(",").join(addresses));
+        setAttribute(RiakCluster.NODE_LIST_PB_PORT, Joiner.on(",").join(addressesPbPort));
+    }
+
+    protected boolean belongsInServerPool(Entity member) {
+        if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) {
+            log.trace("Members of {}, checking {}, eliminating because not up", this, member);
+            return false;
+        }
+        if (!getMembers().contains(member)) {
+            log.trace("Members of {}, checking {}, eliminating because not member", this, member);
+            return false;
+        }
+        log.trace("Members of {}, checking {}, approving", this, member);
+
+        return true;
+    }
+
+    private String getRiakName(Entity node) {
+        return node.getAttribute(RiakNode.RIAK_NODE_NAME);
+    }
+
+    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+        @Override
+        protected void onEntityEvent(EventType type, Entity entity) {
+            ((RiakClusterImpl) super.entity).onServerPoolMemberChanged(entity);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNode.java
new file mode 100644
index 0000000..9542840
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNode.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.riak;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.annotation.EffectorParam;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.MethodEffector;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.UsesJava;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.AttributeSensorAndConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.reflect.TypeToken;
+
+@Catalog(name="Riak Node", description="Riak is a distributed NoSQL key-value data store that offers "
+        + "extremely high availability, fault tolerance, operational simplicity and scalability.")
+@ImplementedBy(RiakNodeImpl.class)
+public interface RiakNode extends SoftwareProcess, UsesJava {
+
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION,
+            "Version to install (Default 2.0.5)", "2.0.5");
+
+    @SetFromFlag("optimizeNetworking")
+    ConfigKey<Boolean> OPTIMIZE_HOST_NETWORKING  = ConfigKeys.newBooleanConfigKey("riak.networking.optimize", "Optimize host networking when running in a VM", Boolean.TRUE);
+
+    // vm.args and app.config are used for pre-version 2.0.0. Later versions use the (simplified) riak.conf
+    // see https://github.com/joedevivo/ricon/blob/master/cuttlefish.md
+    @SetFromFlag("vmArgsTemplateUrl")
+    ConfigKey<String> RIAK_VM_ARGS_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
+            "riak.vmArgs.templateUrl", "Template file (in freemarker format) for the vm.args config file",
+            "classpath://org/apache/brooklyn/entity/nosql/riak/vm.args");
+    @SetFromFlag("appConfigTemplateUrl")
+    ConfigKey<String> RIAK_APP_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
+            "riak.appConfig.templateUrl", "Template file (in freemarker format) for the app.config config file",
+            "classpath://org/apache/brooklyn/entity/nosql/riak/app.config");
+    @SetFromFlag("appConfigTemplateUrlLinux")
+    ConfigKey<String> RIAK_CONF_TEMPLATE_URL_LINUX = ConfigKeys.newStringConfigKey(
+            "riak.riakConf.templateUrl.linux", "Template file (in freemarker format) for the app.config config file",
+            "classpath://org/apache/brooklyn/entity/nosql/riak/riak.conf");
+    @SetFromFlag("appConfigTemplateUrlMac")
+    ConfigKey<String> RIAK_CONF_TEMPLATE_URL_MAC = ConfigKeys.newStringConfigKey(
+            "riak.riakConf.templateUrl.mac", "Template file (in freemarker format) for the app.config config file",
+            "classpath://org/apache/brooklyn/entity/nosql/riak/riak-mac.conf");
+
+    ConfigKey<String> RIAK_CONF_ADDITIONAL_CONTENT = ConfigKeys.newStringConfigKey(
+            "riak.riakConf.additionalContent", "Template file (in freemarker format) for setting up additional settings in the riak.conf file", "");
+    
+    // maxOpenFiles' default value (65536) is based on the Basho's recommendation - http://docs.basho.com/riak/latest/ops/tuning/open-files-limit/ 
+    @SetFromFlag("maxOpenFiles")
+    ConfigKey<Integer> RIAK_MAX_OPEN_FILES = ConfigKeys.newIntegerConfigKey(
+            "riak.max.open.files", "Number of the open files required by Riak", 65536);
+    
+    @SetFromFlag("downloadUrlRhelCentos")
+    AttributeSensorAndConfigKey<String, String> DOWNLOAD_URL_RHEL_CENTOS = ConfigKeys.newTemplateSensorAndConfigKey("download.url.rhelcentos",
+            "URL pattern for downloading the linux RPM installer (will substitute things like ${version} automatically)",
+            "http://s3.amazonaws.com/downloads.basho.com/riak/${entity.majorVersion}/${entity.fullVersion}/rhel/" +
+                    "${entity.osMajorVersion}/riak-${entity.fullVersion}-1.el${entity.osMajorVersion}.x86_64.rpm");
+
+    @SetFromFlag("downloadUrlUbuntu")
+    AttributeSensorAndConfigKey<String, String> DOWNLOAD_URL_UBUNTU = ConfigKeys.newTemplateSensorAndConfigKey("download.url.ubuntu",
+            "URL pattern for downloading the linux Ubuntu installer (will substitute things like ${version} automatically)",
+            "http://s3.amazonaws.com/downloads.basho.com/riak/${entity.majorVersion}/${entity.fullVersion}/ubuntu/" +
+                    "$OS_RELEASE/riak_${entity.fullVersion}-1_amd64.deb");
+
+    @SetFromFlag("downloadUrlDebian")
+    AttributeSensorAndConfigKey<String, String> DOWNLOAD_URL_DEBIAN = ConfigKeys.newTemplateSensorAndConfigKey("download.url.debian",
+            "URL pattern for downloading the linux Debian installer (will substitute things like ${version} automatically)",
+            "http://s3.amazonaws.com/downloads.basho.com/riak/${entity.majorVersion}/${entity.fullVersion}/debian/" +
+                    "$OS_RELEASE/riak_${entity.fullVersion}-1_amd64.deb");
+
+    @SetFromFlag("downloadUrlMac")
+    AttributeSensorAndConfigKey<String, String> DOWNLOAD_URL_MAC = ConfigKeys.newTemplateSensorAndConfigKey("download.url.mac",
+            "URL pattern for downloading the MAC binaries tarball (will substitute things like ${version} automatically)",
+            "http://s3.amazonaws.com/downloads.basho.com/riak/${entity.majorVersion}/${entity.fullVersion}/osx/10.8/riak-${entity.fullVersion}-OSX-x86_64.tar.gz");
+
+    // NB these two needed for clients to access
+    @SetFromFlag("riakWebPort")
+    PortAttributeSensorAndConfigKey RIAK_WEB_PORT = new PortAttributeSensorAndConfigKey("riak.webPort", "Riak Web Port", "8098+");
+
+    @SetFromFlag("riakPbPort")
+    PortAttributeSensorAndConfigKey RIAK_PB_PORT = new PortAttributeSensorAndConfigKey("riak.pbPort", "Riak Protocol Buffers Port", "8087+");
+
+    AttributeSensor<Boolean> RIAK_PACKAGE_INSTALL = Sensors.newBooleanSensor(
+            "riak.install.package", "Flag to indicate whether Riak was installed using an OS package");
+    AttributeSensor<Boolean> RIAK_ON_PATH = Sensors.newBooleanSensor(
+            "riak.install.onPath", "Flag to indicate whether Riak is available on the PATH");
+
+    AttributeSensor<Boolean> RIAK_NODE_HAS_JOINED_CLUSTER = Sensors.newBooleanSensor(
+            "riak.node.riakNodeHasJoinedCluster", "Flag to indicate whether the Riak node has joined a cluster member");
+
+    AttributeSensor<String> RIAK_NODE_NAME = Sensors.newStringSensor("riak.node", "Returns the riak node name as defined in vm.args");
+
+    // these needed for nodes to talk to each other, but not clients (so ideally set up in the security group for internal access)
+    PortAttributeSensorAndConfigKey HANDOFF_LISTENER_PORT = new PortAttributeSensorAndConfigKey("handoffListenerPort", "Handoff Listener Port", "8099+");
+    PortAttributeSensorAndConfigKey EPMD_LISTENER_PORT = new PortAttributeSensorAndConfigKey("epmdListenerPort", "Erlang Port Mapper Daemon Listener Port", "4369");
+    PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_START = new PortAttributeSensorAndConfigKey("erlangPortRangeStart", "Erlang Port Range Start", "6000+");
+    PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("erlangPortRangeEnd", "Erlang Port Range End", "7999+");
+
+    @SetFromFlag("searchEnabled")
+    ConfigKey<Boolean> SEARCH_ENABLED = ConfigKeys.newBooleanConfigKey("riak.search", "Deploy Solr and configure Riak to use it", false);
+
+    /**
+     * http://docs.basho.com/riak/latest/dev/using/search/
+     * Solr is powered by Riak's Yokozuna engine and it is used through the riak webport
+     * So SEARCH_SOLR_PORT shouldn't be exposed
+     */
+    ConfigKey<Integer> SEARCH_SOLR_PORT = ConfigKeys.newIntegerConfigKey("search.solr.port", "Solr port", 8983);
+    ConfigKey<Integer> SEARCH_SOLR_JMX_PORT = ConfigKeys.newIntegerConfigKey("search.solr.jmx_port", "Solr port", 8985);
+
+    AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("riak.node.gets", "Gets in the last minute");
+    AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.node.gets.total", "Total gets since node started");
+    AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("riak.node.puts", "Puts in the last minute");
+    AttributeSensor<Integer> NODE_PUTS_TOTAL = Sensors.newIntegerSensor("riak.node.puts.total", "Total puts since node started");
+    AttributeSensor<Integer> VNODE_GETS = Sensors.newIntegerSensor("riak.vnode.gets");
+    AttributeSensor<Integer> VNODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.vnode.gets.total");
+
+    //Sensors for Riak Node Counters (within 1 minute window or lifetime of node.
+    //http://docs.basho.com/riak/latest/ops/running/stats-and-monitoring/#Statistics-from-Riak
+    AttributeSensor<Integer> VNODE_PUTS = Sensors.newIntegerSensor("riak.vnode.puts");
+    AttributeSensor<Integer> VNODE_PUTS_TOTAL = Sensors.newIntegerSensor("riak.vnode.puts.total");
+    AttributeSensor<Integer> READ_REPAIRS_TOTAL = Sensors.newIntegerSensor("riak.read.repairs.total");
+    AttributeSensor<Integer> COORD_REDIRS_TOTAL = Sensors.newIntegerSensor("riak.coord.redirs.total");
+    //Additional Riak node counters
+    AttributeSensor<Integer> MEMORY_PROCESSES_USED = Sensors.newIntegerSensor("riak.memory.processes.used");
+    AttributeSensor<Integer> SYS_PROCESS_COUNT = Sensors.newIntegerSensor("riak.sys.process.count");
+    AttributeSensor<Integer> PBC_CONNECTS = Sensors.newIntegerSensor("riak.pbc.connects");
+    AttributeSensor<Integer> PBC_ACTIVE = Sensors.newIntegerSensor("riak.pbc.active");
+    @SuppressWarnings("serial")
+    AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>() {},
+            "ring.members", "all the riak nodes in the ring");
+    
+    AttributeSensor<Integer> NODE_OPS = Sensors.newIntegerSensor("riak.node.ops", "Sum of node gets and puts in the last minute");
+    AttributeSensor<Integer> NODE_OPS_TOTAL = Sensors.newIntegerSensor("riak.node.ops.total", "Sum of node gets and puts since the node started");
+
+    MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster");
+    MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster");
+    MethodEffector<Void> REMOVE_FROM_CLUSTER = new MethodEffector<Void>(RiakNode.class, "removeNode");
+
+    AttributeSensor<Integer> RIAK_NODE_GET_FSM_TIME_MEAN = Sensors.newIntegerSensor("riak.node_get_fsm_time_mean", "Time between reception of client read request and subsequent response to client");
+    AttributeSensor<Integer> RIAK_NODE_PUT_FSM_TIME_MEAN = Sensors.newIntegerSensor("riak.node_put_fsm_time_mean", "Time between reception of client write request and subsequent response to client");
+    AttributeSensor<Integer> RIAK_OBJECT_COUNTER_MERGE_TIME_MEAN = Sensors.newIntegerSensor("riak.object_counter_merge_time_mean", "Time it takes to perform an Update Counter operation");
+    AttributeSensor<Integer> RIAK_OBJECT_SET_MERGE_TIME_MEAN = Sensors.newIntegerSensor("riak.object_set_merge_time_mean", "Time it takes to perform an Update Set operation");
+    AttributeSensor<Integer> RIAK_OBJECT_MAP_MERGE_TIME_MEAN = Sensors.newIntegerSensor("riak.object_map_merge_time_mean", "Time it takes to perform an Update Map operation");
+    AttributeSensor<Integer> RIAK_CONSISTENT_GET_TIME_MEAN = Sensors.newIntegerSensor("riak.consistent_get_time_mean", "Strongly consistent read latency");
+    AttributeSensor<Integer> RIAK_CONSISTENT_PUT_TIME_MEAN = Sensors.newIntegerSensor("riak.consistent_put_time_mean", "Strongly consistent write latency");
+
+    List<AttributeSensor<Integer>> ONE_MINUTE_SENSORS = ImmutableList.of(RIAK_NODE_GET_FSM_TIME_MEAN, RIAK_NODE_PUT_FSM_TIME_MEAN,
+            RIAK_OBJECT_COUNTER_MERGE_TIME_MEAN, RIAK_OBJECT_SET_MERGE_TIME_MEAN, RIAK_OBJECT_MAP_MERGE_TIME_MEAN,
+            RIAK_CONSISTENT_GET_TIME_MEAN, RIAK_CONSISTENT_PUT_TIME_MEAN);
+
+    AttributeSensor<URI> RIAK_CONSOLE_URI = Attributes.MAIN_URI;
+
+    // accessors, for use from template file
+    Integer getRiakWebPort();
+
+    Integer getRiakPbPort();
+
+    Integer getHandoffListenerPort();
+
+    Integer getEpmdListenerPort();
+
+    Integer getErlangPortRangeStart();
+
+    Integer getErlangPortRangeEnd();
+
+    Boolean isSearchEnabled();
+
+    Integer getSearchSolrPort();
+
+    Integer getSearchSolrJmxPort();
+
+    String getFullVersion();
+
+    String getMajorVersion();
+
+    String getOsMajorVersion();
+
+    // TODO add commitCluster() effector and add effectors joinCluster, leaveCluster, removeNode, recoverFailedNode which do not execute commitCluster()
+    // the commit where the commitCluster effector was available is adbf2dc1cb5df98b1e52d3ab35fa6bb4983b722f
+
+    @Effector(description = "Join the Riak cluster on the given node")
+    void joinCluster(@EffectorParam(name = "nodeName") String nodeName);
+
+    @Effector(description = "Leave the Riak cluster")
+    void leaveCluster();
+
+    @Effector(description = "Remove the given node from the Riak cluster")
+    void removeNode(@EffectorParam(name = "nodeName") String nodeName);
+
+    @Effector(description = "Recover and join the Riak cluster on the given node")
+    void recoverFailedNode(@EffectorParam(name = "nodeName") String nodeName);
+
+    @Effector(description = "Create or modify a bucket type before activation")
+    void bucketTypeCreate(@EffectorParam(name = "bucketTypeName") String bucketTypeName,
+                          @EffectorParam(name = "bucketTypeProperties") String bucketTypeProperties);
+
+    @Effector(description = "List all currently available bucket types and their activation status")
+    List<String> bucketTypeList();
+
+    @Effector(description = "Display the status and properties of a specific bucket type")
+    List<String> bucketTypeStatus(@EffectorParam(name = "bucketTypeName") String bucketTypeName);
+
+    @Effector(description = "Update a bucket type after activation")
+    void bucketTypeUpdate(@EffectorParam(name = "bucketTypeName") String bucketTypeName,
+                          @EffectorParam(name = "bucketTypeProperties") String bucketTypeProperties);
+
+    @Effector(description = "Activate a bucket type")
+    void bucketTypeActivate(@EffectorParam(name = "bucketTypeName") String bucketTypeName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeDriver.java
new file mode 100644
index 0000000..b9339cf
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeDriver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.riak;
+
+import brooklyn.entity.basic.SoftwareProcessDriver;
+
+import java.util.List;
+
+public interface RiakNodeDriver extends SoftwareProcessDriver {
+
+    String getRiakEtcDir();
+
+    void joinCluster(String nodeName);
+
+    void leaveCluster();
+
+    void removeNode(String nodeName);
+
+    void recoverFailedNode(String nodeName);
+
+    String getOsMajorVersion();
+
+    void bucketTypeCreate(String bucketTypeName, String bucketTypeProperties);
+
+    List<String> bucketTypeList();
+
+    List<String> bucketTypeStatus(String bucketTypeName);
+
+    void bucketTypeUpdate(String bucketTypeName, String bucketTypeProperties);
+
+    void bucketTypeActivate(String bucketTypeName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeImpl.java
new file mode 100644
index 0000000..d631516
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeImpl.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.riak;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.webapp.WebAppServiceMethods;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.AttributeSensorAndConfigKey;
+import brooklyn.event.feed.http.HttpFeed;
+import brooklyn.event.feed.http.HttpPollConfig;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.location.MachineProvisioningLocation;
+import brooklyn.location.access.BrooklynAccessUtils;
+import brooklyn.location.cloud.CloudLocationConfig;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Range;
+import com.google.common.net.HostAndPort;
+
+public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode {
+
+    private volatile HttpFeed httpFeed;
+
+    @Override
+    public RiakNodeDriver getDriver() {
+        return (RiakNodeDriver) super.getDriver();
+    }
+
+    @Override
+    public Class<RiakNodeDriver> getDriverInterface() {
+        return RiakNodeDriver.class;
+    }
+
+    @Override
+    public void init() {
+        super.init();
+        // fail fast if config files not avail
+        Entities.getRequiredUrlConfig(this, RIAK_VM_ARGS_TEMPLATE_URL);
+        Entities.getRequiredUrlConfig(this, RIAK_APP_CONFIG_TEMPLATE_URL);
+        
+        Integer defaultMaxOpenFiles = RIAK_MAX_OPEN_FILES.getDefaultValue();
+        Integer maxOpenFiles = getConfig(RiakNode.RIAK_MAX_OPEN_FILES);
+        Preconditions.checkArgument(maxOpenFiles >= defaultMaxOpenFiles , "Specified number of open files : %s : is less than the required minimum",
+                maxOpenFiles, defaultMaxOpenFiles);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public boolean isPackageDownloadUrlProvided() {
+        AttributeSensorAndConfigKey[] downloadProperties = { DOWNLOAD_URL_RHEL_CENTOS, DOWNLOAD_URL_UBUNTU, DOWNLOAD_URL_DEBIAN };
+        for (AttributeSensorAndConfigKey property : downloadProperties) {
+            if (!((ConfigurationSupportInternal) config()).getRaw(property).isAbsent()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    protected Map<String, Object> obtainProvisioningFlags(@SuppressWarnings("rawtypes") MachineProvisioningLocation location) {
+        ConfigBag result = ConfigBag.newInstance(super.obtainProvisioningFlags(location));
+        result.configure(CloudLocationConfig.OS_64_BIT, true);
+        return result.getAllConfig();
+    }
+
+    @Override
+    protected Collection<Integer> getRequiredOpenPorts() {
+        // TODO this creates a huge list of inbound ports; much better to define on a security group using range syntax!
+        int erlangRangeStart = getConfig(ERLANG_PORT_RANGE_START).iterator().next();
+        int erlangRangeEnd = getConfig(ERLANG_PORT_RANGE_END).iterator().next();
+
+        Set<Integer> ports = MutableSet.copyOf(super.getRequiredOpenPorts());
+        Set<Integer> erlangPorts = ContiguousSet.create(Range.open(erlangRangeStart, erlangRangeEnd), DiscreteDomain.integers());
+        ports.addAll(erlangPorts);
+
+        return ports;
+    }
+
+    @Override
+    public void connectSensors() {
+        super.connectSensors();
+        connectServiceUpIsRunning();
+        HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getRiakWebPort());
+
+        HttpFeed.Builder httpFeedBuilder = HttpFeed.builder()
+                .entity(this)
+                .period(500, TimeUnit.MILLISECONDS)
+                .baseUri(String.format("http://%s/stats", accessible.toString()))
+                .poll(new HttpPollConfig<Integer>(NODE_GETS)
+                        .onSuccess(HttpValueFunctions.jsonContents("node_gets", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(NODE_GETS_TOTAL)
+                        .onSuccess(HttpValueFunctions.jsonContents("node_gets_total", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(NODE_PUTS)
+                        .onSuccess(HttpValueFunctions.jsonContents("node_puts", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(NODE_PUTS_TOTAL)
+                        .onSuccess(HttpValueFunctions.jsonContents("node_puts_total", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(VNODE_GETS)
+                        .onSuccess(HttpValueFunctions.jsonContents("vnode_gets", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(VNODE_GETS_TOTAL)
+                        .onSuccess(HttpValueFunctions.jsonContents("vnode_gets_total", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(VNODE_PUTS)
+                        .onSuccess(HttpValueFunctions.jsonContents("vnode_puts", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(VNODE_PUTS_TOTAL)
+                        .onSuccess(HttpValueFunctions.jsonContents("vnode_puts_total", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(READ_REPAIRS_TOTAL)
+                        .onSuccess(HttpValueFunctions.jsonContents("read_repairs_total", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(COORD_REDIRS_TOTAL)
+                        .onSuccess(HttpValueFunctions.jsonContents("coord_redirs_total", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(MEMORY_PROCESSES_USED)
+                        .onSuccess(HttpValueFunctions.jsonContents("memory_processes_used", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(SYS_PROCESS_COUNT)
+                        .onSuccess(HttpValueFunctions.jsonContents("sys_process_count", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(PBC_CONNECTS)
+                        .onSuccess(HttpValueFunctions.jsonContents("pbc_connects", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<Integer>(PBC_ACTIVE)
+                        .onSuccess(HttpValueFunctions.jsonContents("pbc_active", Integer.class))
+                        .onFailureOrException(Functions.constant(-1)))
+                .poll(new HttpPollConfig<List<String>>(RING_MEMBERS)
+                        .onSuccess(Functionals.chain(
+                                HttpValueFunctions.jsonContents("ring_members", String[].class),
+                                new Function<String[], List<String>>() {
+                                    @Nullable
+                                    @Override
+                                    public List<String> apply(@Nullable String[] strings) {
+                                        return Arrays.asList(strings);
+                                    }
+                                }
+                        ))
+                        .onFailureOrException(Functions.constant(Arrays.asList(new String[0]))));
+
+        for (AttributeSensor<Integer> sensor : ONE_MINUTE_SENSORS) {
+            httpFeedBuilder.poll(new HttpPollConfig<Integer>(sensor)
+                    .period(Duration.ONE_MINUTE)
+                    .onSuccess(HttpValueFunctions.jsonContents(sensor.getName().substring(5), Integer.class))
+                    .onFailureOrException(Functions.constant(-1)));
+        }
+
+        httpFeed = httpFeedBuilder.build();
+
+        addEnricher(Enrichers.builder().combining(NODE_GETS, NODE_PUTS).computingSum().publishing(NODE_OPS).build());
+        addEnricher(Enrichers.builder().combining(NODE_GETS_TOTAL, NODE_PUTS_TOTAL).computingSum().publishing(NODE_OPS_TOTAL).build());
+        WebAppServiceMethods.connectWebAppServerPolicies(this);
+    }
+
+    @Override
+    public void disconnectSensors() {
+        super.disconnectSensors();
+        if (httpFeed != null) {
+            httpFeed.stop();
+        }
+        disconnectServiceUpIsRunning();
+    }
+
+    @Override
+    public void joinCluster(String nodeName) {
+        getDriver().joinCluster(nodeName);
+    }
+
+    @Override
+    public void leaveCluster() {
+        getDriver().leaveCluster();
+    }
+
+    @Override
+    public void removeNode(String nodeName) {
+        getDriver().removeNode(nodeName);
+    }
+
+    @Override
+    public void bucketTypeCreate(String bucketTypeName, String bucketTypeProperties) {
+        getDriver().bucketTypeCreate(bucketTypeName, bucketTypeProperties);
+    }
+
+    @Override
+    public List<String> bucketTypeList() {
+        return getDriver().bucketTypeList();
+    }
+
+    @Override
+    public List<String> bucketTypeStatus(String bucketTypeName) {
+        return getDriver().bucketTypeStatus(bucketTypeName);
+    }
+
+    @Override
+    public void bucketTypeUpdate(String bucketTypeName, String bucketTypeProperties) {
+        getDriver().bucketTypeUpdate(bucketTypeName, bucketTypeProperties);
+    }
+
+    @Override
+    public void bucketTypeActivate(String bucketTypeName) {
+        getDriver().bucketTypeActivate(bucketTypeName);
+    }
+
+    @Override
+    public void recoverFailedNode(String nodeName) {
+        getDriver().recoverFailedNode(nodeName);
+    }
+
+    @Override
+    public Integer getRiakWebPort() {
+        return getAttribute(RiakNode.RIAK_WEB_PORT);
+    }
+
+    @Override
+    public Integer getRiakPbPort() {
+        return getAttribute(RiakNode.RIAK_PB_PORT);
+    }
+
+    @Override
+    public Integer getHandoffListenerPort() {
+        return getAttribute(RiakNode.HANDOFF_LISTENER_PORT);
+    }
+
+    @Override
+    public Integer getEpmdListenerPort() {
+        return getAttribute(RiakNode.EPMD_LISTENER_PORT);
+    }
+
+    @Override
+    public Integer getErlangPortRangeStart() {
+        return getAttribute(RiakNode.ERLANG_PORT_RANGE_START);
+    }
+
+    @Override
+    public Integer getErlangPortRangeEnd() {
+        return getAttribute(RiakNode.ERLANG_PORT_RANGE_END);
+    }
+
+    @Override
+    public Boolean isSearchEnabled() {
+        return getConfig(RiakNode.SEARCH_ENABLED);
+    }
+
+    @Override
+    public Integer getSearchSolrPort() {
+        return getConfig(RiakNode.SEARCH_SOLR_PORT);
+    }
+
+    @Override
+    public Integer getSearchSolrJmxPort() {
+        return getConfig(RiakNode.SEARCH_SOLR_JMX_PORT);
+    }
+
+    @Override
+    public String getMajorVersion() {
+        return getFullVersion().substring(0, 3);
+    }
+
+    @Override
+    public String getFullVersion() {
+        return getConfig(RiakNode.SUGGESTED_VERSION);
+    }
+
+    @Override
+    public String getOsMajorVersion() {
+        return getDriver().getOsMajorVersion();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
new file mode 100644
index 0000000..1815673
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.riak;
+
+import static brooklyn.util.ssh.BashCommands.INSTALL_CURL;
+import static brooklyn.util.ssh.BashCommands.INSTALL_TAR;
+import static brooklyn.util.ssh.BashCommands.addSbinPathCommand;
+import static brooklyn.util.ssh.BashCommands.alternatives;
+import static brooklyn.util.ssh.BashCommands.chainGroup;
+import static brooklyn.util.ssh.BashCommands.commandToDownloadUrlAs;
+import static brooklyn.util.ssh.BashCommands.ifExecutableElse;
+import static brooklyn.util.ssh.BashCommands.ifNotExecutable;
+import static brooklyn.util.ssh.BashCommands.ok;
+import static brooklyn.util.ssh.BashCommands.sudo;
+import static brooklyn.util.text.StringEscapes.BashStringEscapes.escapeLiteralForDoubleQuotedBash;
+import static java.lang.String.format;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.lifecycle.ScriptHelper;
+import brooklyn.entity.software.SshEffectorTasks;
+import brooklyn.location.OsDetails;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.net.Urls;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.ssh.SshTasks;
+import brooklyn.util.text.Strings;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+// TODO: Alter -env ERL_CRASH_DUMP path in vm.args
+public class RiakNodeSshDriver extends JavaSoftwareProcessSshDriver implements RiakNodeDriver {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RiakNodeSshDriver.class);
+    private static final String sbinPath = "$PATH:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin";
+    private static final String INSTALLING_FALLBACK = INSTALLING + "_fallback";
+
+    public RiakNodeSshDriver(final RiakNodeImpl entity, final SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    protected String getLogFileLocation() {
+        return "/var/log/riak/solr.log";
+    }
+
+    @Override
+    public RiakNodeImpl getEntity() {
+        return RiakNodeImpl.class.cast(super.getEntity());
+    }
+
+    @Override
+    public Map<String, String> getShellEnvironment() {
+        MutableMap<String, String> result = MutableMap.copyOf(super.getShellEnvironment());
+        // how to change epmd port, according to
+        // http://serverfault.com/questions/582787/how-to-change-listening-interface-of-rabbitmqs-epmd-port-4369
+        if (getEntity().getEpmdListenerPort() != null) {
+            result.put("ERL_EPMD_PORT", Integer.toString(getEntity().getEpmdListenerPort()));
+        }
+        result.put("WAIT_FOR_ERLANG", "60");
+        return result;
+    }
+
+    @Override
+    public void preInstall() {
+        resolver = Entities.newDownloader(this);
+        setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("riak-%s", getVersion()))));
+
+        // Set package install attribute
+        OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
+        if (osDetails.isLinux()) {
+            entity.setAttribute(RiakNode.RIAK_PACKAGE_INSTALL, true);
+        } else if (osDetails.isMac()) {
+            entity.setAttribute(RiakNode.RIAK_PACKAGE_INSTALL, false);
+        }
+    }
+
+    @Override
+    public void install() {
+        if (entity.getConfig(Attributes.DOWNLOAD_URL) != null) {
+            LOG.warn("Ignoring download.url {}, use download.url.rhelcentos or download.url.mac", entity.getConfig(Attributes.DOWNLOAD_URL));
+        }
+
+        OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
+        List<String> commands = Lists.newLinkedList();
+        if (osDetails.isLinux()) {
+            if (getEntity().isPackageDownloadUrlProvided()) {
+                commands.addAll(installLinuxFromPackageUrl());
+            } else {
+                commands.addAll(installFromPackageCloud());
+            }
+        } else if (osDetails.isMac()) {
+            commands.addAll(installMac());
+        } else if (osDetails.isWindows()) {
+            throw new UnsupportedOperationException("RiakNode not supported on Windows instances");
+        } else {
+            throw new IllegalStateException("Machine was not detected as linux, mac or windows! Installation does not know how to proceed with " +
+                    getMachine() + ". Details: " + getMachine().getMachineDetails().getOsDetails());
+        }
+
+        int result = newScript(INSTALLING)
+                .body.append(commands)
+                .failIfBodyEmpty()
+                .execute();
+
+        if (result != 0 && osDetails.isLinux()) {
+            result = newScript(INSTALLING_FALLBACK)
+                    .body.append(installLinuxFromPackageUrl())
+                    .execute();
+        }
+
+        if (result != 0) {
+            throw new IllegalStateException(String.format("Install failed with result %d", result));
+        }
+    }
+
+    private List<String> installLinuxFromPackageUrl() {
+        DynamicTasks.queueIfPossible(SshTasks.dontRequireTtyForSudo(getMachine(), SshTasks.OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL)).orSubmitAndBlock();
+
+        String expandedInstallDir = getExpandedInstallDir();
+        String installBin = Urls.mergePaths(expandedInstallDir, "bin");
+        String saveAsYum = "riak.rpm";
+        String saveAsApt = "riak.deb";
+        OsDetails osDetails = getMachine().getOsDetails();
+
+        String downloadUrl;
+        String osReleaseCmd;
+        if ("debian".equalsIgnoreCase(osDetails.getName())) {
+            // TODO osDetails.getName() is returning "linux", instead of debian/ubuntu on AWS with jenkins image,
+            //      running as integration test targetting localhost.
+            // TODO Debian support (default debian image fails with 'sudo: command not found')
+            downloadUrl = (String)entity.getAttribute(RiakNode.DOWNLOAD_URL_DEBIAN);
+            osReleaseCmd = osDetails.getVersion().substring(0, osDetails.getVersion().indexOf("."));
+        } else {
+            // assume Ubuntu
+            downloadUrl = (String)entity.getAttribute(RiakNode.DOWNLOAD_URL_UBUNTU);
+            osReleaseCmd = "`lsb_release -sc` && " +
+                    "export OS_RELEASE=`([[ \"lucid natty precise\" =~ (^| )\\$OS_RELEASE($| ) ]] && echo $OS_RELEASE || echo precise)`";
+        }
+        String apt = chainGroup(
+                //debian fix
+                "export PATH=" + sbinPath,
+                "which apt-get",
+                ok(sudo("apt-get -y --allow-unauthenticated install logrotate libpam0g-dev libssl0.9.8")),
+                "export OS_NAME=" + Strings.toLowerCase(osDetails.getName()),
+                "export OS_RELEASE=" + osReleaseCmd,
+                String.format("wget -O %s %s", saveAsApt, downloadUrl),
+                sudo(String.format("dpkg -i %s", saveAsApt)));
+        String yum = chainGroup(
+                "which yum",
+                ok(sudo("yum -y install openssl")),
+                String.format("wget -O %s %s", saveAsYum, entity.getAttribute(RiakNode.DOWNLOAD_URL_RHEL_CENTOS)),
+                sudo(String.format("yum localinstall -y %s", saveAsYum)));
+        return ImmutableList.<String>builder()
+                .add("mkdir -p " + installBin)
+                .add(INSTALL_CURL)
+                .add(alternatives(apt, yum))
+                .add("ln -s `which riak` " + Urls.mergePaths(installBin, "riak"))
+                .add("ln -s `which riak-admin` " + Urls.mergePaths(installBin, "riak-admin"))
+                .build();
+    }
+
+    private List<String> installFromPackageCloud() {
+        OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
+        return ImmutableList.<String>builder()
+                .add(osDetails.getName().toLowerCase().contains("debian") ? addSbinPathCommand() : "")
+                .add(ifNotExecutable("curl", INSTALL_CURL))
+                .addAll(ifExecutableElse("yum", installDebianBased(), installRpmBased()))
+                .build();
+    }
+
+    private ImmutableList<String> installDebianBased() {
+        return ImmutableList.<String>builder()
+                .add("curl https://packagecloud.io/install/repositories/basho/riak/script.deb.sh | " + BashCommands.sudo("bash"))
+                .add(BashCommands.sudo("apt-get install --assume-yes riak=" + getEntity().getFullVersion() + "-1"))
+                .build();
+    }
+
+    private ImmutableList<String> installRpmBased() {
+        return ImmutableList.<String>builder()
+                .add("curl https://packagecloud.io/install/repositories/basho/riak/script.rpm.sh | " + BashCommands.sudo("bash"))
+                .add(BashCommands.sudo("yum install -y riak-" + getEntity().getFullVersion() + "*"))
+                .build();
+    }
+
+    protected List<String> installMac() {
+        String saveAs = resolver.getFilename();
+        String url = entity.getAttribute(RiakNode.DOWNLOAD_URL_MAC);
+        return ImmutableList.<String>builder()
+                .add(INSTALL_TAR)
+                .add(INSTALL_CURL)
+                .add(commandToDownloadUrlAs(url, saveAs))
+                .add("tar xzvf " + saveAs)
+                .build();
+    }
+
+    @Override
+    public void customize() {
+        checkRiakOnPath();
+
+        //create entity's runDir
+        newScript(CUSTOMIZING).execute();
+
+        OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
+
+        List<String> commands = Lists.newLinkedList();
+        commands.add(sudo("mkdir -p " + getRiakEtcDir()));
+
+        if (isVersion1()) {
+            String vmArgsTemplate = processTemplate(entity.getConfig(RiakNode.RIAK_VM_ARGS_TEMPLATE_URL));
+            String saveAsVmArgs = Urls.mergePaths(getRunDir(), "vm.args");
+            DynamicTasks.queue(SshEffectorTasks.put(saveAsVmArgs).contents(vmArgsTemplate));
+            commands.add(sudo("mv " + saveAsVmArgs + " " + getRiakEtcDir()));
+
+            String appConfigTemplate = processTemplate(entity.getConfig(RiakNode.RIAK_APP_CONFIG_TEMPLATE_URL));
+            String saveAsAppConfig = Urls.mergePaths(getRunDir(), "app.config");
+            DynamicTasks.queue(SshEffectorTasks.put(saveAsAppConfig).contents(appConfigTemplate));
+            commands.add(sudo("mv " + saveAsAppConfig + " " + getRiakEtcDir()));
+        } else {
+            String templateUrl = osDetails.isMac() ? entity.getConfig(RiakNode.RIAK_CONF_TEMPLATE_URL_MAC) :
+                    entity.getConfig(RiakNode.RIAK_CONF_TEMPLATE_URL_LINUX);
+            String riakConfContent = processTemplate(templateUrl);
+            String saveAsRiakConf = Urls.mergePaths(getRunDir(), "riak.conf");
+
+            if(Strings.isNonBlank(entity.getConfig(RiakNode.RIAK_CONF_ADDITIONAL_CONTENT))) {
+                String additionalConfigContent = processTemplateContents(entity.getConfig(RiakNode.RIAK_CONF_ADDITIONAL_CONTENT));
+                riakConfContent += "\n## Brooklyn note: additional config\n";
+                riakConfContent += additionalConfigContent;
+            }
+
+            DynamicTasks.queue(SshEffectorTasks.put(saveAsRiakConf).contents(riakConfContent));
+            commands.add(sudo("mv " + saveAsRiakConf + " " + getRiakEtcDir()));
+        }
+
+        //increase open file limit (default min for riak is: 4096)
+        //TODO: detect the actual limit then do the modification.
+        //TODO: modify ulimit for linux distros
+        //    commands.add(sudo("launchctl limit maxfiles 4096 32768"));
+        if (osDetails.isMac()) {
+            commands.add("ulimit -n 4096");
+        }
+
+        if (osDetails.isLinux() && isVersion1()) {
+            commands.add(sudo("chown -R riak:riak " + getRiakEtcDir()));
+        }
+
+        // TODO platform_*_dir
+        // TODO riak config log
+
+        ScriptHelper customizeScript = newScript(CUSTOMIZING)
+                .failOnNonZeroResultCode()
+                .body.append(commands);
+
+        if (!isRiakOnPath()) {
+            addRiakOnPath(customizeScript);
+        }
+        customizeScript.failOnNonZeroResultCode().execute();
+
+        if (osDetails.isLinux()) {
+            ImmutableMap<String, String> sysctl = ImmutableMap.<String, String>builder()
+                    .put("vm.swappiness", "0")
+                    .put("net.core.somaxconn", "40000")
+                    .put("net.ipv4.tcp_max_syn_backlog", "40000")
+                    .put("net.ipv4.tcp_sack",  "1")
+                    .put("net.ipv4.tcp_window_scaling",  "15")
+                    .put("net.ipv4.tcp_fin_timeout",     "1")
+                    .put("net.ipv4.tcp_keepalive_intvl", "30")
+                    .put("net.ipv4.tcp_tw_reuse",        "1")
+                    .put("net.ipv4.tcp_moderate_rcvbuf", "1")
+                    .build();
+
+            ScriptHelper optimize = newScript(CUSTOMIZING + "network")
+                .body.append(sudo("sysctl " + Joiner.on(' ').withKeyValueSeparator("=").join(sysctl)));
+
+            Optional<Boolean> enable = Optional.fromNullable(entity.getConfig(RiakNode.OPTIMIZE_HOST_NETWORKING));
+            if (!enable.isPresent()) optimize.inessential();
+            if (enable.or(true)) optimize.execute();
+        }
+
+        //set the riak node name
+        entity.setAttribute(RiakNode.RIAK_NODE_NAME, format("riak@%s", getSubnetHostname()));
+    }
+
+    @Override
+    public void launch() {
+        List<String> commands = Lists.newLinkedList();
+
+        if (isPackageInstall()) {
+            commands.add(addSbinPathCommand());
+            commands.add(sudo(format("sh -c \"ulimit -n %s && service riak start\"", maxOpenFiles())));
+        } else {
+            // NOTE: See instructions at http://superuser.com/questions/433746/is-there-a-fix-for-the-too-many-open-files-in-system-error-on-os-x-10-7-1
+            // for increasing the system limit for number of open files
+            commands.add("ulimit -n 65536 || true"); // `BashCommands.ok` will put this in parentheses, which will set ulimit -n in the subshell
+            commands.add(format("%s start >/dev/null 2>&1 < /dev/null &", getRiakCmd()));
+        }
+
+        ScriptHelper launchScript = newScript(LAUNCHING)
+                .body.append(commands);
+
+        if (!isRiakOnPath()) {
+            addRiakOnPath(launchScript);
+        }
+        launchScript.failOnNonZeroResultCode().execute();
+
+        String mainUri = String.format("http://%s:%s/admin", entity.getAttribute(Attributes.HOSTNAME), entity.getAttribute(RiakNode.RIAK_WEB_PORT));
+        entity.setAttribute(Attributes.MAIN_URI, URI.create(mainUri));
+    }
+
+    @Override
+    public void stop() {
+        leaveCluster();
+
+        String command = format("%s stop", getRiakCmd());
+        command = isPackageInstall() ? sudo(command) : command;
+
+        ScriptHelper stopScript = newScript(ImmutableMap.of(USE_PID_FILE, false), STOPPING)
+                .body.append(command);
+
+        if (!isRiakOnPath()) {
+            addRiakOnPath(stopScript);
+        }
+
+        int result = stopScript.failOnNonZeroResultCode().execute();
+        if (result != 0) {
+            newScript(ImmutableMap.of(USE_PID_FILE, false), STOPPING).execute();
+        }
+    }
+
+    @Override
+    public boolean isRunning() {
+        // Version 2.0.0 requires sudo for `riak ping`
+        ScriptHelper checkRunningScript = newScript(CHECK_RUNNING)
+                .body.append(sudo(format("%s ping", getRiakCmd())));
+
+        if (!isRiakOnPath()) {
+            addRiakOnPath(checkRunningScript);
+        }
+        return (checkRunningScript.execute() == 0);
+    }
+
+    public boolean isPackageInstall() {
+        return entity.getAttribute(RiakNode.RIAK_PACKAGE_INSTALL);
+    }
+
+    public boolean isRiakOnPath() {
+        return entity.getAttribute(RiakNode.RIAK_ON_PATH);
+    }
+
+    public String getRiakEtcDir() {
+        return isPackageInstall() ? "/etc/riak" : Urls.mergePaths(getExpandedInstallDir(), "etc");
+    }
+
+    protected String getRiakCmd() {
+        return isPackageInstall() ? "riak" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak");
+    }
+
+    protected String getRiakAdminCmd() {
+        return isPackageInstall() ? "riak-admin" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin");
+    }
+
+    // TODO find a way to batch commit the changes, instead of committing for every operation.
+
+    @Override
+    public void joinCluster(String nodeName) {
+        if (getRiakName().equals(nodeName)) {
+            log.warn("Cannot join Riak node: {} to itself", nodeName);
+        } else {
+            if (!hasJoinedCluster()) {
+                ScriptHelper joinClusterScript = newScript("joinCluster")
+                        .body.append(sudo(format("%s cluster join %s", getRiakAdminCmd(), nodeName)))
+                        .body.append(sudo(format("%s cluster plan", getRiakAdminCmd())))
+                        .body.append(sudo(format("%s cluster commit", getRiakAdminCmd())))
+                        .failOnNonZeroResultCode();
+
+                if (!isRiakOnPath()) {
+                    addRiakOnPath(joinClusterScript);
+                }
+
+                joinClusterScript.execute();
+
+                entity.setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE);
+            } else {
+                log.warn("entity {}: is already in the riak cluster", entity.getId());
+            }
+        }
+    }
+
+    @Override
+    public void leaveCluster() {
+        if (hasJoinedCluster()) {
+            ScriptHelper leaveClusterScript = newScript("leaveCluster")
+                    .body.append(sudo(format("%s cluster leave", getRiakAdminCmd())))
+                    .body.append(sudo(format("%s cluster plan", getRiakAdminCmd())))
+                    .body.append(sudo(format("%s cluster commit", getRiakAdminCmd())))
+                    .failOnNonZeroResultCode();
+
+            if (!isRiakOnPath()) {
+                addRiakOnPath(leaveClusterScript);
+            }
+
+            leaveClusterScript.execute();
+
+            entity.setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.FALSE);
+        } else {
+            log.warn("entity {}: has already left the riak cluster", entity.getId());
+        }
+    }
+
+    @Override
+    public void removeNode(String nodeName) {
+        ScriptHelper removeNodeScript = newScript("removeNode")
+                .body.append(sudo(format("%s cluster force-remove %s", getRiakAdminCmd(), nodeName)))
+                .body.append(sudo(format("%s down %s", getRiakAdminCmd(), nodeName)))
+                .body.append(sudo(format("%s cluster plan", getRiakAdminCmd())))
+                .body.append(sudo(format("%s cluster commit", getRiakAdminCmd())))
+                .failOnNonZeroResultCode();
+
+        if (!isRiakOnPath()) {
+            addRiakOnPath(removeNodeScript);
+        }
+
+        removeNodeScript.execute();
+    }
+
+    @Override
+    public void bucketTypeCreate(String bucketTypeName, String bucketTypeProperties) {
+        ScriptHelper bucketTypeCreateScript = newScript("bucket-type_create " + bucketTypeName)
+                .body.append(sudo(format("%s bucket-type create %s %s",
+                        getRiakAdminCmd(),
+                        bucketTypeName,
+                        escapeLiteralForDoubleQuotedBash(bucketTypeProperties))));
+        if(!isRiakOnPath()) {
+            addRiakOnPath(bucketTypeCreateScript);
+        }
+        bucketTypeCreateScript.body.append(sudo(format("%s bucket-type activate %s", getRiakAdminCmd(), bucketTypeName)))
+                .failOnNonZeroResultCode();
+
+        bucketTypeCreateScript.execute();
+    }
+
+    @Override
+    public List<String> bucketTypeList() {
+        ScriptHelper bucketTypeListScript = newScript("bucket-types_list")
+                .body.append(sudo(format("%s bucket-type list", getRiakAdminCmd())))
+                .gatherOutput()
+                .noExtraOutput()
+                .failOnNonZeroResultCode();
+        if (!isRiakOnPath()) {
+            addRiakOnPath(bucketTypeListScript);
+        }
+        bucketTypeListScript.execute();
+        String stdout = bucketTypeListScript.getResultStdout();
+        return Arrays.asList(stdout.split("[\\r\\n]+"));
+    }
+
+    @Override
+    public List<String> bucketTypeStatus(String bucketTypeName) {
+        ScriptHelper bucketTypeStatusScript = newScript("bucket-type_status")
+                .body.append(sudo(format("%s bucket-type status %s", getRiakAdminCmd(), bucketTypeName)))
+                .gatherOutput()
+                .noExtraOutput()
+                .failOnNonZeroResultCode();
+        if (!isRiakOnPath()) {
+            addRiakOnPath(bucketTypeStatusScript);
+        }
+        bucketTypeStatusScript.execute();
+        String stdout = bucketTypeStatusScript.getResultStdout();
+        return Arrays.asList(stdout.split("[\\r\\n]+"));
+    }
+
+    @Override
+    public void bucketTypeUpdate(String bucketTypeName, String bucketTypeProperties) {
+        ScriptHelper bucketTypeStatusScript = newScript("bucket-type_update")
+                .body.append(sudo(format("%s bucket-type update %s %s",
+                        getRiakAdminCmd(),
+                        bucketTypeName,
+                        escapeLiteralForDoubleQuotedBash(bucketTypeProperties))))
+                .failOnNonZeroResultCode();
+        if (!isRiakOnPath()) {
+            addRiakOnPath(bucketTypeStatusScript);
+        }
+        bucketTypeStatusScript.execute();
+    }
+
+    @Override
+    public void bucketTypeActivate(String bucketTypeName) {
+        ScriptHelper bucketTypeStatusScript = newScript("bucket-type_activate")
+                .body.append(sudo(format("%s bucket-type activate %s", getRiakAdminCmd(), bucketTypeName)))
+                .failOnNonZeroResultCode();
+        if (!isRiakOnPath()) {
+            addRiakOnPath(bucketTypeStatusScript);
+        }
+        bucketTypeStatusScript.execute();
+    }
+
+    @Override
+    public void recoverFailedNode(String nodeName) {
+        //TODO find ways to detect a faulty/failed node
+        //argument passed 'node' is any working node in the riak cluster
+        //following the instruction from: http://docs.basho.com/riak/latest/ops/running/recovery/failed-node/
+
+        if (hasJoinedCluster()) {
+            String failedNodeName = getRiakName();
+
+
+            String stopCommand = format("%s stop", getRiakCmd());
+            stopCommand = isPackageInstall() ? sudo(stopCommand) : stopCommand;
+
+            String startCommand = format("%s start > /dev/null 2>&1 < /dev/null &", getRiakCmd());
+            startCommand = isPackageInstall() ? sudo(startCommand) : startCommand;
+
+            ScriptHelper recoverNodeScript = newScript("recoverNode")
+                    .body.append(stopCommand)
+                    .body.append(format("%s down %s", getRiakAdminCmd(), failedNodeName))
+                    .body.append(sudo(format("rm -rf %s", getRingStateDir())))
+                    .body.append(startCommand)
+                    .body.append(sudo(format("%s cluster join %s", getRiakAdminCmd(), nodeName)))
+                    .body.append(sudo(format("%s cluster plan", getRiakAdminCmd())))
+                    .body.append(sudo(format("%s cluster commit", getRiakAdminCmd())))
+                    .failOnNonZeroResultCode();
+
+            if (!isRiakOnPath()) {
+                addRiakOnPath(recoverNodeScript);
+            }
+
+            recoverNodeScript.execute();
+
+        } else {
+            log.warn("entity {}: is not in the riak cluster", entity.getId());
+        }
+    }
+
+    @Override
+    public void setup() {
+        if(entity.getConfig(RiakNode.SEARCH_ENABLED)) {
+            // JavaSoftwareProcessSshDriver.setup() is called in order to install java
+            super.setup();
+        }
+    }
+
+    private Boolean hasJoinedCluster() {
+        return Boolean.TRUE.equals(entity.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER));
+    }
+
+    protected void checkRiakOnPath() {
+        boolean riakOnPath = newScript("riakOnPath")
+                .body.append("which riak")
+                .execute() == 0;
+        entity.setAttribute(RiakNode.RIAK_ON_PATH, riakOnPath);
+    }
+
+    private String getRiakName() {
+        return entity.getAttribute(RiakNode.RIAK_NODE_NAME);
+    }
+
+    private String getRingStateDir() {
+        //TODO: check for non-package install.
+        return isPackageInstall() ? "/var/lib/riak/ring" : Urls.mergePaths(getExpandedInstallDir(), "lib/ring");
+    }
+
+    protected boolean isVersion1() {
+        return getVersion().startsWith("1.");
+    }
+
+    @Override
+    public String getOsMajorVersion() {
+        OsDetails osDetails = getMachine().getMachineDetails().getOsDetails();
+        String osVersion = osDetails.getVersion();
+        return osVersion.contains(".") ? osVersion.substring(0, osVersion.indexOf(".")) : osVersion;
+    }
+
+    private void addRiakOnPath(ScriptHelper scriptHelper) {
+        Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath);
+//        log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable);
+        scriptHelper.environmentVariablesReset(newPathVariable);
+    }
+
+    public Integer maxOpenFiles() {
+        return entity.getConfig(RiakNode.RIAK_MAX_OPEN_FILES);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServer.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServer.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServer.java
new file mode 100644
index 0000000..f04231a
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.solr;
+
+import java.util.Map;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynConfigKeys;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.UsesJava;
+import brooklyn.entity.java.UsesJavaMXBeans;
+import brooklyn.entity.java.UsesJmx;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.location.basic.PortRanges;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.Maps;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a Solr node.
+ */
+@Catalog(name="Apache Solr Node", description="Solr is the popular, blazing fast open source enterprise search " +
+        "platform from the Apache Lucene project.", iconUrl="classpath:///solr-logo.jpeg")
+@ImplementedBy(SolrServerImpl.class)
+public interface SolrServer extends SoftwareProcess, UsesJava, UsesJmx, UsesJavaMXBeans {
+
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "4.7.2");
+
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
+            SoftwareProcess.DOWNLOAD_URL, "${driver.mirrorUrl}/${version}/solr-${version}.tgz");
+
+    /** download mirror, if desired */
+    @SetFromFlag("mirrorUrl")
+    ConfigKey<String> MIRROR_URL = ConfigKeys.newStringConfigKey("solr.install.mirror.url", "URL of mirror",
+            "http://mirrors.ukfast.co.uk/sites/ftp.apache.org/lucene/solr/");
+
+    @SetFromFlag("solrPort")
+    PortAttributeSensorAndConfigKey SOLR_PORT = new PortAttributeSensorAndConfigKey("solr.http.port", "Solr HTTP communications port",
+            PortRanges.fromString("8983+"));
+
+    @SetFromFlag("solrConfigTemplateUrl")
+    ConfigKey<String> SOLR_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
+            "solr.config.templateUrl", "Template file (in freemarker format) for the solr.xml config file", 
+            "classpath://org/apache/brooklyn/entity/nosql/solr/solr.xml");
+
+    @SetFromFlag("coreConfigMap")
+    ConfigKey<Map<String, String>> SOLR_CORE_CONFIG = ConfigKeys.newConfigKey(new TypeToken<Map<String, String>>() { },
+            "solr.core.config", "Map of core names to core configuration archive URL",
+            Maps.<String, String>newHashMap());
+
+    ConfigKey<Duration> START_TIMEOUT = ConfigKeys.newConfigKeyWithDefault(BrooklynConfigKeys.START_TIMEOUT, Duration.FIVE_MINUTES);
+
+    /* Accessors used from template */
+
+    Integer getSolrPort();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerDriver.java
new file mode 100644
index 0000000..72e1049
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerDriver.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.solr;
+
+import brooklyn.entity.basic.SoftwareProcessDriver;
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+
+public interface SolrServerDriver extends JavaSoftwareProcessDriver {
+
+    Integer getSolrPort();
+
+    String getSolrConfigTemplateUrl();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerImpl.java
new file mode 100644
index 0000000..a42d15b
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.solr;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.event.feed.http.HttpFeed;
+import brooklyn.event.feed.http.HttpPollConfig;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.location.access.BrooklynAccessUtils;
+import com.google.common.base.Functions;
+import com.google.common.net.HostAndPort;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link SolrServer}.
+ */
+public class SolrServerImpl extends SoftwareProcessImpl implements SolrServer {
+
+    @Override
+    public Integer getSolrPort() {
+        return getAttribute(SolrServer.SOLR_PORT);
+    }
+
+    @Override
+    public Class<SolrServerDriver> getDriverInterface() {
+        return SolrServerDriver.class;
+    }
+
+    private volatile HttpFeed httpFeed;
+
+    @Override 
+    protected void connectSensors() {
+        super.connectSensors();
+
+        HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getSolrPort());
+
+        String solrUri = String.format("http://%s:%d/solr", hp.getHostText(), hp.getPort());
+        setAttribute(Attributes.MAIN_URI, URI.create(solrUri));
+
+        httpFeed = HttpFeed.builder()
+                .entity(this)
+                .period(500, TimeUnit.MILLISECONDS)
+                .baseUri(solrUri)
+                .poll(new HttpPollConfig<Boolean>(SERVICE_UP)
+                        .onSuccess(HttpValueFunctions.responseCodeEquals(200))
+                        .onFailureOrException(Functions.constant(false)))
+                .build();
+    }
+
+    @Override
+    public void disconnectSensors() {
+        super.disconnectSensors();
+
+        if (httpFeed != null) httpFeed.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerSshDriver.java
new file mode 100644
index 0000000..f05624e
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/solr/SolrServerSshDriver.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.solr;
+
+import static java.lang.String.format;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.entity.java.UsesJmx;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
+import brooklyn.entity.basic.Entities;
+import brooklyn.location.Location;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.file.ArchiveUtils;
+import brooklyn.util.net.Networking;
+import brooklyn.util.net.Urls;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+import brooklyn.util.stream.Streams;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+/**
+ * Start a {@link SolrServer} in a {@link Location} accessible over ssh.
+ */
+public class SolrServerSshDriver extends JavaSoftwareProcessSshDriver implements SolrServerDriver {
+
+    private static final Logger log = LoggerFactory.getLogger(SolrServerSshDriver.class);
+
+    public SolrServerSshDriver(SolrServerImpl entity, SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    public Integer getSolrPort() { return entity.getAttribute(SolrServer.SOLR_PORT); }
+
+    @Override
+    public String getSolrConfigTemplateUrl() { return entity.getConfig(SolrServer.SOLR_CONFIG_TEMPLATE_URL); }
+
+    public String getMirrorUrl() { return entity.getConfig(SolrServer.MIRROR_URL); }
+
+    public String getPidFile() { return Os.mergePaths(getRunDir(), "solr.pid"); }
+
+    @Override
+    public void preInstall() {
+        resolver = Entities.newDownloader(this);
+        setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("solr-%s", getVersion()))));
+    }
+
+    @Override
+    public void install() {
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+
+        List<String> commands = ImmutableList.<String>builder()
+                .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
+                .add(BashCommands.INSTALL_TAR)
+                .add("tar xzfv " + saveAs)
+                .build();
+
+        newScript(INSTALLING)
+                .failOnNonZeroResultCode()
+                .body.append(commands)
+                .execute();
+    }
+
+    public Set<Integer> getPortsUsed() {
+        Set<Integer> result = Sets.newLinkedHashSet(super.getPortsUsed());
+        result.addAll(getPortMap().values());
+        return result;
+    }
+
+    private Map<String, Integer> getPortMap() {
+        return ImmutableMap.<String, Integer>builder()
+                .put("solrPort", getSolrPort())
+                .put("jmxPort", entity.getAttribute(UsesJmx.JMX_PORT))
+                .put("rmiPort", entity.getAttribute(UsesJmx.RMI_REGISTRY_PORT))
+                .build();
+    }
+
+    @Override
+    public void customize() {
+        log.debug("Customizing {}", entity);
+        Networking.checkPortsValid(getPortMap());
+
+        ImmutableList.Builder<String> commands = new ImmutableList.Builder<String>()
+                .add("mkdir contrib")
+                .add("mkdir solr")
+                .add(String.format("cp -R %s/example/{etc,contexts,lib,logs,resources,webapps} .", getExpandedInstallDir()))
+                .add(String.format("cp %s/example/start.jar .", getExpandedInstallDir()))
+                .add(String.format("cp %s/dist/*.jar lib/", getExpandedInstallDir()))
+                .add(String.format("cp %s/contrib/*/lib/*.jar contrib/", getExpandedInstallDir()));
+
+        newScript(CUSTOMIZING)
+                .body.append(commands.build())
+                .execute();
+
+        // Copy the solr.xml configuration file across
+        String configFileContents = processTemplate(getSolrConfigTemplateUrl());
+        String destinationConfigFile = String.format("%s/solr/solr.xml", getRunDir());
+        getMachine().copyTo(Streams.newInputStreamWithContents(configFileContents), destinationConfigFile);
+
+        // Copy the core definitions across
+        Map<String, String> coreConfig = entity.getConfig(SolrServer.SOLR_CORE_CONFIG);
+        for (String core : coreConfig.keySet()) {
+            String url = coreConfig.get(core);
+            String solr = Urls.mergePaths(getRunDir(), "solr");
+            ArchiveUtils.deploy(url, getMachine(), solr);
+        }
+    }
+
+    @Override
+    public void launch() {
+        newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING)
+                .body.append("nohup java $JAVA_OPTS -jar start.jar > ./logs/console.log 2>&1 &")
+                .execute();
+    }
+
+    @Override
+    public boolean isRunning() {
+        return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0;
+    }
+
+    @Override
+    public void stop() {
+        newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute();
+    }
+
+    @Override
+    protected String getLogFileLocation() {
+        return Urls.mergePaths(getRunDir(), "solr", "logs", "solr.log");
+    }
+}


Mime
View raw message