brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [16/51] [abbrv] [partial] brooklyn-library git commit: move subdir from incubator up a level as it is promoted to its own repo (first non-incubator commit!)
Date Mon, 01 Feb 2016 17:47:52 GMT
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
deleted file mode 100644
index 43bf4b2..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java
+++ /dev/null
@@ -1,606 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.cassandra;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.math.BigInteger;
-import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-import javax.management.ObjectName;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.location.MachineLocation;
-import org.apache.brooklyn.api.location.MachineProvisioningLocation;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.effector.EffectorBody;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.location.Machines;
-import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
-import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
-import org.apache.brooklyn.core.sensor.DependentConfiguration;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.enricher.stock.Enrichers;
-import org.apache.brooklyn.entity.java.JavaAppUtils;
-import org.apache.brooklyn.entity.java.UsesJmx;
-import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
-import org.apache.brooklyn.feed.function.FunctionFeed;
-import org.apache.brooklyn.feed.function.FunctionPollConfig;
-import org.apache.brooklyn.feed.jmx.JmxAttributePollConfig;
-import org.apache.brooklyn.feed.jmx.JmxFeed;
-import org.apache.brooklyn.feed.jmx.JmxHelper;
-import org.apache.brooklyn.feed.jmx.JmxOperationPollConfig;
-import org.apache.brooklyn.policy.enricher.RollingTimeWindowMeanEnricher;
-import org.apache.brooklyn.policy.enricher.TimeWeightedDeltaEnricher;
-import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.core.text.TemplateProcessor;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.guava.Functionals;
-import org.apache.brooklyn.util.guava.Maybe;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Splitter;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.net.HostAndPort;
-
-/**
- * Implementation of {@link CassandraNode}.
- */
-public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraNode {
-
-    private static final Logger log = LoggerFactory.getLogger(CassandraNodeImpl.class);
-
-    private final AtomicReference<Boolean> detectedCloudSensors = new AtomicReference<Boolean>(false);
-    
-    public CassandraNodeImpl() {
-    }
-    
-    @Override
-    public void init() {
-        super.init();
-        
-        // TODO PERSISTENCE WORKAROUND kept anonymous class in case referenced in persisted state
-        new EffectorBody<String>() {
-            @Override
-            public String call(ConfigBag parameters) {
-                return executeScript((String)parameters.getStringKey("commands"));
-            }
-        };
-        
-        getMutableEntityType().addEffector(EXECUTE_SCRIPT, new ExecuteScriptEffectorBody(this));
-        
-        Entities.checkRequiredUrl(this, getCassandraConfigTemplateUrl());
-        Entities.getRequiredUrlConfig(this, CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL);
-        
-        connectEnrichers();
-    }
-    
-    private static class ExecuteScriptEffectorBody extends EffectorBody<String> {
-        private final CassandraNode entity;
-        
-        public ExecuteScriptEffectorBody(CassandraNode entity) {
-            this.entity = entity;
-        }
-        @Override
-        public String call(ConfigBag parameters) {
-            return entity.executeScript((String)parameters.getStringKey("commands"));
-        }
-    }
-    
-    /**
-     * Some clouds (e.g. Rackspace) give us VMs that have two nics: one for private and one for public.
-     * If the private IP is used then it doesn't work, even for a cluster purely internal to Rackspace!
-     * 
-     * TODO Ugly. Need to understand more and find a better fix. Perhaps in Cassandra itself if necessary.
-     * Also need to investigate further:
-     *  - does it still fail if BroadcastAddress is set to private IP?
-     *  - is `openIptables` opening it up for both interfaces?
-     *  - for aws->rackspace comms between nodes (thus using the public IP), will it be listening on an accessible port?
-     *  - ideally do a check, open a server on one port on the machine, see if it is contactable on the public address;
-     *    and set that as a flag on the cloud
-     */
-    protected void setCloudPreferredSensorNames() {
-        if (detectedCloudSensors.get()) return;
-        synchronized (detectedCloudSensors) {
-            if (detectedCloudSensors.get()) return;
-
-            MachineProvisioningLocation<?> loc = getProvisioningLocation();
-            if (loc != null) {
-                try {
-                    Method method = loc.getClass().getMethod("getProvider");
-                    method.setAccessible(true);
-                    String provider = (String) method.invoke(loc);
-                    String result = "(nothing special)";
-                    if (provider!=null) {
-                        if (provider.contains("rackspace") || provider.contains("cloudservers") || provider.contains("softlayer")) {
-                            /* These clouds have 2 NICs and it has to be consistent, so use public IP here to allow external access;
-                             * (TODO internal access could be configured to improve performance / lower cost, 
-                             * if we know all nodes are visible to each other) */
-                            if (getConfig(LISTEN_ADDRESS_SENSOR)==null)
-                                config().set(LISTEN_ADDRESS_SENSOR, CassandraNode.ADDRESS.getName());
-                            if (getConfig(BROADCAST_ADDRESS_SENSOR)==null)
-                                config().set(BROADCAST_ADDRESS_SENSOR, CassandraNode.ADDRESS.getName());
-                            result = "public IP for both listen and broadcast";
-                        } else if (provider.contains("google-compute")) {
-                            /* Google nodes cannot reach themselves/each-other on the public IP,
-                             * and there is no hostname, so use private IP here */
-                            if (getConfig(LISTEN_ADDRESS_SENSOR)==null)
-                                config().set(LISTEN_ADDRESS_SENSOR, CassandraNode.SUBNET_HOSTNAME.getName());
-                            if (getConfig(BROADCAST_ADDRESS_SENSOR)==null)
-                                config().set(BROADCAST_ADDRESS_SENSOR, CassandraNode.SUBNET_HOSTNAME.getName());
-                            result = "private IP for both listen and broadcast";
-                        }
-                    }
-                    log.debug("Cassandra NICs inferred {} for {}; using location {}, based on provider {}", new Object[] {result, this, loc, provider});
-                } catch (Exception e) {
-                    log.debug("Cassandra NICs auto-detection failed for {} in location {}: {}", new Object[] {this, loc, e});
-                }
-            }
-            detectedCloudSensors.set(true);
-        }
-    }
-    
-    @Override
-    protected void preStart() {
-        super.preStart();
-        setCloudPreferredSensorNames();
-    }
-    
-    // Used for freemarker
-    public String getMajorMinorVersion() {
-        String version = getConfig(CassandraNode.SUGGESTED_VERSION);
-        if (Strings.isBlank(version)) return "";
-        List<String> versionParts = ImmutableList.copyOf(Splitter.on(".").split(version));
-        return versionParts.get(0) + (versionParts.size() > 1 ? "."+versionParts.get(1) : "");
-    }
-    
-    public String getCassandraConfigTemplateUrl() {
-        String templatedUrl = getConfig(CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL);
-        return TemplateProcessor.processTemplateContents(templatedUrl, this, ImmutableMap.<String, Object>of());
-    }
-
-    @Override public Integer getGossipPort() { return getAttribute(CassandraNode.GOSSIP_PORT); }
-    @Override public Integer getSslGossipPort() { return getAttribute(CassandraNode.SSL_GOSSIP_PORT); }
-    @Override public Integer getThriftPort() { return getAttribute(CassandraNode.THRIFT_PORT); }
-    @Override public Integer getNativeTransportPort() { return getAttribute(CassandraNode.NATIVE_TRANSPORT_PORT); }
-    @Override public String getClusterName() { return getAttribute(CassandraNode.CLUSTER_NAME); }
-    
-    @Override public int getNumTokensPerNode() {
-        return getConfig(CassandraNode.NUM_TOKENS_PER_NODE);
-    }
-
-    @Override public Set<BigInteger> getTokens() {
-        // Prefer an already-set attribute over the config.
-        // Prefer TOKENS over TOKEN.
-        Set<BigInteger> tokens = getAttribute(CassandraNode.TOKENS);
-        if (tokens == null) {
-            tokens = getConfig(CassandraNode.TOKENS);
-        }
-        return tokens;
-    }
-    
-    @Override public String getTokensAsString() {
-        // TODO check what is required when replacing failed node.
-        // with vnodes in Cassandra 2.x, don't bother supplying token
-        Set<BigInteger> tokens = getTokens();
-        if (tokens == null) return "";
-        return Joiner.on(",").join(tokens);
-    }
-    
-    @Override public String getListenAddress() {
-        String sensorName = getConfig(LISTEN_ADDRESS_SENSOR);
-        if (Strings.isNonBlank(sensorName))
-            return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked();
-        
-        String subnetAddress = getAttribute(CassandraNode.SUBNET_ADDRESS);
-        return Strings.isNonBlank(subnetAddress) ? subnetAddress : getAttribute(CassandraNode.ADDRESS);
-    }
-    
-    @Override public String getBroadcastAddress() {
-        String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR);
-        if (Strings.isNonBlank(sensorName))
-            return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked();
-        
-        String snitchName = getConfig(CassandraNode.ENDPOINT_SNITCH_NAME);
-        if (snitchName.equals("Ec2MultiRegionSnitch") || snitchName.contains("MultiCloudSnitch")) {
-            // http://www.datastax.com/documentation/cassandra/2.0/mobile/cassandra/architecture/architectureSnitchEC2MultiRegion_c.html
-            // describes that the listen_address is set to the private IP, and the broadcast_address is set to the public IP.
-            return getAttribute(CassandraNode.ADDRESS);
-        } else if (!getDriver().isClustered()) {
-            return getListenAddress();
-        } else {
-            // In other situations, prefer the hostname, so other regions can see it
-            // *Unless* hostname resolves at the target to a local-only interface which is different to ADDRESS
-            // (workaround for issue deploying to localhost)
-            String hostname = getAttribute(CassandraNode.HOSTNAME);
-            try {
-                String resolvedAddress = getDriver().getResolvedAddress(hostname);
-                if (resolvedAddress==null) {
-                    log.debug("Cassandra using broadcast address "+getListenAddress()+" for "+this+" because hostname "+hostname+" could not be resolved at remote machine");
-                    return getListenAddress();
-                }
-                if (resolvedAddress.equals("127.0.0.1")) {
-                    log.debug("Cassandra using broadcast address "+getListenAddress()+" for "+this+" because hostname "+hostname+" resolves to 127.0.0.1");
-                    return getListenAddress();                    
-                }
-                return hostname;
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                log.warn("Error resolving hostname "+hostname+" for "+this+": "+e, e);
-                return hostname;
-            }
-        }
-    }
-    /** not always the private IP, if public IP has been insisted on for broadcast, e.g. setting up a rack topology */
-    // have not confirmed this does the right thing in all clouds ... only used for rack topology however
-    public String getPrivateIp() {
-        String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR);
-        if (Strings.isNonBlank(sensorName)) {
-            return getAttribute(Sensors.newStringSensor(sensorName));
-        } else {
-            String subnetAddress = getAttribute(CassandraNode.SUBNET_ADDRESS);
-            return Strings.isNonBlank(subnetAddress) ? subnetAddress : getAttribute(CassandraNode.ADDRESS);
-        }
-    }
-    public String getPublicIp() {
-        // may need to be something else in google
-        return getAttribute(CassandraNode.ADDRESS);
-    }
-
-    @Override public String getRpcAddress() {
-        String sensorName = getConfig(RPC_ADDRESS_SENSOR);
-        if (Strings.isNonBlank(sensorName))
-            return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked();
-        return "0.0.0.0";
-    }
-    
-    @Override public String getSeeds() { 
-        Set<Entity> seeds = getConfig(CassandraNode.INITIAL_SEEDS);
-        if (seeds==null) {
-            log.warn("No seeds available when requested for "+this, new Throwable("source of no Cassandra seeds when requested"));
-            return null;
-        }
-        String snitchName = getConfig(CassandraNode.ENDPOINT_SNITCH_NAME);
-        MutableSet<String> seedsHostnames = MutableSet.of();
-        for (Entity entity : seeds) {
-            // tried removing ourselves if there are other nodes, but that is a BAD idea!
-            // blows up with a "java.lang.RuntimeException: No other nodes seen!"
-            
-            if (snitchName.equals("Ec2MultiRegionSnitch") || snitchName.contains("MultiCloudSnitch")) {
-                // http://www.datastax.com/documentation/cassandra/2.0/mobile/cassandra/architecture/architectureSnitchEC2MultiRegion_c.html
-                // says the seeds should be public IPs.
-                seedsHostnames.add(entity.getAttribute(CassandraNode.ADDRESS));
-            } else {
-                String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR);
-                if (Strings.isNonBlank(sensorName)) {
-                    seedsHostnames.add(entity.getAttribute(Sensors.newStringSensor(sensorName)));
-                } else {
-                    Maybe<String> optionalSeedHostname = Machines.findSubnetOrPublicHostname(entity);
-                    if (optionalSeedHostname.isPresent()) {
-                        String seedHostname = optionalSeedHostname.get();
-                        seedsHostnames.add(seedHostname);
-                    } else {
-                        log.warn("In node {}, seed hostname missing for {}; not including in seeds list", this, entity);
-                    }
-                }
-            }
-        }
-        
-        String result = Strings.join(seedsHostnames, ",");
-        log.info("Seeds for {}: {}", this, result);
-        return result;
-    }
-
-    // referenced by cassandra-rackdc.properties, read by some of the cassandra snitches
-    public String getDatacenterName() {
-        String name = getAttribute(CassandraNode.DATACENTER_NAME);
-        if (name == null) {
-            MachineLocation machine = getMachineOrNull();
-            MachineProvisioningLocation<?> provisioningLocation = getProvisioningLocation();
-            if (machine != null) {
-                name = machine.getConfig(CloudLocationConfig.CLOUD_REGION_ID);
-            }
-            if (name == null && provisioningLocation != null) {
-                name = provisioningLocation.getConfig(CloudLocationConfig.CLOUD_REGION_ID);
-            }
-            if (name == null) {
-                name = "UNKNOWN_DATACENTER";
-            }
-            sensors().set((AttributeSensor<String>)DATACENTER_NAME, name);
-        }
-        return name;
-    }
-
-    public String getRackName() {
-        String name = getAttribute(CassandraNode.RACK_NAME);
-        if (name == null) {
-            MachineLocation machine = getMachineOrNull();
-            MachineProvisioningLocation<?> provisioningLocation = getProvisioningLocation();
-            if (machine != null) {
-                name = machine.getConfig(CloudLocationConfig.CLOUD_AVAILABILITY_ZONE_ID);
-            }
-            if (name == null && provisioningLocation != null) {
-                name = provisioningLocation.getConfig(CloudLocationConfig.CLOUD_AVAILABILITY_ZONE_ID);
-            }
-            if (name == null) {
-                name = "UNKNOWN_RACK";
-            }
-            sensors().set((AttributeSensor<String>)RACK_NAME, name);
-        }
-        return name;
-    }
-
-    @Override
-    public Class<? extends CassandraNodeDriver> getDriverInterface() {
-        return CassandraNodeDriver.class;
-    }
-    
-    @Override
-    public CassandraNodeDriver getDriver() {
-        return (CassandraNodeDriver) super.getDriver();
-    }
-
-    private volatile JmxFeed jmxFeed;
-    private volatile FunctionFeed functionFeed;
-    private JmxFeed jmxMxBeanFeed;
-    private JmxHelper jmxHelper;
-    private ObjectName storageServiceMBean = JmxHelper.createObjectName("org.apache.cassandra.db:type=StorageService");
-    private ObjectName readStageMBean = JmxHelper.createObjectName("org.apache.cassandra.request:type=ReadStage");
-    private ObjectName mutationStageMBean = JmxHelper.createObjectName("org.apache.cassandra.request:type=MutationStage");
-    private ObjectName snitchMBean = JmxHelper.createObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo");
-
-    
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    @Override
-    protected void connectSensors() {
-        // "cassandra" isn't really a protocol, but okay for now
-        sensors().set(DATASTORE_URL, "cassandra://"+getAttribute(HOSTNAME)+":"+getAttribute(THRIFT_PORT));
-        
-        super.connectSensors();
-
-        jmxHelper = new JmxHelper(this);
-        boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS);
-        
-        if (getDriver().isJmxEnabled()) {
-            jmxFeed = JmxFeed.builder()
-                    .entity(this)
-                    .period(3000, TimeUnit.MILLISECONDS)
-                    .helper(jmxHelper)
-                    .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
-                            .objectName(storageServiceMBean)
-                            .attributeName("Initialized")
-                            .onSuccess(Functions.forPredicate(Predicates.notNull()))
-                            .onException(Functions.constant(false))
-                            .suppressDuplicates(true))
-                    .pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS)
-                            .objectName(storageServiceMBean)
-                            .attributeName("TokenToEndpointMap")
-                            .onSuccess(new Function<Object, Set<BigInteger>>() {
-                                @Override
-                                public Set<BigInteger> apply(@Nullable Object arg) {
-                                    Map input = (Map)arg;
-                                    if (input == null || input.isEmpty()) return null;
-                                    // FIXME does not work on aws-ec2, uses RFC1918 address
-                                    Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME)));
-                                    Set<String> tokens = Maps.filterValues(input, self).keySet();
-                                    Set<BigInteger> result = Sets.newLinkedHashSet();
-                                    for (String token : tokens) {
-                                        result.add(new BigInteger(token));
-                                    }
-                                    return result;
-                                }})
-                            .onException(Functions.<Set<BigInteger>>constant(null))
-                            .suppressDuplicates(true))
-                    .pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME)
-                            .period(60, TimeUnit.SECONDS)
-                            .objectName(snitchMBean)
-                            .operationName("getDatacenter")
-                            .operationParams(ImmutableList.of(getBroadcastAddress()))
-                            .onException(Functions.<String>constant(null))
-                            .suppressDuplicates(true))
-                    .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME)
-                            .period(60, TimeUnit.SECONDS)
-                            .objectName(snitchMBean)
-                            .operationName("getRack")
-                            .operationParams(ImmutableList.of(getBroadcastAddress()))
-                            .onException(Functions.<String>constant(null))
-                            .suppressDuplicates(true))
-                    .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS)
-                            .objectName(storageServiceMBean)
-                            .attributeName("TokenToEndpointMap")
-                            .onSuccess(new Function<Object, Integer>() {
-                                @Override
-                                public Integer apply(@Nullable Object arg) {
-                                    Map input = (Map)arg;
-                                    if (input == null || input.isEmpty()) return 0;
-                                    return input.size();
-                                }
-                            })
-                            .onException(Functions.constant(-1)))
-                    .pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT)
-                            .objectName(storageServiceMBean)
-                            .attributeName("LiveNodes")
-                            .onSuccess(new Function<Object, Integer>() {
-                                @Override
-                                public Integer apply(@Nullable Object arg) {
-                                    List input = (List)arg;
-                                    if (input == null || input.isEmpty()) return 0;
-                                    return input.size();
-                                }
-                            })
-                            .onException(Functions.constant(-1)))
-                    .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE)
-                            .objectName(readStageMBean)
-                            .attributeName("ActiveCount")
-                            .onException(Functions.constant((Integer)null))
-                            .enabled(retrieveUsageMetrics))
-                    .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING)
-                            .objectName(readStageMBean)
-                            .attributeName("PendingTasks")
-                            .onException(Functions.constant((Long)null))
-                            .enabled(retrieveUsageMetrics))
-                    .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED)
-                            .objectName(readStageMBean)
-                            .attributeName("CompletedTasks")
-                            .onException(Functions.constant((Long)null))
-                            .enabled(retrieveUsageMetrics))
-                    .pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE)
-                            .objectName(mutationStageMBean)
-                            .attributeName("ActiveCount")
-                            .onException(Functions.constant((Integer)null))
-                            .enabled(retrieveUsageMetrics))
-                    .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING)
-                            .objectName(mutationStageMBean)
-                            .attributeName("PendingTasks")
-                            .onException(Functions.constant((Long)null))
-                            .enabled(retrieveUsageMetrics))
-                    .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED)
-                            .objectName(mutationStageMBean)
-                            .attributeName("CompletedTasks")
-                            .onException(Functions.constant((Long)null))
-                            .enabled(retrieveUsageMetrics))
-                    .build();
-            
-            jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this);
-        }
-        
-        if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) {
-            functionFeed = FunctionFeed.builder()
-                    .entity(this)
-                    .period(3000, TimeUnit.MILLISECONDS)
-                    .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY)
-                            .onException(Functions.constant(-1L))
-                            .callable(new ThriftLatencyChecker(CassandraNodeImpl.this))
-                            .enabled(retrieveUsageMetrics))
-                    .build();
-        }
-        
-        connectServiceUpIsRunning();
-    }
-    
-    protected void connectEnrichers() {
-        connectEnrichers(Duration.TEN_SECONDS);
-    }
-    
-    protected void connectEnrichers(Duration windowPeriod) {
-        JavaAppUtils.connectJavaAppServerPolicies(this);
-
-        enrichers().add(TimeWeightedDeltaEnricher.<Long>getPerSecondDeltaEnricher(this, READ_COMPLETED, READS_PER_SECOND_LAST));
-        enrichers().add(TimeWeightedDeltaEnricher.<Long>getPerSecondDeltaEnricher(this, WRITE_COMPLETED, WRITES_PER_SECOND_LAST));
-        
-        if (windowPeriod!=null) {
-            enrichers().add(new RollingTimeWindowMeanEnricher<Long>(this, THRIFT_PORT_LATENCY, 
-                    THRIFT_PORT_LATENCY_IN_WINDOW, windowPeriod));
-            enrichers().add(new RollingTimeWindowMeanEnricher<Double>(this, READS_PER_SECOND_LAST, 
-                    READS_PER_SECOND_IN_WINDOW, windowPeriod));
-            enrichers().add(new RollingTimeWindowMeanEnricher<Double>(this, WRITES_PER_SECOND_LAST, 
-                    WRITES_PER_SECOND_IN_WINDOW, windowPeriod));
-        }
-        
-        // service-up checks
-        if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) {
-            enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
-                    .from(THRIFT_PORT_LATENCY)
-                    .computing(Functionals.ifEquals(-1L).value("Thrift latency polling failed") )
-                    .build());
-        }
-        
-        if (Boolean.TRUE.equals(getConfig(UsesJmx.USE_JMX))) {
-            enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
-                    .from(SERVICE_UP_JMX)
-                    .computing(Functionals.ifEquals(false).value("JMX reports not up") )
-                    .build());
-        }
-    }
-    
-    @Override
-    public void disconnectSensors() {
-        super.disconnectSensors();
-        
-        disconnectServiceUpIsRunning();
-        if (jmxFeed != null) jmxFeed.stop();
-        if (jmxMxBeanFeed != null) jmxMxBeanFeed.stop();
-        if (jmxHelper != null) jmxHelper.terminate();
-        if (functionFeed != null) functionFeed.stop();
-    }
-
-    @Override
-    public void setToken(String token) {
-        try {
-            if (!jmxHelper.isConnected()) jmxHelper.connect();;
-            jmxHelper.operation(storageServiceMBean, "move", token);
-            log.info("Moved server {} to token {}", getId(), token);
-        } catch (IOException ioe) {
-            Throwables.propagate(ioe);
-        }
-    }
-    
-    @Override
-    public String executeScript(String commands) {
-        return getDriver().executeScriptAsync(commands).block().getStdout();
-    }
-    
-    private static class ThriftLatencyChecker implements Callable<Long> {
-        private final CassandraNode entity;
-        
-        public ThriftLatencyChecker(CassandraNode entity) {
-            this.entity = entity;
-        }
-        public Long call() {
-            Integer privatePort = entity.getThriftPort();
-            if (privatePort == null) return -1L;
-            
-            HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, privatePort);
-
-            try {
-                long start = System.currentTimeMillis();
-                Socket s = new Socket(hp.getHostText(), hp.getPort());
-                s.close();
-                long latency = System.currentTimeMillis() - start;
-                return latency;
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                if (log.isDebugEnabled())
-                    log.debug("Cassandra thrift port poll failure: "+e);
-                return -1L;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java
deleted file mode 100644
index d3230f9..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.cassandra;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.mgmt.TaskWrapper;
-import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.location.Machines;
-import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
-import org.apache.brooklyn.core.sensor.DependentConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.brooklyn.entity.database.DatastoreMixins;
-import org.apache.brooklyn.entity.java.JavaSoftwareProcessSshDriver;
-import org.apache.brooklyn.entity.java.UsesJmx;
-import org.apache.brooklyn.location.ssh.SshMachineLocation;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
-import org.apache.brooklyn.util.core.text.TemplateProcessor;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.guava.Maybe;
-import org.apache.brooklyn.util.net.Networking;
-import org.apache.brooklyn.util.os.Os;
-import org.apache.brooklyn.util.ssh.BashCommands;
-import org.apache.brooklyn.util.stream.Streams;
-import org.apache.brooklyn.util.text.Identifiers;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
-
-/**
- * Start a {@link CassandraNode} in a {@link Location} accessible over ssh.
- */
-public class CassandraNodeSshDriver extends JavaSoftwareProcessSshDriver implements CassandraNodeDriver {
-
-    private static final Logger log = LoggerFactory.getLogger(CassandraNodeSshDriver.class);
-
-    protected Maybe<String> resolvedAddressCache = Maybe.absent();
-
-    public CassandraNodeSshDriver(CassandraNodeImpl entity, SshMachineLocation machine) {
-        super(entity, machine);
-    }
-
-    @Override
-    protected String getLogFileLocation() { return Os.mergePathsUnix(getRunDir(),"cassandra.log"); }
-
-    @Override
-    public Integer getGossipPort() { return entity.getAttribute(CassandraNode.GOSSIP_PORT); }
-
-    @Override
-    public Integer getSslGossipPort() { return entity.getAttribute(CassandraNode.SSL_GOSSIP_PORT); }
-
-    @Override
-    public Integer getThriftPort() { return entity.getAttribute(CassandraNode.THRIFT_PORT); }
-
-    @Override
-    public Integer getNativeTransportPort() { return entity.getAttribute(CassandraNode.NATIVE_TRANSPORT_PORT); }
-
-    @Override
-    public String getClusterName() { return entity.getAttribute(CassandraNode.CLUSTER_NAME); }
-
-    @Override
-    public String getCassandraConfigTemplateUrl() {
-        String templatedUrl = entity.getConfig(CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL);
-        return TemplateProcessor.processTemplateContents(templatedUrl, this, ImmutableMap.<String, Object>of());
-    }
-
-    @Override
-    public String getCassandraConfigFileName() { return entity.getConfig(CassandraNode.CASSANDRA_CONFIG_FILE_NAME); }
-
-    public String getEndpointSnitchName() { return entity.getConfig(CassandraNode.ENDPOINT_SNITCH_NAME); }
-
-    public String getCassandraRackdcConfigTemplateUrl() { return entity.getConfig(CassandraNode.CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL); }
-
-    public String getCassandraRackdcConfigFileName() { return entity.getConfig(CassandraNode.CASSANDRA_RACKDC_CONFIG_FILE_NAME); }
-
-    public String getMirrorUrl() { return entity.getConfig(CassandraNode.MIRROR_URL); }
-
-    protected String getDefaultUnpackedDirectoryName() {
-        return "apache-cassandra-"+getVersion();
-    }
-
-    protected boolean isV2() {
-        String version = getVersion();
-        return version.startsWith("2.");
-    }
-
-    @Override
-    public boolean installJava() {
-        if (isV2()) {
-            return checkForAndInstallJava("1.8");
-        } else {
-            return super.installJava();
-        }
-    }
-
-    @Override
-    public void preInstall() {
-        resolver = Entities.newDownloader(this);
-        setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(getDefaultUnpackedDirectoryName())));
-    }
-
-    @Override
-    public void install() {
-        List<String> urls = resolver.getTargets();
-        String saveAs = resolver.getFilename();
-
-        List<String> commands = ImmutableList.<String>builder()
-                .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
-                .add(BashCommands.INSTALL_TAR)
-                .add("tar xzfv " + saveAs)
-                .build();
-
-        newScript(INSTALLING)
-                .body.append(commands)
-                .execute();
-    }
-
-    @Override
-    public Set<Integer> getPortsUsed() {
-        return ImmutableSet.<Integer>builder()
-                .addAll(super.getPortsUsed())
-                .addAll(getPortMap().values())
-                .build();
-    }
-
-    protected Map<String, Integer> getPortMap() {
-        return ImmutableMap.<String, Integer>builder()
-                .put("jmxPort", entity.getAttribute(UsesJmx.JMX_PORT))
-                .put("rmiPort", entity.getAttribute(UsesJmx.RMI_REGISTRY_PORT))
-                .put("gossipPort", getGossipPort())
-                .put("sslGossipPort", getSslGossipPort())
-                .put("thriftPort", getThriftPort())
-                .build();
-    }
-
-    @Override
-    public void customize() {
-        log.debug("Customizing {} (Cluster {})", entity, getClusterName());
-        Networking.checkPortsValid(getPortMap());
-
-        customizeInitialSeeds();
-
-        String logFileEscaped = getLogFileLocation().replace("/", "\\/"); // escape slashes
-
-        ImmutableList.Builder<String> commands = new ImmutableList.Builder<String>()
-                .add(String.format("cp -R %s/{bin,conf,lib,interface,pylib,tools} .", getExpandedInstallDir()))
-                .add("mkdir -p data")
-                .add("mkdir -p brooklyn_commands")
-                .add(String.format("sed -i.bk 's/log4j.appender.R.File=.*/log4j.appender.R.File=%s/g' %s/conf/log4j-server.properties", logFileEscaped, getRunDir()))
-                .add(String.format("sed -i.bk '/JMX_PORT/d' %s/conf/cassandra-env.sh", getRunDir()))
-                // Script sets 180k on Linux which gives Java error:  The stack size specified is too small, Specify at least 228k
-                .add(String.format("sed -i.bk 's/-Xss180k/-Xss280k/g' %s/conf/cassandra-env.sh", getRunDir()));
-
-        newScript(CUSTOMIZING)
-                .body.append(commands.build())
-                .failOnNonZeroResultCode()
-                .execute();
-
-        // Copy the cassandra.yaml configuration file across
-        String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf", getCassandraConfigFileName());
-        copyTemplate(getCassandraConfigTemplateUrl(), destinationConfigFile);
-
-        // Copy the cassandra-rackdc.properties configuration file across
-        String rackdcDestinationFile = Os.mergePathsUnix(getRunDir(), "conf", getCassandraRackdcConfigFileName());
-        copyTemplate(getCassandraRackdcConfigTemplateUrl(), rackdcDestinationFile);
-
-        customizeCopySnitch();
-    }
-
-    protected void customizeCopySnitch() {
-        // Copy the custom snitch jar file across
-        String customSnitchJarUrl = entity.getConfig(CassandraNode.CUSTOM_SNITCH_JAR_URL);
-        if (Strings.isNonBlank(customSnitchJarUrl)) {
-            int lastSlashIndex = customSnitchJarUrl.lastIndexOf("/");
-            String customSnitchJarName = (lastSlashIndex > 0) ? customSnitchJarUrl.substring(lastSlashIndex+1) : "customBrooklynSnitch.jar";
-            String jarDestinationFile = Os.mergePathsUnix(getRunDir(), "lib", customSnitchJarName);
-            InputStream customSnitchJarStream = checkNotNull(resource.getResourceFromUrl(customSnitchJarUrl), "%s could not be loaded", customSnitchJarUrl);
-            try {
-                getMachine().copyTo(customSnitchJarStream, jarDestinationFile);
-            } finally {
-                Streams.closeQuietly(customSnitchJarStream);
-            }
-        }
-    }
-
-    protected void customizeInitialSeeds() {
-        if (entity.getConfig(CassandraNode.INITIAL_SEEDS)==null) {
-            if (isClustered()) {
-                entity.config().set(CassandraNode.INITIAL_SEEDS,
-                    DependentConfiguration.attributeWhenReady(entity.getParent(), CassandraDatacenter.CURRENT_SEEDS));
-            } else {
-                entity.config().set(CassandraNode.INITIAL_SEEDS, MutableSet.<Entity>of(entity));
-            }
-        }
-    }
-
-    @Override
-    public boolean isClustered() {
-        return entity.getParent() instanceof CassandraDatacenter;
-    }
-
-    @Override
-    public void launch() {
-        String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get();
-        Set<Entity> seeds = getEntity().getConfig(CassandraNode.INITIAL_SEEDS);
-        List<Entity> ancestors = getCassandraAncestors();
-        log.info("Launching " + entity + ": " +
-                "cluster "+getClusterName()+", " +
-                "hostname (public) " + getEntity().getAttribute(Attributes.HOSTNAME) + ", " +
-                "hostname (subnet) " + subnetHostname + ", " +
-                "seeds "+((CassandraNode)entity).getSeeds()+" (from "+seeds+")");
-
-        boolean isFirst = seeds.iterator().next().equals(entity);
-        if (isClustered() && !isFirst && CassandraDatacenter.WAIT_FOR_FIRST) {
-            // wait for the first node
-            long firstStartTime = Entities.submit(entity, DependentConfiguration.attributeWhenReady(
-                ancestors.get(ancestors.size()-1), CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC)).getUnchecked();
-            // optionally force a delay before starting subsequent nodes; see comment at CassandraCluster.DELAY_AFTER_FIRST
-            Duration toWait = Duration.millis(firstStartTime + CassandraDatacenter.DELAY_AFTER_FIRST.toMilliseconds() -  System.currentTimeMillis());
-            if (toWait.toMilliseconds()>0) {
-                log.info("Launching " + entity + ": delaying launch of non-first node by "+toWait+" to prevent schema disagreements");
-                Tasks.setBlockingDetails("Pausing to ensure first node has time to start");
-                Time.sleep(toWait);
-                Tasks.resetBlockingDetails();
-            }
-        }
-
-        List<Entity> queuedStart = null;
-        if (CassandraDatacenter.DELAY_BETWEEN_STARTS!=null && !ancestors.isEmpty()) {
-            Entity root = ancestors.get(ancestors.size()-1);
-            // TODO currently use the class as a semaphore; messy, and obviously will not federate;
-            // should develop a brooklyn framework semaphore (similar to that done on SshMachineLocation)
-            // and use it - note however the synch block is very very short so relatively safe at least
-            synchronized (CassandraNode.class) {
-                queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES);
-                if (queuedStart==null) {
-                    queuedStart = new ArrayList<Entity>();
-                    ((EntityLocal)root).sensors().set(CassandraDatacenter.QUEUED_START_NODES, queuedStart);
-                }
-                queuedStart.add(getEntity());
-                ((EntityLocal)root).sensors().set(CassandraDatacenter.QUEUED_START_NODES, queuedStart);
-            }
-            do {
-                // get it again in case it is backed by something external
-                queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES);
-                if (queuedStart.get(0).equals(getEntity())) break;
-                synchronized (queuedStart) {
-                    try {
-                        queuedStart.wait(1000);
-                    } catch (InterruptedException e) {
-                        Exceptions.propagate(e);
-                    }
-                }
-            } while (true);
-
-            // TODO should look at last start time... but instead we always wait
-            CassandraDatacenter.DELAY_BETWEEN_STARTS.countdownTimer().waitForExpiryUnchecked();
-        }
-
-        try {
-            // Relies on `bin/cassandra -p <pidfile>`, rather than us writing pid file ourselves.
-            newScript(MutableMap.of(USE_PID_FILE, false), LAUNCHING)
-                    .body.append(
-                            // log the date to attempt to debug occasional http://wiki.apache.org/cassandra/FAQ#schema_disagreement
-                            // (can be caused by machines out of synch time-wise; but in our case it seems to be caused by other things!)
-                            "echo date on cassandra server `hostname` when launching is `date`",
-                            launchEssentialCommand(),
-                            "echo after essential command")
-                    .execute();
-            if (!isClustered()) {
-                InputStream creationScript = DatastoreMixins.getDatabaseCreationScript(entity);
-                if (creationScript!=null) {
-                    Tasks.setBlockingDetails("Pausing to ensure Cassandra (singleton) has started before running creation script");
-                    Time.sleep(Duration.seconds(20));
-                    Tasks.resetBlockingDetails();
-                    executeScriptAsync(Streams.readFullyString(creationScript));
-                }
-            }
-            if (isClustered() && isFirst) {
-                for (Entity ancestor: getCassandraAncestors()) {
-                    ((EntityLocal)ancestor).sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, System.currentTimeMillis());
-                }
-            }
-        } finally {
-            if (queuedStart!=null) {
-                Entity head = queuedStart.remove(0);
-                checkArgument(head.equals(getEntity()), "first queued node was "+head+" but we are "+getEntity());
-                synchronized (queuedStart) {
-                    queuedStart.notifyAll();
-                }
-            }
-        }
-    }
-
-    /** returns cassandra-related ancestors (datacenter, fabric), with datacenter first and fabric last */
-    protected List<Entity> getCassandraAncestors() {
-        List<Entity> result = new ArrayList<Entity>();
-        Entity ancestor = getEntity().getParent();
-        while (ancestor!=null) {
-            if (ancestor instanceof CassandraDatacenter || ancestor instanceof CassandraFabric)
-                result.add(ancestor);
-            ancestor = ancestor.getParent();
-        }
-        return result;
-    }
-
-    protected String launchEssentialCommand() {
-        if (isV2()) {
-            return String.format("./bin/cassandra -p %s > ./cassandra-console.log 2>&1", getPidFile());
-        } else {
-            // TODO Could probably get rid of the nohup here, as script does equivalent itself
-            // with `exec ... <&- &`
-            return String.format("nohup ./bin/cassandra -p %s > ./cassandra-console.log 2>&1 &", getPidFile());
-        }
-    }
-
-    public String getPidFile() { return Os.mergePathsUnix(getRunDir(), "cassandra.pid"); }
-
-    @Override
-    public boolean isRunning() {
-        return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0;
-    }
-
-    @Override
-    public void stop() {
-        newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected Map<String,String> getCustomJavaSystemProperties() {
-        return MutableMap.<String, String>builder()
-                .putAll(super.getCustomJavaSystemProperties())
-                .put("cassandra.config", getCassandraConfigFileName())
-                .build();
-    }
-
-    @Override
-    public Map<String, String> getShellEnvironment() {
-        return MutableMap.<String, String>builder()
-                .putAll(super.getShellEnvironment())
-                .put("CASSANDRA_HOME", getRunDir())
-                .put("CASSANDRA_CONF", Os.mergePathsUnix(getRunDir(), "conf"))
-                .renameKey("JAVA_OPTS", "JVM_OPTS")
-                .build();
-    }
-
-    @Override
-    public ProcessTaskWrapper<Integer> executeScriptAsync(String commands) {
-        String fileToRun = Os.mergePathsUnix("brooklyn_commands", "cassandra-commands-"+Identifiers.makeRandomId(8));
-        TaskWrapper<Void> task = SshEffectorTasks.put(Os.mergePathsUnix(getRunDir(), fileToRun))
-                .machine(getMachine())
-                .contents(commands)
-                .summary("copying cassandra script to execute "+fileToRun)
-                .newTask();
-        DynamicTasks.queueIfPossible(task).orSubmitAndBlock(getEntity()).andWaitForSuccess();
-        return executeScriptFromInstalledFileAsync(fileToRun);
-    }
-
-    public ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String fileToRun) {
-        ProcessTaskWrapper<Integer> task = SshEffectorTasks.ssh(
-                        "cd "+getRunDir(),
-                        scriptInvocationCommand(getThriftPort(), fileToRun))
-                .machine(getMachine())
-                .summary("executing cassandra script "+fileToRun)
-                .newTask();
-        DynamicTasks.queueIfPossible(task).orSubmitAndBlock(getEntity());
-        return task;
-    }
-
-    protected String scriptInvocationCommand(Integer optionalThriftPort, String fileToRun) {
-        return "bin/cassandra-cli " +
-                (optionalThriftPort != null ? "--port " + optionalThriftPort : "") +
-                " --file "+fileToRun;
-    }
-
-    @Override
-    public String getResolvedAddress(String hostname) {
-        return resolvedAddressCache.or(BrooklynAccessUtils.resolvedAddressSupplier(getEntity(), getMachine(), hostname));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerator.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerator.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerator.java
deleted file mode 100644
index 6401c03..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.cassandra;
-
-import java.math.BigInteger;
-import java.util.Set;
-
-public interface TokenGenerator {
-
-    BigInteger max();
-    BigInteger min();
-    BigInteger range();
-
-    void setOrigin(BigInteger shift);
-    
-    BigInteger newToken();
-    
-    BigInteger getTokenForReplacementNode(BigInteger oldToken);
-    
-    Set<BigInteger> getTokensForReplacementNode(Set<BigInteger> oldTokens);
-    
-    /**
-     * Indicates that we are starting a new cluster of the given number of nodes,
-     * so expect that number of consecutive calls to {@link #newToken()}.
-     * 
-     * @param numNewNodes
-     */
-    void growingCluster(int numNewNodes);
-
-    void shrinkingCluster(Set<BigInteger> nodesToRemove);
-    
-    void refresh(Set<BigInteger> currentNodes);
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerators.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerators.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerators.java
deleted file mode 100644
index f29a813..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/TokenGenerators.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.cassandra;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.brooklyn.util.collections.MutableList;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class TokenGenerators {
-
-    /**
-     * Sub-classes are recommended to call {@link #checkRangeValid()} at construction time.
-     */
-    public static abstract class AbstractTokenGenerator implements TokenGenerator, Serializable {
-        
-        private static final long serialVersionUID = -1884526356161711176L;
-        
-        public static final BigInteger TWO = BigInteger.valueOf(2);
-        
-        public abstract BigInteger max();
-        public abstract BigInteger min();
-        public abstract BigInteger range();
-
-        private final Set<BigInteger> currentTokens = Sets.newTreeSet();
-        private final List<BigInteger> nextTokens = Lists.newArrayList();
-        private BigInteger origin = BigInteger.ZERO;
-        
-        protected void checkRangeValid() {
-            Preconditions.checkState(range().equals(max().subtract(min()).add(BigInteger.ONE)), 
-                    "min=%s; max=%s; range=%s", min(), max(), range());
-        }
-        
-        @Override
-        public void setOrigin(BigInteger shift) {
-            this.origin = Preconditions.checkNotNull(shift, "shift");
-        }
-        
-        /**
-         * Unless we're explicitly starting a new cluster or resizing by a pre-defined number of nodes, then
-         * let Cassandra decide (i.e. return null).
-         */
-        @Override
-        public synchronized BigInteger newToken() {
-            BigInteger result = (nextTokens.isEmpty()) ? null : nextTokens.remove(0);
-            if (result != null) currentTokens.add(result);
-            return result;
-        }
-
-        @Override
-        public synchronized BigInteger getTokenForReplacementNode(BigInteger oldToken) {
-            checkNotNull(oldToken, "oldToken");
-            return normalize(oldToken.subtract(BigInteger.ONE));
-        }
-
-        @Override
-        public synchronized Set<BigInteger> getTokensForReplacementNode(Set<BigInteger> oldTokens) {
-            checkNotNull(oldTokens, "oldToken");
-            Set<BigInteger> result = Sets.newLinkedHashSet();
-            for (BigInteger oldToken : oldTokens) {
-                result.add(getTokenForReplacementNode(oldToken));
-            }
-            return result;
-        }
-        
-        @Override
-        public synchronized void growingCluster(int numNewNodes) {
-            if (currentTokens.isEmpty() && nextTokens.isEmpty()) {
-                nextTokens.addAll(generateEquidistantTokens(numNewNodes));
-            } else {
-                // simple strategy which iteratively finds best midpoint
-                for (int i=0; i<numNewNodes; i++) {
-                    nextTokens.add(generateBestNextToken());
-                }
-            }
-        }
-
-        @Override
-        public synchronized void shrinkingCluster(Set<BigInteger> nodesToRemove) {
-            currentTokens.remove(nodesToRemove);
-        }
-
-        @Override
-        public synchronized void refresh(Set<BigInteger> currentNodes) {
-            currentTokens.clear();
-            currentTokens.addAll(currentNodes);
-        }
-
-        private List<BigInteger> generateEquidistantTokens(int numTokens) {
-            List<BigInteger> result = Lists.newArrayList();
-            for (int i = 0; i < numTokens; i++) {
-                BigInteger token = range().multiply(BigInteger.valueOf(i)).divide(BigInteger.valueOf(numTokens)).add(min());
-                token = normalize(token.add(origin));
-                result.add(token);
-            }
-            return result;
-        }
-        
-        private BigInteger normalize(BigInteger input) {
-            while (input.compareTo(min()) < 0)
-                input = input.add(range());
-            while (input.compareTo(max()) > 0)
-                input = input.subtract(range());
-            return input;
-        }
-        
-        private BigInteger generateBestNextToken() {
-            List<BigInteger> allTokens = MutableList.<BigInteger>of().appendAll(currentTokens).appendAll(nextTokens);
-            Collections.sort(allTokens);
-            Iterator<BigInteger> ti = allTokens.iterator();
-            
-            BigInteger thisValue = ti.next();
-            BigInteger prevValue = allTokens.get(allTokens.size()-1).subtract(range());
-            
-            BigInteger bestNewTokenSoFar = normalize(prevValue.add(thisValue).divide(TWO));
-            BigInteger biggestRangeSizeSoFar = thisValue.subtract(prevValue);
-            
-            while (ti.hasNext()) {
-                prevValue = thisValue;
-                thisValue = ti.next();
-                
-                BigInteger rangeHere = thisValue.subtract(prevValue);
-                if (rangeHere.compareTo(biggestRangeSizeSoFar) > 0) {
-                    bestNewTokenSoFar = prevValue.add(thisValue).divide(TWO);
-                    biggestRangeSizeSoFar = rangeHere;
-                }
-            }
-            return bestNewTokenSoFar;
-        }
-
-    }
-
-    public static class PosNeg63TokenGenerator extends AbstractTokenGenerator {
-        private static final long serialVersionUID = 7327403957176106754L;
-        
-        public static final BigInteger MIN_TOKEN = TWO.pow(63).negate();
-        public static final BigInteger MAX_TOKEN = TWO.pow(63).subtract(BigInteger.ONE);
-        public static final BigInteger RANGE = TWO.pow(64);
-
-        public PosNeg63TokenGenerator() {
-            checkRangeValid();
-        }
-
-        @Override public BigInteger max() { return MAX_TOKEN; }
-        @Override public BigInteger min() { return MIN_TOKEN; }
-        @Override public BigInteger range() { return RANGE; }
-    }
-    
-    /** token generator used by cassandra pre v1.2 */
-    public static class NonNeg127TokenGenerator extends AbstractTokenGenerator {
-        private static final long serialVersionUID = 1357426905711548198L;
-        
-        public static final BigInteger MIN_TOKEN = BigInteger.ZERO;
-        public static final BigInteger MAX_TOKEN = TWO.pow(127).subtract(BigInteger.ONE);
-        public static final BigInteger RANGE = TWO.pow(127);
-
-        public NonNeg127TokenGenerator() {
-            checkRangeValid();
-        }
-        
-        @Override public BigInteger max() { return MAX_TOKEN; }
-        @Override public BigInteger min() { return MIN_TOKEN; }
-        @Override public BigInteger range() { return RANGE; }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java
deleted file mode 100644
index dfff6a4..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.couchbase;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.ImplementedBy;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.entity.group.DynamicCluster;
-import org.apache.brooklyn.util.core.flags.SetFromFlag;
-import org.apache.brooklyn.util.time.Duration;
-
-import com.google.common.reflect.TypeToken;
-
-@Catalog(name="CouchBase Cluster", description="Couchbase is an open source, distributed (shared-nothing architecture) "
-        + "NoSQL document-oriented database that is optimized for interactive applications.")
-@ImplementedBy(CouchbaseClusterImpl.class)
-public interface CouchbaseCluster extends DynamicCluster {
-
-    AttributeSensor<Integer> ACTUAL_CLUSTER_SIZE = Sensors.newIntegerSensor("coucbase.cluster.actualClusterSize", "returns the actual number of nodes in the cluster");
-
-    @SuppressWarnings("serial")
-    AttributeSensor<Set<Entity>> COUCHBASE_CLUSTER_UP_NODES = Sensors.newSensor(new TypeToken<Set<Entity>>() {
-    }, "couchbase.cluster.clusterEntities", "the set of service up nodes");
-
-    @SuppressWarnings("serial")
-    AttributeSensor<List<String>> COUCHBASE_CLUSTER_BUCKETS = Sensors.newSensor(new TypeToken<List<String>>() {
-    }, "couchbase.cluster.buckets", "Names of all the buckets the couchbase cluster");
-
-    AttributeSensor<Entity> COUCHBASE_PRIMARY_NODE = Sensors.newSensor(Entity.class, "couchbase.cluster.primaryNode", "The primary couchbase node to query and issue add-server and rebalance on");
-
-    AttributeSensor<Boolean> IS_CLUSTER_INITIALIZED = Sensors.newBooleanSensor("couchbase.cluster.isClusterInitialized", "flag to emit if the couchbase cluster was intialized");
-
-    @SetFromFlag("clusterName")
-    ConfigKey<String> CLUSTER_NAME = ConfigKeys.newStringConfigKey("couchbase.cluster.name", "Optional name for this cluster");
-
-    @SetFromFlag("intialQuorumSize")
-    ConfigKey<Integer> INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey("couchbase.cluster.intialQuorumSize", "Initial cluster quorum size - number of initial nodes that must have been successfully started to report success (if < 0, then use value of INITIAL_SIZE)",
-            -1);
-
-    @SetFromFlag("delayBeforeAdvertisingCluster")
-    ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "couchbase.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.TEN_SECONDS);
-
-    // TODO not sure if this is needed; previously waited 3m (SERVICE_UP_TIME_OUT) but that seems absurdly long
-    @SetFromFlag("postStartStabilizationDelay")
-    ConfigKey<Duration> NODES_STARTED_STABILIZATION_DELAY = ConfigKeys.newConfigKey(Duration.class, "couchbase.cluster.postStartStabilizationDelay", "Delay after nodes have been started before treating it as a cluster", Duration.TEN_SECONDS);
-    
-    @SetFromFlag("adminUsername")
-    ConfigKey<String> COUCHBASE_ADMIN_USERNAME = CouchbaseNode.COUCHBASE_ADMIN_USERNAME;
-
-    @SetFromFlag("adminPassword")
-    ConfigKey<String> COUCHBASE_ADMIN_PASSWORD = CouchbaseNode.COUCHBASE_ADMIN_PASSWORD;
-
-    @SuppressWarnings("serial")
-    AttributeSensor<List<String>> COUCHBASE_CLUSTER_UP_NODE_ADDRESSES = Sensors.newSensor(new TypeToken<List<String>>() {},
-            "couchbase.cluster.node.addresses", "List of host:port of all active nodes in the cluster (http admin port, and public hostname/IP)");
-    AttributeSensor<String> COUCHBASE_CLUSTER_CONNECTION_URL = Sensors.newStringSensor(
-            "couchbase.cluster.connection.url", "Couchbase-style URL to connect to the cluster (e.g. http://127.0.0.1:8091/ or couchbase://10.0.0.1,10.0.0.2/)");
-    
-    // Interesting stats
-    AttributeSensor<Double> OPS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.ops", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/ops");
-    AttributeSensor<Double> EP_BG_FETCHED_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.ep.bg.fetched", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/ep_bg_fetched");
-    AttributeSensor<Double> CURR_ITEMS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.curr.items", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/curr_items");
-    AttributeSensor<Double> VB_REPLICA_CURR_ITEMS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.vb.replica.curr.items", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/vb_replica_curr_items");
-    AttributeSensor<Double> GET_HITS_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.get.hits", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/get_hits");
-    AttributeSensor<Double> CMD_GET_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.cmd.get", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/cmd_get");
-    AttributeSensor<Double> CURR_ITEMS_TOT_PER_NODE = Sensors.newDoubleSensor("couchbase.stats.cluster.per.node.curr.items.tot", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/curr_items_tot");
-    // Although these are Double (after aggregation), they need to be coerced to Long for ByteSizeStrings rendering
-    AttributeSensor<Long> COUCH_DOCS_DATA_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.docs.data.size", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/couch_docs_data_size");
-    AttributeSensor<Long> MEM_USED_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.mem.used", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/mem_used");
-    AttributeSensor<Long> COUCH_VIEWS_ACTUAL_DISK_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.views.actual.disk.size", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/couch_views_actual_disk_size");
-    AttributeSensor<Long> COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.docs.actual.disk.size", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/couch_docs_actual_disk_size");
-    AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.views.data.size", 
-            "Average across cluster for pools/nodes/<current node>/interestingStats/couch_views_data_size");
-    
-    AttributeSensor<Boolean> BUCKET_CREATION_IN_PROGRESS = Sensors.newBooleanSensor("couchbase.cluster.bucketCreationInProgress", "Indicates that a bucket is currently being created, and" +
-            "further bucket creation should be deferred");
-
-    /**
-     * createBuckets is a list of all the buckets to be created on the couchbase cluster
-     * the buckets will be created on the primary node of the cluster
-     * each map entry for a bucket should contain the following parameters:
-     * - <"bucket",(String) name of the bucket (default: default)>
-     * - <"bucket-type",(String) name of bucket type (default: couchbase)>
-     * - <"bucket-port",(Integer) the bucket port to connect to (default: 11222)>
-     * - <"bucket-ramsize",(Integer) ram size allowed for bucket (default: 200)>
-     * - <"bucket-replica",(Integer) number of replicas for the bucket (default: 1)>
-     */
-    @SuppressWarnings("serial")
-    @SetFromFlag("createBuckets")
-    ConfigKey<List<Map<String, Object>>> CREATE_BUCKETS = ConfigKeys.newConfigKey(new TypeToken<List<Map<String, Object>>>() {}, 
-            "couchbase.cluster.createBuckets", "a list of all dedicated port buckets to be created on the couchbase cluster");
-    
-    @SuppressWarnings("serial")
-    @SetFromFlag("replication")
-    ConfigKey<List<Map<String,Object>>> REPLICATION = ConfigKeys.newConfigKey(new TypeToken<List<Map<String,Object>>>() {}, 
-            "couchbase.cluster.replicationConfiguration", "List of replication rules to configure, each rule including target (id of another cluster) and mode (unidirectional or bidirectional)");
-
-    int getQuorumSize();
-}


Mime
View raw message