Return-Path: X-Original-To: apmail-brooklyn-commits-archive@minotaur.apache.org Delivered-To: apmail-brooklyn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E78F8182A0 for ; Mon, 1 Feb 2016 17:47:38 +0000 (UTC) Received: (qmail 51765 invoked by uid 500); 1 Feb 2016 17:47:38 -0000 Delivered-To: apmail-brooklyn-commits-archive@brooklyn.apache.org Received: (qmail 51696 invoked by uid 500); 1 Feb 2016 17:47:38 -0000 Mailing-List: contact commits-help@brooklyn.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.apache.org Delivered-To: mailing list commits@brooklyn.apache.org Received: (qmail 51037 invoked by uid 99); 1 Feb 2016 17:47:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Feb 2016 17:47:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2DBB8E3838; Mon, 1 Feb 2016 17:47:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: heneveld@apache.org To: commits@brooklyn.apache.org Date: Mon, 01 Feb 2016 17:47:53 -0000 Message-Id: <16d7fdd1ed824d6198f92b1debc2e087@git.apache.org> In-Reply-To: <4df94285a12940a89c0b8693de0e15c6@git.apache.org> References: <4df94285a12940a89c0b8693de0e15c6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/51] [abbrv] [partial] brooklyn-library git commit: move subdir from incubator up a level as it is promoted to its own repo (first non-incubator commit!) http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java ---------------------------------------------------------------------- diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java deleted file mode 100644 index 79003c2..0000000 --- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java +++ /dev/null @@ -1,629 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.entity.nosql.cassandra; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.policy.PolicySpec; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.core.effector.EffectorBody; -import org.apache.brooklyn.core.entity.Attributes; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.entity.EntityPredicates; -import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; -import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic; -import org.apache.brooklyn.core.location.Machines; -import org.apache.brooklyn.enricher.stock.Enrichers; -import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy; -import org.apache.brooklyn.entity.group.DynamicClusterImpl; -import org.apache.brooklyn.entity.group.DynamicGroup; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.collections.MutableSet; -import org.apache.brooklyn.util.core.ResourceUtils; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.text.Strings; -import org.apache.brooklyn.util.time.Time; - -import com.google.common.base.Objects; -import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.LinkedHashMultimap; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import com.google.common.net.HostAndPort; - -/** - * Implementation of {@link CassandraDatacenter}. - *

- * Several subtleties to note: - * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port - * (so we wait for thrift port to be contactable) - * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema - * (each up to 1m; often very close to the 1m) - */ -public class CassandraDatacenterImpl extends DynamicClusterImpl implements CassandraDatacenter { - - /* - * TODO Seed management is hard! - * - The ServiceRestarter is not doing customize(), so is not refreshing the seeds in cassandra.yaml. - * If we have two nodes that were seeds for each other and they both restart at the same time, we'll have a split brain. - */ - - private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterImpl.class); - - // Mutex for synchronizing during re-size operations - private final Object mutex = new Object[0]; - - private final Supplier> defaultSeedSupplier = new Supplier>() { - // Mutex for (re)calculating our seeds - // TODO is this very dangerous?! Calling out to SeedTracker, which calls out to alien getAttribute()/getConfig(). But I think that's ok. - // TODO might not need mutex? previous race was being caused by something else, other than concurrent calls! - private final Object seedMutex = new Object(); - - @Override - public Set get() { - synchronized (seedMutex) { - boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS)); - int quorumSize = getSeedQuorumSize(); - Set potentialSeeds = gatherPotentialSeeds(); - Set potentialRunningSeeds = gatherPotentialRunningSeeds(); - boolean stillWaitingForQuorum = (!hasPublishedSeeds) && (potentialSeeds.size() < quorumSize); - - if (stillWaitingForQuorum) { - if (log.isDebugEnabled()) log.debug("Not refreshed seeds of cluster {}, because still waiting for quorum (need {}; have {} potentials)", new Object[] {CassandraDatacenterImpl.class, quorumSize, potentialSeeds.size()}); - return ImmutableSet.of(); - } else if (hasPublishedSeeds) { - Set currentSeeds = getAttribute(CURRENT_SEEDS); - if (getAttribute(SERVICE_STATE_ACTUAL) == Lifecycle.STARTING) { - if (Sets.intersection(currentSeeds, potentialSeeds).isEmpty()) { - log.warn("Cluster {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraDatacenterImpl.this, currentSeeds}); - } - return currentSeeds; - } else if (potentialRunningSeeds.isEmpty()) { - // TODO Could be race where nodes have only just returned from start() and are about to - // transition to serviceUp; so don't just abandon all our seeds! - log.warn("Cluster {} has no running seeds (yet?); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraDatacenterImpl.this}); - return currentSeeds; - } else { - Set result = trim(quorumSize, potentialRunningSeeds); - log.debug("Cluster {} updating seeds: chosen={}; potentialRunning={}", new Object[] {CassandraDatacenterImpl.this, result, potentialRunningSeeds}); - return result; - } - } else { - Set result = trim(quorumSize, potentialSeeds); - if (log.isDebugEnabled()) log.debug("Cluster {} has reached seed quorum: seeds={}", new Object[] {CassandraDatacenterImpl.this, result}); - return result; - } - } - } - private Set trim(int num, Set contenders) { - // Prefer existing seeds wherever possible; otherwise accept any other contenders - Set currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.of(); - Set result = Sets.newLinkedHashSet(); - result.addAll(Sets.intersection(currentSeeds, contenders)); - result.addAll(contenders); - return ImmutableSet.copyOf(Iterables.limit(result, num)); - } - }; - - protected SeedTracker seedTracker = new SeedTracker(); - protected TokenGenerator tokenGenerator = null; - - public CassandraDatacenterImpl() { - } - - @Override - public void init() { - super.init(); - - /* - * subscribe to hostname, and keep an accurate set of current seeds in a sensor; - * then at nodes we set the initial seeds to be the current seeds when ready (non-empty) - */ - subscriptions().subscribeToMembers(this, Attributes.HOSTNAME, new SensorEventListener() { - @Override - public void onEvent(SensorEvent event) { - seedTracker.onHostnameChanged(event.getSource(), event.getValue()); - } - }); - subscriptions().subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener() { - @Override public void onEvent(SensorEvent event) { - seedTracker.onMemberRemoved(event.getValue()); - } - }); - subscriptions().subscribeToMembers(this, Attributes.SERVICE_UP, new SensorEventListener() { - @Override - public void onEvent(SensorEvent event) { - seedTracker.onServiceUpChanged(event.getSource(), event.getValue()); - } - }); - subscriptions().subscribeToMembers(this, Attributes.SERVICE_STATE_ACTUAL, new SensorEventListener() { - @Override - public void onEvent(SensorEvent event) { - // trigger a recomputation also when lifecycle state changes, - // because it might not have ruled a seed as inviable when service up went true - // because service state was not yet running - seedTracker.onServiceUpChanged(event.getSource(), Lifecycle.RUNNING==event.getValue()); - } - }); - - // Track the datacenters for this cluster - subscriptions().subscribeToMembers(this, CassandraNode.DATACENTER_NAME, new SensorEventListener() { - @Override - public void onEvent(SensorEvent event) { - Entity member = event.getSource(); - String dcName = event.getValue(); - if (dcName != null) { - Multimap datacenterUsage = getAttribute(DATACENTER_USAGE); - Multimap mutableDatacenterUsage = (datacenterUsage == null) ? LinkedHashMultimap.create() : LinkedHashMultimap.create(datacenterUsage); - Optional oldDcName = getKeyOfVal(mutableDatacenterUsage, member); - if (!(oldDcName.isPresent() && dcName.equals(oldDcName.get()))) { - mutableDatacenterUsage.values().remove(member); - mutableDatacenterUsage.put(dcName, member); - sensors().set(DATACENTER_USAGE, mutableDatacenterUsage); - sensors().set(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet())); - } - } - } - private Optional getKeyOfVal(Multimap map, V val) { - for (Map.Entry entry : map.entries()) { - if (Objects.equal(val, entry.getValue())) { - return Optional.of(entry.getKey()); - } - } - return Optional.absent(); - } - }); - subscriptions().subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener() { - @Override public void onEvent(SensorEvent event) { - Entity entity = event.getSource(); - Multimap datacenterUsage = getAttribute(DATACENTER_USAGE); - if (datacenterUsage != null && datacenterUsage.containsValue(entity)) { - Multimap mutableDatacenterUsage = LinkedHashMultimap.create(datacenterUsage); - mutableDatacenterUsage.values().remove(entity); - sensors().set(DATACENTER_USAGE, mutableDatacenterUsage); - sensors().set(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet())); - } - } - }); - - getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody() { - @Override - public String call(ConfigBag parameters) { - return executeScript((String)parameters.getStringKey("commands")); - } - }); - } - - protected Supplier> getSeedSupplier() { - Supplier> seedSupplier = getConfig(SEED_SUPPLIER); - return (seedSupplier == null) ? defaultSeedSupplier : seedSupplier; - } - - protected boolean useVnodes() { - return Boolean.TRUE.equals(getConfig(USE_VNODES)); - } - - protected synchronized TokenGenerator getTokenGenerator() { - if (tokenGenerator!=null) - return tokenGenerator; - - try { - tokenGenerator = getConfig(TOKEN_GENERATOR_CLASS).newInstance(); - - BigInteger shift = getConfig(TOKEN_SHIFT); - if (shift==null) - shift = BigDecimal.valueOf(Math.random()).multiply( - new BigDecimal(tokenGenerator.range())).toBigInteger(); - tokenGenerator.setOrigin(shift); - - return tokenGenerator; - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - protected int getSeedQuorumSize() { - Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE); - if (quorumSize!=null && quorumSize>0) - return quorumSize; - // default 2 is recommended, unless initial size is smaller - return Math.min(Math.max(getConfig(INITIAL_SIZE), 1), DEFAULT_SEED_QUORUM); - } - - @Override - public Set gatherPotentialSeeds() { - return seedTracker.gatherPotentialSeeds(); - } - - @Override - public Set gatherPotentialRunningSeeds() { - return seedTracker.gatherPotentialRunningSeeds(); - } - - /** - * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra nodes. - */ - @Override - protected EntitySpec getMemberSpec() { - return getConfig(MEMBER_SPEC, EntitySpec.create(CassandraNode.class)); - } - - @Override - public String getClusterName() { - return getAttribute(CLUSTER_NAME); - } - - @Override - public Collection grow(int delta) { - if (useVnodes()) { - // nothing to do for token generator - } else { - if (getCurrentSize() == 0) { - getTokenGenerator().growingCluster(delta); - } - } - return super.grow(delta); - } - - @Override - protected Entity createNode(@Nullable Location loc, Map flags) { - Map allflags = MutableMap.copyOf(flags); - - if (flags.containsKey("token") || flags.containsKey("cassandra.token")) { - // TODO Delete in future version; was deprecated in 0.7.0; deleted config key in 0.9.0 - log.warn("Cassandra token no longer supported - use 'tokens' in "+CassandraDatacenterImpl.this); - } - if (flags.containsKey(CassandraNode.TOKENS) || flags.containsKey("tokens") || flags.containsKey("cassandra.tokens")) { - // leave token config as-is - } else if (!useVnodes()) { - BigInteger token = getTokenGenerator().newToken(); - if (token != null) { - allflags.put(CassandraNode.TOKENS, ImmutableSet.of(token)); - } - } - - if ((flags.containsKey(CassandraNode.NUM_TOKENS_PER_NODE) || flags.containsKey("numTokensPerNode"))) { - // leave num_tokens as-is - } else if (useVnodes()) { - Integer numTokensPerNode = getConfig(NUM_TOKENS_PER_NODE); - allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, numTokensPerNode); - } else { - allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, 1); - } - - return super.createNode(loc, allflags); - } - - @Override - protected Entity replaceMember(Entity member, Location memberLoc, Map extraFlags) { - Set oldTokens = ((CassandraNode) member).getTokens(); - Set newTokens = (oldTokens != null && oldTokens.size() > 0) ? getTokenGenerator().getTokensForReplacementNode(oldTokens) : null; - return super.replaceMember(member, memberLoc, MutableMap.copyOf(extraFlags).add(CassandraNode.TOKENS, newTokens)); - } - - @Override - public void start(Collection locations) { - Machines.warnIfLocalhost(locations, "CassandraCluster does not support multiple nodes on localhost, " + - "due to assumptions Cassandra makes about the use of the same port numbers used across the cluster."); - - // force this to be set - even if it is using the default - sensors().set(CLUSTER_NAME, getConfig(CLUSTER_NAME)); - - super.start(locations); - - connectSensors(); - - // TODO wait until all nodes which we think are up are consistent - // i.e. all known nodes use the same schema, as reported by - // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli"); - // once we've done that we can revert to using 2 seed nodes. - // see CassandraCluster.DEFAULT_SEED_QUORUM - // (also ensure the cluster is ready if we are about to run a creation script) - Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER)); - - String scriptUrl = getConfig(CassandraNode.CREATION_SCRIPT_URL); - if (Strings.isNonEmpty(scriptUrl)) { - executeScript(new ResourceUtils(this).getResourceAsString(scriptUrl)); - } - - update(); - } - - protected void connectSensors() { - connectEnrichers(); - - policies().add(PolicySpec.create(MemberTrackingPolicy.class) - .displayName("Cassandra Cluster Tracker") - .configure("sensorsToTrack", ImmutableSet.of(Attributes.SERVICE_UP, Attributes.HOSTNAME, CassandraNode.THRIFT_PORT)) - .configure("group", this)); - } - - public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { - @Override - protected void onEntityChange(Entity member) { - if (log.isDebugEnabled()) log.debug("Node {} updated in Cluster {}", member, this); - ((CassandraDatacenterImpl)entity).update(); - } - @Override - protected void onEntityAdded(Entity member) { - if (log.isDebugEnabled()) log.debug("Node {} added to Cluster {}", member, this); - ((CassandraDatacenterImpl)entity).update(); - } - @Override - protected void onEntityRemoved(Entity member) { - if (log.isDebugEnabled()) log.debug("Node {} removed from Cluster {}", member, this); - ((CassandraDatacenterImpl)entity).update(); - } - }; - - @SuppressWarnings("unchecked") - protected void connectEnrichers() { - List>> summingEnricherSetup = ImmutableList.of( - ImmutableList.of(CassandraNode.READ_ACTIVE, READ_ACTIVE), - ImmutableList.of(CassandraNode.READ_PENDING, READ_PENDING), - ImmutableList.of(CassandraNode.WRITE_ACTIVE, WRITE_ACTIVE), - ImmutableList.of(CassandraNode.WRITE_PENDING, WRITE_PENDING) - ); - - List>> averagingEnricherSetup = ImmutableList.of( - ImmutableList.of(CassandraNode.READS_PER_SECOND_LAST, READS_PER_SECOND_LAST_PER_NODE), - ImmutableList.of(CassandraNode.WRITES_PER_SECOND_LAST, WRITES_PER_SECOND_LAST_PER_NODE), - ImmutableList.of(CassandraNode.WRITES_PER_SECOND_IN_WINDOW, WRITES_PER_SECOND_IN_WINDOW_PER_NODE), - ImmutableList.of(CassandraNode.READS_PER_SECOND_IN_WINDOW, READS_PER_SECOND_IN_WINDOW_PER_NODE), - ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY, THRIFT_PORT_LATENCY_PER_NODE), - ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY_IN_WINDOW, THRIFT_PORT_LATENCY_IN_WINDOW_PER_NODE), - ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_LAST, PROCESS_CPU_TIME_FRACTION_LAST_PER_NODE), - ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_IN_WINDOW, PROCESS_CPU_TIME_FRACTION_IN_WINDOW_PER_NODE) - ); - - for (List> es : summingEnricherSetup) { - AttributeSensor t = es.get(0); - AttributeSensor total = es.get(1); - enrichers().add(Enrichers.builder() - .aggregating(t) - .publishing(total) - .fromMembers() - .computingSum() - .defaultValueForUnreportedSensors(null) - .valueToReportIfNoSensors(null) - .build()); - } - - for (List> es : averagingEnricherSetup) { - AttributeSensor t = (AttributeSensor) es.get(0); - AttributeSensor average = (AttributeSensor) es.get(1); - enrichers().add(Enrichers.builder() - .aggregating(t) - .publishing(average) - .fromMembers() - .computingAverage() - .defaultValueForUnreportedSensors(null) - .valueToReportIfNoSensors(null) - .build()); - - } - } - - @Override - public void stop() { - disconnectSensors(); - - super.stop(); - } - - protected void disconnectSensors() { - } - - @Override - public void update() { - synchronized (mutex) { - // Update our seeds, as necessary - seedTracker.refreshSeeds(); - - // Choose the first available cluster member to set host and port (and compute one-up) - Optional upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE)); - - if (upNode.isPresent()) { - sensors().set(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME)); - sensors().set(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT)); - - List currentNodes = getAttribute(CASSANDRA_CLUSTER_NODES); - Set oldNodes = (currentNodes != null) ? ImmutableSet.copyOf(currentNodes) : ImmutableSet.of(); - Set newNodes = MutableSet.of(); - for (Entity member : getMembers()) { - if (member instanceof CassandraNode && Boolean.TRUE.equals(member.getAttribute(SERVICE_UP))) { - String hostname = member.getAttribute(Attributes.HOSTNAME); - Integer thriftPort = member.getAttribute(CassandraNode.THRIFT_PORT); - if (hostname != null && thriftPort != null) { - newNodes.add(HostAndPort.fromParts(hostname, thriftPort).toString()); - } - } - } - if (Sets.symmetricDifference(oldNodes, newNodes).size() > 0) { - sensors().set(CASSANDRA_CLUSTER_NODES, MutableList.copyOf(newNodes)); - } - } else { - sensors().set(HOSTNAME, null); - sensors().set(THRIFT_PORT, null); - sensors().set(CASSANDRA_CLUSTER_NODES, Collections.emptyList()); - } - - ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyList(this, CASSANDRA_CLUSTER_NODES); - } - } - - /** - * For tracking our seeds. This gets fiddly! High-level logic is: - *

    - *
  • If we have never reached quorum (i.e. have never published seeds), then continue to wait for quorum; - * because entity-startup may be blocking for this. This is handled by the seedSupplier. - *
  • If we previously reached quorum (i.e. have previousy published seeds), then always update; - * we never want stale/dead entities listed in our seeds. - *
  • If an existing seed looks unhealthy, then replace it. - *
  • If a new potential seed becomes available (and we're in need of more), then add it. - *
      - * - * Also note that {@link CassandraFabric} can take over, because it know about multiple sub-clusters! - * It will provide a different {@link CassandraDatacenter#SEED_SUPPLIER}. Each time we think that our seeds - * need to change, we call that. The fabric will call into {@link CassandraDatacenterImpl#gatherPotentialSeeds()} - * to find out what's available. - * - * @author aled - */ - protected class SeedTracker { - private final Map memberUpness = Maps.newLinkedHashMap(); - - public void onMemberRemoved(Entity member) { - Set seeds = getSeeds(); - boolean maybeRemove = seeds.contains(member); - memberUpness.remove(member); - - if (maybeRemove) { - refreshSeeds(); - } else { - if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} removed)", new Object[] {CassandraDatacenterImpl.this, member}); - return; - } - } - public void onHostnameChanged(Entity member, String hostname) { - Set seeds = getSeeds(); - int quorum = getSeedQuorumSize(); - boolean isViable = isViableSeed(member); - boolean maybeAdd = isViable && seeds.size() < quorum; - boolean maybeRemove = seeds.contains(member) && !isViable; - - if (maybeAdd || maybeRemove) { - refreshSeeds(); - } else { - if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed hostname {})", new Object[] {CassandraDatacenterImpl.this, member, hostname}); - return; - } - } - public void onServiceUpChanged(Entity member, Boolean serviceUp) { - Boolean oldVal = memberUpness.put(member, serviceUp); - if (Objects.equal(oldVal, serviceUp)) { - if (log.isTraceEnabled()) log.trace("Ignoring duplicate service-up in "+CassandraDatacenterImpl.this+" for "+member+", "+serviceUp); - } - Set seeds = getSeeds(); - int quorum = getSeedQuorumSize(); - boolean isViable = isViableSeed(member); - boolean maybeAdd = isViable && seeds.size() < quorum; - boolean maybeRemove = seeds.contains(member) && !isViable; - - if (log.isDebugEnabled()) - log.debug("Considering refresh of seeds for "+CassandraDatacenterImpl.this+" because "+member+" is now "+serviceUp+" ("+isViable+" / "+maybeAdd+" / "+maybeRemove+")"); - if (maybeAdd || maybeRemove) { - refreshSeeds(); - } else { - if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed serviceUp {})", new Object[] {CassandraDatacenterImpl.this, member, serviceUp}); - return; - } - } - protected Set getSeeds() { - Set result = getAttribute(CURRENT_SEEDS); - return (result == null) ? ImmutableSet.of() : result; - } - public void refreshSeeds() { - Set oldseeds = getAttribute(CURRENT_SEEDS); - Set newseeds = getSeedSupplier().get(); - if (Objects.equal(oldseeds, newseeds)) { - if (log.isTraceEnabled()) log.debug("Seed refresh no-op for cluster {}: still={}", new Object[] {CassandraDatacenterImpl.this, oldseeds}); - } else { - if (log.isDebugEnabled()) log.debug("Refreshing seeds of cluster {}: now={}; old={}", new Object[] {this, newseeds, oldseeds}); - sensors().set(CURRENT_SEEDS, newseeds); - if (newseeds != null && newseeds.size() > 0) { - sensors().set(HAS_PUBLISHED_SEEDS, true); - } - } - } - public Set gatherPotentialSeeds() { - Set result = Sets.newLinkedHashSet(); - for (Entity member : getMembers()) { - if (isViableSeed(member)) { - result.add(member); - } - } - if (log.isTraceEnabled()) log.trace("Viable seeds in Cluster {}: {}", new Object[] {result}); - return result; - } - public Set gatherPotentialRunningSeeds() { - Set result = Sets.newLinkedHashSet(); - for (Entity member : getMembers()) { - if (isRunningSeed(member)) { - result.add(member); - } - } - if (log.isTraceEnabled()) log.trace("Viable running seeds in Cluster {}: {}", new Object[] {result}); - return result; - } - public boolean isViableSeed(Entity member) { - // TODO would be good to reuse the better logic in ServiceFailureDetector - // (e.g. if that didn't just emit a notification but set a sensor as well?) - boolean managed = Entities.isManaged(member); - String hostname = member.getAttribute(Attributes.HOSTNAME); - boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP)); - Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL); - boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED); - boolean result = (hostname != null && !hasFailed); - if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, this, result, hostname, serviceUp, serviceState, hasFailed}); - return result; - } - public boolean isRunningSeed(Entity member) { - boolean viableSeed = isViableSeed(member); - boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP)); - Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL); - boolean result = viableSeed && serviceUp && serviceState == Lifecycle.RUNNING; - if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: runningSeed={}; viableSeed={}; serviceUp={}; serviceState={}", new Object[] {member, this, result, viableSeed, serviceUp, serviceState}); - return result; - } - } - - @Override - public String executeScript(String commands) { - Entity someChild = Iterables.getFirst(getMembers(), null); - if (someChild==null) - throw new IllegalStateException("No Cassandra nodes available"); - // FIXME cross-etntity method-style calls such as below do not set up a queueing context (DynamicSequentialTask) -// return ((CassandraNode)someChild).executeScript(commands); - return Entities.invokeEffector(this, someChild, CassandraNode.EXECUTE_SCRIPT, MutableMap.of("commands", commands)).getUnchecked(); - } - -} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java ---------------------------------------------------------------------- diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java deleted file mode 100644 index 5d9a9ca..0000000 --- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.entity.nosql.cassandra; - -import java.util.Set; - -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.ImplementedBy; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.annotation.Effector; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.effector.MethodEffector; -import org.apache.brooklyn.entity.group.DynamicFabric; - -import com.google.common.base.Function; -import com.google.common.collect.Multimap; -import com.google.common.reflect.TypeToken; - -/** - * A fabric of {@link CassandraNode}s, which forms a cluster spanning multiple locations. - *

      - * Each {@link CassandraDatacenter} child instance is actually just a part of the whole cluster. It consists of the - * nodes in that single location (which normally corresponds to a "datacenter" in Cassandra terminology). - */ -@Catalog(name="Apache Cassandra Database Fabric", description="Cassandra is a highly scalable, eventually " + - "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " + - "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg") -@ImplementedBy(CassandraFabricImpl.class) -public interface CassandraFabric extends DynamicFabric { - - ConfigKey INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey( - "fabric.initial.quorumSize", - "Initial fabric quorum size - number of initial nodes that must have been successfully started " + - "to report success (if less than 0, then use a value based on INITIAL_SIZE of clusters)", - -1); - - @SuppressWarnings("serial") - ConfigKey> DATA_CENTER_NAMER = ConfigKeys.newConfigKey(new TypeToken>(){}, - "cassandra.fabric.datacenter.namer", - "Function used to provide the cassandra.replication.datacenterName for a given location"); - - int DEFAULT_SEED_QUORUM = 5; - - AttributeSensor> DATACENTER_USAGE = CassandraDatacenter.DATACENTER_USAGE; - - AttributeSensor> DATACENTERS = CassandraDatacenter.DATACENTERS; - - AttributeSensor> CURRENT_SEEDS = CassandraDatacenter.CURRENT_SEEDS; - - AttributeSensor HAS_PUBLISHED_SEEDS = CassandraDatacenter.HAS_PUBLISHED_SEEDS; - - AttributeSensor HOSTNAME = CassandraDatacenter.HOSTNAME; - - AttributeSensor THRIFT_PORT = CassandraDatacenter.THRIFT_PORT; - - MethodEffector UPDATE = new MethodEffector(CassandraFabric.class, "update"); - - @Effector(description="Updates the cluster members") - void update(); - -} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java ---------------------------------------------------------------------- diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java deleted file mode 100644 index 5aa108d..0000000 --- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.entity.nosql.cassandra; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.EntitySpec; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.policy.PolicySpec; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.core.entity.Attributes; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.core.entity.EntityPredicates; -import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; -import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy; -import org.apache.brooklyn.entity.group.DynamicFabricImpl; -import org.apache.brooklyn.entity.group.DynamicGroup; -import org.apache.brooklyn.util.collections.CollectionFunctionals; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.collections.MutableSet; -import org.apache.brooklyn.util.time.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.base.Objects; -import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.LinkedHashMultimap; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; - -/** - * Implementation of {@link CassandraDatacenter}. - *

      - * Serveral subtleties to note: - * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port - * (so we wait for thrift port to be contactable) - * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema - * (each up to 1m; often very close to the 1m) - */ -public class CassandraFabricImpl extends DynamicFabricImpl implements CassandraFabric { - - private static final Logger log = LoggerFactory.getLogger(CassandraFabricImpl.class); - - // Mutex for synchronizing during re-size operations - private final Object mutex = new Object[0]; - - private final Supplier> defaultSeedSupplier = new Supplier>() { - @Override public Set get() { - // TODO Remove duplication from CassandraClusterImpl.defaultSeedSupplier - Set seeds = getAttribute(CURRENT_SEEDS); - boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS)); - int quorumSize = getSeedQuorumSize(); - - // update seeds if we're not quorate; note this may not work for dynamically adding new datacenters - // as we do not take a new seed from the new datacenter - if (seeds == null || seeds.size() < quorumSize || containsDownEntity(seeds)) { - Set newseeds; - Map> potentialSeeds = MutableMap.of(); - int potentialSeedCount = 0; - for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { - Set dcPotentialSeeds = member.gatherPotentialSeeds(); - potentialSeeds.put(member, dcPotentialSeeds); - potentialSeedCount += dcPotentialSeeds.size(); - } - - if (hasPublishedSeeds) { - Set currentSeeds = getAttribute(CURRENT_SEEDS); - Lifecycle serviceState = getAttribute(SERVICE_STATE_ACTUAL); - if (serviceState == Lifecycle.STARTING) { - if (Sets.intersection(currentSeeds, ImmutableSet.copyOf(Iterables.concat(potentialSeeds.values()))).isEmpty()) { - log.warn("Fabric {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraFabricImpl.this, currentSeeds}); - } - newseeds = currentSeeds; - } else if (serviceState == Lifecycle.STOPPING || serviceState == Lifecycle.STOPPED) { - if (log.isTraceEnabled()) log.trace("Fabric {} ignoring any potential seed-changes, because {}: seeds={}", new Object[] {CassandraFabricImpl.this, serviceState, currentSeeds}); - newseeds = currentSeeds; - } else if (potentialSeedCount == 0) { - // TODO Could be race where nodes have only just returned from start() and are about to - // transition to serviceUp; so don't just abandon all our seeds! - log.warn("Fabric {} has no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this}); - newseeds = currentSeeds; - } else if (!allNonEmpty(potentialSeeds.values())) { - log.warn("Fabric {} has datacenter with no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this}); - newseeds = currentSeeds; - } else { - Set result = selectSeeds(quorumSize, potentialSeeds); - if (log.isDebugEnabled() && !Objects.equal(seeds, result)) { - log.debug("Fabric {} updating seeds: chosen={}; potential={}", new Object[] {CassandraFabricImpl.this, result, potentialSeeds}); - } - newseeds = result; - } - } else if (potentialSeedCount < quorumSize) { - if (log.isDebugEnabled()) log.debug("Not setting seeds of fabric {} yet, because still waiting for quorum (need {}; have {} potentials from {} members)", new Object[] {CassandraFabricImpl.this, quorumSize, potentialSeedCount, getMembers()}); - newseeds = ImmutableSet.of(); - } else if (!allNonEmpty(potentialSeeds.values())) { - if (log.isDebugEnabled()) { - Map datacenterCounts = Maps.transformValues(potentialSeeds, CollectionFunctionals.sizeFunction()); - log.debug("Not setting seeds of fabric {} yet, because not all datacenters have seeds (sizes are {})", new Object[] {CassandraFabricImpl.this, datacenterCounts}); - } - newseeds = ImmutableSet.of(); - } else { - // yay, we're quorate - Set result = selectSeeds(quorumSize, potentialSeeds); - log.info("Fabric {} has reached seed quorum: seeds={}", new Object[] {CassandraFabricImpl.this, result}); - newseeds = result; - } - - if (!Objects.equal(seeds, newseeds)) { - sensors().set(CURRENT_SEEDS, newseeds); - - if (newseeds != null && newseeds.size() > 0) { - sensors().set(HAS_PUBLISHED_SEEDS, true); - - // Need to tell every datacenter that seeds are ready. - // Otherwise a datacenter might get no more changes (e.g. to nodes' hostnames etc), - // and not call seedSupplier.get() again. - for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { - member.update(); - } - } - return newseeds; - } else { - return seeds; - } - } else { - if (log.isTraceEnabled()) log.trace("Not refresheed seeds of fabric {}, because have quorum {} (of {} members), and none are down: seeds={}", - new Object[] {CassandraFabricImpl.class, quorumSize, getMembers().size(), seeds}); - return seeds; - } - } - private boolean allNonEmpty(Collection> contenders) { - for (Collection contender: contenders) - if (contender.isEmpty()) return false; - return true; - } - private Set selectSeeds(int num, Map> contenders) { - // Prefer existing seeds wherever possible; - // otherwise prefer a seed from each sub-cluster; - // otherwise accept any other contenders - Set currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.of(); - MutableSet result = MutableSet.of(); - result.addAll(Sets.intersection(currentSeeds, ImmutableSet.copyOf(contenders.values()))); - for (CassandraDatacenter cluster : contenders.keySet()) { - Set contendersInCluster = Sets.newLinkedHashSet(contenders.get(cluster)); - if (contendersInCluster.size() > 0 && Sets.intersection(result, contendersInCluster).isEmpty()) { - result.add(Iterables.getFirst(contendersInCluster, null)); - } - } - result.addAll(Iterables.concat(contenders.values())); - return ImmutableSet.copyOf(Iterables.limit(result, num)); - } - private boolean containsDownEntity(Set seeds) { - for (Entity seed : seeds) { - if (!isViableSeed(seed)) { - return true; - } - } - return false; - } - public boolean isViableSeed(Entity member) { - // TODO remove duplication from CassandraClusterImpl.SeedTracker.isViableSeed - boolean managed = Entities.isManaged(member); - String hostname = member.getAttribute(Attributes.HOSTNAME); - boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP)); - Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL); - boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED); - boolean result = (hostname != null && !hasFailed); - if (log.isTraceEnabled()) log.trace("Node {} in Fabric {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, CassandraFabricImpl.this, result, hostname, serviceUp, serviceState, hasFailed}); - return result; - } - }; - - public CassandraFabricImpl() { - } - - @Override - public void init() { - super.init(); - - if (!config().getRaw(CassandraDatacenter.SEED_SUPPLIER).isPresentAndNonNull()) - config().set(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier()); - - // track members - policies().add(PolicySpec.create(MemberTrackingPolicy.class) - .displayName("Cassandra Fabric Tracker") - .configure("group", this)); - - // Track first node's startup - subscriptions().subscribeToMembers(this, CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, new SensorEventListener() { - @Override - public void onEvent(SensorEvent event) { - Long oldval = getAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC); - Long newval = event.getValue(); - if (oldval == null && newval != null) { - sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval); - for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { - ((EntityInternal)member).sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval); - } - } - } - }); - - // Track the datacenters for this cluster - subscriptions().subscribeToMembers(this, CassandraDatacenter.DATACENTER_USAGE, new SensorEventListener>() { - @Override - public void onEvent(SensorEvent> event) { - Multimap usage = calculateDatacenterUsage(); - sensors().set(DATACENTER_USAGE, usage); - sensors().set(DATACENTERS, usage.keySet()); - } - }); - subscriptions().subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener() { - @Override public void onEvent(SensorEvent event) { - Multimap usage = calculateDatacenterUsage(); - sensors().set(DATACENTER_USAGE, usage); - sensors().set(DATACENTERS, usage.keySet()); - } - }); - } - - public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { - @Override - protected void onEntityChange(Entity member) { - if (log.isDebugEnabled()) log.debug("Location {} updated in Fabric {}", member, entity); - ((CassandraFabricImpl)entity).update(); - } - @Override - protected void onEntityAdded(Entity member) { - if (log.isDebugEnabled()) log.debug("Location {} added to Fabric {}", member, entity); - ((CassandraFabricImpl)entity).update(); - } - @Override - protected void onEntityRemoved(Entity member) { - if (log.isDebugEnabled()) log.debug("Location {} removed from Fabric {}", member, entity); - ((CassandraFabricImpl)entity).update(); - } - }; - - protected int getSeedQuorumSize() { - Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE); - if (quorumSize!=null && quorumSize>0) - return quorumSize; - - int initialSizeSum = 0; - for (CassandraDatacenter cluster : Iterables.filter(getMembers(), CassandraDatacenter.class)) { - initialSizeSum += cluster.getConfig(CassandraDatacenter.INITIAL_SIZE); - } - if (initialSizeSum>5) initialSizeSum /= 2; - else if (initialSizeSum>3) initialSizeSum -= 2; - else if (initialSizeSum>2) initialSizeSum -= 1; - - return Math.min(Math.max(initialSizeSum, 1), CassandraFabric.DEFAULT_SEED_QUORUM); - } - - /** - * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra sub-clusters. - */ - @Override - protected EntitySpec getMemberSpec() { - // Need to set the seedSupplier, even if the caller has overridden the CassandraCluster config - // (unless they've explicitly overridden the seedSupplier as well!) - // TODO probably don't need to anymore, as it is set on the Fabric here -- just make sure there is a default! - EntitySpec custom = getConfig(MEMBER_SPEC); - if (custom == null) { - return EntitySpec.create(CassandraDatacenter.class) - .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier()); - } else if (custom.getConfig().containsKey(CassandraDatacenter.SEED_SUPPLIER) || custom.getFlags().containsKey("seedSupplier")) { - return custom; - } else { - return EntitySpec.create(custom) - .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier()); - } - } - - @Override - protected Entity createCluster(Location location, Map flags) { - Function dataCenterNamer = getConfig(DATA_CENTER_NAMER); - if (dataCenterNamer != null) { - flags = ImmutableMap.builder() - .putAll(flags) - .put(CassandraNode.DATACENTER_NAME, dataCenterNamer.apply(location)) - .build(); - } - return super.createCluster(location, flags); - } - - /** - * Prefers one node per location, and then others from anywhere. - * Then trims result down to the "quorumSize". - */ - public Supplier> getSeedSupplier() { - return defaultSeedSupplier; - } - - @Override - public void start(Collection locations) { - super.start(locations); - - connectSensors(); - - // TODO wait until all nodes which we think are up are consistent - // i.e. all known nodes use the same schema, as reported by - // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli"); - // once we've done that we can revert to using 2 seed nodes. - // see CassandraCluster.DEFAULT_SEED_QUORUM - Time.sleep(getConfig(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER)); - - update(); - } - - protected void connectSensors() { - connectEnrichers(); - } - - protected void connectEnrichers() { - // TODO Aggregate across sub-clusters - - subscriptions().subscribeToMembers(this, SERVICE_UP, new SensorEventListener() { - @Override public void onEvent(SensorEvent event) { - sensors().set(SERVICE_UP, calculateServiceUp()); - } - }); - } - - @Override - public void stop() { - disconnectSensors(); - - super.stop(); - } - - protected void disconnectSensors() { - } - - protected boolean calculateServiceUp() { - Optional upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE)); - return upNode.isPresent(); - } - - protected Multimap calculateDatacenterUsage() { - Multimap result = LinkedHashMultimap.create(); - for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { - Multimap memberUsage = member.getAttribute(CassandraDatacenter.DATACENTER_USAGE); - if (memberUsage != null) result.putAll(memberUsage); - } - return result; - } - - @Override - public void update() { - synchronized (mutex) { - for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) { - member.update(); - } - - calculateServiceUp(); - - // Choose the first available location to set host and port (and compute one-up) - Optional upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE)); - - if (upNode.isPresent()) { - sensors().set(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME)); - sensors().set(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java ---------------------------------------------------------------------- diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java deleted file mode 100644 index fb937ae..0000000 --- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.entity.nosql.cassandra; - -import java.math.BigInteger; -import java.util.Set; - -import org.apache.brooklyn.api.catalog.Catalog; -import org.apache.brooklyn.api.effector.Effector; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.ImplementedBy; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.BasicConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.BrooklynConfigKeys; -import org.apache.brooklyn.core.location.PortRanges; -import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey; -import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey; -import org.apache.brooklyn.core.sensor.Sensors; -import org.apache.brooklyn.entity.database.DatastoreMixins; -import org.apache.brooklyn.entity.java.UsesJavaMXBeans; -import org.apache.brooklyn.entity.java.UsesJmx; -import org.apache.brooklyn.entity.software.base.SoftwareProcess; -import org.apache.brooklyn.util.core.flags.SetFromFlag; -import org.apache.brooklyn.util.time.Duration; - -import com.google.common.reflect.TypeToken; - -/** - * An {@link org.apache.brooklyn.api.entity.Entity} that represents a Cassandra node in a {@link CassandraDatacenter}. - */ -@Catalog(name="Apache Cassandra Node", description="Cassandra is a highly scalable, eventually " + - "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " + - "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg") -@ImplementedBy(CassandraNodeImpl.class) -public interface CassandraNode extends DatastoreMixins.DatastoreCommon, SoftwareProcess, UsesJmx, UsesJavaMXBeans, DatastoreMixins.HasDatastoreUrl, DatastoreMixins.CanExecuteScript { - - @SetFromFlag("version") - ConfigKey SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "1.2.16"); - // when this changes remember to put a copy under releng2:/var/www/developer/brooklyn/repository/ ! - // TODO experiment with supporting 2.0.x - - @SetFromFlag("downloadUrl") - BasicAttributeSensorAndConfigKey DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey( - SoftwareProcess.DOWNLOAD_URL, "${driver.mirrorUrl}/${version}/apache-cassandra-${version}-bin.tar.gz"); - - /** download mirror, if desired */ - @SetFromFlag("mirrorUrl") - ConfigKey MIRROR_URL = new BasicConfigKey(String.class, "cassandra.install.mirror.url", "URL of mirror", - "http://www.mirrorservice.org/sites/ftp.apache.org/cassandra" - // for older versions, but slower: -// "http://archive.apache.org/dist/cassandra/" - ); - - @SetFromFlag("tgzUrl") - ConfigKey TGZ_URL = new BasicConfigKey(String.class, "cassandra.install.tgzUrl", "URL of TGZ download file"); - - @SetFromFlag("clusterName") - BasicAttributeSensorAndConfigKey CLUSTER_NAME = CassandraDatacenter.CLUSTER_NAME; - - @SetFromFlag("snitchName") - ConfigKey ENDPOINT_SNITCH_NAME = CassandraDatacenter.ENDPOINT_SNITCH_NAME; - - @SetFromFlag("gossipPort") - PortAttributeSensorAndConfigKey GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.gossip.port", "Cassandra Gossip communications port", PortRanges.fromString("7000+")); - - @SetFromFlag("sslGgossipPort") - PortAttributeSensorAndConfigKey SSL_GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.ssl-gossip.port", "Cassandra Gossip SSL communications port", PortRanges.fromString("7001+")); - - @SetFromFlag("thriftPort") - PortAttributeSensorAndConfigKey THRIFT_PORT = new PortAttributeSensorAndConfigKey("cassandra.thrift.port", "Cassandra Thrift RPC port", PortRanges.fromString("9160+")); - - @SetFromFlag("nativePort") - PortAttributeSensorAndConfigKey NATIVE_TRANSPORT_PORT = new PortAttributeSensorAndConfigKey("cassandra.native.port", "Cassandra Native Transport port", PortRanges.fromString("9042+")); - - @SetFromFlag("rmiRegistryPort") - // cassandra nodetool and others want 7199 - not required, but useful - PortAttributeSensorAndConfigKey RMI_REGISTRY_PORT = new PortAttributeSensorAndConfigKey(UsesJmx.RMI_REGISTRY_PORT, - PortRanges.fromInteger(7199)); - - // some of the cassandra tooing (eg nodetool) use RMI, but we want JMXMP, so do both! - ConfigKey JMX_AGENT_MODE = ConfigKeys.newConfigKeyWithDefault(UsesJmx.JMX_AGENT_MODE, JmxAgentModes.JMXMP_AND_RMI); - - // TODO the multicloud-snitch has to be built manually, it is available in the brooklyn sandbox - @SetFromFlag("customSnitchJarUrl") - ConfigKey CUSTOM_SNITCH_JAR_URL = ConfigKeys.newStringConfigKey("cassandra.config.customSnitchUrl", - "URL for a jar file to be uploaded (e.g. \"classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-multicloud-snitch.jar\"); defaults to null which means nothing to upload", - null); - - @SetFromFlag("cassandraConfigTemplateUrl") - ConfigKey CASSANDRA_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey( - "cassandra.config.templateUrl", "A URL (in freemarker format) for a cassandra.yaml config file (in freemarker format)", - "classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-${entity.majorMinorVersion}.yaml"); - - @SetFromFlag("cassandraConfigFileName") - ConfigKey CASSANDRA_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey( - "cassandra.config.fileName", "Name for the copied config file", "cassandra.yaml"); - - @SetFromFlag("cassandraRackdcConfigTemplateUrl") - ConfigKey CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey( - "cassandra.config.rackdc.templateUrl", "Template file (in freemarker format) for the cassandra-rackdc.properties config file", - "classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-rackdc.properties"); - - @SetFromFlag("cassandraRackdcConfigFileName") - ConfigKey CASSANDRA_RACKDC_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey( - "cassandra.config.rackdc.fileName", "Name for the copied rackdc config file (used for configuring replication, when a suitable snitch is used)", "cassandra-rackdc.properties"); - - @SetFromFlag("datacenterName") - BasicAttributeSensorAndConfigKey DATACENTER_NAME = new BasicAttributeSensorAndConfigKey( - String.class, "cassandra.replication.datacenterName", "Datacenter name (used for configuring replication, when a suitable snitch is used)", - null); - - @SetFromFlag("rackName") - BasicAttributeSensorAndConfigKey RACK_NAME = new BasicAttributeSensorAndConfigKey( - String.class, "cassandra.replication.rackName", "Rack name (used for configuring replication, when a suitable snitch is used)", - null); - - ConfigKey NUM_TOKENS_PER_NODE = ConfigKeys.newIntegerConfigKey("cassandra.numTokensPerNode", - "Number of tokens per node; if using vnodes, should set this to a value like 256", - 1); - - @SetFromFlag("tokens") - @SuppressWarnings("serial") - BasicAttributeSensorAndConfigKey> TOKENS = new BasicAttributeSensorAndConfigKey>( - new TypeToken>() {}, "cassandra.tokens", "Cassandra Tokens"); - - @SetFromFlag("useThriftMonitoring") - ConfigKey USE_THRIFT_MONITORING = ConfigKeys.newConfigKey("thriftMonitoring.enabled", "Thrift-port monitoring enabled", Boolean.TRUE); - - AttributeSensor PEERS = Sensors.newIntegerSensor( "cassandra.peers", "Number of peers in cluster"); - - AttributeSensor LIVE_NODE_COUNT = Sensors.newIntegerSensor( "cassandra.liveNodeCount", "Number of live nodes in cluster"); - - /* Metrics for read/write performance. */ - - AttributeSensor READ_PENDING = Sensors.newLongSensor("cassandra.read.pending", "Current pending ReadStage tasks"); - AttributeSensor READ_ACTIVE = Sensors.newIntegerSensor("cassandra.read.active", "Current active ReadStage tasks"); - AttributeSensor READ_COMPLETED = Sensors.newLongSensor("cassandra.read.completed", "Total completed ReadStage tasks"); - AttributeSensor WRITE_PENDING = Sensors.newLongSensor("cassandra.write.pending", "Current pending MutationStage tasks"); - AttributeSensor WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.write.active", "Current active MutationStage tasks"); - AttributeSensor WRITE_COMPLETED = Sensors.newLongSensor("cassandra.write.completed", "Total completed MutationStage tasks"); - - AttributeSensor SERVICE_UP_JMX = Sensors.newBooleanSensor("cassandra.service.jmx.up", "Whether JMX is up for this service"); - AttributeSensor THRIFT_PORT_LATENCY = Sensors.newLongSensor("cassandra.thrift.latency", "Latency for thrift port connection (ms) or null if down"); - - AttributeSensor READS_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.reads.perSec.last", "Reads/sec (last datapoint)"); - AttributeSensor WRITES_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.write.perSec.last", "Writes/sec (last datapoint)"); - - AttributeSensor THRIFT_PORT_LATENCY_IN_WINDOW = Sensors.newDoubleSensor("cassandra.thrift.latency.windowed", "Latency for thrift port (ms, averaged over time window)"); - AttributeSensor READS_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.reads.perSec.windowed", "Reads/sec (over time window)"); - AttributeSensor WRITES_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.writes.perSec.windowed", "Writes/sec (over time window)"); - - @SuppressWarnings({ "rawtypes", "unchecked" }) - ConfigKey> INITIAL_SEEDS = (ConfigKey)ConfigKeys.newConfigKey(Set.class, "cassandra.cluster.seeds.initial", - "List of cluster nodes to seed this node"); - - ConfigKey START_TIMEOUT = ConfigKeys.newConfigKeyWithDefault(BrooklynConfigKeys.START_TIMEOUT, Duration.FIVE_MINUTES); - - ConfigKey LISTEN_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.listenAddressSensor", "sensor name from which to take the listen address; default (null) is a smart lookup"); - ConfigKey BROADCAST_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.broadcastAddressSensor", "sensor name from which to take the broadcast address; default (null) is a smart lookup"); - ConfigKey RPC_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.rpcAddressSensor", "sensor name from which to take the RPC address; default (null) is 0.0.0.0"); - - Effector EXECUTE_SCRIPT = CassandraDatacenter.EXECUTE_SCRIPT; - - /* Accessors used from template */ - - String getMajorMinorVersion(); - Integer getGossipPort(); - Integer getSslGossipPort(); - Integer getThriftPort(); - Integer getNativeTransportPort(); - String getClusterName(); - String getListenAddress(); - String getBroadcastAddress(); - String getRpcAddress(); - String getSeeds(); - - String getPrivateIp(); - String getPublicIp(); - - int getNumTokensPerNode(); - - /** - * Returns the set of tokens. - * Each is in the range 0 to (2^127)-1. - * Returns null if there are no tokens. - */ - Set getTokens(); - - /** string value of comma-separated tokens; or blank if none */ - String getTokensAsString(); - - /* For configuration */ - - void setToken(String token); - - /* Using Cassandra */ - - String executeScript(String commands); - -} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java ---------------------------------------------------------------------- diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java deleted file mode 100644 index 3893373..0000000 --- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.entity.nosql.cassandra; - -import org.apache.brooklyn.entity.java.JavaSoftwareProcessDriver; -import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; - -public interface CassandraNodeDriver extends JavaSoftwareProcessDriver { - - Integer getGossipPort(); - - Integer getSslGossipPort(); - - Integer getThriftPort(); - - Integer getNativeTransportPort(); - - String getClusterName(); - - String getCassandraConfigTemplateUrl(); - - String getCassandraConfigFileName(); - - boolean isClustered(); - - ProcessTaskWrapper executeScriptAsync(String commands); - - /** returns the address that the given hostname resolves to at the target */ - String getResolvedAddress(String hostname); - -}