brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [14/33] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/management
Date Sat, 15 Aug 2015 13:33:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java b/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java
new file mode 100644
index 0000000..6a133fc
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleException;
+import org.osgi.framework.launch.Framework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.BrooklynVersion;
+
+import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle;
+import org.apache.brooklyn.api.management.ManagementContext;
+
+import brooklyn.config.BrooklynServerConfig;
+import brooklyn.config.BrooklynServerPaths;
+import brooklyn.config.ConfigKey;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.os.Os;
+import brooklyn.util.os.Os.DeletionResult;
+import brooklyn.util.osgi.Osgis;
+import brooklyn.util.osgi.Osgis.BundleFinder;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class OsgiManager {
+
+    private static final Logger log = LoggerFactory.getLogger(OsgiManager.class);
+    
+    public static final ConfigKey<Boolean> USE_OSGI = BrooklynServerConfig.USE_OSGI;
+    
+    /* see Osgis for info on starting framework etc */
+    
+    protected ManagementContext mgmt;
+    protected Framework framework;
+    protected File osgiCacheDir;
+
+    public OsgiManager(ManagementContext mgmt) {
+        this.mgmt = mgmt;
+    }
+
+    public void start() {
+        try {
+            osgiCacheDir = BrooklynServerPaths.getOsgiCacheDirCleanedIfNeeded(mgmt);
+            
+            // any extra OSGi startup args could go here
+            framework = Osgis.newFrameworkStarted(osgiCacheDir.getAbsolutePath(), false, MutableMap.of());
+            
+        } catch (Exception e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    public void stop() {
+        try {
+            if (framework!=null) {
+                framework.stop();
+                framework.waitForStop(0); // 0 means indefinite
+            }
+        } catch (BundleException e) {
+            throw Exceptions.propagate(e);
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+        if (BrooklynServerPaths.isOsgiCacheForCleaning(mgmt, osgiCacheDir)) {
+            // See exception reported in https://issues.apache.org/jira/browse/BROOKLYN-72
+            // We almost always fail to delete he OSGi temp directory due to a concurrent modification.
+            // Therefore keep trying.
+            final AtomicReference<DeletionResult> deletionResult = new AtomicReference<DeletionResult>();
+            Repeater.create("Delete OSGi cache dir")
+                    .until(new Callable<Boolean>() {
+                        public Boolean call() {
+                            deletionResult.set(Os.deleteRecursively(osgiCacheDir));
+                            return deletionResult.get().wasSuccessful();
+                        }})
+                    .limitTimeTo(Duration.ONE_SECOND)
+                    .backoffTo(Duration.millis(50))
+                    .run();
+            if (deletionResult.get().getThrowable()!=null) {
+                log.debug("Unable to delete "+osgiCacheDir+" (possibly being modified concurrently?): "+deletionResult.get().getThrowable());
+            }
+        }
+        osgiCacheDir = null;
+        framework = null;
+    }
+
+    public synchronized void registerBundle(CatalogBundle bundle) {
+        try {
+            if (checkBundleInstalledThrowIfInconsistent(bundle)) {
+                return;
+            }
+
+            Bundle b = Osgis.install(framework, bundle.getUrl());
+
+            checkCorrectlyInstalled(bundle, b);
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            throw new IllegalStateException("Bundle from "+bundle.getUrl()+" failed to install: " + e.getMessage(), e);
+        }
+    }
+
+    private void checkCorrectlyInstalled(CatalogBundle bundle, Bundle b) {
+        String nv = b.getSymbolicName()+":"+b.getVersion().toString();
+
+        if (!isBundleNameEqualOrAbsent(bundle, b)) {
+            throw new IllegalStateException("Bundle already installed as "+nv+" but user explicitly requested "+bundle);
+        }
+
+        List<Bundle> matches = Osgis.bundleFinder(framework)
+                .symbolicName(b.getSymbolicName())
+                .version(b.getVersion().toString())
+                .findAll();
+        if (matches.isEmpty()) {
+            log.error("OSGi could not find bundle "+nv+" in search after installing it from "+bundle.getUrl());
+        } else if (matches.size()==1) {
+            log.debug("Bundle from "+bundle.getUrl()+" successfully installed as " + nv + " ("+b+")");
+        } else {
+            log.warn("OSGi has multiple bundles matching "+nv+", when just installed from "+bundle.getUrl()+": "+matches+"; "
+                + "brooklyn will prefer the URL-based bundle for top-level references but any dependencies or "
+                + "import-packages will be at the mercy of OSGi. "
+                + "It is recommended to use distinct versions for different bundles, and the same URL for the same bundles.");
+        }
+    }
+
+    private boolean checkBundleInstalledThrowIfInconsistent(CatalogBundle bundle) {
+        String bundleUrl = bundle.getUrl();
+        if (bundleUrl != null) {
+            Maybe<Bundle> installedBundle = Osgis.bundleFinder(framework).requiringFromUrl(bundleUrl).find();
+            if (installedBundle.isPresent()) {
+                Bundle b = installedBundle.get();
+                String nv = b.getSymbolicName()+":"+b.getVersion().toString();
+                if (!isBundleNameEqualOrAbsent(bundle, b)) {
+                    throw new IllegalStateException("User requested bundle " + bundle + " but already installed as "+nv);
+                } else {
+                    log.trace("Bundle from "+bundleUrl+" already installed as "+nv+"; not re-registering");
+                }
+                return true;
+            }
+        } else {
+            Maybe<Bundle> installedBundle = Osgis.bundleFinder(framework).symbolicName(bundle.getSymbolicName()).version(bundle.getVersion()).find();
+            if (installedBundle.isPresent()) {
+                log.trace("Bundle "+bundle+" installed from "+installedBundle.get().getLocation());
+            } else {
+                throw new IllegalStateException("Bundle "+bundle+" not previously registered, but URL is empty.");
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public static boolean isBundleNameEqualOrAbsent(CatalogBundle bundle, Bundle b) {
+        return !bundle.isNamed() ||
+                (bundle.getSymbolicName().equals(b.getSymbolicName()) &&
+                bundle.getVersion().equals(b.getVersion().toString()));
+    }
+
+    public <T> Maybe<Class<T>> tryResolveClass(String type, CatalogBundle... catalogBundles) {
+        return tryResolveClass(type, Arrays.asList(catalogBundles));
+    }
+    public <T> Maybe<Class<T>> tryResolveClass(String type, Iterable<CatalogBundle> catalogBundles) {
+        Map<CatalogBundle,Throwable> bundleProblems = MutableMap.of();
+        Set<String> extraMessages = MutableSet.of();
+        for (CatalogBundle catalogBundle: catalogBundles) {
+            try {
+                Maybe<Bundle> bundle = findBundle(catalogBundle);
+                if (bundle.isPresent()) {
+                    Bundle b = bundle.get();
+                    Class<T> clazz;
+                    //Extension bundles don't support loadClass.
+                    //Instead load from the app classpath.
+                    if (Osgis.isExtensionBundle(b)) {
+                        @SuppressWarnings("unchecked")
+                        Class<T> c = (Class<T>)Class.forName(type);
+                        clazz = c;
+                    } else {
+                        @SuppressWarnings("unchecked")
+                        Class<T> c = (Class<T>)b.loadClass(type);
+                        clazz = c;
+                    }
+                    return Maybe.of(clazz);
+                } else {
+                    bundleProblems.put(catalogBundle, ((Maybe.Absent<?>)bundle).getException());
+                }
+                
+            } catch (Exception e) {
+                // should come from classloading now; name formatting or missing bundle errors will be caught above 
+                Exceptions.propagateIfFatal(e);
+                bundleProblems.put(catalogBundle, e);
+
+                Throwable cause = e.getCause();
+                if (cause != null && cause.getMessage().contains("Unresolved constraint in bundle")) {
+                    if (BrooklynVersion.INSTANCE.getVersionFromOsgiManifest()==null) {
+                        extraMessages.add("No brooklyn-core OSGi manifest available. OSGi will not work.");
+                    }
+                    if (BrooklynVersion.isDevelopmentEnvironment()) {
+                        extraMessages.add("Your development environment may not have created necessary files. Doing a maven build then retrying may fix the issue.");
+                    }
+                    if (!extraMessages.isEmpty()) log.warn(Strings.join(extraMessages, " "));
+                    log.warn("Unresolved constraint resolving OSGi bundle "+catalogBundle+" to load "+type+": "+cause.getMessage());
+                    if (log.isDebugEnabled()) log.debug("Trace for OSGi resolution failure", e);
+                }
+            }
+        }
+        if (bundleProblems.size()==1) {
+            Throwable error = Iterables.getOnlyElement(bundleProblems.values());
+            if (error instanceof ClassNotFoundException && error.getCause()!=null && error.getCause().getMessage()!=null) {
+                error = Exceptions.collapseIncludingAllCausalMessages(error);
+            }
+            return Maybe.absent("Unable to resolve class "+type+" in "+Iterables.getOnlyElement(bundleProblems.keySet())
+                + (extraMessages.isEmpty() ? "" : " ("+Strings.join(extraMessages, " ")+")"), error);
+        } else {
+            return Maybe.absent(Exceptions.create("Unable to resolve class "+type+": "+bundleProblems
+                + (extraMessages.isEmpty() ? "" : " ("+Strings.join(extraMessages, " ")+")"), bundleProblems.values()));
+        }
+    }
+
+    public Maybe<Bundle> findBundle(CatalogBundle catalogBundle) {
+        //Either fail at install time when the user supplied name:version is different
+        //from the one reported from the bundle
+        //or
+        //Ignore user supplied name:version when URL is supplied to be able to find the
+        //bundle even if it's with a different version.
+        //
+        //For now we just log a warning if there's a version discrepancy at install time,
+        //so prefer URL if supplied.
+        BundleFinder bundleFinder = Osgis.bundleFinder(framework);
+        if (catalogBundle.getUrl() != null) {
+            bundleFinder.requiringFromUrl(catalogBundle.getUrl());
+        } else {
+            bundleFinder.symbolicName(catalogBundle.getSymbolicName()).version(catalogBundle.getVersion());
+        }
+        return bundleFinder.find();
+    }
+
+    /**
+     * Iterates through catalogBundles until one contains a resource with the given name.
+     */
+    public URL getResource(String name, Iterable<CatalogBundle> catalogBundles) {
+        for (CatalogBundle catalogBundle: catalogBundles) {
+            try {
+                Maybe<Bundle> bundle = findBundle(catalogBundle);
+                if (bundle.isPresent()) {
+                    URL result = bundle.get().getResource(name);
+                    if (result!=null) return result;
+                }
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @return An iterable of all resources matching name in catalogBundles.
+     */
+    public Iterable<URL> getResources(String name, Iterable<CatalogBundle> catalogBundles) {
+        List<URL> resources = Lists.newArrayList();
+        for (CatalogBundle catalogBundle : catalogBundles) {
+            try {
+                Maybe<Bundle> bundle = findBundle(catalogBundle);
+                if (bundle.isPresent()) {
+                    Enumeration<URL> result = bundle.get().getResources(name);
+                    resources.addAll(Collections.list(result));
+                }
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+            }
+        }
+        return resources;
+    }
+
+    public Framework getFramework() {
+        return framework;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java
new file mode 100644
index 0000000..1758846
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.basic.BrooklynObject;
+import org.apache.brooklyn.api.catalog.BrooklynCatalog;
+import org.apache.brooklyn.api.catalog.CatalogItem;
+import org.apache.brooklyn.api.entity.Effector;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.drivers.EntityDriverManager;
+import org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager;
+import org.apache.brooklyn.api.entity.rebind.RebindManager;
+import org.apache.brooklyn.api.location.LocationRegistry;
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.management.SubscriptionContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.classloading.BrooklynClassLoadingContext;
+import org.apache.brooklyn.api.management.entitlement.EntitlementManager;
+import org.apache.brooklyn.api.management.ha.HighAvailabilityManager;
+import org.apache.brooklyn.core.management.classloading.JavaBrooklynClassLoadingContext;
+import org.apache.brooklyn.core.management.entitlement.Entitlements;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+
+import brooklyn.catalog.internal.BasicBrooklynCatalog;
+import brooklyn.catalog.internal.CatalogInitialization;
+import brooklyn.catalog.internal.CatalogUtils;
+import brooklyn.config.BrooklynProperties;
+import brooklyn.config.StringConfigMap;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.drivers.BasicEntityDriverManager;
+import brooklyn.entity.drivers.downloads.BasicDownloadsManager;
+import brooklyn.entity.rebind.RebindManagerImpl;
+import brooklyn.internal.storage.BrooklynStorage;
+import brooklyn.internal.storage.DataGrid;
+import brooklyn.internal.storage.DataGridFactory;
+import brooklyn.internal.storage.impl.BrooklynStorageImpl;
+import brooklyn.internal.storage.impl.inmemory.InMemoryDataGridFactory;
+
+import org.apache.brooklyn.location.basic.BasicLocationRegistry;
+
+import brooklyn.util.GroovyJavaMethods;
+import brooklyn.util.ResourceUtils;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.task.BasicExecutionContext;
+import brooklyn.util.task.Tasks;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+public abstract class AbstractManagementContext implements ManagementContextInternal {
+    private static final Logger log = LoggerFactory.getLogger(AbstractManagementContext.class);
+
+    private static DataGridFactory loadDataGridFactory(BrooklynProperties properties) {
+        String clazzName = properties.getFirst(DataGridFactory.class.getName());
+        if(clazzName == null){
+            clazzName = InMemoryDataGridFactory.class.getName();
+        }
+
+        Class<?> clazz;
+        try{
+            //todo: which classloader should we use?
+            clazz = LocalManagementContext.class.getClassLoader().loadClass(clazzName);
+        }catch(ClassNotFoundException e){
+            throw new IllegalStateException(format("Could not load class [%s]",clazzName),e);
+        }
+
+        Object instance;
+        try {
+            instance = clazz.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalStateException(format("Could not instantiate class [%s]",clazzName),e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalStateException(format("Could not instantiate class [%s]",clazzName),e);
+        }
+
+        if(!(instance instanceof DataGridFactory)){
+            throw new IllegalStateException(format("Class [%s] not an instantiate of class [%s]",clazzName, DataGridFactory.class.getName()));
+        }
+
+        return (DataGridFactory)instance;
+    }
+
+    static {
+        ResourceUtils.addClassLoaderProvider(new Function<Object, BrooklynClassLoadingContext>() {
+            @Override
+            public BrooklynClassLoadingContext apply(@Nullable Object input) {
+                if (input instanceof EntityInternal) {
+                    EntityInternal internal = (EntityInternal)input;
+                    if (internal.getCatalogItemId() != null) {
+                        CatalogItem<?, ?> item = CatalogUtils.getCatalogItemOptionalVersion(internal.getManagementContext(), internal.getCatalogItemId());
+                        if (item != null) {
+                            return CatalogUtils.newClassLoadingContext(internal.getManagementContext(), item);
+                        } else {
+                            log.error("Can't find catalog item " + internal.getCatalogItemId() +
+                                    " used for instantiating entity " + internal +
+                                    ". Falling back to application classpath.");
+                        }
+                    }
+                    return apply(internal.getManagementSupport());
+                }
+                
+                if (input instanceof EntityManagementSupport)
+                    return apply(((EntityManagementSupport)input).getManagementContext());
+                if (input instanceof ManagementContext)
+                    return JavaBrooklynClassLoadingContext.create((ManagementContext) input);
+                return null;
+            }
+        });
+    }
+
+    private final AtomicLong totalEffectorInvocationCount = new AtomicLong();
+
+    protected BrooklynProperties configMap;
+    protected BasicLocationRegistry locationRegistry;
+    protected final BasicBrooklynCatalog catalog;
+    protected ClassLoader baseClassLoader;
+    protected Iterable<URL> baseClassPathForScanning;
+
+    private final RebindManager rebindManager;
+    private final HighAvailabilityManager highAvailabilityManager;
+    
+    protected volatile BrooklynGarbageCollector gc;
+
+    private final EntityDriverManager entityDriverManager;
+    protected DownloadResolverManager downloadsManager;
+
+    protected EntitlementManager entitlementManager;
+
+    private final BrooklynStorage storage;
+
+    private volatile boolean running = true;
+    protected boolean startupComplete = false;
+    protected final List<Throwable> errors = Collections.synchronizedList(MutableList.<Throwable>of()); 
+
+    protected Maybe<URI> uri = Maybe.absent();
+    protected CatalogInitialization catalogInitialization;
+
+    public AbstractManagementContext(BrooklynProperties brooklynProperties){
+        this(brooklynProperties, null);
+    }
+
+    public AbstractManagementContext(BrooklynProperties brooklynProperties, DataGridFactory datagridFactory) {
+        this.configMap = brooklynProperties;
+        this.entityDriverManager = new BasicEntityDriverManager();
+        this.downloadsManager = BasicDownloadsManager.newDefault(configMap);
+        if (datagridFactory == null) {
+            datagridFactory = loadDataGridFactory(brooklynProperties);
+        }
+        DataGrid datagrid = datagridFactory.newDataGrid(this);
+
+        this.catalog = new BasicBrooklynCatalog(this);
+        
+        this.storage = new BrooklynStorageImpl(datagrid);
+        this.rebindManager = new RebindManagerImpl(this); // TODO leaking "this" reference; yuck
+        this.highAvailabilityManager = new HighAvailabilityManagerImpl(this); // TODO leaking "this" reference; yuck
+        
+        this.entitlementManager = Entitlements.newManager(this, brooklynProperties);
+    }
+
+    @Override
+    public void terminate() {
+        highAvailabilityManager.stop();
+        running = false;
+        rebindManager.stop();
+        storage.terminate();
+        // Don't unmanage everything; different entities get given their events at different times 
+        // so can cause problems (e.g. a group finds out that a member is unmanaged, before the
+        // group itself has been told that it is unmanaged).
+    }
+    
+    @Override
+    public boolean isRunning() {
+        return running;
+    }
+    
+    @Override
+    public boolean isStartupComplete() {
+        return startupComplete;
+    }
+
+    @Override
+    public BrooklynStorage getStorage() {
+        return storage;
+    }
+    
+    @Override
+    public RebindManager getRebindManager() {
+        return rebindManager;
+    }
+
+    @Override
+    public HighAvailabilityManager getHighAvailabilityManager() {
+        return highAvailabilityManager;
+    }
+
+    @Override
+    public long getTotalEffectorInvocations() {
+        return totalEffectorInvocationCount.get();
+    }
+    
+    @Override
+    public ExecutionContext getExecutionContext(Entity e) {
+        // BEC is a thin wrapper around EM so fine to create a new one here; but make sure it gets the real entity
+        if (e instanceof AbstractEntity) {
+            return new BasicExecutionContext(MutableMap.of("tag", BrooklynTaskTags.tagForContextEntity(e)), getExecutionManager());
+        } else {
+            return ((EntityInternal)e).getManagementSupport().getExecutionContext();
+        }
+    }
+
+    @Override
+    public ExecutionContext getServerExecutionContext() {
+        // BEC is a thin wrapper around EM so fine to create a new one here
+        return new BasicExecutionContext(MutableMap.of("tag", BrooklynTaskTags.BROOKLYN_SERVER_TASK_TAG), getExecutionManager());
+    }
+
+    @Override
+    public SubscriptionContext getSubscriptionContext(Entity e) {
+        // BSC is a thin wrapper around SM so fine to create a new one here
+        return new BasicSubscriptionContext(getSubscriptionManager(), e);
+    }
+
+    @Override
+    public EntityDriverManager getEntityDriverManager() {
+        return entityDriverManager;
+    }
+
+    @Override
+    public DownloadResolverManager getEntityDownloadsManager() {
+        return downloadsManager;
+    }
+    
+    @Override
+    public EntitlementManager getEntitlementManager() {
+        return entitlementManager;
+    }
+    
+    protected abstract void manageIfNecessary(Entity entity, Object context);
+
+    @Override
+    public <T> Task<T> invokeEffector(final Entity entity, final Effector<T> eff, @SuppressWarnings("rawtypes") final Map parameters) {
+        return runAtEntity(entity, eff, parameters);
+    }
+    
+    protected <T> T invokeEffectorMethodLocal(Entity entity, Effector<T> eff, Object args) {
+        assert isManagedLocally(entity) : "cannot invoke effector method at "+this+" because it is not managed here";
+        totalEffectorInvocationCount.incrementAndGet();
+        Object[] transformedArgs = EffectorUtils.prepareArgsForEffector(eff, args);
+        return GroovyJavaMethods.invokeMethodOnMetaClass(entity, eff.getName(), transformedArgs);
+    }
+
+    /**
+     * Method for entity to make effector happen with correct semantics (right place, right task context),
+     * when a method is called on that entity.
+     * @throws ExecutionException 
+     */
+    @Override
+    public <T> T invokeEffectorMethodSync(final Entity entity, final Effector<T> eff, final Object args) throws ExecutionException {
+        try {
+            Task<?> current = Tasks.current();
+            if (current == null || !entity.equals(BrooklynTaskTags.getContextEntity(current)) || !isManagedLocally(entity)) {
+                manageIfNecessary(entity, eff.getName());
+                // Wrap in a task if we aren't already in a task that is tagged with this entity
+                Task<T> task = runAtEntity( EffectorUtils.getTaskFlagsForEffectorInvocation(entity, eff, 
+                            ConfigBag.newInstance().configureStringKey("args", args)),
+                        entity, 
+                        new Callable<T>() {
+                            public T call() {
+                                return invokeEffectorMethodLocal(entity, eff, args);
+                            }});
+                return task.get();
+            } else {
+                return invokeEffectorMethodLocal(entity, eff, args);
+            }
+        } catch (Exception e) {
+            // don't need to attach any message or warning because the Effector impl hierarchy does that (see calls to EffectorUtils.handleException)
+            throw new ExecutionException(e);
+        }
+    }
+
+    /**
+     * Whether the master entity record is local, and sensors and effectors can be properly accessed locally.
+     */ 
+    public abstract boolean isManagedLocally(Entity e);
+    
+    /**
+     * Causes the indicated runnable to be run at the right location for the given entity.
+     *
+     * Returns the actual task (if it is local) or a proxy task (if it is remote);
+     * if management for the entity has not yet started this may start it.
+     * 
+     * @deprecated since 0.6.0 use effectors (or support {@code runAtEntity(Entity, Effector, Map)} if something else is needed);
+     * (Callable with Map flags is too open-ended, bothersome to support, and not used much) 
+     */
+    @Deprecated
+    public abstract <T> Task<T> runAtEntity(@SuppressWarnings("rawtypes") Map flags, Entity entity, Callable<T> c);
+
+    /** Runs the given effector in the right place for the given entity.
+     * The task is immediately submitted in the background, but also recorded in the queueing context (if present)
+     * so it appears as a child, but marked inessential so it does not fail the parent task, who will ordinarily
+     * call {@link Task#get()} on the object and may do their own failure handling. 
+     */
+    protected abstract <T> Task<T> runAtEntity(final Entity entity, final Effector<T> eff, @SuppressWarnings("rawtypes") final Map parameters);
+
+    @Override
+    public StringConfigMap getConfig() {
+        return configMap;
+    }
+
+    @Override
+    public BrooklynProperties getBrooklynProperties() {
+        return configMap;
+    }
+
+    @Override
+    public synchronized LocationRegistry getLocationRegistry() {
+        if (locationRegistry==null) locationRegistry = new BasicLocationRegistry(this);
+        return locationRegistry;
+    }
+
+    @Override
+    public BrooklynCatalog getCatalog() {
+        if (!getCatalogInitialization().hasRunAnyInitialization()) {
+            // catalog init is needed; normally this will be done from start sequence,
+            // but if accessed early -- and in tests -- we will load it here
+            getCatalogInitialization().injectManagementContext(this);
+            getCatalogInitialization().populateUnofficial(catalog);
+        }
+        return catalog;
+    }
+    
+    @Override
+    public ClassLoader getCatalogClassLoader() {
+        // catalog does not have to be initialized
+        return catalog.getRootClassLoader();
+    }
+
+    /**
+     * Optional class-loader that this management context should use as its base,
+     * as the first-resort in the catalog, and for scanning (if scanning the default in the catalog).
+     * In most instances the default classloader (ManagementContext.class.getClassLoader(), assuming
+     * this was in the JARs used at boot time) is fine, and in those cases this method normally returns null.
+     * (Surefire does some weird stuff, but the default classloader is fine for loading;
+     * however it requires a custom base classpath to be set for scanning.)
+     */
+    @Override
+    public ClassLoader getBaseClassLoader() {
+        return baseClassLoader;
+    }
+    
+    /** See {@link #getBaseClassLoader()}.  Only settable once and must be invoked before catalog is loaded. */
+    public void setBaseClassLoader(ClassLoader cl) {
+        if (baseClassLoader==cl) return;
+        if (baseClassLoader!=null) throw new IllegalStateException("Cannot change base class loader (in "+this+")");
+        if (catalog!=null) throw new IllegalStateException("Cannot set base class after catalog has been loaded (in "+this+")");
+        this.baseClassLoader = cl;
+    }
+    
+    /** Optional mechanism for setting the classpath which should be scanned by the catalog, if the catalog
+     * is scanning the default classpath.  Usually it infers the right thing, but some classloaders
+     * (e.g. surefire) do funny things which the underlying org.reflections.Reflections library can't see in to.
+     * <p>
+     * This should normally be invoked early in the server startup.  Setting it after the catalog is loaded will not
+     * take effect without an explicit internal call to do so.  Once set, it can be changed prior to catalog loading
+     * but it cannot be <i>changed</i> once the catalog is loaded.
+     * <p>
+     * ClasspathHelper.forJavaClassPath() is often a good argument to pass, and is used internally in some places
+     * when no items are found on the catalog. */
+    @Override
+    public void setBaseClassPathForScanning(Iterable<URL> urls) {
+        if (Objects.equal(baseClassPathForScanning, urls)) return;
+        if (baseClassPathForScanning != null) {
+            if (catalog==null)
+                log.warn("Changing scan classpath to "+urls+" from "+baseClassPathForScanning);
+            else
+                throw new IllegalStateException("Cannot change base class path for scanning (in "+this+")");
+        }
+        this.baseClassPathForScanning = urls;
+    }
+    /** 
+     * @see #setBaseClassPathForScanning(Iterable)
+     */
+    @Override
+    public Iterable<URL> getBaseClassPathForScanning() {
+        return baseClassPathForScanning;
+    }
+
+    public BrooklynGarbageCollector getGarbageCollector() {
+        return gc;
+    }
+
+    @Override
+    public void setManagementNodeUri(URI uri) {
+        this.uri = Maybe.of(checkNotNull(uri, "uri"));
+    }
+
+    @Override
+    public Maybe<URI> getManagementNodeUri() {
+        return uri;
+    }
+    
+    private Object catalogInitMutex = new Object();
+    @Override
+    public CatalogInitialization getCatalogInitialization() {
+        synchronized (catalogInitMutex) {
+            if (catalogInitialization!=null) return catalogInitialization;
+            CatalogInitialization ci = new CatalogInitialization();
+            setCatalogInitialization(ci);
+            return ci;
+        }
+    }
+    
+    @Override
+    public void setCatalogInitialization(CatalogInitialization catalogInitialization) {
+        synchronized (catalogInitMutex) {
+            Preconditions.checkNotNull(catalogInitialization, "initialization must not be null");
+            if (this.catalogInitialization!=null && this.catalogInitialization != catalogInitialization)
+                throw new IllegalStateException("Changing catalog init from "+this.catalogInitialization+" to "+catalogInitialization+"; changes not permitted");
+            catalogInitialization.injectManagementContext(this);
+            this.catalogInitialization = catalogInitialization;
+        }
+    }
+    
+    public BrooklynObject lookup(String id) {
+        return lookup(id, BrooklynObject.class);
+    }
+    
+    @SuppressWarnings("unchecked")
+    public <T extends BrooklynObject> T lookup(String id, Class<T> type) {
+        Object result;
+        result = getEntityManager().getEntity(id);
+        if (result!=null && type.isInstance(result)) return (T)result;
+        
+        result = getLocationManager().getLocation(id);
+        if (result!=null && type.isInstance(result)) return (T)result;
+
+        // TODO policies, enrichers, feeds
+        return null;
+    }
+
+    @Override
+    public List<Throwable> errors() {
+        return errors;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java
new file mode 100644
index 0000000..bec041d
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.SubscriptionHandle;
+import org.apache.brooklyn.api.management.SubscriptionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+
+public abstract class AbstractSubscriptionManager implements SubscriptionManager {
+
+    // TODO Perhaps could use guava's SynchronizedSetMultimap? But need to check its synchronization guarantees.
+    //      That would replace the utils used for subscriptionsBySubscriber etc.
+    
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscriptionManager.class);
+
+    /** performs the actual subscription; should return the subscription parameter as the handle */
+    protected abstract <T> SubscriptionHandle subscribe(Map<String, Object> flags, Subscription<T> s);
+    /** performs the actual publishing -- ie distribution to subscriptions */
+    public abstract <T> void publish(final SensorEvent<T> event);
+
+    public static class EntitySensorToken {
+        Entity e;
+        Sensor<?> s;
+        String sName;
+        public EntitySensorToken(Entity e, Sensor<?> s) {
+            this.e = e;
+            this.s = s;
+            this.sName = (s == null) ? null : checkNotNull(s.getName(), "sensor must have non-null name: %s", s);
+        }
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(e, sName);
+        }
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (!(obj instanceof EntitySensorToken)) return false;
+            if (!Objects.equal(e, ((EntitySensorToken)obj).e)) return false;
+            if (!Objects.equal(sName, ((EntitySensorToken)obj).sName)) return false;
+            return true;
+        }
+        @Override
+        public String toString() {
+            return (e != null ? e.getId() :  "*")+":"+(s != null ? sName : "*");
+        }
+    }
+    static Object makeEntitySensorToken(Entity e, Sensor<?> s) {
+        return new EntitySensorToken(e, s);
+    }
+    static Object makeEntitySensorToken(SensorEvent<?> se) {
+        return makeEntitySensorToken(se.getSource(), se.getSensor());
+    }
+    
+    /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */
+    public final <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, listener);
+    }
+ 
+    /**
+     * This implementation handles the following flags, in addition to those described in the {@link SubscriptionManager}
+     * interface:
+     * <ul>
+     * <li>subscriberExecutionManagerTag - a tag to pass to execution manager (without setting any execution semantics / TaskPreprocessor);
+     *      if not supplied and there is a subscriber, this will be inferred from the subscriber and set up with SingleThreadedScheduler
+     * <li>eventFilter - a Predicate&lt;SensorEvent&gt; instance to filter what events are delivered
+     * </ul>
+     * 
+     * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)
+     */
+    public final <T> SubscriptionHandle subscribe(Map<String, Object> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        return subscribe(flags, new Subscription<T>(producer, sensor, listener));
+    }
+        
+    /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */
+    public final <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, listener);
+    }
+
+    /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */
+    public final  <T> SubscriptionHandle subscribeToChildren(Map<String, Object> flags, final Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        Predicate<SensorEvent<T>> eventFilter = new Predicate<SensorEvent<T>>() {
+            public boolean apply(SensorEvent<T> input) {
+                return parent != null && input.getSource() != null && parent.equals(input.getSource().getParent());
+            }
+        };
+        flags.put("eventFilter", eventFilter);
+        return subscribe(flags, null, sensor, listener);
+    }
+
+    /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */
+    public final <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, listener);
+    }
+
+    /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */
+    public final  <T> SubscriptionHandle subscribeToMembers(Map<String, Object> flags, final Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        Predicate<SensorEvent<T>> eventFilter = new Predicate<SensorEvent<T>>() {
+            public boolean apply(SensorEvent<T> input) {
+                return parent.getMembers().contains(input.getSource());
+            }
+        };
+        flags.put("eventFilter", eventFilter);
+        return subscribe(flags, null, sensor, listener);
+    }
+
+    protected <T> Object getSubscriber(Map<String, Object> flags, Subscription<T> s) {
+        return s.subscriber!=null ? s.subscriber : flags.containsKey("subscriber") ? flags.remove("subscriber") : s.listener;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java
new file mode 100644
index 0000000..5d6a057
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import org.apache.brooklyn.api.management.AccessController;
+
+import com.google.common.annotations.Beta;
+
+@Beta
+public interface AccessManager {
+
+    AccessController getAccessController();
+    
+    boolean isLocationProvisioningAllowed();
+
+    boolean isLocationManagementAllowed();
+
+    boolean isEntityManagementAllowed();
+
+    void setLocationProvisioningAllowed(boolean allowed);
+    
+    void setLocationManagementAllowed(boolean allowed);
+    
+    void setEntityManagementAllowed(boolean allowed);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java
new file mode 100644
index 0000000..038ec90
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.brooklyn.api.management.ExecutionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.task.BasicExecutionManager;
+import brooklyn.util.task.SingleThreadedScheduler;
+
+public class AsyncCollectionChangeAdapter<Item> implements CollectionChangeListener<Item> {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectionChangeAdapter.class);
+
+    private final ExecutionManager executor;
+    private final CollectionChangeListener<Item> delegate;
+    
+    public AsyncCollectionChangeAdapter(ExecutionManager executor, CollectionChangeListener<Item> delegate) {
+        this.executor = checkNotNull(executor, "executor");
+        this.delegate = checkNotNull(delegate, "delegate");
+        ((BasicExecutionManager) executor).setTaskSchedulerForTag(delegate, SingleThreadedScheduler.class);
+    }
+
+    @Override
+    public void onItemAdded(final Item item) {
+        executor.submit(MutableMap.of("tag", delegate), new Runnable() {
+            public void run() {
+                try {
+                    delegate.onItemAdded(item);
+                } catch (Throwable t) {
+                    LOG.warn("Error notifying listener of itemAdded("+item+")", t);
+                    Exceptions.propagate(t);
+                }
+            }
+        });
+    }
+    
+    @Override
+    public void onItemRemoved(final Item item) {
+        executor.submit(MutableMap.of("tag", delegate), new Runnable() {
+            public void run() {
+                try {
+                    delegate.onItemRemoved(item);
+                } catch (Throwable t) {
+                    LOG.warn("Error notifying listener of itemAdded("+item+")", t);
+                    Exceptions.propagate(t);
+                }
+            }
+        });
+    }
+
+    @Override
+    public int hashCode() {
+        return delegate.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        return (other instanceof AsyncCollectionChangeAdapter) && 
+                delegate.equals(((AsyncCollectionChangeAdapter<?>) other).delegate);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java
new file mode 100644
index 0000000..04aa18f
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import static brooklyn.util.JavaGroovyEquivalents.mapOf;
+import groovy.lang.Closure;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.SubscriptionContext;
+import org.apache.brooklyn.api.management.SubscriptionHandle;
+import org.apache.brooklyn.api.management.SubscriptionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+/**
+ * A {@link SubscriptionContext} for an entity or other user of a {@link SubscriptionManager}.
+ */
+public class BasicSubscriptionContext implements SubscriptionContext {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(BasicSubscriptionContext.class);
+
+    private final SubscriptionManager manager;
+    private final Object subscriber;
+    private final Map<String,Object> flags;
+
+    public BasicSubscriptionContext(SubscriptionManager manager, Object subscriber) {
+        this(Collections.<String,Object>emptyMap(), manager, subscriber);
+    }
+    
+    public BasicSubscriptionContext(Map<String, ?> flags, SubscriptionManager manager, Object subscriber) {
+        this.manager = manager;
+        this.subscriber = subscriber;
+        this.flags = mapOf("subscriber", subscriber);
+        if (flags!=null) this.flags.putAll(flags);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, Closure c) {
+        return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, c);
+    }
+    
+    @SuppressWarnings("rawtypes")
+    public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, Closure c) {
+        return subscribe(newFlags, producer, sensor, toSensorEventListener(c));        
+    }
+
+    @Override
+    public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, listener);
+    }
+    
+    @Override
+    public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
+        if (newFlags != null) subscriptionFlags.putAll(newFlags);
+        return manager.subscribe(subscriptionFlags, producer, sensor, listener);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, Closure c) {
+        return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, c);
+    }
+    
+    @SuppressWarnings("rawtypes")
+    public <T> SubscriptionHandle subscribeToChildren(Map<String, Object> newFlags, Entity parent, Sensor<T> sensor, Closure c) {
+        return subscribeToChildren(newFlags, parent, sensor, toSensorEventListener(c));
+    }
+
+    @Override
+    public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, listener);
+    }
+    
+    @Override
+    public <T> SubscriptionHandle subscribeToChildren(Map<String, Object> newFlags, Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
+        if (newFlags != null) subscriptionFlags.putAll(newFlags);
+        return manager.subscribeToChildren(subscriptionFlags, parent, sensor, listener);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, Closure c) {
+        return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, c);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public <T> SubscriptionHandle subscribeToMembers(Map<String, Object> newFlags, Group parent, Sensor<T> sensor, Closure c) {
+        return subscribeToMembers(newFlags, parent, sensor, toSensorEventListener(c));
+    }
+    
+    @Override
+    public <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, listener);
+    }
+    
+    @Override
+    public <T> SubscriptionHandle subscribeToMembers(Map<String, Object> newFlags, Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+        Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
+        if (newFlags != null) subscriptionFlags.putAll(newFlags);
+        return manager.subscribeToMembers(subscriptionFlags, parent, sensor, listener);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public boolean unsubscribe(SubscriptionHandle subscriptionId) {
+        Preconditions.checkNotNull(subscriptionId, "subscriptionId must not be null");
+        Preconditions.checkArgument(Objects.equal(subscriber, ((Subscription) subscriptionId).subscriber), "The subscriptionId is for a different "+subscriber+"; expected "+((Subscription) subscriptionId).subscriber);
+        return manager.unsubscribe(subscriptionId);
+    }
+
+    /** @see SubscriptionManager#publish(SensorEvent) */
+    @Override
+    public <T> void publish(SensorEvent<T> event) {
+        manager.publish(event);
+    }
+
+    /** Return the subscriptions associated with this context */
+    @Override
+    public Set<SubscriptionHandle> getSubscriptions() {
+        return manager.getSubscriptionsForSubscriber(subscriber);
+    }
+
+    @Override
+    public int unsubscribeAll() {
+        int count = 0;
+        
+        // To avoid ConcurrentModificationException when copying subscriptions, need to synchronize on it
+        Set<SubscriptionHandle> subscriptions = getSubscriptions();
+        Collection<SubscriptionHandle> subscriptionsCopy;
+        synchronized (subscriptions) {
+            subscriptionsCopy = ImmutableList.copyOf(subscriptions);
+        }
+        
+        for (SubscriptionHandle s : subscriptionsCopy) {
+            count++; 
+            boolean result = unsubscribe(s); 
+            if (!result) LOG.warn("When unsubscribing from all of {}, unsubscribe of {} return false", subscriber, s);
+        }
+        return count;
+    }
+    
+    @SuppressWarnings("rawtypes")
+    private <T> SensorEventListener<T> toSensorEventListener(final Closure c) {
+        return new SensorEventListener<T>() {
+            @Override public void onEvent(SensorEvent<T> event) {
+                c.call(event);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java
new file mode 100644
index 0000000..c02ff81
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.BrooklynProperties;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity;
+import brooklyn.entity.basic.BrooklynTaskTags.WrappedStream;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.Entities;
+import brooklyn.internal.storage.BrooklynStorage;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.javalang.MemoryUsageTracker;
+import brooklyn.util.task.BasicExecutionManager;
+import brooklyn.util.task.ExecutionListener;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Objects;
+import com.google.common.annotations.Beta;
+import com.google.common.collect.Iterables;
+
+/**
+ * Deletes record of old tasks, to prevent space leaks and the eating up of more and more memory.
+ * 
+ * The deletion policy is configurable:
+ * <ul>
+ *   <li>Period - how frequently to look at the existing tasks to delete some, if required
+ *   <li>Max task age - the time after which a completed task will be automatically deleted
+ *       (i.e. any root task completed more than maxTaskAge ago will be deleted)
+ *   <li>Max tasks per <various categories> - the maximum number of tasks to be kept for a given tag,
+ *       split into categories based on what is seeming to be useful
+ * </ul>
+ * 
+ * The default is to check with a period of one minute, deleting tasks after 30 days, 
+ * and keeping at most 100000 tasks in the system,
+ * max 1000 tasks per entity, 50 per effector within that entity, and 50 per other non-effector tag
+ * within that entity (or global if not attached to an entity).
+ * 
+ * @author aled
+ */
+public class BrooklynGarbageCollector {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BrooklynGarbageCollector.class);
+
+    public static final ConfigKey<Duration> GC_PERIOD = ConfigKeys.newDurationConfigKey(
+            "brooklyn.gc.period", "the period for checking if any tasks need to be deleted", 
+            Duration.minutes(1));
+    
+    public static final ConfigKey<Boolean> DO_SYSTEM_GC = ConfigKeys.newBooleanConfigKey(
+            "brooklyn.gc.doSystemGc", "whether to periodically call System.gc()", false);
+    
+    /** 
+     * should we check for tasks which are submitted by another but backgrounded, i.e. not a child of that task?
+     * default to yes, despite it can be some extra loops, to make sure we GC them promptly.
+     * @since 0.7.0 */
+    // work offender is {@link DynamicSequentialTask} internal job tracker, but it is marked 
+    // transient so it is destroyed prompty; there may be others, however;
+    // but OTOH it might be expensive to check for these all the time!
+    // TODO probably we can set this false (remove this and related code),
+    // and just rely on usual GC to pick up background tasks; the lifecycle of background task
+    // should normally be independent of the submitter. (DST was the exception, and marking 
+    // transient there fixes the main problem, which is when the submitter is GC'd but the submitted is not,
+    // and we don't want the submitted to show up at the root in the GUI, which it will if its
+    // submitter has been GC'd)
+    @Beta
+    public static final ConfigKey<Boolean> CHECK_SUBTASK_SUBMITTERS = ConfigKeys.newBooleanConfigKey(
+        "brooklyn.gc.checkSubtaskSubmitters", "whether for subtasks to check the submitters", true);
+
+    public static final ConfigKey<Integer> MAX_TASKS_PER_TAG = ConfigKeys.newIntegerConfigKey(
+        "brooklyn.gc.maxTasksPerTag", 
+        "the maximum number of tasks to be kept for a given tag "
+        + "within an execution context (e.g. entity); "
+        + "some broad-brush tags are excluded, and if an entity has multiple tags all tag counts must be full",
+        50);
+    
+    public static final ConfigKey<Integer> MAX_TASKS_PER_ENTITY = ConfigKeys.newIntegerConfigKey(
+        "brooklyn.gc.maxTasksPerEntity", 
+        "the maximum number of tasks to be kept for a given entity",
+        1000);
+
+    public static final ConfigKey<Integer> MAX_TASKS_GLOBAL = ConfigKeys.newIntegerConfigKey(
+        "brooklyn.gc.maxTasksGlobal", 
+        "the maximum number of tasks to be kept across the entire system",
+        100000);
+
+    public static final ConfigKey<Duration> MAX_TASK_AGE = ConfigKeys.newDurationConfigKey(
+            "brooklyn.gc.maxTaskAge", 
+            "the duration after which a completed task will be automatically deleted", 
+            Duration.days(30));
+    
+    protected final static Comparator<Task<?>> TASKS_OLDEST_FIRST_COMPARATOR = new Comparator<Task<?>>() {
+        @Override public int compare(Task<?> t1, Task<?> t2) {
+            long end1 = t1.getEndTimeUtc();
+            long end2 = t2.getEndTimeUtc();
+            return (end1 < end2) ? -1 : ((end1 == end2) ? 0 : 1);
+        }
+    };
+    
+    private final BasicExecutionManager executionManager;
+    private final BrooklynStorage storage;
+    private final BrooklynProperties brooklynProperties;
+    private final ScheduledExecutorService executor;
+    private ScheduledFuture<?> activeCollector;
+    private Map<Entity,Task<?>> unmanagedEntitiesNeedingGc = new LinkedHashMap<Entity, Task<?>>();
+    
+    private Duration gcPeriod;
+    private final boolean doSystemGc;
+    private volatile boolean running = true;
+    
+    public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExecutionManager executionManager, BrooklynStorage storage) {
+        this.executionManager = executionManager;
+        this.storage = storage;
+        this.brooklynProperties = brooklynProperties;
+
+        doSystemGc = brooklynProperties.getConfig(DO_SYSTEM_GC);
+        
+        executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+                @Override public Thread newThread(Runnable r) {
+                    return new Thread(r, "brooklyn-gc");
+                }});
+        
+        executionManager.addListener(new ExecutionListener() {
+                @Override public void onTaskDone(Task<?> task) {
+                    BrooklynGarbageCollector.this.onTaskDone(task);
+                }});
+    
+        scheduleCollector(true);
+    }
+
+    protected synchronized void scheduleCollector(boolean canInterruptCurrent) {
+        if (activeCollector != null) activeCollector.cancel(canInterruptCurrent);
+        
+        gcPeriod = brooklynProperties.getConfig(GC_PERIOD);
+        if (gcPeriod!=null) {
+            activeCollector = executor.scheduleWithFixedDelay(
+                new Runnable() {
+                    @Override public void run() {
+                        gcIteration();
+                    }
+                }, 
+                gcPeriod.toMillisecondsRoundingUp(), 
+                gcPeriod.toMillisecondsRoundingUp(), 
+                TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /** force a round of Brooklyn garbage collection */
+    public void gcIteration() {
+        try {
+            logUsage("brooklyn gc (before)");
+            gcTasks();
+            logUsage("brooklyn gc (after)");
+            
+            if (doSystemGc) {
+                // Can be very useful when tracking down OOMEs etc, where a lot of tasks are executing
+                // Empirically observed that (on OS X jvm at least) calling twice blocks - logs a significant
+                // amount of memory having been released, as though a full-gc had been run. But this is highly
+                // dependent on the JVM implementation.
+                System.gc(); System.gc();
+                logUsage("brooklyn gc (after system gc)");
+            }
+        } catch (Throwable t) {
+            Exceptions.propagateIfFatal(t);
+            LOG.warn("Error during management-context GC: "+t, t);
+            // previously we bailed on all errors, but I don't think we should do that -Alex
+        }
+    }
+
+    public void logUsage(String prefix) {
+        if (LOG.isDebugEnabled())
+            LOG.debug(prefix+" - using "+getUsageString());
+    }
+
+    public static String makeBasicUsageString() {
+        return Strings.makeSizeString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())+" / "+
+            Strings.makeSizeString(Runtime.getRuntime().totalMemory()) + " memory" +
+            " ("+Strings.makeSizeString(MemoryUsageTracker.SOFT_REFERENCES.getBytesUsed()) + " soft); "+
+            Thread.activeCount()+" threads";
+    }
+    
+    public String getUsageString() {
+        return makeBasicUsageString()+"; "+
+            "storage: " + storage.getStorageMetrics() + "; " +
+            "tasks: " +
+            executionManager.getNumActiveTasks()+" active, "+
+            executionManager.getNumIncompleteTasks()+" unfinished; "+
+            executionManager.getNumInMemoryTasks()+" remembered, "+
+            executionManager.getTotalTasksSubmitted()+" total submitted)";
+    }
+    
+    public void shutdownNow() {
+        running = false;
+        if (activeCollector != null) activeCollector.cancel(true);
+        if (executor != null) executor.shutdownNow();
+    }
+    
+    public void onUnmanaged(Entity entity) {
+        // defer task deletions until the entity is completely unmanaged
+        // (this is usually invoked during the stop sequence)
+        synchronized (unmanagedEntitiesNeedingGc) {
+            unmanagedEntitiesNeedingGc.put(entity, Tasks.current());
+        }
+    }
+    
+    public void deleteTasksForEntity(Entity entity) {
+        // remove all references to this entity from tasks
+        executionManager.deleteTag(entity);
+        executionManager.deleteTag(BrooklynTaskTags.tagForContextEntity(entity));
+        executionManager.deleteTag(BrooklynTaskTags.tagForCallerEntity(entity));
+        executionManager.deleteTag(BrooklynTaskTags.tagForTargetEntity(entity));
+    }
+    
+    public void onUnmanaged(Location loc) {
+        // No-op currently; no tasks are tracked through their location
+    }
+    
+    public void onTaskDone(Task<?> task) {
+        if (shouldDeleteTaskImmediately(task)) {
+            executionManager.deleteTask(task);
+        }
+    }
+    
+    /** @deprecated since 0.7.0, method moved internal until semantics are clarified; see also {@link #shouldDeleteTaskImmediately(Task)} */
+    @Deprecated
+    public boolean shouldDeleteTask(Task<?> task) {
+        return shouldDeleteTaskImmediately(task);
+    }
+    /** whether this task should be deleted on completion,
+     * because it is transient, or because it is submitted background without much context information */
+    protected boolean shouldDeleteTaskImmediately(Task<?> task) {
+        if (!task.isDone()) return false;
+        
+        Set<Object> tags = task.getTags();
+        if (tags.contains(ManagementContextInternal.TRANSIENT_TASK_TAG))
+            return true;
+        if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG))
+            return false;
+        
+        if (task.getSubmittedByTask()!=null) {
+            Task<?> parent = task.getSubmittedByTask();
+            if (executionManager.getTask(parent.getId())==null) {
+                // parent is already cleaned up
+                return true;
+            }
+            if (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)) {
+                // it is a child, let the parent manage this task's death
+                return false;
+            }
+            Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task);
+            if (associatedEntity!=null) {
+                // this is associated to an entity; destroy only if the entity is unmanaged
+                return !Entities.isManaged(associatedEntity);
+            }
+            // if not associated to an entity, then delete immediately
+            return true;
+        }
+        
+        // e.g. scheduled tasks, sensor events, etc
+        // TODO (in future may keep some of these with another limit, based on a new TagCategory)
+        // there may also be a server association for server-side tasks which should be kept
+        // (but be careful not to keep too many subscriptions!)
+        
+        return true;
+    }
+
+    /**
+     * Deletes old tasks. The age/number of tasks to keep is controlled by fields like 
+     * {@link #maxTasksPerTag} and {@link #maxTaskAge}.
+     */
+    protected synchronized int gcTasks() {
+        // TODO Must be careful with memory usage here: have seen OOME if we get crazy lots of tasks.
+        // hopefully the use new limits, filters, and use of live lists in some places (added Sep 2014) will help.
+        // 
+        // An option is for getTasksWithTag(tag) to return an ArrayList rather than a LinkedHashSet. That
+        // is a far more memory efficient data structure (e.g. 4 bytes overhead per object rather than 
+        // 32 bytes overhead per object for HashSet).
+        //
+        // More notes on optimization is in the history of this file.
+        
+        if (!running) return 0;
+        
+        Duration newPeriod = brooklynProperties.getConfig(GC_PERIOD);
+        if (!Objects.equal(gcPeriod, newPeriod)) {
+            // caller has changed period, reschedule on next run
+            scheduleCollector(false);
+        }
+    
+        expireUnmanagedEntityTasks();
+        expireAgedTasks();
+        expireTransientTasks();
+        
+        // now look at overcapacity tags, non-entity tags first
+        
+        Set<Object> taskTags = executionManager.getTaskTags();
+        
+        int maxTasksPerEntity = brooklynProperties.getConfig(MAX_TASKS_PER_ENTITY);
+        int maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG);
+        
+        Map<Object,AtomicInteger> taskNonEntityTagsOverCapacity = MutableMap.of();
+        Map<Object,AtomicInteger> taskEntityTagsOverCapacity = MutableMap.of();
+        
+        Map<Object,AtomicInteger> taskAllTagsOverCapacity = MutableMap.of();
+        
+        for (Object tag : taskTags) {
+            if (isTagIgnoredForGc(tag)) continue;
+            
+            Set<Task<?>> tasksWithTag = executionManager.tasksWithTagLiveOrNull(tag);
+            if (tasksWithTag==null) continue;
+            AtomicInteger overA = null;
+            if (tag instanceof WrappedEntity) {
+                int over = tasksWithTag.size() - maxTasksPerEntity;
+                if (over>0) {
+                    overA = new AtomicInteger(over);
+                    taskEntityTagsOverCapacity.put(tag, overA);
+                }
+            } else {
+                int over = tasksWithTag.size() - maxTasksPerTag;
+                if (over>0) {
+                    overA = new AtomicInteger(over);
+                    taskNonEntityTagsOverCapacity.put(tag, overA);
+                }
+            }
+            if (overA!=null) {
+                taskAllTagsOverCapacity.put(tag, overA);
+            }
+        }
+        
+        int deletedCount = 0;
+        deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false);
+        deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true);
+        deletedCount += expireSubTasksWhoseSubmitterIsExpired();
+        
+        int deletedGlobally = expireIfOverCapacityGlobally();
+        deletedCount += deletedGlobally;
+        if (deletedGlobally>0) deletedCount += expireSubTasksWhoseSubmitterIsExpired();
+        
+        return deletedCount;
+    }
+
+    protected static boolean isTagIgnoredForGc(Object tag) {
+        if (tag == null) return true;
+        if (tag.equals(ManagementContextInternal.EFFECTOR_TAG)) return true;
+        if (tag.equals(ManagementContextInternal.SUB_TASK_TAG)) return true;
+        if (tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return true;
+        if (tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true;
+        if (tag instanceof WrappedStream) {
+            return true;
+        }
+        
+        return false;
+    }
+    
+    protected void expireUnmanagedEntityTasks() {
+        Iterator<Entry<Entity, Task<?>>> ei;
+        synchronized (unmanagedEntitiesNeedingGc) {
+            ei = MutableSet.copyOf(unmanagedEntitiesNeedingGc.entrySet()).iterator();
+        }
+        while (ei.hasNext()) {
+            Entry<Entity, Task<?>> ee = ei.next();
+            if (Entities.isManaged(ee.getKey())) continue;
+            if (ee.getValue()!=null && !ee.getValue().isDone()) continue;
+            deleteTasksForEntity(ee.getKey());
+            synchronized (unmanagedEntitiesNeedingGc) {
+                unmanagedEntitiesNeedingGc.remove(ee.getKey());
+            }
+        }
+    }
+    
+    protected void expireAgedTasks() {
+        Duration maxTaskAge = brooklynProperties.getConfig(MAX_TASK_AGE);
+        
+        Collection<Task<?>> allTasks = executionManager.allTasksLive();
+        Collection<Task<?>> tasksToDelete = MutableList.of();
+
+        try {
+            for (Task<?> task: allTasks) {
+                if (!task.isDone()) continue;
+                if (BrooklynTaskTags.isSubTask(task)) continue;
+
+                if (maxTaskAge.isShorterThan(Duration.sinceUtc(task.getEndTimeUtc())))
+                    tasksToDelete.add(task);
+            }
+            
+        } catch (ConcurrentModificationException e) {
+            // delete what we've found so far
+            LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e);
+        }
+        
+        for (Task<?> task: tasksToDelete) {
+            executionManager.deleteTask(task);
+        }
+    }
+    
+    protected void expireTransientTasks() {
+        Set<Task<?>> transientTasks = executionManager.getTasksWithTag(BrooklynTaskTags.TRANSIENT_TASK_TAG);
+        for (Task<?> t: transientTasks) {
+            if (!t.isDone()) continue;
+            executionManager.deleteTask(t);
+        }
+    }
+    
+    protected int expireSubTasksWhoseSubmitterIsExpired() {
+        // ideally we wouldn't have this; see comments on CHECK_SUBTASK_SUBMITTERS
+        if (!brooklynProperties.getConfig(CHECK_SUBTASK_SUBMITTERS))
+            return 0;
+        
+        Collection<Task<?>> allTasks = executionManager.allTasksLive();
+        Collection<Task<?>> tasksToDelete = MutableList.of();
+        try {
+            for (Task<?> task: allTasks) {
+                if (!task.isDone()) continue;
+                Task<?> submitter = task.getSubmittedByTask();
+                // if we've leaked, ie a subtask which is not a child task, 
+                // and the submitter is GC'd, then delete this also
+                if (submitter!=null && submitter.isDone() && executionManager.getTask(submitter.getId())==null) {
+                    tasksToDelete.add(task);
+                }
+            }
+            
+        } catch (ConcurrentModificationException e) {
+            // delete what we've found so far
+            LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e);
+        }
+        
+        for (Task<?> task: tasksToDelete) {
+            executionManager.deleteTask(task);
+        }
+        return tasksToDelete.size();
+    }
+    
+    protected enum TagCategory { 
+        ENTITY, NON_ENTITY_NORMAL;
+        
+        public boolean acceptsTag(Object tag) {
+            if (isTagIgnoredForGc(tag)) return false;
+            if (tag instanceof WrappedEntity) return this==ENTITY;
+            if (this==ENTITY) return false;
+            return true;
+        }
+    } 
+
+
+    /** expires tasks which are over-capacity in all their non-entity tag categories, returned count */
+    protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) {
+        if (emptyFilterNeeded) {
+            // previous run may have decremented counts  
+            MutableList<Object> nowOkayTags = MutableList.of(); 
+            for (Map.Entry<Object,AtomicInteger> entry: taskTagsInCategoryOverCapacity.entrySet()) {
+                if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey());
+            }
+            for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag);
+        }
+        
+        if (taskTagsInCategoryOverCapacity.isEmpty())
+            return 0;
+        
+        Collection<Task<?>> tasks = executionManager.allTasksLive();
+        List<Task<?>> tasksToConsiderDeleting = MutableList.of();
+        try {
+            for (Task<?> task: tasks) {
+                if (!task.isDone()) continue;
+                
+                Set<Object> tags = task.getTags();
+
+                int categoryTags = 0, tooFullCategoryTags = 0;
+                for (Object tag: tags) {
+                    if (category.acceptsTag(tag)) {
+                        categoryTags++;
+                        if (taskTagsInCategoryOverCapacity.containsKey(tag))
+                            tooFullCategoryTags++;
+                    }
+                }
+                if (tooFullCategoryTags>0) {
+                    if (categoryTags==tooFullCategoryTags) {
+                        // all buckets are full, delete this one
+                        tasksToConsiderDeleting.add(task);
+                    } else {
+                        // if any bucket is under capacity, then give grace to the other buckets in this category
+                        for (Object tag: tags) {
+                            if (category.acceptsTag(tag)) {
+                                AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag);
+                                if (over!=null) {
+                                    if (over.decrementAndGet()<=0) {
+                                        // and remove it from over-capacity if so
+                                        taskTagsInCategoryOverCapacity.remove(tag);
+                                        if (taskTagsInCategoryOverCapacity.isEmpty())
+                                            return 0;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+
+        } catch (ConcurrentModificationException e) {
+            // do CME's happen with these data structures?
+            // if so, let's just delete what we've found so far
+            LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e);
+        }
+
+        if (LOG.isDebugEnabled())
+            LOG.debug("brooklyn-gc detected "+taskTagsInCategoryOverCapacity.size()+" "+category+" "
+                    + "tags over capacity, expiring old tasks; "
+                    + tasksToConsiderDeleting.size()+" tasks under consideration; categories are: "
+                    + taskTagsInCategoryOverCapacity);
+
+        Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR);
+        // now try deleting tasks which are overcapacity for each (non-entity) tag
+        int deleted = 0;
+        for (Task<?> task: tasksToConsiderDeleting) {
+            boolean delete = true;
+            for (Object tag: task.getTags()) {
+                if (!category.acceptsTag(tag))
+                    continue;
+                if (taskTagsInCategoryOverCapacity.get(tag)==null) {
+                    // no longer over capacity in this tag
+                    delete = false;
+                    break;
+                }
+            }
+            if (delete) {
+                // delete this and update overcapacity info
+                deleted++;
+                executionManager.deleteTask(task);
+                for (Object tag: task.getTags()) {
+                    AtomicInteger counter = taskAllTagsOverCapacity.get(tag);
+                    if (counter!=null && counter.decrementAndGet()<=0)
+                        taskTagsInCategoryOverCapacity.remove(tag);
+                }
+                if (LOG.isTraceEnabled())
+                    LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity);
+                if (taskTagsInCategoryOverCapacity.isEmpty())
+                    break;
+            }
+        }
+
+        if (LOG.isDebugEnabled())
+            LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; "
+                    + "capacities now: " + taskTagsInCategoryOverCapacity);
+        return deleted;
+    }
+
+    protected int expireIfOverCapacityGlobally() {
+        Collection<Task<?>> tasksLive = executionManager.allTasksLive();
+        if (tasksLive.size() <= brooklynProperties.getConfig(MAX_TASKS_GLOBAL))
+            return 0;
+        LOG.debug("brooklyn-gc detected "+tasksLive.size()+" tasks in memory, over global limit, looking at deleting some");
+        
+        try {
+            tasksLive = MutableList.copyOf(tasksLive);
+        } catch (ConcurrentModificationException e) {
+            tasksLive = executionManager.getTasksWithAllTags(MutableList.of());
+        }
+
+        MutableList<Task<?>> tasks = MutableList.of();
+        for (Task<?> task: tasksLive) {
+            if (task.isDone()) {
+                tasks.add(task);
+            }
+        }
+        
+        int numToDelete = tasks.size() - brooklynProperties.getConfig(MAX_TASKS_GLOBAL);
+        if (numToDelete <= 0) {
+            LOG.debug("brooklyn-gc detected only "+tasks.size()+" completed tasks in memory, not over global limit, so not deleting any");
+            return 0;
+        }
+            
+        Collections.sort(tasks, TASKS_OLDEST_FIRST_COMPARATOR);
+        
+        int numDeleted = 0;
+        while (numDeleted < numToDelete && tasks.size()>numDeleted) {
+            executionManager.deleteTask( tasks.get(numDeleted++) );
+        }
+        if (LOG.isDebugEnabled())
+            LOG.debug("brooklyn-gc deleted "+numDeleted+" tasks as was over global limit, now have "+executionManager.allTasksLive().size());
+        return numDeleted;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java
new file mode 100644
index 0000000..49afe27
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+/** Indicates how an entity/location/adjunct is treated at a given {@link ManagementContext} */
+public enum BrooklynObjectManagementMode {
+    /** item does not exist, not in memory, nor persisted (e.g. creating for first time, or finally destroying) */
+    NONEXISTENT, 
+    /** item exists or existed elsewhere, i.e. there is persisted state, but is not loaded here */
+    UNMANAGED_PERSISTED, 
+    /** item is loaded but read-only (ie not actively managed here) */
+    LOADED_READ_ONLY, 
+    /** item is actively managed here */
+    MANAGED_PRIMARY 
+}
\ No newline at end of file



Mime
View raw message