brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [24/26] incubator-brooklyn git commit: [BROOKLYN-162] Renaming of the NoSQL packages
Date Thu, 06 Aug 2015 16:32:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
deleted file mode 100644
index 603fb6d..0000000
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.cassandra;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.enricher.Enrichers;
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.DynamicGroup;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityPredicates;
-import brooklyn.entity.basic.Lifecycle;
-import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
-import brooklyn.entity.effector.EffectorBody;
-import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
-import brooklyn.entity.group.DynamicClusterImpl;
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.SensorEvent;
-import brooklyn.event.SensorEventListener;
-import brooklyn.location.Location;
-import brooklyn.location.basic.Machines;
-import brooklyn.policy.PolicySpec;
-import brooklyn.util.ResourceUtils;
-import brooklyn.util.collections.MutableList;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.collections.MutableSet;
-import brooklyn.util.config.ConfigBag;
-import brooklyn.util.text.Strings;
-import brooklyn.util.time.Time;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.common.net.HostAndPort;
-
-/**
- * Implementation of {@link CassandraDatacenter}.
- * <p>
- * Several subtleties to note:
- * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port
- *   (so we wait for thrift port to be contactable)
- * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema
- *   (each up to 1m; often very close to the 1m) 
- */
-public class CassandraDatacenterImpl extends DynamicClusterImpl implements CassandraDatacenter {
-
-    /*
-     * TODO Seed management is hard!
-     *  - The ServiceRestarter is not doing customize(), so is not refreshing the seeds in cassandra.yaml.
-     *    If we have two nodes that were seeds for each other and they both restart at the same time, we'll have a split brain.
-     */
-    
-    private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterImpl.class);
-
-    // Mutex for synchronizing during re-size operations
-    private final Object mutex = new Object[0];
-
-    private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() {
-        // Mutex for (re)calculating our seeds
-        // TODO is this very dangerous?! Calling out to SeedTracker, which calls out to alien getAttribute()/getConfig(). But I think that's ok.
-        // TODO might not need mutex? previous race was being caused by something else, other than concurrent calls!
-        private final Object seedMutex = new Object();
-        
-        @Override
-        public Set<Entity> get() {
-            synchronized (seedMutex) {
-                boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS));
-                int quorumSize = getSeedQuorumSize();
-                Set<Entity> potentialSeeds = gatherPotentialSeeds();
-                Set<Entity> potentialRunningSeeds = gatherPotentialRunningSeeds();
-                boolean stillWaitingForQuorum = (!hasPublishedSeeds) && (potentialSeeds.size() < quorumSize);
-                
-                if (stillWaitingForQuorum) {
-                    if (log.isDebugEnabled()) log.debug("Not refreshed seeds of cluster {}, because still waiting for quorum (need {}; have {} potentials)", new Object[] {CassandraDatacenterImpl.class, quorumSize, potentialSeeds.size()});
-                    return ImmutableSet.of();
-                } else if (hasPublishedSeeds) {
-                    Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS);
-                    if (getAttribute(SERVICE_STATE_ACTUAL) == Lifecycle.STARTING) {
-                        if (Sets.intersection(currentSeeds, potentialSeeds).isEmpty()) {
-                            log.warn("Cluster {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraDatacenterImpl.this, currentSeeds});
-                        }
-                        return currentSeeds;
-                    } else if (potentialRunningSeeds.isEmpty()) {
-                        // TODO Could be race where nodes have only just returned from start() and are about to 
-                        // transition to serviceUp; so don't just abandon all our seeds!
-                        log.warn("Cluster {} has no running seeds (yet?); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraDatacenterImpl.this});
-                        return currentSeeds;
-                    } else {
-                        Set<Entity> result = trim(quorumSize, potentialRunningSeeds);
-                        log.debug("Cluster {} updating seeds: chosen={}; potentialRunning={}", new Object[] {CassandraDatacenterImpl.this, result, potentialRunningSeeds});
-                        return result;
-                    }
-                } else {
-                    Set<Entity> result = trim(quorumSize, potentialSeeds);
-                    if (log.isDebugEnabled()) log.debug("Cluster {} has reached seed quorum: seeds={}", new Object[] {CassandraDatacenterImpl.this, result});
-                    return result;
-                }
-            }
-        }
-        private Set<Entity> trim(int num, Set<Entity> contenders) {
-            // Prefer existing seeds wherever possible; otherwise accept any other contenders
-            Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of();
-            Set<Entity> result = Sets.newLinkedHashSet();
-            result.addAll(Sets.intersection(currentSeeds, contenders));
-            result.addAll(contenders);
-            return ImmutableSet.copyOf(Iterables.limit(result, num));
-        }
-    };
-    
-    protected SeedTracker seedTracker = new SeedTracker();
-    protected TokenGenerator tokenGenerator = null;
-
-    public CassandraDatacenterImpl() {
-    }
-
-    @Override
-    public void init() {
-        super.init();
-
-        /*
-         * subscribe to hostname, and keep an accurate set of current seeds in a sensor;
-         * then at nodes we set the initial seeds to be the current seeds when ready (non-empty)
-         */
-        subscribeToMembers(this, Attributes.HOSTNAME, new SensorEventListener<String>() {
-            @Override
-            public void onEvent(SensorEvent<String> event) {
-                seedTracker.onHostnameChanged(event.getSource(), event.getValue());
-            }
-        });
-        subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
-            @Override public void onEvent(SensorEvent<Entity> event) {
-                seedTracker.onMemberRemoved(event.getValue());
-            }
-        });
-        subscribeToMembers(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() {
-            @Override
-            public void onEvent(SensorEvent<Boolean> event) {
-                seedTracker.onServiceUpChanged(event.getSource(), event.getValue());
-            }
-        });
-        subscribeToMembers(this, Attributes.SERVICE_STATE_ACTUAL, new SensorEventListener<Lifecycle>() {
-            @Override
-            public void onEvent(SensorEvent<Lifecycle> event) {
-                // trigger a recomputation also when lifecycle state changes, 
-                // because it might not have ruled a seed as inviable when service up went true 
-                // because service state was not yet running
-                seedTracker.onServiceUpChanged(event.getSource(), Lifecycle.RUNNING==event.getValue());
-            }
-        });
-        
-        // Track the datacenters for this cluster
-        subscribeToMembers(this, CassandraNode.DATACENTER_NAME, new SensorEventListener<String>() {
-            @Override
-            public void onEvent(SensorEvent<String> event) {
-                Entity member = event.getSource();
-                String dcName = event.getValue();
-                if (dcName != null) {
-                    Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE);
-                    Multimap<String, Entity> mutableDatacenterUsage = (datacenterUsage == null) ? LinkedHashMultimap.<String, Entity>create() : LinkedHashMultimap.create(datacenterUsage);
-                    Optional<String> oldDcName = getKeyOfVal(mutableDatacenterUsage, member);
-                    if (!(oldDcName.isPresent() && dcName.equals(oldDcName.get()))) {
-                        mutableDatacenterUsage.values().remove(member);
-                        mutableDatacenterUsage.put(dcName, member);
-                        setAttribute(DATACENTER_USAGE, mutableDatacenterUsage);
-                        setAttribute(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet()));
-                    }
-                }
-            }
-            private <K,V> Optional<K> getKeyOfVal(Multimap<K,V> map, V val) {
-                for (Map.Entry<K,V> entry : map.entries()) {
-                    if (Objects.equal(val, entry.getValue())) {
-                        return Optional.of(entry.getKey());
-                    }
-                }
-                return Optional.absent();
-            }
-        });
-        subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
-            @Override public void onEvent(SensorEvent<Entity> event) {
-                Entity entity = event.getSource();
-                Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE);
-                if (datacenterUsage != null && datacenterUsage.containsValue(entity)) {
-                    Multimap<String, Entity> mutableDatacenterUsage = LinkedHashMultimap.create(datacenterUsage);
-                    mutableDatacenterUsage.values().remove(entity);
-                    setAttribute(DATACENTER_USAGE, mutableDatacenterUsage);
-                    setAttribute(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet()));
-                }
-            }
-        });
-        
-        getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody<String>() {
-            @Override
-            public String call(ConfigBag parameters) {
-                return executeScript((String)parameters.getStringKey("commands"));
-            }
-        });
-    }
-    
-    protected Supplier<Set<Entity>> getSeedSupplier() {
-        Supplier<Set<Entity>> seedSupplier = getConfig(SEED_SUPPLIER);
-        return (seedSupplier == null) ? defaultSeedSupplier : seedSupplier;
-    }
-    
-    protected boolean useVnodes() {
-        return Boolean.TRUE.equals(getConfig(USE_VNODES));
-    }
-    
-    protected synchronized TokenGenerator getTokenGenerator() {
-        if (tokenGenerator!=null) 
-            return tokenGenerator;
-        
-        try {
-            tokenGenerator = getConfig(TOKEN_GENERATOR_CLASS).newInstance();
-            
-            BigInteger shift = getConfig(TOKEN_SHIFT);
-            if (shift==null) 
-                shift = BigDecimal.valueOf(Math.random()).multiply(
-                    new BigDecimal(tokenGenerator.range())).toBigInteger();
-            tokenGenerator.setOrigin(shift);
-            
-            return tokenGenerator;
-        } catch (Exception e) {
-            throw Throwables.propagate(e);
-        }        
-    }
-    
-    protected int getSeedQuorumSize() {
-        Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE);
-        if (quorumSize!=null && quorumSize>0)
-            return quorumSize;
-        // default 2 is recommended, unless initial size is smaller
-        return Math.min(Math.max(getConfig(INITIAL_SIZE), 1), DEFAULT_SEED_QUORUM);
-    }
-
-    @Override
-    public Set<Entity> gatherPotentialSeeds() {
-        return seedTracker.gatherPotentialSeeds();
-    }
-
-    @Override
-    public Set<Entity> gatherPotentialRunningSeeds() {
-        return seedTracker.gatherPotentialRunningSeeds();
-    }
-
-    /**
-     * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra nodes.
-     */
-    @Override
-    protected EntitySpec<?> getMemberSpec() {
-        return getConfig(MEMBER_SPEC, EntitySpec.create(CassandraNode.class));
-    }
-
-    @Override
-    public String getClusterName() {
-        return getAttribute(CLUSTER_NAME);
-    }
-
-    @Override
-    public Collection<Entity> grow(int delta) {
-        if (useVnodes()) {
-            // nothing to do for token generator
-        } else {
-            if (getCurrentSize() == 0) {
-                getTokenGenerator().growingCluster(delta);
-            }
-        }
-        return super.grow(delta);
-    }
-    
-    @SuppressWarnings("deprecation")
-    @Override
-    protected Entity createNode(@Nullable Location loc, Map<?,?> flags) {
-        Map<Object, Object> allflags = MutableMap.copyOf(flags);
-        
-        if ((flags.containsKey(CassandraNode.TOKEN) || flags.containsKey("token")) || (flags.containsKey(CassandraNode.TOKENS) || flags.containsKey("tokens"))) {
-            // leave token config as-is
-        } else if (!useVnodes()) {
-            BigInteger token = getTokenGenerator().newToken();
-            allflags.put(CassandraNode.TOKEN, token);
-        }
-
-        if ((flags.containsKey(CassandraNode.NUM_TOKENS_PER_NODE) || flags.containsKey("numTokensPerNode"))) {
-            // leave num_tokens as-is
-        } else if (useVnodes()) {
-            Integer numTokensPerNode = getConfig(NUM_TOKENS_PER_NODE);
-            allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, numTokensPerNode);
-        } else {
-            allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, 1);
-        }
-        
-        return super.createNode(loc, allflags);
-    }
-
-    @Override
-    protected Entity replaceMember(Entity member, Location memberLoc, Map<?, ?> extraFlags) {
-        Set<BigInteger> oldTokens = ((CassandraNode) member).getTokens();
-        Set<BigInteger> newTokens = (oldTokens != null && oldTokens.size() > 0) ? getTokenGenerator().getTokensForReplacementNode(oldTokens) : null;
-        return super.replaceMember(member, memberLoc,  MutableMap.copyOf(extraFlags).add(CassandraNode.TOKENS, newTokens));
-    }
-
-    @Override
-    public void start(Collection<? extends Location> locations) {
-        Machines.warnIfLocalhost(locations, "CassandraCluster does not support multiple nodes on localhost, " +
-                "due to assumptions Cassandra makes about the use of the same port numbers used across the cluster.");
-
-        // force this to be set - even if it is using the default
-        setAttribute(CLUSTER_NAME, getConfig(CLUSTER_NAME));
-        
-        super.start(locations);
-
-        connectSensors();
-
-        // TODO wait until all nodes which we think are up are consistent 
-        // i.e. all known nodes use the same schema, as reported by
-        // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli");
-        // once we've done that we can revert to using 2 seed nodes.
-        // see CassandraCluster.DEFAULT_SEED_QUORUM
-        // (also ensure the cluster is ready if we are about to run a creation script)
-        Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
-
-        String scriptUrl = getConfig(CassandraNode.CREATION_SCRIPT_URL);
-        if (Strings.isNonEmpty(scriptUrl)) {
-            executeScript(new ResourceUtils(this).getResourceAsString(scriptUrl));
-        }
-
-        update();
-    }
-
-    protected void connectSensors() {
-        connectEnrichers();
-        
-        addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
-                .displayName("Cassandra Cluster Tracker")
-                .configure("sensorsToTrack", ImmutableSet.of(Attributes.SERVICE_UP, Attributes.HOSTNAME, CassandraNode.THRIFT_PORT))
-                .configure("group", this));
-    }
-
-    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
-        @Override
-        protected void onEntityChange(Entity member) {
-            if (log.isDebugEnabled()) log.debug("Node {} updated in Cluster {}", member, this);
-            ((CassandraDatacenterImpl)entity).update();
-        }
-        @Override
-        protected void onEntityAdded(Entity member) {
-            if (log.isDebugEnabled()) log.debug("Node {} added to Cluster {}", member, this);
-            ((CassandraDatacenterImpl)entity).update();
-        }
-        @Override
-        protected void onEntityRemoved(Entity member) {
-            if (log.isDebugEnabled()) log.debug("Node {} removed from Cluster {}", member, this);
-            ((CassandraDatacenterImpl)entity).update();
-        }
-    };
-
-    @SuppressWarnings("unchecked")
-    protected void connectEnrichers() {
-        List<? extends List<? extends AttributeSensor<? extends Number>>> summingEnricherSetup = ImmutableList.of(
-                ImmutableList.of(CassandraNode.READ_ACTIVE, READ_ACTIVE),
-                ImmutableList.of(CassandraNode.READ_PENDING, READ_PENDING),
-                ImmutableList.of(CassandraNode.WRITE_ACTIVE, WRITE_ACTIVE),
-                ImmutableList.of(CassandraNode.WRITE_PENDING, WRITE_PENDING)
-        );
-        
-        List<? extends List<? extends AttributeSensor<? extends Number>>> averagingEnricherSetup = ImmutableList.of(
-                ImmutableList.of(CassandraNode.READS_PER_SECOND_LAST, READS_PER_SECOND_LAST_PER_NODE),
-                ImmutableList.of(CassandraNode.WRITES_PER_SECOND_LAST, WRITES_PER_SECOND_LAST_PER_NODE),
-                ImmutableList.of(CassandraNode.WRITES_PER_SECOND_IN_WINDOW, WRITES_PER_SECOND_IN_WINDOW_PER_NODE),
-                ImmutableList.of(CassandraNode.READS_PER_SECOND_IN_WINDOW, READS_PER_SECOND_IN_WINDOW_PER_NODE),
-                ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY, THRIFT_PORT_LATENCY_PER_NODE),
-                ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY_IN_WINDOW, THRIFT_PORT_LATENCY_IN_WINDOW_PER_NODE),
-                ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_LAST, PROCESS_CPU_TIME_FRACTION_LAST_PER_NODE),
-                ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_IN_WINDOW, PROCESS_CPU_TIME_FRACTION_IN_WINDOW_PER_NODE)
-        );
-        
-        for (List<? extends AttributeSensor<? extends Number>> es : summingEnricherSetup) {
-            AttributeSensor<? extends Number> t = es.get(0);
-            AttributeSensor<? extends Number> total = es.get(1);
-            addEnricher(Enrichers.builder()
-                    .aggregating(t)
-                    .publishing(total)
-                    .fromMembers()
-                    .computingSum()
-                    .defaultValueForUnreportedSensors(null)
-                    .valueToReportIfNoSensors(null)
-                    .build());
-        }
-        
-        for (List<? extends AttributeSensor<? extends Number>> es : averagingEnricherSetup) {
-            AttributeSensor<Number> t = (AttributeSensor<Number>) es.get(0);
-            AttributeSensor<Double> average = (AttributeSensor<Double>) es.get(1);
-            addEnricher(Enrichers.builder()
-                    .aggregating(t)
-                    .publishing(average)
-                    .fromMembers()
-                    .computingAverage()
-                    .defaultValueForUnreportedSensors(null)
-                    .valueToReportIfNoSensors(null)
-                    .build());
-
-        }
-    }
-
-    @Override
-    public void stop() {
-        disconnectSensors();
-        
-        super.stop();
-    }
-    
-    protected void disconnectSensors() {
-    }
-
-    @Override
-    public void update() {
-        synchronized (mutex) {
-            // Update our seeds, as necessary
-            seedTracker.refreshSeeds();
-            
-            // Choose the first available cluster member to set host and port (and compute one-up)
-            Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
-
-            if (upNode.isPresent()) {
-                setAttribute(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME));
-                setAttribute(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT));
-
-                List<String> currentNodes = getAttribute(CASSANDRA_CLUSTER_NODES);
-                Set<String> oldNodes = (currentNodes != null) ? ImmutableSet.copyOf(currentNodes) : ImmutableSet.<String>of();
-                Set<String> newNodes = MutableSet.<String>of();
-                for (Entity member : getMembers()) {
-                    if (member instanceof CassandraNode && Boolean.TRUE.equals(member.getAttribute(SERVICE_UP))) {
-                        String hostname = member.getAttribute(Attributes.HOSTNAME);
-                        Integer thriftPort = member.getAttribute(CassandraNode.THRIFT_PORT);
-                        if (hostname != null && thriftPort != null) {
-                            newNodes.add(HostAndPort.fromParts(hostname, thriftPort).toString());
-                        }
-                    }
-                }
-                if (Sets.symmetricDifference(oldNodes, newNodes).size() > 0) {
-                    setAttribute(CASSANDRA_CLUSTER_NODES, MutableList.copyOf(newNodes));
-                }
-            } else {
-                setAttribute(HOSTNAME, null);
-                setAttribute(THRIFT_PORT, null);
-                setAttribute(CASSANDRA_CLUSTER_NODES, Collections.<String>emptyList());
-            }
-
-            ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyList(this, CASSANDRA_CLUSTER_NODES);
-        }
-    }
-    
-    /**
-     * For tracking our seeds. This gets fiddly! High-level logic is:
-     * <ul>
-     *   <li>If we have never reached quorum (i.e. have never published seeds), then continue to wait for quorum;
-     *       because entity-startup may be blocking for this. This is handled by the seedSupplier.
-     *   <li>If we previously reached quorum (i.e. have previousy published seeds), then always update;
-     *       we never want stale/dead entities listed in our seeds.
-     *   <li>If an existing seed looks unhealthy, then replace it.
-     *   <li>If a new potential seed becomes available (and we're in need of more), then add it.
-     * <ul>
-     * 
-     * Also note that {@link CassandraFabric} can take over, because it know about multiple sub-clusters!
-     * It will provide a different {@link CassandraDatacenter#SEED_SUPPLIER}. Each time we think that our seeds
-     * need to change, we call that. The fabric will call into {@link CassandraDatacenterImpl#gatherPotentialSeeds()}
-     * to find out what's available.
-     * 
-     * @author aled
-     */
-    protected class SeedTracker {
-        private final Map<Entity, Boolean> memberUpness = Maps.newLinkedHashMap();
-        
-        public void onMemberRemoved(Entity member) {
-            Set<Entity> seeds = getSeeds();
-            boolean maybeRemove = seeds.contains(member);
-            memberUpness.remove(member);
-            
-            if (maybeRemove) {
-                refreshSeeds();
-            } else {
-                if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} removed)", new Object[] {CassandraDatacenterImpl.this, member});
-                return;
-            }
-        }
-        public void onHostnameChanged(Entity member, String hostname) {
-            Set<Entity> seeds = getSeeds();
-            int quorum = getSeedQuorumSize();
-            boolean isViable = isViableSeed(member);
-            boolean maybeAdd = isViable && seeds.size() < quorum;
-            boolean maybeRemove = seeds.contains(member) && !isViable;
-            
-            if (maybeAdd || maybeRemove) {
-                refreshSeeds();
-            } else {
-                if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed hostname {})", new Object[] {CassandraDatacenterImpl.this, member, hostname});
-                return;
-            }
-        }
-        public void onServiceUpChanged(Entity member, Boolean serviceUp) {
-            Boolean oldVal = memberUpness.put(member, serviceUp);
-            if (Objects.equal(oldVal, serviceUp)) {
-                if (log.isTraceEnabled()) log.trace("Ignoring duplicate service-up in "+CassandraDatacenterImpl.this+" for "+member+", "+serviceUp);
-            }
-            Set<Entity> seeds = getSeeds();
-            int quorum = getSeedQuorumSize();
-            boolean isViable = isViableSeed(member);
-            boolean maybeAdd = isViable && seeds.size() < quorum;
-            boolean maybeRemove = seeds.contains(member) && !isViable;
-            
-            if (log.isDebugEnabled())
-                log.debug("Considering refresh of seeds for "+CassandraDatacenterImpl.this+" because "+member+" is now "+serviceUp+" ("+isViable+" / "+maybeAdd+" / "+maybeRemove+")");
-            if (maybeAdd || maybeRemove) {
-                refreshSeeds();
-            } else {
-                if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed serviceUp {})", new Object[] {CassandraDatacenterImpl.this, member, serviceUp});
-                return;
-            }
-        }
-        protected Set<Entity> getSeeds() {
-            Set<Entity> result = getAttribute(CURRENT_SEEDS);
-            return (result == null) ? ImmutableSet.<Entity>of() : result;
-        }
-        public void refreshSeeds() {
-            Set<Entity> oldseeds = getAttribute(CURRENT_SEEDS);
-            Set<Entity> newseeds = getSeedSupplier().get();
-            if (Objects.equal(oldseeds, newseeds)) {
-                if (log.isTraceEnabled()) log.debug("Seed refresh no-op for cluster {}: still={}", new Object[] {CassandraDatacenterImpl.this, oldseeds});
-            } else {
-                if (log.isDebugEnabled()) log.debug("Refreshing seeds of cluster {}: now={}; old={}", new Object[] {this, newseeds, oldseeds});
-                setAttribute(CURRENT_SEEDS, newseeds);
-                if (newseeds != null && newseeds.size() > 0) {
-                    setAttribute(HAS_PUBLISHED_SEEDS, true);
-                }
-            }
-        }
-        public Set<Entity> gatherPotentialSeeds() {
-            Set<Entity> result = Sets.newLinkedHashSet();
-            for (Entity member : getMembers()) {
-                if (isViableSeed(member)) {
-                    result.add(member);
-                }
-            }
-            if (log.isTraceEnabled()) log.trace("Viable seeds in Cluster {}: {}", new Object[] {result});
-            return result;
-        }
-        public Set<Entity> gatherPotentialRunningSeeds() {
-            Set<Entity> result = Sets.newLinkedHashSet();
-            for (Entity member : getMembers()) {
-                if (isRunningSeed(member)) {
-                    result.add(member);
-                }
-            }
-            if (log.isTraceEnabled()) log.trace("Viable running seeds in Cluster {}: {}", new Object[] {result});
-            return result;
-        }
-        public boolean isViableSeed(Entity member) {
-            // TODO would be good to reuse the better logic in ServiceFailureDetector
-            // (e.g. if that didn't just emit a notification but set a sensor as well?)
-            boolean managed = Entities.isManaged(member);
-            String hostname = member.getAttribute(Attributes.HOSTNAME);
-            boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
-            Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
-            boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED);
-            boolean result = (hostname != null && !hasFailed);
-            if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, this, result, hostname, serviceUp, serviceState, hasFailed});
-            return result;
-        }
-        public boolean isRunningSeed(Entity member) {
-            boolean viableSeed = isViableSeed(member);
-            boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
-            Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
-            boolean result = viableSeed && serviceUp && serviceState == Lifecycle.RUNNING;
-            if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: runningSeed={}; viableSeed={}; serviceUp={}; serviceState={}", new Object[] {member, this, result, viableSeed, serviceUp, serviceState});
-            return result;
-        }
-    }
-    
-    @Override
-    public String executeScript(String commands) {
-        Entity someChild = Iterables.getFirst(getMembers(), null);
-        if (someChild==null)
-            throw new IllegalStateException("No Cassandra nodes available");
-        // FIXME cross-etntity method-style calls such as below do not set up a queueing context (DynamicSequentialTask) 
-//        return ((CassandraNode)someChild).executeScript(commands);
-        return Entities.invokeEffector(this, someChild, CassandraNode.EXECUTE_SCRIPT, MutableMap.of("commands", commands)).getUnchecked();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java
deleted file mode 100644
index 8dc0f28..0000000
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabric.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.cassandra;
-
-import java.util.Set;
-
-import org.apache.brooklyn.catalog.Catalog;
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.Entity;
-import brooklyn.entity.annotation.Effector;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.MethodEffector;
-import brooklyn.entity.group.DynamicFabric;
-import brooklyn.entity.proxying.ImplementedBy;
-import brooklyn.event.AttributeSensor;
-import brooklyn.location.Location;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Multimap;
-import com.google.common.reflect.TypeToken;
-
-/**
- * A fabric of {@link CassandraNode}s, which forms a cluster spanning multiple locations.
- * <p>
- * Each {@link CassandraDatacenter} child instance is actually just a part of the whole cluster. It consists of the
- * nodes in that single location (which normally corresponds to a "datacenter" in Cassandra terminology).
- */
-@Catalog(name="Apache Cassandra Database Fabric", description="Cassandra is a highly scalable, eventually " +
-        "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " +
-        "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg")
-@ImplementedBy(CassandraFabricImpl.class)
-public interface CassandraFabric extends DynamicFabric {
-
-    ConfigKey<Integer> INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey(
-            "fabric.initial.quorumSize",
-            "Initial fabric quorum size - number of initial nodes that must have been successfully started " +
-            "to report success (if less than 0, then use a value based on INITIAL_SIZE of clusters)",
-            -1);
-    
-    @SuppressWarnings("serial")
-    ConfigKey<Function<Location, String>> DATA_CENTER_NAMER = ConfigKeys.newConfigKey(new TypeToken<Function<Location, String>>(){}, 
-            "cassandra.fabric.datacenter.namer",
-            "Function used to provide the cassandra.replication.datacenterName for a given location");
-
-    int DEFAULT_SEED_QUORUM = 5;
-    
-    AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = CassandraDatacenter.DATACENTER_USAGE;
-
-    AttributeSensor<Set<String>> DATACENTERS = CassandraDatacenter.DATACENTERS;
-
-    AttributeSensor<Set<Entity>> CURRENT_SEEDS = CassandraDatacenter.CURRENT_SEEDS;
-
-    AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = CassandraDatacenter.HAS_PUBLISHED_SEEDS;
-
-    AttributeSensor<String> HOSTNAME = CassandraDatacenter.HOSTNAME;
-
-    AttributeSensor<Integer> THRIFT_PORT = CassandraDatacenter.THRIFT_PORT;
-
-    MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraFabric.class, "update");
-
-    @Effector(description="Updates the cluster members")
-    void update();
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
deleted file mode 100644
index 2f874e6..0000000
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.cassandra;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.DynamicGroup;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.EntityPredicates;
-import brooklyn.entity.basic.Lifecycle;
-import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
-import brooklyn.entity.group.DynamicFabricImpl;
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.event.SensorEvent;
-import brooklyn.event.SensorEventListener;
-import brooklyn.location.Location;
-import brooklyn.policy.PolicySpec;
-import brooklyn.util.collections.CollectionFunctionals;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.collections.MutableSet;
-import brooklyn.util.time.Time;
-
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-/**
- * Implementation of {@link CassandraDatacenter}.
- * <p>
- * Serveral subtleties to note:
- * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port
- *   (so we wait for thrift port to be contactable)
- * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema
- *   (each up to 1m; often very close to the 1m) 
- */
-public class CassandraFabricImpl extends DynamicFabricImpl implements CassandraFabric {
-
-    private static final Logger log = LoggerFactory.getLogger(CassandraFabricImpl.class);
-
-    // Mutex for synchronizing during re-size operations
-    private final Object mutex = new Object[0];
-
-    private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() {
-        @Override public Set<Entity> get() {
-            // TODO Remove duplication from CassandraClusterImpl.defaultSeedSupplier
-            Set<Entity> seeds = getAttribute(CURRENT_SEEDS);
-            boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS));
-            int quorumSize = getSeedQuorumSize();
-            
-            // update seeds if we're not quorate; note this may not work for dynamically adding new datacenters
-            // as we do not take a new seed from the new datacenter
-            if (seeds == null || seeds.size() < quorumSize || containsDownEntity(seeds)) {
-                Set<Entity> newseeds;
-                Map<CassandraDatacenter,Set<Entity>> potentialSeeds = MutableMap.of();
-                int potentialSeedCount = 0;
-                for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
-                    Set<Entity> dcPotentialSeeds = member.gatherPotentialSeeds();
-                    potentialSeeds.put(member, dcPotentialSeeds);
-                    potentialSeedCount += dcPotentialSeeds.size();
-                }
-                
-                if (hasPublishedSeeds) {
-                    Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS);
-                    Lifecycle serviceState = getAttribute(SERVICE_STATE_ACTUAL);
-                    if (serviceState == Lifecycle.STARTING) {
-                        if (Sets.intersection(currentSeeds, ImmutableSet.copyOf(Iterables.concat(potentialSeeds.values()))).isEmpty()) {
-                            log.warn("Fabric {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraFabricImpl.this, currentSeeds});
-                        }
-                        newseeds = currentSeeds;
-                    } else if (serviceState == Lifecycle.STOPPING || serviceState == Lifecycle.STOPPED) {
-                        if (log.isTraceEnabled()) log.trace("Fabric {} ignoring any potential seed-changes, because {}: seeds={}", new Object[] {CassandraFabricImpl.this, serviceState, currentSeeds});
-                        newseeds = currentSeeds;
-                    } else if (potentialSeedCount == 0) {
-                        // TODO Could be race where nodes have only just returned from start() and are about to 
-                        // transition to serviceUp; so don't just abandon all our seeds!
-                        log.warn("Fabric {} has no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this});
-                        newseeds = currentSeeds;
-                    } else if (!allNonEmpty(potentialSeeds.values())) {
-                        log.warn("Fabric {} has datacenter with no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this});
-                        newseeds = currentSeeds;
-                    } else {
-                        Set<Entity> result = selectSeeds(quorumSize, potentialSeeds);
-                        if (log.isDebugEnabled() && !Objects.equal(seeds, result)) {
-                            log.debug("Fabric {} updating seeds: chosen={}; potential={}", new Object[] {CassandraFabricImpl.this, result, potentialSeeds});
-                        }
-                        newseeds = result;
-                    }
-                } else if (potentialSeedCount < quorumSize) {
-                    if (log.isDebugEnabled()) log.debug("Not setting seeds of fabric {} yet, because still waiting for quorum (need {}; have {} potentials from {} members)", new Object[] {CassandraFabricImpl.this, quorumSize, potentialSeedCount, getMembers()});
-                    newseeds = ImmutableSet.of();
-                } else if (!allNonEmpty(potentialSeeds.values())) {
-                    if (log.isDebugEnabled()) {
-                        Map<CassandraDatacenter, Integer> datacenterCounts = Maps.transformValues(potentialSeeds, CollectionFunctionals.sizeFunction());
-                        log.debug("Not setting seeds of fabric {} yet, because not all datacenters have seeds (sizes are {})", new Object[] {CassandraFabricImpl.this, datacenterCounts});
-                    }
-                    newseeds = ImmutableSet.of();
-                } else {
-                    // yay, we're quorate
-                    Set<Entity> result = selectSeeds(quorumSize, potentialSeeds);
-                    log.info("Fabric {} has reached seed quorum: seeds={}", new Object[] {CassandraFabricImpl.this, result});
-                    newseeds = result;
-                }
-                
-                if (!Objects.equal(seeds, newseeds)) {
-                    setAttribute(CURRENT_SEEDS, newseeds);
-                    
-                    if (newseeds != null && newseeds.size() > 0) {
-                        setAttribute(HAS_PUBLISHED_SEEDS, true);
-                        
-                        // Need to tell every datacenter that seeds are ready.
-                        // Otherwise a datacenter might get no more changes (e.g. to nodes' hostnames etc), 
-                        // and not call seedSupplier.get() again.
-                        for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
-                            member.update();
-                        }
-                    }
-                    return newseeds;
-                } else {
-                    return seeds;
-                }
-            } else {
-                if (log.isTraceEnabled()) log.trace("Not refresheed seeds of fabric {}, because have quorum {} (of {} members), and none are down: seeds={}", 
-                        new Object[] {CassandraFabricImpl.class, quorumSize, getMembers().size(), seeds});
-                return seeds;
-            }
-        }
-        private boolean allNonEmpty(Collection<? extends Collection<Entity>> contenders) {
-            for (Collection<Entity> contender: contenders)
-                if (contender.isEmpty()) return false;
-            return true;
-        }
-        private Set<Entity> selectSeeds(int num, Map<CassandraDatacenter,? extends Collection<Entity>> contenders) {
-            // Prefer existing seeds wherever possible;
-            // otherwise prefer a seed from each sub-cluster;
-            // otherwise accept any other contenders
-            Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of();
-            MutableSet<Entity> result = MutableSet.of();
-            result.addAll(Sets.intersection(currentSeeds, ImmutableSet.copyOf(contenders.values())));
-            for (CassandraDatacenter cluster : contenders.keySet()) {
-                Set<Entity> contendersInCluster = Sets.newLinkedHashSet(contenders.get(cluster));
-                if (contendersInCluster.size() > 0 && Sets.intersection(result, contendersInCluster).isEmpty()) {
-                    result.add(Iterables.getFirst(contendersInCluster, null));
-                }
-            }
-            result.addAll(Iterables.concat(contenders.values()));
-            return ImmutableSet.copyOf(Iterables.limit(result, num));
-        }
-        private boolean containsDownEntity(Set<Entity> seeds) {
-            for (Entity seed : seeds) {
-                if (!isViableSeed(seed)) {
-                    return true;
-                }
-            }
-            return false;
-        }
-        public boolean isViableSeed(Entity member) {
-            // TODO remove duplication from CassandraClusterImpl.SeedTracker.isViableSeed
-            boolean managed = Entities.isManaged(member);
-            String hostname = member.getAttribute(Attributes.HOSTNAME);
-            boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
-            Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
-            boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED);
-            boolean result = (hostname != null && !hasFailed);
-            if (log.isTraceEnabled()) log.trace("Node {} in Fabric {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, CassandraFabricImpl.this, result, hostname, serviceUp, serviceState, hasFailed});
-            return result;
-        }
-    };
-
-    public CassandraFabricImpl() {
-    }
-
-    @Override
-    public void init() {
-        super.init();
-
-        if (!getConfigRaw(CassandraDatacenter.SEED_SUPPLIER, true).isPresentAndNonNull())
-            setConfig(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
-        
-        // track members
-        addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
-                .displayName("Cassandra Fabric Tracker")
-                .configure("group", this));
-
-        // Track first node's startup
-        subscribeToMembers(this, CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, new SensorEventListener<Long>() {
-            @Override
-            public void onEvent(SensorEvent<Long> event) {
-                Long oldval = getAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC);
-                Long newval = event.getValue();
-                if (oldval == null && newval != null) {
-                    setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval);
-                    for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
-                        ((EntityInternal)member).setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval);
-                    }
-                }
-            }
-        });
-        
-        // Track the datacenters for this cluster
-        subscribeToMembers(this, CassandraDatacenter.DATACENTER_USAGE, new SensorEventListener<Multimap<String,Entity>>() {
-            @Override
-            public void onEvent(SensorEvent<Multimap<String,Entity>> event) {
-                Multimap<String, Entity> usage = calculateDatacenterUsage();
-                setAttribute(DATACENTER_USAGE, usage);
-                setAttribute(DATACENTERS, usage.keySet());
-            }
-        });
-        subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
-            @Override public void onEvent(SensorEvent<Entity> event) {
-                Multimap<String, Entity> usage = calculateDatacenterUsage();
-                setAttribute(DATACENTER_USAGE, usage);
-                setAttribute(DATACENTERS, usage.keySet());
-            }
-        });
-    }
-
-    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
-        @Override
-        protected void onEntityChange(Entity member) {
-            if (log.isDebugEnabled()) log.debug("Location {} updated in Fabric {}", member, entity);
-            ((CassandraFabricImpl)entity).update();
-        }
-        @Override
-        protected void onEntityAdded(Entity member) {
-            if (log.isDebugEnabled()) log.debug("Location {} added to Fabric {}", member, entity);
-            ((CassandraFabricImpl)entity).update();
-        }
-        @Override
-        protected void onEntityRemoved(Entity member) {
-            if (log.isDebugEnabled()) log.debug("Location {} removed from Fabric {}", member, entity);
-            ((CassandraFabricImpl)entity).update();
-        }
-    };
-
-    protected int getSeedQuorumSize() {
-        Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE);
-        if (quorumSize!=null && quorumSize>0)
-            return quorumSize;
-
-        int initialSizeSum = 0;
-        for (CassandraDatacenter cluster : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
-            initialSizeSum += cluster.getConfig(CassandraDatacenter.INITIAL_SIZE);
-        }
-        if (initialSizeSum>5) initialSizeSum /= 2;
-        else if (initialSizeSum>3) initialSizeSum -= 2;
-        else if (initialSizeSum>2) initialSizeSum -= 1;
-        
-        return Math.min(Math.max(initialSizeSum, 1), CassandraFabric.DEFAULT_SEED_QUORUM);
-    }
-
-    /**
-     * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra sub-clusters.
-     */
-    @Override
-    protected EntitySpec<?> getMemberSpec() {
-        // Need to set the seedSupplier, even if the caller has overridden the CassandraCluster config
-        // (unless they've explicitly overridden the seedSupplier as well!)
-        // TODO probably don't need to anymore, as it is set on the Fabric here -- just make sure there is a default!
-        EntitySpec<?> custom = getConfig(MEMBER_SPEC);
-        if (custom == null) {
-            return EntitySpec.create(CassandraDatacenter.class)
-                    .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
-        } else if (custom.getConfig().containsKey(CassandraDatacenter.SEED_SUPPLIER) || custom.getFlags().containsKey("seedSupplier")) {
-            return custom;
-        } else {
-            return EntitySpec.create(custom)
-                    .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
-        }
-    }
-    
-    @Override
-    protected Entity createCluster(Location location, Map flags) {
-        Function<Location, String> dataCenterNamer = getConfig(DATA_CENTER_NAMER);
-        if (dataCenterNamer != null) {
-            flags = ImmutableMap.builder()
-                .putAll(flags)
-                .put(CassandraNode.DATACENTER_NAME, dataCenterNamer.apply(location))
-                .build();
-        }
-        return super.createCluster(location, flags);
-    }
-
-    /**
-     * Prefers one node per location, and then others from anywhere.
-     * Then trims result down to the "quorumSize".
-     */
-    public Supplier<Set<Entity>> getSeedSupplier() {
-        return defaultSeedSupplier;
-    }
-
-    @Override
-    public void start(Collection<? extends Location> locations) {
-        super.start(locations);
-
-        connectSensors();
-
-        // TODO wait until all nodes which we think are up are consistent 
-        // i.e. all known nodes use the same schema, as reported by
-        // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli");
-        // once we've done that we can revert to using 2 seed nodes.
-        // see CassandraCluster.DEFAULT_SEED_QUORUM
-        Time.sleep(getConfig(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER));
-
-        update();
-    }
-
-    protected void connectSensors() {
-        connectEnrichers();
-    }
-    
-    protected void connectEnrichers() {
-        // TODO Aggregate across sub-clusters
-
-        subscribeToMembers(this, SERVICE_UP, new SensorEventListener<Boolean>() {
-            @Override public void onEvent(SensorEvent<Boolean> event) {
-                setAttribute(SERVICE_UP, calculateServiceUp());
-            }
-        });
-    }
-
-    @Override
-    public void stop() {
-        disconnectSensors();
-        
-        super.stop();
-    }
-    
-    protected void disconnectSensors() {
-    }
-
-    protected boolean calculateServiceUp() {
-        Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
-        return upNode.isPresent();
-    }
-
-    protected Multimap<String, Entity> calculateDatacenterUsage() {
-        Multimap<String, Entity> result = LinkedHashMultimap.<String, Entity>create();
-        for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
-            Multimap<String, Entity> memberUsage = member.getAttribute(CassandraDatacenter.DATACENTER_USAGE);
-            if (memberUsage != null) result.putAll(memberUsage);
-        }
-        return result;
-    }
-
-    @Override
-    public void update() {
-        synchronized (mutex) {
-            for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
-                member.update();
-            }
-
-            calculateServiceUp();
-
-            // Choose the first available location to set host and port (and compute one-up)
-            Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
-
-            if (upNode.isPresent()) {
-                setAttribute(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME));
-                setAttribute(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNode.java
deleted file mode 100644
index aa8d445..0000000
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNode.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.cassandra;
-
-import java.math.BigInteger;
-import java.util.Set;
-
-import org.apache.brooklyn.catalog.Catalog;
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.Effector;
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.BrooklynConfigKeys;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.database.DatastoreMixins;
-import brooklyn.entity.java.UsesJavaMXBeans;
-import brooklyn.entity.java.UsesJmx;
-import brooklyn.entity.proxying.ImplementedBy;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
-import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
-import brooklyn.event.basic.Sensors;
-import brooklyn.location.basic.PortRanges;
-import brooklyn.util.flags.SetFromFlag;
-import brooklyn.util.time.Duration;
-
-import com.google.common.reflect.TypeToken;
-
-/**
- * An {@link brooklyn.entity.Entity} that represents a Cassandra node in a {@link CassandraDatacenter}.
- */
-@Catalog(name="Apache Cassandra Node", description="Cassandra is a highly scalable, eventually " +
-        "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " +
-        "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg")
-@ImplementedBy(CassandraNodeImpl.class)
-public interface CassandraNode extends DatastoreMixins.DatastoreCommon, SoftwareProcess, UsesJmx, UsesJavaMXBeans, DatastoreMixins.HasDatastoreUrl, DatastoreMixins.CanExecuteScript {
-
-    @SetFromFlag("version")
-    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "1.2.16");
-    // when this changes remember to put a copy under releng2:/var/www/developer/brooklyn/repository/ !
-    // TODO experiment with supporting 2.0.x
-
-    @SetFromFlag("downloadUrl")
-    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
-            SoftwareProcess.DOWNLOAD_URL, "${driver.mirrorUrl}/${version}/apache-cassandra-${version}-bin.tar.gz");
-
-    /** download mirror, if desired */
-    @SetFromFlag("mirrorUrl")
-    ConfigKey<String> MIRROR_URL = new BasicConfigKey<String>(String.class, "cassandra.install.mirror.url", "URL of mirror", 
-        "http://www.mirrorservice.org/sites/ftp.apache.org/cassandra"
-        // for older versions, but slower:
-//        "http://archive.apache.org/dist/cassandra/"
-        );
-
-    @SetFromFlag("tgzUrl")
-    ConfigKey<String> TGZ_URL = new BasicConfigKey<String>(String.class, "cassandra.install.tgzUrl", "URL of TGZ download file");
-
-    @SetFromFlag("clusterName")
-    BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = CassandraDatacenter.CLUSTER_NAME;
-
-    @SetFromFlag("snitchName")
-    ConfigKey<String> ENDPOINT_SNITCH_NAME = CassandraDatacenter.ENDPOINT_SNITCH_NAME;
-
-    @SetFromFlag("gossipPort")
-    PortAttributeSensorAndConfigKey GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.gossip.port", "Cassandra Gossip communications port", PortRanges.fromString("7000+"));
-
-    @SetFromFlag("sslGgossipPort")
-    PortAttributeSensorAndConfigKey SSL_GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.ssl-gossip.port", "Cassandra Gossip SSL communications port", PortRanges.fromString("7001+"));
-
-    @SetFromFlag("thriftPort")
-    PortAttributeSensorAndConfigKey THRIFT_PORT = new PortAttributeSensorAndConfigKey("cassandra.thrift.port", "Cassandra Thrift RPC port", PortRanges.fromString("9160+"));
-
-    @SetFromFlag("nativePort")
-    PortAttributeSensorAndConfigKey NATIVE_TRANSPORT_PORT = new PortAttributeSensorAndConfigKey("cassandra.native.port", "Cassandra Native Transport port", PortRanges.fromString("9042+"));
-
-    @SetFromFlag("rmiRegistryPort")
-    // cassandra nodetool and others want 7199 - not required, but useful
-    PortAttributeSensorAndConfigKey RMI_REGISTRY_PORT = new PortAttributeSensorAndConfigKey(UsesJmx.RMI_REGISTRY_PORT, 
-        PortRanges.fromInteger(7199));
-
-    // some of the cassandra tooing (eg nodetool) use RMI, but we want JMXMP, so do both!
-    ConfigKey<JmxAgentModes> JMX_AGENT_MODE = ConfigKeys.newConfigKeyWithDefault(UsesJmx.JMX_AGENT_MODE, JmxAgentModes.JMXMP_AND_RMI);
-    
-    @SetFromFlag("customSnitchJarUrl")
-    ConfigKey<String> CUSTOM_SNITCH_JAR_URL = ConfigKeys.newStringConfigKey("cassandra.config.customSnitchUrl", 
-            "URL for a jar file to be uploaded (e.g. \"classpath://brooklyn/entity/nosql/cassandra/cassandra-multicloud-snitch.jar\"); defaults to null which means nothing to upload", 
-            null);
-
-    @SetFromFlag("cassandraConfigTemplateUrl")
-    ConfigKey<String> CASSANDRA_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
-            "cassandra.config.templateUrl", "A URL (in freemarker format) for a cassandra.yaml config file (in freemarker format)", 
-            "classpath://brooklyn/entity/nosql/cassandra/cassandra-${entity.majorMinorVersion}.yaml");
-
-    @SetFromFlag("cassandraConfigFileName")
-    ConfigKey<String> CASSANDRA_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey(
-            "cassandra.config.fileName", "Name for the copied config file", "cassandra.yaml");
-
-    @SetFromFlag("cassandraRackdcConfigTemplateUrl")
-    ConfigKey<String> CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
-            "cassandra.config.rackdc.templateUrl", "Template file (in freemarker format) for the cassandra-rackdc.properties config file", 
-            "classpath://brooklyn/entity/nosql/cassandra/cassandra-rackdc.properties");
-
-    @SetFromFlag("cassandraRackdcConfigFileName")
-    ConfigKey<String> CASSANDRA_RACKDC_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey(
-            "cassandra.config.rackdc.fileName", "Name for the copied rackdc config file (used for configuring replication, when a suitable snitch is used)", "cassandra-rackdc.properties");
-    
-    @SetFromFlag("datacenterName")
-    BasicAttributeSensorAndConfigKey<String> DATACENTER_NAME = new BasicAttributeSensorAndConfigKey<String>(
-            String.class, "cassandra.replication.datacenterName", "Datacenter name (used for configuring replication, when a suitable snitch is used)", 
-            null);
-
-    @SetFromFlag("rackName")
-    BasicAttributeSensorAndConfigKey<String> RACK_NAME = new BasicAttributeSensorAndConfigKey<String>(
-            String.class, "cassandra.replication.rackName", "Rack name (used for configuring replication, when a suitable snitch is used)", 
-            null);
-
-    ConfigKey<Integer> NUM_TOKENS_PER_NODE = ConfigKeys.newIntegerConfigKey("cassandra.numTokensPerNode",
-            "Number of tokens per node; if using vnodes, should set this to a value like 256",
-            1);
-    
-    /**
-     * @deprecated since 0.7; use {@link #TOKENS}
-     */
-    @SetFromFlag("token")
-    @Deprecated
-    BasicAttributeSensorAndConfigKey<BigInteger> TOKEN = new BasicAttributeSensorAndConfigKey<BigInteger>(
-            BigInteger.class, "cassandra.token", "Cassandra Token");
-
-    @SetFromFlag("tokens")
-    BasicAttributeSensorAndConfigKey<Set<BigInteger>> TOKENS = new BasicAttributeSensorAndConfigKey<Set<BigInteger>>(
-            new TypeToken<Set<BigInteger>>() {}, "cassandra.tokens", "Cassandra Tokens");
-
-    AttributeSensor<Integer> PEERS = Sensors.newIntegerSensor( "cassandra.peers", "Number of peers in cluster");
-
-    AttributeSensor<Integer> LIVE_NODE_COUNT = Sensors.newIntegerSensor( "cassandra.liveNodeCount", "Number of live nodes in cluster");
-
-    /* Metrics for read/write performance. */
-
-    AttributeSensor<Long> READ_PENDING = Sensors.newLongSensor("cassandra.read.pending", "Current pending ReadStage tasks");
-    AttributeSensor<Integer> READ_ACTIVE = Sensors.newIntegerSensor("cassandra.read.active", "Current active ReadStage tasks");
-    AttributeSensor<Long> READ_COMPLETED = Sensors.newLongSensor("cassandra.read.completed", "Total completed ReadStage tasks");
-    AttributeSensor<Long> WRITE_PENDING = Sensors.newLongSensor("cassandra.write.pending", "Current pending MutationStage tasks");
-    AttributeSensor<Integer> WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.write.active", "Current active MutationStage tasks");
-    AttributeSensor<Long> WRITE_COMPLETED = Sensors.newLongSensor("cassandra.write.completed", "Total completed MutationStage tasks");
-    
-    AttributeSensor<Boolean> SERVICE_UP_JMX = Sensors.newBooleanSensor("cassandra.service.jmx.up", "Whether JMX is up for this service");
-    AttributeSensor<Long> THRIFT_PORT_LATENCY = Sensors.newLongSensor("cassandra.thrift.latency", "Latency for thrift port connection (ms) or null if down");
-
-    AttributeSensor<Double> READS_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.reads.perSec.last", "Reads/sec (last datapoint)");
-    AttributeSensor<Double> WRITES_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.write.perSec.last", "Writes/sec (last datapoint)");
-
-    AttributeSensor<Double> THRIFT_PORT_LATENCY_IN_WINDOW = Sensors.newDoubleSensor("cassandra.thrift.latency.windowed", "Latency for thrift port (ms, averaged over time window)");
-    AttributeSensor<Double> READS_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.reads.perSec.windowed", "Reads/sec (over time window)");
-    AttributeSensor<Double> WRITES_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.writes.perSec.windowed", "Writes/sec (over time window)");
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    ConfigKey<Set<Entity>> INITIAL_SEEDS = (ConfigKey)ConfigKeys.newConfigKey(Set.class, "cassandra.cluster.seeds.initial", 
-            "List of cluster nodes to seed this node");
-
-    ConfigKey<Duration> START_TIMEOUT = ConfigKeys.newConfigKeyWithDefault(BrooklynConfigKeys.START_TIMEOUT, Duration.FIVE_MINUTES);
-    
-    ConfigKey<String> LISTEN_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.listenAddressSensor", "sensor name from which to take the listen address; default (null) is a smart lookup");
-    ConfigKey<String> BROADCAST_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.broadcastAddressSensor", "sensor name from which to take the broadcast address; default (null) is a smart lookup");
-    ConfigKey<String> RPC_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.rpcAddressSensor", "sensor name from which to take the RPC address; default (null) is 0.0.0.0");
-
-    Effector<String> EXECUTE_SCRIPT = CassandraDatacenter.EXECUTE_SCRIPT;
-
-    /* Accessors used from template */
-    
-    String getMajorMinorVersion();
-    Integer getGossipPort();
-    Integer getSslGossipPort();
-    Integer getThriftPort();
-    Integer getNativeTransportPort();
-    String getClusterName();
-    String getListenAddress();
-    String getBroadcastAddress();
-    String getRpcAddress();
-    String getSeeds();
-    
-    String getPrivateIp();
-    String getPublicIp();
-    
-    /**
-     * In range 0 to (2^127)-1; or null if not yet set or known.
-     * Returns the first token if more than one token.
-     * @deprecated since 0.7; see {@link #getTokens()}
-     */
-    @Deprecated
-    BigInteger getToken();
-
-    int getNumTokensPerNode();
-
-    Set<BigInteger> getTokens();
-
-    /**
-     * string value of token (with no commas, which freemarker introduces!) or blank if none
-     * @deprecated since 0.7; use {@link #getTokensAsString()}
-     */
-    @Deprecated
-    String getTokenAsString();
-
-    /** string value of comma-separated tokens; or blank if none */
-    String getTokensAsString();
-
-    /* For configuration */
-    
-    void setToken(String token);
-    
-    /* Using Cassandra */
-    
-    String executeScript(String commands);
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
deleted file mode 100644
index 62850bd..0000000
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.nosql.cassandra;
-
-import brooklyn.entity.java.JavaSoftwareProcessDriver;
-import brooklyn.util.task.system.ProcessTaskWrapper;
-
-public interface CassandraNodeDriver extends JavaSoftwareProcessDriver {
-
-    Integer getGossipPort();
-
-    Integer getSslGossipPort();
-
-    Integer getThriftPort();
-
-    Integer getNativeTransportPort();
-
-    String getClusterName();
-
-    String getCassandraConfigTemplateUrl();
-
-    String getCassandraConfigFileName();
-
-    boolean isClustered();
-
-    ProcessTaskWrapper<Integer> executeScriptAsync(String commands);
-
-    /** returns the address that the given hostname resolves to at the target */
-    String getResolvedAddress(String hostname);
-
-}


Mime
View raw message