Return-Path: X-Original-To: apmail-brooklyn-commits-archive@minotaur.apache.org Delivered-To: apmail-brooklyn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6A3FD1891D for ; Sat, 15 Aug 2015 13:33:16 +0000 (UTC) Received: (qmail 3836 invoked by uid 500); 15 Aug 2015 13:33:16 -0000 Delivered-To: apmail-brooklyn-commits-archive@brooklyn.apache.org Received: (qmail 3812 invoked by uid 500); 15 Aug 2015 13:33:16 -0000 Mailing-List: contact commits-help@brooklyn.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.incubator.apache.org Delivered-To: mailing list commits@brooklyn.incubator.apache.org Received: (qmail 3803 invoked by uid 99); 15 Aug 2015 13:33:16 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 Aug 2015 13:33:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B113B1AA209 for ; Sat, 15 Aug 2015 13:33:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.774 X-Spam-Level: * X-Spam-Status: No, score=1.774 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id jI5eUMaFf5dr for ; Sat, 15 Aug 2015 13:33:05 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 3C8A642B2B for ; Sat, 15 Aug 2015 13:33:04 +0000 (UTC) Received: (qmail 3506 invoked by uid 99); 15 Aug 2015 13:33:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 Aug 2015 13:33:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 92BCBE01F5; Sat, 15 Aug 2015 13:33:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hadrian@apache.org To: commits@brooklyn.incubator.apache.org Date: Sat, 15 Aug 2015 13:33:14 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [12/33] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/management http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalEntityManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalEntityManager.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalEntityManager.java new file mode 100644 index 0000000..53d31eb --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalEntityManager.java @@ -0,0 +1,818 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.internal; + +import static com.google.common.base.Preconditions.checkNotNull; +import groovy.util.ObservableList; + +import java.lang.reflect.Proxy; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.entity.Application; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.entity.proxying.EntityTypeRegistry; +import org.apache.brooklyn.api.management.AccessController; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.policy.Enricher; +import org.apache.brooklyn.api.policy.EnricherSpec; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.api.policy.PolicySpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.BrooklynLogging; +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.basic.EntityPredicates; +import brooklyn.entity.proxying.BasicEntityTypeRegistry; +import brooklyn.entity.proxying.EntityProxy; +import brooklyn.entity.proxying.EntityProxyImpl; +import brooklyn.entity.proxying.InternalEntityFactory; +import brooklyn.entity.proxying.InternalPolicyFactory; +import brooklyn.entity.trait.Startable; +import brooklyn.internal.storage.BrooklynStorage; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.collections.SetFromLiveMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.task.Tasks; +import brooklyn.util.time.CountdownTimer; +import brooklyn.util.time.Duration; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class LocalEntityManager implements EntityManagerInternal { + + private static final Logger log = LoggerFactory.getLogger(LocalEntityManager.class); + + private final LocalManagementContext managementContext; + private final BasicEntityTypeRegistry entityTypeRegistry; + private final InternalEntityFactory entityFactory; + private final InternalPolicyFactory policyFactory; + + /** Entities that have been created, but have not yet begun to be managed */ + protected final Map preRegisteredEntitiesById = Collections.synchronizedMap(new WeakHashMap()); + + /** Entities that are in the process of being managed, but where management is not yet complete */ + protected final Map preManagedEntitiesById = Collections.synchronizedMap(new WeakHashMap()); + + /** Proxies of the managed entities */ + protected final ConcurrentMap entityProxiesById = Maps.newConcurrentMap(); + + /** Real managed entities */ + protected final Map entitiesById = Maps.newLinkedHashMap(); + + /** Management mode for each entity */ + protected final Map entityModesById = Collections.synchronizedMap(Maps.newLinkedHashMap()); + + /** Proxies of the managed entities */ + protected final ObservableList entities = new ObservableList(); + + /** Proxies of the managed entities that are applications */ + protected final Set applications = Sets.newConcurrentHashSet(); + + private final BrooklynStorage storage; + private final Map entityTypes; + private final Set applicationIds; + + public LocalEntityManager(LocalManagementContext managementContext) { + this.managementContext = checkNotNull(managementContext, "managementContext"); + this.storage = managementContext.getStorage(); + this.entityTypeRegistry = new BasicEntityTypeRegistry(); + this.policyFactory = new InternalPolicyFactory(managementContext); + this.entityFactory = new InternalEntityFactory(managementContext, entityTypeRegistry, policyFactory); + + entityTypes = storage.getMap("entities"); + applicationIds = SetFromLiveMap.create(storage.getMap("applications")); + } + + public InternalEntityFactory getEntityFactory() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + return entityFactory; + } + + public InternalPolicyFactory getPolicyFactory() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + return policyFactory; + } + + @Override + public EntityTypeRegistry getEntityTypeRegistry() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + return entityTypeRegistry; + } + + @SuppressWarnings("unchecked") + @Override + public T createEntity(EntitySpec spec) { + try { + T entity = entityFactory.createEntity(spec); + Entity proxy = ((AbstractEntity)entity).getProxy(); + return (T) checkNotNull(proxy, "proxy for entity %s, spec %s", entity, spec); + } catch (Throwable e) { + log.warn("Failed to create entity using spec "+spec+" (rethrowing)", e); + throw Exceptions.propagate(e); + } + } + + @Override + public T createEntity(Map config, Class type) { + return createEntity(EntitySpec.create(config, type)); + } + + @Override + public T createPolicy(PolicySpec spec) { + try { + return policyFactory.createPolicy(spec); + } catch (Throwable e) { + log.warn("Failed to create policy using spec "+spec+" (rethrowing)", e); + throw Exceptions.propagate(e); + } + } + + @Override + public T createEnricher(EnricherSpec spec) { + try { + return policyFactory.createEnricher(spec); + } catch (Throwable e) { + log.warn("Failed to create enricher using spec "+spec+" (rethrowing)", e); + throw Exceptions.propagate(e); + } + } + + @Override + public Collection getEntities() { + return ImmutableList.copyOf(entityProxiesById.values()); + } + + @Override + public Collection getEntityIds() { + return ImmutableList.copyOf(entityProxiesById.keySet()); + } + + @Override + public Collection getEntitiesInApplication(Application application) { + Predicate predicate = EntityPredicates.applicationIdEqualTo(application.getId()); + return ImmutableList.copyOf(Iterables.filter(entityProxiesById.values(), predicate)); + } + + @Override + public Collection findEntities(Predicate filter) { + return ImmutableList.copyOf(Iterables.filter(entityProxiesById.values(), filter)); + } + + @Override + public Collection findEntitiesInApplication(Application application, Predicate filter) { + Predicate predicate = Predicates.and(EntityPredicates.applicationIdEqualTo(application.getId()), filter); + return ImmutableList.copyOf(Iterables.filter(entityProxiesById.values(), predicate)); + } + + @Override + public Iterable getAllEntitiesInApplication(Application application) { + Predicate predicate = EntityPredicates.applicationIdEqualTo(application.getId()); + Iterable allentities = Iterables.concat(preRegisteredEntitiesById.values(), preManagedEntitiesById.values(), entityProxiesById.values()); + Iterable result = Iterables.filter(allentities, predicate); + return ImmutableSet.copyOf(Iterables.transform(result, new Function() { + @Override public Entity apply(Entity input) { + return Entities.proxy(input); + }})); + } + + @Override + public Entity getEntity(String id) { + return entityProxiesById.get(id); + } + + Collection getApplications() { + return ImmutableList.copyOf(applications); + } + + @Override + public boolean isManaged(Entity e) { + return (isRunning() && getEntity(e.getId()) != null); + } + + boolean isPreRegistered(Entity e) { + return preRegisteredEntitiesById.containsKey(e.getId()); + } + + void prePreManage(Entity entity) { + if (isPreRegistered(entity)) { + log.warn(""+this+" redundant call to pre-pre-manage entity "+entity+"; skipping", + new Exception("source of duplicate pre-pre-manage of "+entity)); + return; + } + preRegisteredEntitiesById.put(entity.getId(), entity); + } + + @Override + public ManagementTransitionMode getLastManagementTransitionMode(String itemId) { + return entityModesById.get(itemId); + } + + @Override + public void setManagementTransitionMode(Entity item, ManagementTransitionMode mode) { + entityModesById.put(item.getId(), mode); + } + + // TODO synchronization issues here. We guard with isManaged(), but if another thread executing + // concurrently then the managed'ness could be set after our check but before we do + // onManagementStarting etc. However, we can't just synchronize because we're calling alien code + // (the user might override entity.onManagementStarting etc). + // + // TODO We need to do some check about isPreManaged - i.e. is there another thread (or is this a + // re-entrant call) where the entity is not yet full managed (i.e. isManaged==false) but we're in + // the middle of managing it. + // + // TODO Also see LocalLocationManager.manage(Entity), if fixing things here + @Override + public void manage(Entity e) { + if (isManaged(e)) { + log.warn(""+this+" redundant call to start management of entity (and descendants of) "+e+"; skipping", + new Exception("source of duplicate management of "+e)); + return; + } + manageRecursive(e, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.NONEXISTENT, BrooklynObjectManagementMode.MANAGED_PRIMARY)); + } + + @Override + public void manageRebindedRoot(Entity item) { + ManagementTransitionMode mode = getLastManagementTransitionMode(item.getId()); + Preconditions.checkNotNull(mode, "Mode not set for rebinding %s", item); + manageRecursive(item, mode); + } + + protected void checkManagementAllowed(Entity item) { + AccessController.Response access = managementContext.getAccessController().canManageEntity(item); + if (!access.isAllowed()) { + throw new IllegalStateException("Access controller forbids management of "+item+": "+access.getMsg()); + } + } + + /* TODO we sloppily use "recursive" to ensure ordering of parent-first in many places + * (which may not be necessary but seems like a good idea), + * and also to collect many entities when doing a big rebind, + * ensuring all have #manageNonRecursive called before calling #onManagementStarted. + * + * it would be better to have a manageAll(Map items) + * method which did that in two phases, allowing us to selectively rebind, + * esp when we come to want supporting different modes and different brooklyn nodes. + * + * the impl of manageAll could sort them with parents before children, + * (and manageRecursive could simply populate a map and delegate to manageAll). + * + * manageRebindRoot would then go, and the (few) callers would construct the map. + * + * similarly we might want an unmanageAll(), + * although possibly all unmanagement should be recursive, if we assume an entity's ancestors are always at least proxied + * (and the non-recursive RO path here could maybe be dropped) + */ + + /** Applies management lifecycle callbacks (onManagementStarting, for all beforehand, then onManagementStopped, for all after) */ + protected void manageRecursive(Entity e, final ManagementTransitionMode initialMode) { + checkManagementAllowed(e); + + final List allEntities = Lists.newArrayList(); + Predicate manageEntity = new Predicate() { public boolean apply(EntityInternal it) { + ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); + if (mode==null) { + setManagementTransitionMode(it, mode = initialMode); + } + + Boolean isReadOnlyFromEntity = it.getManagementSupport().isReadOnlyRaw(); + if (isReadOnlyFromEntity==null) { + if (mode.isReadOnly()) { + // should have been marked by rebinder + log.warn("Read-only entity "+it+" not marked as such on call to manage; marking and continuing"); + } + it.getManagementSupport().setReadOnly(mode.isReadOnly()); + } else { + if (!isReadOnlyFromEntity.equals(mode.isReadOnly())) { + log.warn("Read-only status at entity "+it+" ("+isReadOnlyFromEntity+") not consistent with management mode "+mode); + } + } + + if (it.getManagementSupport().isDeployed()) { + if (mode.wasNotLoaded()) { + // silently bail out + return false; + } else { + if (mode.wasPrimary() && mode.isPrimary()) { + // active partial rebind; continue + } else if (mode.wasReadOnly() && mode.isReadOnly()) { + // reload in RO mode + } else { + // on initial non-RO rebind, should not have any deployed instances + log.warn("Already deployed "+it+" when managing "+mode+"/"+initialMode+"; ignoring this and all descendants"); + return false; + } + } + } + + // check RO status is consistent + boolean isNowReadOnly = Boolean.TRUE.equals( ((EntityInternal)it).getManagementSupport().isReadOnly() ); + if (mode.isReadOnly()!=isNowReadOnly) { + throw new IllegalStateException("Read-only status mismatch for "+it+": "+mode+" / RO="+isNowReadOnly); + } + + allEntities.add(it); + preManageNonRecursive(it, mode); + it.getManagementSupport().onManagementStarting( new ManagementTransitionInfo(managementContext, mode) ); + return manageNonRecursive(it, mode); + } }; + boolean isRecursive = true; + if (initialMode.wasPrimary() && initialMode.isPrimary()) { + // already managed, so this shouldn't be recursive + // (in ActivePartialRebind we cheat, calling in to this method then skipping recursion). + // it also falls through to here when doing a redundant promotion, + // in that case we *should* be recursive; determine by checking whether a child exists and is preregistered. + // the TODO above removing manageRebindRoot in favour of explicit mgmt list would clean this up a lot! + Entity aChild = Iterables.getFirst(e.getChildren(), null); + if (aChild!=null && isPreRegistered(aChild)) { + log.debug("Managing "+e+" in mode "+initialMode+", doing this recursively because a child is preregistered"); + } else { + log.debug("Managing "+e+" but skipping recursion, as mode is "+initialMode); + isRecursive = false; + } + } + if (!isRecursive) { + manageEntity.apply( (EntityInternal)e ); + } else { + recursively(e, manageEntity); + } + + for (EntityInternal it : allEntities) { + if (!it.getManagementSupport().isFullyManaged()) { + ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); + ManagementTransitionInfo info = new ManagementTransitionInfo(managementContext, mode); + + it.getManagementSupport().onManagementStarted(info); + managementContext.getRebindManager().getChangeListener().onManaged(it); + } + } + } + + @Override + public void unmanage(final Entity e) { + unmanage(e, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.MANAGED_PRIMARY, BrooklynObjectManagementMode.NONEXISTENT)); + } + + public void unmanage(final Entity e, final ManagementTransitionMode mode) { + unmanage(e, mode, false); + } + + private void unmanage(final Entity e, ManagementTransitionMode mode, boolean hasBeenReplaced) { + if (shouldSkipUnmanagement(e)) return; + final ManagementTransitionInfo info = new ManagementTransitionInfo(managementContext, mode); + + if (hasBeenReplaced) { + // we are unmanaging an old instance after having replaced it + // don't unmanage or even clear its fields, because there might be references to it + + if (mode.wasReadOnly()) { + // if coming *from* read only; nothing needed + } else { + if (!mode.wasPrimary()) { + log.warn("Unexpected mode "+mode+" for unmanage-replace "+e+" (applying anyway)"); + } + // migrating away or in-place active partial rebind: + ((EntityInternal)e).getManagementSupport().onManagementStopping(info); + stopTasks(e); + ((EntityInternal)e).getManagementSupport().onManagementStopped(info); + } + // do not remove from maps below, bail out now + return; + + } else if (mode.wasReadOnly() && mode.isNoLongerLoaded()) { + // we are unmanaging an instance (secondary); either stopping here or primary destroyed elsewhere + ((EntityInternal)e).getManagementSupport().onManagementStopping(info); + unmanageNonRecursive(e); + stopTasks(e); + ((EntityInternal)e).getManagementSupport().onManagementStopped(info); + managementContext.getRebindManager().getChangeListener().onUnmanaged(e); + if (managementContext.getGarbageCollector() != null) managementContext.getGarbageCollector().onUnmanaged(e); + + } else if (mode.wasPrimary() && mode.isNoLongerLoaded()) { + // unmanaging a primary; currently this is done recursively + + /* TODO tidy up when it is recursive and when it isn't; if something is being unloaded or destroyed, + * that probably *is* recursive, but the old mode might be different if in some cases things are read-only. + * or maybe nothing needs to be recursive, we just make sure the callers (e.g. HighAvailabilityModeImpl.clearManagedItems) + * call in a good order + * + * see notes above about recursive/manage/All/unmanageAll + */ + + // Need to store all child entities as onManagementStopping removes a child from the parent entity + final List allEntities = Lists.newArrayList(); + recursively(e, new Predicate() { public boolean apply(EntityInternal it) { + if (shouldSkipUnmanagement(it)) return false; + allEntities.add(it); + it.getManagementSupport().onManagementStopping(info); + return true; + } }); + + for (EntityInternal it : allEntities) { + if (shouldSkipUnmanagement(it)) continue; + unmanageNonRecursive(it); + stopTasks(it); + } + for (EntityInternal it : allEntities) { + it.getManagementSupport().onManagementStopped(info); + managementContext.getRebindManager().getChangeListener().onUnmanaged(it); + if (managementContext.getGarbageCollector() != null) managementContext.getGarbageCollector().onUnmanaged(e); + } + + } else { + log.warn("Invalid mode for unmanage: "+mode+" on "+e+" (ignoring)"); + } + + preRegisteredEntitiesById.remove(e.getId()); + preManagedEntitiesById.remove(e.getId()); + entityProxiesById.remove(e.getId()); + entitiesById.remove(e.getId()); + entityModesById.remove(e.getId()); + } + + private void stopTasks(Entity entity) { + stopTasks(entity, null); + } + + /** stops all tasks (apart from any current one or its descendants) on this entity, + * optionally -- if a timeout is given -- waiting for completion and warning on incomplete tasks */ + @Beta + public void stopTasks(Entity entity, @Nullable Duration timeout) { + CountdownTimer timeleft = timeout==null ? null : timeout.countdownTimer(); + // try forcibly interrupting tasks on managed entities + Collection exceptions = MutableSet.of(); + try { + Set> tasksCancelled = MutableSet.of(); + for (Task t: managementContext.getExecutionContext(entity).getTasks()) { + if (entity.equals(BrooklynTaskTags.getContextEntity(Tasks.current())) && hasTaskAsAncestor(t, Tasks.current())) { + // don't cancel if we are running inside a task on the target entity and + // the task being considered is one we have submitted -- e.g. on "stop" don't cancel ourselves! + // but if our current task is from another entity we probably do want to cancel them (we are probably invoking unmanage) + continue; + } + + if (!t.isDone()) { + try { + log.debug("Cancelling "+t+" on "+entity); + tasksCancelled.add(t); + t.cancel(true); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.debug("Error cancelling "+t+" on "+entity+" (will warn when all tasks are cancelled): "+e, e); + exceptions.add(e); + } + } + } + + if (timeleft!=null) { + Set> tasksIncomplete = MutableSet.of(); + // go through all tasks, not just cancelled ones, in case there are previously cancelled ones which are not complete + for (Task t: managementContext.getExecutionContext(entity).getTasks()) { + if (hasTaskAsAncestor(t, Tasks.current())) + continue; + if (!Tasks.blockUntilInternalTasksEnded(t, timeleft.getDurationRemaining())) { + tasksIncomplete.add(t); + } + } + if (!tasksIncomplete.isEmpty()) { + log.warn("Incomplete tasks when stopping "+entity+": "+tasksIncomplete); + } + if (log.isTraceEnabled()) + log.trace("Cancelled "+tasksCancelled+" tasks for "+entity+", with "+ + timeleft.getDurationRemaining()+" remaining (of "+timeout+"): "+tasksCancelled); + } else { + if (log.isTraceEnabled()) + log.trace("Cancelled "+tasksCancelled+" tasks for "+entity+": "+tasksCancelled); + } + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.warn("Error inspecting tasks to cancel on unmanagement: "+e, e); + } + if (!exceptions.isEmpty()) + log.warn("Error when cancelling tasks for "+entity+" on unmanagement: "+Exceptions.create(exceptions)); + } + + private boolean hasTaskAsAncestor(Task t, Task potentialAncestor) { + if (t==null || potentialAncestor==null) return false; + if (t.equals(potentialAncestor)) return true; + return hasTaskAsAncestor(t.getSubmittedByTask(), potentialAncestor); + } + + /** + * activates management when effector invoked, warning unless context is acceptable + * (currently only acceptable context is "start") + */ + void manageIfNecessary(Entity entity, Object context) { + if (!isRunning()) { + return; // TODO Still a race for terminate being called, and then isManaged below returning false + } else if (((EntityInternal)entity).getManagementSupport().wasDeployed()) { + return; + } else if (isManaged(entity)) { + return; + } else if (isPreManaged(entity)) { + return; + } else if (Boolean.TRUE.equals(((EntityInternal)entity).getManagementSupport().isReadOnly())) { + return; + } else { + Entity rootUnmanaged = entity; + while (true) { + Entity candidateUnmanagedParent = rootUnmanaged.getParent(); + if (candidateUnmanagedParent == null || isManaged(candidateUnmanagedParent) || isPreManaged(candidateUnmanagedParent)) + break; + rootUnmanaged = candidateUnmanagedParent; + } + if (context == Startable.START.getName()) + log.info("Activating local management for {} on start", rootUnmanaged); + else + log.warn("Activating local management for {} due to effector invocation on {}: {}", new Object[]{rootUnmanaged, entity, context}); + manage(rootUnmanaged); + } + } + + private void recursively(Entity e, Predicate action) { + Entity otherPreregistered = preRegisteredEntitiesById.get(e.getId()); + if (otherPreregistered!=null) { + // if something has been pre-registered, prefer it + // (e.g. if we recursing through children, we might have a proxy from previous iteration; + // the most recent will have been pre-registered) + e = otherPreregistered; + } + + boolean success = action.apply( (EntityInternal)e ); + if (!success) { + return; // Don't manage children if action false/unnecessary for parent + } + for (Entity child : e.getChildren()) { + recursively(child, action); + } + } + + /** + * Whether the entity is in the process of being managed. + */ + private synchronized boolean isPreManaged(Entity e) { + return preManagedEntitiesById.containsKey(e.getId()); + } + + /** + * Should ensure that the entity is now known about, but should not be accessible from other entities yet. + * + * Records that the given entity is about to be managed (used for answering {@link isPreManaged(Entity)}. + * Note that refs to the given entity are stored in a a weak hashmap so if the subsequent management + * attempt fails then this reference to the entity will eventually be discarded (if no-one else holds + * a reference). + */ + private synchronized boolean preManageNonRecursive(Entity e, ManagementTransitionMode mode) { + Entity realE = toRealEntity(e); + + Object old = preManagedEntitiesById.put(e.getId(), realE); + preRegisteredEntitiesById.remove(e.getId()); + + if (old!=null && mode.wasNotLoaded()) { + if (old.equals(e)) { + log.warn("{} redundant call to pre-start management of entity {}, mode {}; ignoring", new Object[] { this, e, mode }); + } else { + throw new IllegalStateException("call to pre-manage entity "+e+" ("+mode+") but different entity "+old+" already known under that id at "+this); + } + return false; + } else { + if (log.isTraceEnabled()) log.trace("{} pre-start management of entity {}, mode {}", + new Object[] { this, e, mode }); + return true; + } + } + + /** + * Should ensure that the entity is now managed somewhere, and known about in all the lists. + * Returns true if the entity has now become managed; false if it was already managed (anything else throws exception) + * @param isOrWasReadOnly + */ + private synchronized boolean manageNonRecursive(Entity e, ManagementTransitionMode mode) { + Entity old = entitiesById.get(e.getId()); + + if (old!=null && mode.wasNotLoaded()) { + if (old.equals(e)) { + log.warn("{} redundant call to start management of entity {}; ignoring", this, e); + } else { + throw new IllegalStateException("call to manage entity "+e+" ("+mode+") but different entity "+old+" already known under that id at "+this); + } + return false; + } + + BrooklynLogging.log(log, BrooklynLogging.levelDebugOrTraceIfReadOnly(e), + "{} starting management of entity {}", this, e); + Entity realE = toRealEntity(e); + + Entity oldProxy = entityProxiesById.get(e.getId()); + Entity proxyE; + if (oldProxy!=null) { + if (mode.wasNotLoaded()) { + throw new IllegalStateException("call to manage entity "+e+" from unloaded state ("+mode+") but already had proxy "+oldProxy+" already known under that id at "+this); + } + // make the old proxy point at this new delegate + // (some other tricks done in the call below) + ((EntityProxyImpl)(Proxy.getInvocationHandler(oldProxy))).resetDelegate(oldProxy, oldProxy, realE); + proxyE = oldProxy; + } else { + proxyE = toProxyEntityIfAvailable(e); + } + entityProxiesById.put(e.getId(), proxyE); + entityTypes.put(e.getId(), realE.getClass().getName()); + entitiesById.put(e.getId(), realE); + + preManagedEntitiesById.remove(e.getId()); + if ((e instanceof Application) && (e.getParent()==null)) { + applications.add((Application)proxyE); + applicationIds.add(e.getId()); + } + if (!entities.contains(proxyE)) + entities.add(proxyE); + + if (old!=null && old!=e) { + // passing the transition info will ensure the right shutdown steps invoked for old instance + unmanage(old, mode, true); + } + + return true; + } + + /** + * Should ensure that the entity is no longer managed anywhere, remove from all lists. + * Returns true if the entity has been removed from management; if it was not previously managed (anything else throws exception) + */ + private boolean unmanageNonRecursive(Entity e) { + /* + * When method is synchronized, hit deadlock: + * 1. thread called unmanage() on a member of a group, so we got the lock and called group.removeMember; + * this ties to synchronize on AbstractGroupImpl.members + * 2. another thread was doing AbstractGroupImpl.addMember, which is synchronized on AbstractGroupImpl.members; + * it tries to call Entities.manage(child) which calls LocalEntityManager.getEntity(), which is + * synchronized on this. + * + * We MUST NOT call alien code from within the management framework while holding locks. + * The AbstractGroup.removeMember is effectively alien because a user could override it, and because + * it is entity specific. + * + * TODO Does getting then removing from groups risk this entity being added to other groups while + * this is happening? Should abstractEntity.onManagementStopped or some such remove the entity + * from its groups? + */ + + if (!getLastManagementTransitionMode(e.getId()).isReadOnly()) { + e.clearParent(); + Collection groups = e.getGroups(); + for (Group group : groups) { + if (!Entities.isNoLongerManaged(group)) group.removeMember(e); + } + if (e instanceof Group) { + Collection members = ((Group)e).getMembers(); + for (Entity member : members) { + if (!Entities.isNoLongerManaged(member)) member.removeGroup((Group)e); + } + } + } else { + log.debug("No relations being updated on unmanage of read only {}", e); + } + + synchronized (this) { + Entity proxyE = toProxyEntityIfAvailable(e); + if (e instanceof Application) { + applications.remove(proxyE); + applicationIds.remove(e.getId()); + } + + entities.remove(proxyE); + entityProxiesById.remove(e.getId()); + entityModesById.remove(e.getId()); + Object old = entitiesById.remove(e.getId()); + + entityTypes.remove(e.getId()); + if (old==null) { + log.warn("{} call to stop management of unknown entity (already unmanaged?) {}; ignoring", this, e); + return false; + } else if (!old.equals(e)) { + // shouldn't happen... + log.error("{} call to stop management of entity {} removed different entity {}", new Object[] { this, e, old }); + return true; + } else { + if (log.isDebugEnabled()) log.debug("{} stopped management of entity {}", this, e); + return true; + } + } + } + + void addEntitySetListener(CollectionChangeListener listener) { + //must notify listener in a different thread to avoid deadlock (issue #378) + AsyncCollectionChangeAdapter wrappedListener = new AsyncCollectionChangeAdapter(managementContext.getExecutionManager(), listener); + entities.addPropertyChangeListener(new GroovyObservablesPropertyChangeToCollectionChangeAdapter(wrappedListener)); + } + + void removeEntitySetListener(CollectionChangeListener listener) { + AsyncCollectionChangeAdapter wrappedListener = new AsyncCollectionChangeAdapter(managementContext.getExecutionManager(), listener); + entities.removePropertyChangeListener(new GroovyObservablesPropertyChangeToCollectionChangeAdapter(wrappedListener)); + } + + private boolean shouldSkipUnmanagement(Entity e) { + if (e==null) { + log.warn(""+this+" call to unmanage null entity; skipping", + new IllegalStateException("source of null unmanagement call to "+this)); + return true; + } + if (!isManaged(e)) { + log.warn("{} call to stop management of unknown entity (already unmanaged?) {}; skipping, and all descendants", this, e); + return true; + } + return false; + } + + private Entity toProxyEntityIfAvailable(Entity e) { + checkNotNull(e, "entity"); + + if (e instanceof EntityProxy) { + return e; + } else if (e instanceof AbstractEntity) { + Entity result = ((AbstractEntity)e).getProxy(); + return (result == null) ? e : result; + } else { + // If we don't already know about the proxy, then use the real thing; presumably it's + // the legacy way of creating the entity so didn't get a preManage() call + + return e; + } + } + + private Entity toRealEntity(Entity e) { + checkNotNull(e, "entity"); + + if (e instanceof AbstractEntity) { + return e; + } else { + Entity result = toRealEntityOrNull(e.getId()); + if (result == null) { + throw new IllegalStateException("No concrete entity known for entity "+e+" ("+e.getId()+", "+e.getEntityType().getName()+")"); + } + return result; + } + } + + public boolean isKnownEntityId(String id) { + return entitiesById.containsKey(id) || preManagedEntitiesById.containsKey(id) || preRegisteredEntitiesById.containsKey(id); + } + + private Entity toRealEntityOrNull(String id) { + Entity result; + // prefer the preRegistered and preManaged entities, during hot proxying, they should be newer + result = preRegisteredEntitiesById.get(id); + if (result==null) + result = preManagedEntitiesById.get(id); + if (result==null) + entitiesById.get(id); + return result; + } + + private boolean isRunning() { + return managementContext.isRunning(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalLocationManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalLocationManager.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalLocationManager.java new file mode 100644 index 0000000..54708f5 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalLocationManager.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.internal; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.Closeable; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.location.ProvisioningLocation; +import org.apache.brooklyn.api.management.AccessController; +import org.apache.brooklyn.core.management.entitlement.Entitlements; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.BrooklynLogging; +import brooklyn.config.BrooklynLogging.LoggingLevel; +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.proxying.InternalLocationFactory; +import brooklyn.internal.storage.BrooklynStorage; + +import org.apache.brooklyn.location.basic.AbstractLocation; +import org.apache.brooklyn.location.basic.LocationInternal; + +import brooklyn.util.config.ConfigBag; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.exceptions.RuntimeInterruptedException; +import brooklyn.util.stream.Streams; +import brooklyn.util.task.Tasks; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; + +public class LocalLocationManager implements LocationManagerInternal { + + @Beta /* expect to remove when API returns LocationSpec or similar */ + public static final ConfigKey CREATE_UNMANAGED = ConfigKeys.newBooleanConfigKey("brooklyn.internal.location.createUnmanaged", + "If set on a location or spec, causes the manager to create it in an unmanaged state (for peeking)", false); + + private static final Logger log = LoggerFactory.getLogger(LocalLocationManager.class); + + private final LocalManagementContext managementContext; + private final InternalLocationFactory locationFactory; + + protected final Map locationsById = Maps.newLinkedHashMap(); + private final Map preRegisteredLocationsById = Maps.newLinkedHashMap(); + + /** Management mode for each location */ + protected final Map locationModesById = Maps.newLinkedHashMap(); + + private final BrooklynStorage storage; + private Map locationTypes; + + private static AtomicLong LOCATION_CNT = new AtomicLong(0); + + public LocalLocationManager(LocalManagementContext managementContext) { + this.managementContext = checkNotNull(managementContext, "managementContext"); + this.locationFactory = new InternalLocationFactory(managementContext); + + this.storage = managementContext.getStorage(); + locationTypes = storage.getMap("locations"); + } + + public InternalLocationFactory getLocationFactory() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + return locationFactory; + + } + + @Override + public T createLocation(LocationSpec spec) { + try { + boolean createUnmanaged = ConfigBag.coerceFirstNonNullKeyValue(CREATE_UNMANAGED, + spec.getConfig().get(CREATE_UNMANAGED), spec.getFlags().get(CREATE_UNMANAGED.getName())); + if (createUnmanaged) { + spec.removeConfig(CREATE_UNMANAGED); + } + + T loc = locationFactory.createLocation(spec); + if (!createUnmanaged) { + manage(loc); + } else { + // remove references + Location parent = loc.getParent(); + if (parent!=null) { + ((AbstractLocation)parent).removeChild(loc); + } + preRegisteredLocationsById.remove(loc.getId()); + } + + return loc; + } catch (Throwable e) { + log.warn("Failed to create location using spec "+spec+" (rethrowing)", e); + throw Exceptions.propagate(e); + } + } + + @Override + public T createLocation(Map config, Class type) { + return createLocation(LocationSpec.create(config, type)); + } + + @Override + public synchronized Collection getLocations() { + return ImmutableList.copyOf(locationsById.values()); + } + + @Override + public Collection getLocationIds() { + return ImmutableList.copyOf(locationsById.keySet()); + } + + @Override + public synchronized Location getLocation(String id) { + return locationsById.get(id); + } + + public synchronized Location getLocationEvenIfPreManaged(String id) { + Location result = locationsById.get(id); + if (result == null) { + result = preRegisteredLocationsById.get(id); + } + return result; + } + + @Override + public boolean isManaged(Location loc) { + return (isRunning() && loc != null && getLocation(loc.getId()) != null); + } + + synchronized boolean isPreRegistered(Location loc) { + return preRegisteredLocationsById.containsKey(loc.getId()); + } + + public boolean isKnownLocationId(String id) { + return preRegisteredLocationsById.containsKey(id) || locationsById.containsKey(id); + } + + synchronized void prePreManage(Location loc) { + if (isPreRegistered(loc)) { + log.warn(""+this+" redundant call to pre-pre-manage location "+loc+"; skipping", + new Exception("source of duplicate pre-pre-manage of "+loc)); + return; + } + preRegisteredLocationsById.put(loc.getId(), loc); + } + + @Override + public ManagementTransitionMode getLastManagementTransitionMode(String itemId) { + return locationModesById.get(itemId); + } + + @Override + public void setManagementTransitionMode(Location item, ManagementTransitionMode mode) { + locationModesById.put(item.getId(), mode); + } + + // TODO synchronization issues here: see comment in LocalEntityManager.manage(Entity) + /** management on creation */ + @Override + public Location manage(Location loc) { + if (isManaged(loc)) { + // TODO put log.warn back in if/when manage(Location) becomes private; or could even have assert. + // Can be stricter about contract. + return loc; + } + + Location parent = loc.getParent(); + if (parent != null && !managementContext.getLocationManager().isManaged(parent)) { + log.warn("Parent location "+parent+" of "+loc+" is not managed; attempting to manage it (in future this may be disallowed)"); + return manage(parent); + } else { + return manageRecursive(loc, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.NONEXISTENT, BrooklynObjectManagementMode.MANAGED_PRIMARY)); + } + } + + @Override + public void manageRebindedRoot(Location item) { + ManagementTransitionMode mode = getLastManagementTransitionMode(item.getId()); + Preconditions.checkNotNull(mode, "Mode not set for rebinding %s", item); + manageRecursive(item, mode); + } + + protected void checkManagementAllowed(Location item) { + AccessController.Response access = managementContext.getAccessController().canManageLocation(item); + if (!access.isAllowed()) { + throw new IllegalStateException("Access controller forbids management of "+item+": "+access.getMsg()); + } + } + + protected Location manageRecursive(Location loc, final ManagementTransitionMode initialMode) { + // TODO see comments in LocalEntityManager about recursive management / manageRebindRoot v manageAll + + AccessController.Response access = managementContext.getAccessController().canManageLocation(loc); + if (!access.isAllowed()) { + throw new IllegalStateException("Access controller forbids management of "+loc+": "+access.getMsg()); + } + + long count = LOCATION_CNT.incrementAndGet(); + if (log.isDebugEnabled()) { + String msg = "Managing location " + loc + " ("+initialMode+"), from " + Tasks.current()+" / "+Entitlements.getEntitlementContext(); + LoggingLevel level = (!initialMode.wasNotLoaded() || initialMode.isReadOnly() ? LoggingLevel.TRACE : LoggingLevel.DEBUG); + if (count % 100 == 0) { + // include trace periodically in case we get leaks or too much location management + BrooklynLogging.log(log, level, + msg, new Exception("Informational stack trace of call to manage location "+loc+" ("+count+" calls; "+getLocations().size()+" currently managed)")); + } else { + BrooklynLogging.log(log, level, msg); + } + } + + recursively(loc, new Predicate() { public boolean apply(AbstractLocation it) { + ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); + if (mode==null) { + setManagementTransitionMode(it, mode = initialMode); + } + + if (it.isManaged()) { + if (mode.wasNotLoaded()) { + // silently bail out + return false; + } else { + // on rebind, we just replace, fall through to below + } + } + + boolean result = manageNonRecursive(it, mode); + if (result) { + it.setManagementContext(managementContext); + if (mode.isPrimary()) { + it.onManagementStarted(); + if (mode.isCreating()) { + // Never record event on rebind; this isn't the location (e.g. the VM) being "created" + // so don't tell listeners that. + // TODO The location-event history should be persisted; currently it is lost on + // rebind, unless there is a listener that is persisting the state externally itself. + recordLocationEvent(it, Lifecycle.CREATED); + } + } + managementContext.getRebindManager().getChangeListener().onManaged(it); + } + return result; + } }); + return loc; + } + + @Override + public void unmanage(final Location loc) { + unmanage(loc, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.MANAGED_PRIMARY, BrooklynObjectManagementMode.NONEXISTENT)); + } + + public void unmanage(final Location loc, final ManagementTransitionMode mode) { + unmanage(loc, mode, false); + } + + private void unmanage(final Location loc, final ManagementTransitionMode mode, boolean hasBeenReplaced) { + if (shouldSkipUnmanagement(loc)) return; + + if (hasBeenReplaced) { + // we are unmanaging an old instance after having replaced it; + // don't unmanage or even clear its fields, because there might be references to it + + if (mode.wasReadOnly()) { + // if coming *from* read only; nothing needed + } else { + if (!mode.wasPrimary()) { + log.warn("Unexpected mode "+mode+" for unmanage-replace "+loc+" (applying anyway)"); + } + // migrating away or in-place active partial rebind: + managementContext.getRebindManager().getChangeListener().onUnmanaged(loc); + if (managementContext.gc != null) managementContext.gc.onUnmanaged(loc); + } + // do not remove from maps below, bail out now + return; + + } else if ((mode.wasPrimary() && mode.isReadOnly()) || (mode.wasReadOnly() && mode.isNoLongerLoaded())) { + if (mode.isReadOnly() && mode.wasPrimary()) { + // TODO shouldn't this fall into "hasBeenReplaced" above? + log.debug("Unmanaging on demotion: "+loc+" ("+mode+")"); + } + // we are unmanaging an instance whose primary management is elsewhere (either we were secondary, or we are being demoted) + unmanageNonRecursiveRemoveFromRecords(loc, mode); + managementContext.getRebindManager().getChangeListener().onUnmanaged(loc); + if (managementContext.gc != null) managementContext.gc.onUnmanaged(loc); + unmanageNonRecursiveClearItsFields(loc, mode); + + } else if (mode.isNoLongerLoaded()) { + // Need to store all child entities as onManagementStopping removes a child from the parent entity + + // As above, see TODO in LocalEntityManager about recursive management / unmanagement v manageAll/unmanageAll + recursively(loc, new Predicate() { public boolean apply(AbstractLocation it) { + if (shouldSkipUnmanagement(it)) return false; + boolean result = unmanageNonRecursiveRemoveFromRecords(it, mode); + if (result) { + ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); + if (mode==null) { + // ad hoc creation e.g. tests + log.debug("Missing transition mode for "+it+" when unmanaging; assuming primary/destroying"); + mode = ManagementTransitionMode.guessing(BrooklynObjectManagementMode.MANAGED_PRIMARY, BrooklynObjectManagementMode.NONEXISTENT); + } + if (mode.wasPrimary()) it.onManagementStopped(); + managementContext.getRebindManager().getChangeListener().onUnmanaged(it); + if (mode.isDestroying()) recordLocationEvent(it, Lifecycle.DESTROYED); + if (managementContext.gc != null) managementContext.gc.onUnmanaged(it); + } + unmanageNonRecursiveClearItsFields(loc, mode); + return result; + } }); + + } else { + log.warn("Invalid mode for unmanage: "+mode+" on "+loc+" (ignoring)"); + } + + if (loc instanceof Closeable) { + Streams.closeQuietly( (Closeable)loc ); + } + + locationsById.remove(loc.getId()); + preRegisteredLocationsById.remove(loc.getId()); + locationModesById.remove(loc.getId()); + locationTypes.remove(loc.getId()); + } + + /** + * Adds this location event to the usage record for the given location (creating the usage + * record if one does not already exist). + */ + private void recordLocationEvent(LocationInternal loc, Lifecycle state) { + try { + managementContext.getUsageManager().recordLocationEvent(loc, state); + } catch (RuntimeInterruptedException e) { + throw e; + } catch (RuntimeException e) { + log.warn("Failed to store location lifecycle event for "+loc+" (ignoring)", e); + } + } + + private void recursively(Location e, Predicate action) { + boolean success = action.apply( (AbstractLocation)e ); + if (!success) { + return; // Don't manage children if action false/unnecessary for parent + } + for (Location child : e.getChildren()) { + recursively(child, action); + } + } + + /** + * Should ensure that the location is now managed somewhere, and known about in all the lists. + * Returns true if the location has now become managed; false if it was already managed (anything else throws exception) + * @param rebindPrimary true if rebinding primary, false if rebinding as copy, null if creating (not rebinding) + */ + private synchronized boolean manageNonRecursive(Location loc, ManagementTransitionMode mode) { + Location old = locationsById.put(loc.getId(), loc); + preRegisteredLocationsById.remove(loc.getId()); + + locationTypes.put(loc.getId(), loc.getClass().getName()); + + if (old!=null && mode.wasNotLoaded()) { + if (old.equals(loc)) { + log.warn("{} redundant call to start management of location {}", this, loc); + } else { + throw new IllegalStateException("call to manage location "+loc+" but different location "+old+" already known under that id at "+this); + } + return false; + } + + if (old!=null && old!=loc) { + // passing the transition info will ensure the right shutdown steps invoked for old instance + unmanage(old, mode, true); + } + + return true; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private synchronized void unmanageNonRecursiveClearItsFields(Location loc, ManagementTransitionMode mode) { + if (mode.isDestroying()) { + ((AbstractLocation)loc).setParent(null, true); + + Location parent = ((AbstractLocation)loc).getParent(); + if (parent instanceof ProvisioningLocation) { + try { + ((ProvisioningLocation)parent).release(loc); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.debug("Error releasing "+loc+" in its parent "+parent+": "+e); + } + } + } else { + // if not destroying, don't change the parent's children list + ((AbstractLocation)loc).setParent(null, false); + } + // clear config to help with GC; i know you're not supposed to, but this seems to help, else config bag is littered with refs to entities etc + // FIXME relies on config().getLocalBag() returning the underlying bag! + ((AbstractLocation)loc).config().getLocalBag().clear(); + } + + /** + * Should ensure that the location is no longer managed anywhere, remove from all lists. + * Returns true if the location has been removed from management; if it was not previously managed (anything else throws exception) + */ + private synchronized boolean unmanageNonRecursiveRemoveFromRecords(Location loc, ManagementTransitionMode mode) { + Object old = locationsById.remove(loc.getId()); + locationTypes.remove(loc.getId()); + locationModesById.remove(loc.getId()); + + if (old==null) { + log.warn("{} call to stop management of unknown location (already unmanaged?) {}; ignoring", this, loc); + return false; + } else if (!old.equals(loc)) { + // shouldn't happen... + log.error("{} call to stop management of location {} removed different location {}; ignoring", new Object[] { this, loc, old }); + return true; + } else { + if (log.isDebugEnabled()) log.debug("{} stopped management of location {}", this, loc); + return true; + } + } + + private boolean shouldSkipUnmanagement(Location loc) { + if (loc==null) { + log.warn(""+this+" call to unmanage null location; skipping", + new IllegalStateException("source of null unmanagement call to "+this)); + return true; + } + if (!isManaged(loc)) { + log.warn("{} call to stop management of unknown location (already unmanaged?) {}; skipping, and all descendants", this, loc); + return true; + } + return false; + } + + private boolean isRunning() { + return managementContext.isRunning(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalManagementContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalManagementContext.java new file mode 100644 index 0000000..306dcd8 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/LocalManagementContext.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.internal; + +import static brooklyn.util.JavaGroovyEquivalents.elvis; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.brooklyn.api.entity.Application; +import org.apache.brooklyn.api.entity.Effector; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.management.AccessController; +import org.apache.brooklyn.api.management.ExecutionContext; +import org.apache.brooklyn.api.management.ExecutionManager; +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.api.management.SubscriptionManager; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; +import org.apache.brooklyn.core.management.entitlement.Entitlements; +import org.apache.brooklyn.core.management.ha.OsgiManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.BrooklynProperties; +import brooklyn.config.BrooklynProperties.Factory.Builder; +import brooklyn.entity.drivers.downloads.BasicDownloadsManager; +import brooklyn.entity.effector.Effectors; +import brooklyn.entity.proxying.InternalEntityFactory; +import brooklyn.entity.proxying.InternalLocationFactory; +import brooklyn.entity.proxying.InternalPolicyFactory; +import brooklyn.internal.BrooklynFeatureEnablement; +import brooklyn.internal.storage.DataGridFactory; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Maybe; +import brooklyn.util.task.BasicExecutionContext; +import brooklyn.util.task.BasicExecutionManager; +import brooklyn.util.task.DynamicTasks; +import brooklyn.util.task.TaskTags; +import brooklyn.util.task.Tasks; +import brooklyn.util.text.Strings; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; + +/** + * A local (single node) implementation of the {@link ManagementContext} API. + */ +public class LocalManagementContext extends AbstractManagementContext { + + private static final Logger log = LoggerFactory.getLogger(LocalManagementContext.class); + + private static final Set INSTANCES = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap())); + + private final Builder builder; + + private final List reloadListeners = new CopyOnWriteArrayList(); + + @VisibleForTesting + static Set getInstances() { + synchronized (INSTANCES) { + return ImmutableSet.copyOf(INSTANCES); + } + } + + // Note also called reflectively by BrooklynLeakListener + public static void logAll(Logger logger){ + for (LocalManagementContext context : getInstances()) { + logger.warn("Management Context "+context+" running, creation stacktrace:\n" + Throwables.getStackTraceAsString(context.constructionStackTrace)); + } + } + + /** terminates all (best effort); returns count of sessions closed; if exceptions thrown, returns negative number. + * semantics might change, particular in dealing with interminable mgmt contexts. */ + // Note also called reflectively by BrooklynLeakListener + @Beta + public static int terminateAll() { + int closed=0,dangling=0; + for (LocalManagementContext context : getInstances()) { + try { + context.terminate(); + closed++; + }catch (Throwable t) { + Exceptions.propagateIfFatal(t); + log.warn("Failed to terminate management context", t); + dangling++; + } + } + if (dangling>0) return -dangling; + return closed; + } + + private String managementPlaneId; + private String managementNodeId; + private BasicExecutionManager execution; + private SubscriptionManager subscriptions; + private LocalEntityManager entityManager; + private final LocalLocationManager locationManager; + private final LocalAccessManager accessManager; + private final LocalUsageManager usageManager; + private OsgiManager osgiManager; + + public final Throwable constructionStackTrace = new Throwable("for construction stacktrace").fillInStackTrace(); + + private final Map brooklynAdditionalProperties; + + /** + * Creates a LocalManagement with default BrooklynProperties. + */ + public LocalManagementContext() { + this(BrooklynProperties.Factory.builderDefault()); + } + + public LocalManagementContext(BrooklynProperties brooklynProperties) { + this(brooklynProperties, (DataGridFactory)null); + } + + /** + * Creates a new LocalManagementContext. + * + * @param brooklynProperties the BrooklynProperties. + * @param datagridFactory the DataGridFactory to use. If this instance is null, it means that the system + * is going to use BrooklynProperties to figure out which instance to load or otherwise + * use a default instance. + */ + @VisibleForTesting + public LocalManagementContext(BrooklynProperties brooklynProperties, DataGridFactory datagridFactory) { + this(Builder.fromProperties(brooklynProperties), datagridFactory); + } + + public LocalManagementContext(Builder builder) { + this(builder, null, null); + } + + public LocalManagementContext(Builder builder, DataGridFactory datagridFactory) { + this(builder, null, datagridFactory); + } + + public LocalManagementContext(Builder builder, Map brooklynAdditionalProperties) { + this(builder, brooklynAdditionalProperties, null); + } + + public LocalManagementContext(BrooklynProperties brooklynProperties, Map brooklynAdditionalProperties) { + this(Builder.fromProperties(brooklynProperties), brooklynAdditionalProperties, null); + } + + public LocalManagementContext(Builder builder, Map brooklynAdditionalProperties, DataGridFactory datagridFactory) { + super(builder.build(), datagridFactory); + + checkNotNull(configMap, "brooklynProperties"); + + // TODO in a persisted world the planeId may be injected + this.managementPlaneId = Strings.makeRandomId(8); + this.managementNodeId = Strings.makeRandomId(8); + this.builder = builder; + this.brooklynAdditionalProperties = brooklynAdditionalProperties; + if (brooklynAdditionalProperties != null) + configMap.addFromMap(brooklynAdditionalProperties); + + BrooklynFeatureEnablement.init(configMap); + + this.locationManager = new LocalLocationManager(this); + this.accessManager = new LocalAccessManager(); + this.usageManager = new LocalUsageManager(this); + + if (configMap.getConfig(OsgiManager.USE_OSGI)) { + this.osgiManager = new OsgiManager(this); + osgiManager.start(); + } + + INSTANCES.add(this); + log.debug("Created management context "+this); + } + + @Override + public String getManagementPlaneId() { + return managementPlaneId; + } + + @Override + public String getManagementNodeId() { + return managementNodeId; + } + + @Override + public void prePreManage(Entity entity) { + getEntityManager().prePreManage(entity); + } + + @Override + public void prePreManage(Location location) { + getLocationManager().prePreManage(location); + } + + @Override + public synchronized Collection getApplications() { + return getEntityManager().getApplications(); + } + + @Override + public void addEntitySetListener(CollectionChangeListener listener) { + getEntityManager().addEntitySetListener(listener); + } + + @Override + public void removeEntitySetListener(CollectionChangeListener listener) { + getEntityManager().removeEntitySetListener(listener); + } + + @Override + protected void manageIfNecessary(Entity entity, Object context) { + getEntityManager().manageIfNecessary(entity, context); + } + + @Override + public synchronized LocalEntityManager getEntityManager() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + + if (entityManager == null) { + entityManager = new LocalEntityManager(this); + } + return entityManager; + } + + @Override + public InternalEntityFactory getEntityFactory() { + return getEntityManager().getEntityFactory(); + } + + @Override + public InternalLocationFactory getLocationFactory() { + return getLocationManager().getLocationFactory(); + } + + @Override + public InternalPolicyFactory getPolicyFactory() { + return getEntityManager().getPolicyFactory(); + } + + @Override + public synchronized LocalLocationManager getLocationManager() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + return locationManager; + } + + @Override + public synchronized LocalAccessManager getAccessManager() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + return accessManager; + } + + @Override + public synchronized LocalUsageManager getUsageManager() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + return usageManager; + } + + @Override + public synchronized Maybe getOsgiManager() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + if (osgiManager==null) return Maybe.absent("OSGi not available in this instance"); + return Maybe.of(osgiManager); + } + + @Override + public synchronized AccessController getAccessController() { + return getAccessManager().getAccessController(); + } + + @Override + public synchronized SubscriptionManager getSubscriptionManager() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + + if (subscriptions == null) { + subscriptions = new LocalSubscriptionManager(getExecutionManager()); + } + return subscriptions; + } + + @Override + public synchronized ExecutionManager getExecutionManager() { + if (!isRunning()) throw new IllegalStateException("Management context no longer running"); + + if (execution == null) { + execution = new BasicExecutionManager(getManagementNodeId()); + gc = new BrooklynGarbageCollector(configMap, execution, getStorage()); + } + return execution; + } + + @Override + public void terminate() { + INSTANCES.remove(this); + super.terminate(); + if (osgiManager!=null) { + osgiManager.stop(); + osgiManager = null; + } + if (usageManager != null) usageManager.terminate(); + if (execution != null) execution.shutdownNow(); + if (gc != null) gc.shutdownNow(); + } + + @Override + protected void finalize() { + terminate(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public Task runAtEntity(Map flags, Entity entity, Callable c) { + manageIfNecessary(entity, elvis(Arrays.asList(flags.get("displayName"), flags.get("description"), flags, c))); + return runAtEntity(entity, Tasks.builder().dynamic(true).body(c).flags(flags).build()); + } + + protected Task runAtEntity(Entity entity, TaskAdaptable task) { + getExecutionContext(entity).submit(task); + if (DynamicTasks.getTaskQueuingContext()!=null) { + // put it in the queueing context so it appears in the GUI + // mark it inessential as this is being invoked from code, + // the caller will do 'get' to handle errors + TaskTags.markInessential(task); + DynamicTasks.getTaskQueuingContext().queue(task.asTask()); + } + return task.asTask(); + } + + @Override + protected Task runAtEntity(final Entity entity, final Effector eff, @SuppressWarnings("rawtypes") final Map parameters) { + manageIfNecessary(entity, eff); + // prefer to submit this from the current execution context so it sets up correct cross-context chaining + ExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext(); + if (ec == null) { + log.debug("Top-level effector invocation: {} on {}", eff, entity); + ec = getExecutionContext(entity); + } + return runAtEntity(entity, Effectors.invocation(entity, eff, parameters)); + } + + @Override + public boolean isManagedLocally(Entity e) { + return true; + } + + @Override + public String toString() { + return LocalManagementContext.class.getSimpleName()+"["+getManagementPlaneId()+"-"+getManagementNodeId()+"]"; + } + + @Override + public void reloadBrooklynProperties() { + log.info("Reloading brooklyn properties from " + builder); + if (builder.hasDelegateOriginalProperties()) + log.warn("When reloading, mgmt context "+this+" properties are fixed, so reload will be of limited utility"); + + BrooklynProperties properties = builder.build(); + configMap = properties; + if (brooklynAdditionalProperties != null) { + log.info("Reloading additional brooklyn properties from " + brooklynAdditionalProperties); + configMap.addFromMap(brooklynAdditionalProperties); + } + this.downloadsManager = BasicDownloadsManager.newDefault(configMap); + this.entitlementManager = Entitlements.newManager(this, configMap); + + clearLocationRegistry(); + + BrooklynFeatureEnablement.init(configMap); + + // Notify listeners that properties have been reloaded + for (PropertiesReloadListener listener : reloadListeners) { + listener.reloaded(); + } + } + + @VisibleForTesting + public void clearLocationRegistry() { + // Force reload of location registry + this.locationRegistry = null; + } + + @Override + public void addPropertiesReloadListener(PropertiesReloadListener listener) { + reloadListeners.add(checkNotNull(listener, "listener")); + } + + @Override + public void removePropertiesReloadListener(PropertiesReloadListener listener) { + reloadListeners.remove(listener); + } + + public void noteStartupComplete() { + startupComplete = true; + } +}