Return-Path: X-Original-To: apmail-brooklyn-commits-archive@minotaur.apache.org Delivered-To: apmail-brooklyn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0E2BD18924 for ; Sat, 15 Aug 2015 13:33:20 +0000 (UTC) Received: (qmail 4165 invoked by uid 500); 15 Aug 2015 13:33:20 -0000 Delivered-To: apmail-brooklyn-commits-archive@brooklyn.apache.org Received: (qmail 4141 invoked by uid 500); 15 Aug 2015 13:33:19 -0000 Mailing-List: contact commits-help@brooklyn.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.incubator.apache.org Delivered-To: mailing list commits@brooklyn.incubator.apache.org Received: (qmail 4132 invoked by uid 99); 15 Aug 2015 13:33:19 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 Aug 2015 13:33:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 4F081C085D for ; Sat, 15 Aug 2015 13:33:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.774 X-Spam-Level: * X-Spam-Status: No, score=1.774 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ydQiPz-hT0vD for ; Sat, 15 Aug 2015 13:33:05 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 8446142BB6 for ; Sat, 15 Aug 2015 13:33:04 +0000 (UTC) Received: (qmail 3518 invoked by uid 99); 15 Aug 2015 13:33:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 Aug 2015 13:33:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9C90EDFB8A; Sat, 15 Aug 2015 13:33:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hadrian@apache.org To: commits@brooklyn.incubator.apache.org Date: Sat, 15 Aug 2015 13:33:16 -0000 Message-Id: <442fe49f56984c1ebfaaba1b307e9276@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/33] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/management http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java b/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java new file mode 100644 index 0000000..6a133fc --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import java.io.File; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleException; +import org.osgi.framework.launch.Framework; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.BrooklynVersion; + +import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle; +import org.apache.brooklyn.api.management.ManagementContext; + +import brooklyn.config.BrooklynServerConfig; +import brooklyn.config.BrooklynServerPaths; +import brooklyn.config.ConfigKey; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Maybe; +import brooklyn.util.os.Os; +import brooklyn.util.os.Os.DeletionResult; +import brooklyn.util.osgi.Osgis; +import brooklyn.util.osgi.Osgis.BundleFinder; +import brooklyn.util.repeat.Repeater; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; + +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +public class OsgiManager { + + private static final Logger log = LoggerFactory.getLogger(OsgiManager.class); + + public static final ConfigKey USE_OSGI = BrooklynServerConfig.USE_OSGI; + + /* see Osgis for info on starting framework etc */ + + protected ManagementContext mgmt; + protected Framework framework; + protected File osgiCacheDir; + + public OsgiManager(ManagementContext mgmt) { + this.mgmt = mgmt; + } + + public void start() { + try { + osgiCacheDir = BrooklynServerPaths.getOsgiCacheDirCleanedIfNeeded(mgmt); + + // any extra OSGi startup args could go here + framework = Osgis.newFrameworkStarted(osgiCacheDir.getAbsolutePath(), false, MutableMap.of()); + + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + + public void stop() { + try { + if (framework!=null) { + framework.stop(); + framework.waitForStop(0); // 0 means indefinite + } + } catch (BundleException e) { + throw Exceptions.propagate(e); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + if (BrooklynServerPaths.isOsgiCacheForCleaning(mgmt, osgiCacheDir)) { + // See exception reported in https://issues.apache.org/jira/browse/BROOKLYN-72 + // We almost always fail to delete he OSGi temp directory due to a concurrent modification. + // Therefore keep trying. + final AtomicReference deletionResult = new AtomicReference(); + Repeater.create("Delete OSGi cache dir") + .until(new Callable() { + public Boolean call() { + deletionResult.set(Os.deleteRecursively(osgiCacheDir)); + return deletionResult.get().wasSuccessful(); + }}) + .limitTimeTo(Duration.ONE_SECOND) + .backoffTo(Duration.millis(50)) + .run(); + if (deletionResult.get().getThrowable()!=null) { + log.debug("Unable to delete "+osgiCacheDir+" (possibly being modified concurrently?): "+deletionResult.get().getThrowable()); + } + } + osgiCacheDir = null; + framework = null; + } + + public synchronized void registerBundle(CatalogBundle bundle) { + try { + if (checkBundleInstalledThrowIfInconsistent(bundle)) { + return; + } + + Bundle b = Osgis.install(framework, bundle.getUrl()); + + checkCorrectlyInstalled(bundle, b); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + throw new IllegalStateException("Bundle from "+bundle.getUrl()+" failed to install: " + e.getMessage(), e); + } + } + + private void checkCorrectlyInstalled(CatalogBundle bundle, Bundle b) { + String nv = b.getSymbolicName()+":"+b.getVersion().toString(); + + if (!isBundleNameEqualOrAbsent(bundle, b)) { + throw new IllegalStateException("Bundle already installed as "+nv+" but user explicitly requested "+bundle); + } + + List matches = Osgis.bundleFinder(framework) + .symbolicName(b.getSymbolicName()) + .version(b.getVersion().toString()) + .findAll(); + if (matches.isEmpty()) { + log.error("OSGi could not find bundle "+nv+" in search after installing it from "+bundle.getUrl()); + } else if (matches.size()==1) { + log.debug("Bundle from "+bundle.getUrl()+" successfully installed as " + nv + " ("+b+")"); + } else { + log.warn("OSGi has multiple bundles matching "+nv+", when just installed from "+bundle.getUrl()+": "+matches+"; " + + "brooklyn will prefer the URL-based bundle for top-level references but any dependencies or " + + "import-packages will be at the mercy of OSGi. " + + "It is recommended to use distinct versions for different bundles, and the same URL for the same bundles."); + } + } + + private boolean checkBundleInstalledThrowIfInconsistent(CatalogBundle bundle) { + String bundleUrl = bundle.getUrl(); + if (bundleUrl != null) { + Maybe installedBundle = Osgis.bundleFinder(framework).requiringFromUrl(bundleUrl).find(); + if (installedBundle.isPresent()) { + Bundle b = installedBundle.get(); + String nv = b.getSymbolicName()+":"+b.getVersion().toString(); + if (!isBundleNameEqualOrAbsent(bundle, b)) { + throw new IllegalStateException("User requested bundle " + bundle + " but already installed as "+nv); + } else { + log.trace("Bundle from "+bundleUrl+" already installed as "+nv+"; not re-registering"); + } + return true; + } + } else { + Maybe installedBundle = Osgis.bundleFinder(framework).symbolicName(bundle.getSymbolicName()).version(bundle.getVersion()).find(); + if (installedBundle.isPresent()) { + log.trace("Bundle "+bundle+" installed from "+installedBundle.get().getLocation()); + } else { + throw new IllegalStateException("Bundle "+bundle+" not previously registered, but URL is empty."); + } + return true; + } + return false; + } + + public static boolean isBundleNameEqualOrAbsent(CatalogBundle bundle, Bundle b) { + return !bundle.isNamed() || + (bundle.getSymbolicName().equals(b.getSymbolicName()) && + bundle.getVersion().equals(b.getVersion().toString())); + } + + public Maybe> tryResolveClass(String type, CatalogBundle... catalogBundles) { + return tryResolveClass(type, Arrays.asList(catalogBundles)); + } + public Maybe> tryResolveClass(String type, Iterable catalogBundles) { + Map bundleProblems = MutableMap.of(); + Set extraMessages = MutableSet.of(); + for (CatalogBundle catalogBundle: catalogBundles) { + try { + Maybe bundle = findBundle(catalogBundle); + if (bundle.isPresent()) { + Bundle b = bundle.get(); + Class clazz; + //Extension bundles don't support loadClass. + //Instead load from the app classpath. + if (Osgis.isExtensionBundle(b)) { + @SuppressWarnings("unchecked") + Class c = (Class)Class.forName(type); + clazz = c; + } else { + @SuppressWarnings("unchecked") + Class c = (Class)b.loadClass(type); + clazz = c; + } + return Maybe.of(clazz); + } else { + bundleProblems.put(catalogBundle, ((Maybe.Absent)bundle).getException()); + } + + } catch (Exception e) { + // should come from classloading now; name formatting or missing bundle errors will be caught above + Exceptions.propagateIfFatal(e); + bundleProblems.put(catalogBundle, e); + + Throwable cause = e.getCause(); + if (cause != null && cause.getMessage().contains("Unresolved constraint in bundle")) { + if (BrooklynVersion.INSTANCE.getVersionFromOsgiManifest()==null) { + extraMessages.add("No brooklyn-core OSGi manifest available. OSGi will not work."); + } + if (BrooklynVersion.isDevelopmentEnvironment()) { + extraMessages.add("Your development environment may not have created necessary files. Doing a maven build then retrying may fix the issue."); + } + if (!extraMessages.isEmpty()) log.warn(Strings.join(extraMessages, " ")); + log.warn("Unresolved constraint resolving OSGi bundle "+catalogBundle+" to load "+type+": "+cause.getMessage()); + if (log.isDebugEnabled()) log.debug("Trace for OSGi resolution failure", e); + } + } + } + if (bundleProblems.size()==1) { + Throwable error = Iterables.getOnlyElement(bundleProblems.values()); + if (error instanceof ClassNotFoundException && error.getCause()!=null && error.getCause().getMessage()!=null) { + error = Exceptions.collapseIncludingAllCausalMessages(error); + } + return Maybe.absent("Unable to resolve class "+type+" in "+Iterables.getOnlyElement(bundleProblems.keySet()) + + (extraMessages.isEmpty() ? "" : " ("+Strings.join(extraMessages, " ")+")"), error); + } else { + return Maybe.absent(Exceptions.create("Unable to resolve class "+type+": "+bundleProblems + + (extraMessages.isEmpty() ? "" : " ("+Strings.join(extraMessages, " ")+")"), bundleProblems.values())); + } + } + + public Maybe findBundle(CatalogBundle catalogBundle) { + //Either fail at install time when the user supplied name:version is different + //from the one reported from the bundle + //or + //Ignore user supplied name:version when URL is supplied to be able to find the + //bundle even if it's with a different version. + // + //For now we just log a warning if there's a version discrepancy at install time, + //so prefer URL if supplied. + BundleFinder bundleFinder = Osgis.bundleFinder(framework); + if (catalogBundle.getUrl() != null) { + bundleFinder.requiringFromUrl(catalogBundle.getUrl()); + } else { + bundleFinder.symbolicName(catalogBundle.getSymbolicName()).version(catalogBundle.getVersion()); + } + return bundleFinder.find(); + } + + /** + * Iterates through catalogBundles until one contains a resource with the given name. + */ + public URL getResource(String name, Iterable catalogBundles) { + for (CatalogBundle catalogBundle: catalogBundles) { + try { + Maybe bundle = findBundle(catalogBundle); + if (bundle.isPresent()) { + URL result = bundle.get().getResource(name); + if (result!=null) return result; + } + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + } + } + return null; + } + + /** + * @return An iterable of all resources matching name in catalogBundles. + */ + public Iterable getResources(String name, Iterable catalogBundles) { + List resources = Lists.newArrayList(); + for (CatalogBundle catalogBundle : catalogBundles) { + try { + Maybe bundle = findBundle(catalogBundle); + if (bundle.isPresent()) { + Enumeration result = bundle.get().getResources(name); + resources.addAll(Collections.list(result)); + } + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + } + } + return resources; + } + + public Framework getFramework() { + return framework; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java new file mode 100644 index 0000000..1758846 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.internal; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; + +import java.net.URI; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.api.basic.BrooklynObject; +import org.apache.brooklyn.api.catalog.BrooklynCatalog; +import org.apache.brooklyn.api.catalog.CatalogItem; +import org.apache.brooklyn.api.entity.Effector; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.drivers.EntityDriverManager; +import org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager; +import org.apache.brooklyn.api.entity.rebind.RebindManager; +import org.apache.brooklyn.api.location.LocationRegistry; +import org.apache.brooklyn.api.management.ExecutionContext; +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.api.management.SubscriptionContext; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.classloading.BrooklynClassLoadingContext; +import org.apache.brooklyn.api.management.entitlement.EntitlementManager; +import org.apache.brooklyn.api.management.ha.HighAvailabilityManager; +import org.apache.brooklyn.core.management.classloading.JavaBrooklynClassLoadingContext; +import org.apache.brooklyn.core.management.entitlement.Entitlements; +import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl; + +import brooklyn.catalog.internal.BasicBrooklynCatalog; +import brooklyn.catalog.internal.CatalogInitialization; +import brooklyn.catalog.internal.CatalogUtils; +import brooklyn.config.BrooklynProperties; +import brooklyn.config.StringConfigMap; +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.drivers.BasicEntityDriverManager; +import brooklyn.entity.drivers.downloads.BasicDownloadsManager; +import brooklyn.entity.rebind.RebindManagerImpl; +import brooklyn.internal.storage.BrooklynStorage; +import brooklyn.internal.storage.DataGrid; +import brooklyn.internal.storage.DataGridFactory; +import brooklyn.internal.storage.impl.BrooklynStorageImpl; +import brooklyn.internal.storage.impl.inmemory.InMemoryDataGridFactory; + +import org.apache.brooklyn.location.basic.BasicLocationRegistry; + +import brooklyn.util.GroovyJavaMethods; +import brooklyn.util.ResourceUtils; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.config.ConfigBag; +import brooklyn.util.guava.Maybe; +import brooklyn.util.task.BasicExecutionContext; +import brooklyn.util.task.Tasks; + +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +public abstract class AbstractManagementContext implements ManagementContextInternal { + private static final Logger log = LoggerFactory.getLogger(AbstractManagementContext.class); + + private static DataGridFactory loadDataGridFactory(BrooklynProperties properties) { + String clazzName = properties.getFirst(DataGridFactory.class.getName()); + if(clazzName == null){ + clazzName = InMemoryDataGridFactory.class.getName(); + } + + Class clazz; + try{ + //todo: which classloader should we use? + clazz = LocalManagementContext.class.getClassLoader().loadClass(clazzName); + }catch(ClassNotFoundException e){ + throw new IllegalStateException(format("Could not load class [%s]",clazzName),e); + } + + Object instance; + try { + instance = clazz.newInstance(); + } catch (InstantiationException e) { + throw new IllegalStateException(format("Could not instantiate class [%s]",clazzName),e); + } catch (IllegalAccessException e) { + throw new IllegalStateException(format("Could not instantiate class [%s]",clazzName),e); + } + + if(!(instance instanceof DataGridFactory)){ + throw new IllegalStateException(format("Class [%s] not an instantiate of class [%s]",clazzName, DataGridFactory.class.getName())); + } + + return (DataGridFactory)instance; + } + + static { + ResourceUtils.addClassLoaderProvider(new Function() { + @Override + public BrooklynClassLoadingContext apply(@Nullable Object input) { + if (input instanceof EntityInternal) { + EntityInternal internal = (EntityInternal)input; + if (internal.getCatalogItemId() != null) { + CatalogItem item = CatalogUtils.getCatalogItemOptionalVersion(internal.getManagementContext(), internal.getCatalogItemId()); + if (item != null) { + return CatalogUtils.newClassLoadingContext(internal.getManagementContext(), item); + } else { + log.error("Can't find catalog item " + internal.getCatalogItemId() + + " used for instantiating entity " + internal + + ". Falling back to application classpath."); + } + } + return apply(internal.getManagementSupport()); + } + + if (input instanceof EntityManagementSupport) + return apply(((EntityManagementSupport)input).getManagementContext()); + if (input instanceof ManagementContext) + return JavaBrooklynClassLoadingContext.create((ManagementContext) input); + return null; + } + }); + } + + private final AtomicLong totalEffectorInvocationCount = new AtomicLong(); + + protected BrooklynProperties configMap; + protected BasicLocationRegistry locationRegistry; + protected final BasicBrooklynCatalog catalog; + protected ClassLoader baseClassLoader; + protected Iterable baseClassPathForScanning; + + private final RebindManager rebindManager; + private final HighAvailabilityManager highAvailabilityManager; + + protected volatile BrooklynGarbageCollector gc; + + private final EntityDriverManager entityDriverManager; + protected DownloadResolverManager downloadsManager; + + protected EntitlementManager entitlementManager; + + private final BrooklynStorage storage; + + private volatile boolean running = true; + protected boolean startupComplete = false; + protected final List errors = Collections.synchronizedList(MutableList.of()); + + protected Maybe uri = Maybe.absent(); + protected CatalogInitialization catalogInitialization; + + public AbstractManagementContext(BrooklynProperties brooklynProperties){ + this(brooklynProperties, null); + } + + public AbstractManagementContext(BrooklynProperties brooklynProperties, DataGridFactory datagridFactory) { + this.configMap = brooklynProperties; + this.entityDriverManager = new BasicEntityDriverManager(); + this.downloadsManager = BasicDownloadsManager.newDefault(configMap); + if (datagridFactory == null) { + datagridFactory = loadDataGridFactory(brooklynProperties); + } + DataGrid datagrid = datagridFactory.newDataGrid(this); + + this.catalog = new BasicBrooklynCatalog(this); + + this.storage = new BrooklynStorageImpl(datagrid); + this.rebindManager = new RebindManagerImpl(this); // TODO leaking "this" reference; yuck + this.highAvailabilityManager = new HighAvailabilityManagerImpl(this); // TODO leaking "this" reference; yuck + + this.entitlementManager = Entitlements.newManager(this, brooklynProperties); + } + + @Override + public void terminate() { + highAvailabilityManager.stop(); + running = false; + rebindManager.stop(); + storage.terminate(); + // Don't unmanage everything; different entities get given their events at different times + // so can cause problems (e.g. a group finds out that a member is unmanaged, before the + // group itself has been told that it is unmanaged). + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public boolean isStartupComplete() { + return startupComplete; + } + + @Override + public BrooklynStorage getStorage() { + return storage; + } + + @Override + public RebindManager getRebindManager() { + return rebindManager; + } + + @Override + public HighAvailabilityManager getHighAvailabilityManager() { + return highAvailabilityManager; + } + + @Override + public long getTotalEffectorInvocations() { + return totalEffectorInvocationCount.get(); + } + + @Override + public ExecutionContext getExecutionContext(Entity e) { + // BEC is a thin wrapper around EM so fine to create a new one here; but make sure it gets the real entity + if (e instanceof AbstractEntity) { + return new BasicExecutionContext(MutableMap.of("tag", BrooklynTaskTags.tagForContextEntity(e)), getExecutionManager()); + } else { + return ((EntityInternal)e).getManagementSupport().getExecutionContext(); + } + } + + @Override + public ExecutionContext getServerExecutionContext() { + // BEC is a thin wrapper around EM so fine to create a new one here + return new BasicExecutionContext(MutableMap.of("tag", BrooklynTaskTags.BROOKLYN_SERVER_TASK_TAG), getExecutionManager()); + } + + @Override + public SubscriptionContext getSubscriptionContext(Entity e) { + // BSC is a thin wrapper around SM so fine to create a new one here + return new BasicSubscriptionContext(getSubscriptionManager(), e); + } + + @Override + public EntityDriverManager getEntityDriverManager() { + return entityDriverManager; + } + + @Override + public DownloadResolverManager getEntityDownloadsManager() { + return downloadsManager; + } + + @Override + public EntitlementManager getEntitlementManager() { + return entitlementManager; + } + + protected abstract void manageIfNecessary(Entity entity, Object context); + + @Override + public Task invokeEffector(final Entity entity, final Effector eff, @SuppressWarnings("rawtypes") final Map parameters) { + return runAtEntity(entity, eff, parameters); + } + + protected T invokeEffectorMethodLocal(Entity entity, Effector eff, Object args) { + assert isManagedLocally(entity) : "cannot invoke effector method at "+this+" because it is not managed here"; + totalEffectorInvocationCount.incrementAndGet(); + Object[] transformedArgs = EffectorUtils.prepareArgsForEffector(eff, args); + return GroovyJavaMethods.invokeMethodOnMetaClass(entity, eff.getName(), transformedArgs); + } + + /** + * Method for entity to make effector happen with correct semantics (right place, right task context), + * when a method is called on that entity. + * @throws ExecutionException + */ + @Override + public T invokeEffectorMethodSync(final Entity entity, final Effector eff, final Object args) throws ExecutionException { + try { + Task current = Tasks.current(); + if (current == null || !entity.equals(BrooklynTaskTags.getContextEntity(current)) || !isManagedLocally(entity)) { + manageIfNecessary(entity, eff.getName()); + // Wrap in a task if we aren't already in a task that is tagged with this entity + Task task = runAtEntity( EffectorUtils.getTaskFlagsForEffectorInvocation(entity, eff, + ConfigBag.newInstance().configureStringKey("args", args)), + entity, + new Callable() { + public T call() { + return invokeEffectorMethodLocal(entity, eff, args); + }}); + return task.get(); + } else { + return invokeEffectorMethodLocal(entity, eff, args); + } + } catch (Exception e) { + // don't need to attach any message or warning because the Effector impl hierarchy does that (see calls to EffectorUtils.handleException) + throw new ExecutionException(e); + } + } + + /** + * Whether the master entity record is local, and sensors and effectors can be properly accessed locally. + */ + public abstract boolean isManagedLocally(Entity e); + + /** + * Causes the indicated runnable to be run at the right location for the given entity. + * + * Returns the actual task (if it is local) or a proxy task (if it is remote); + * if management for the entity has not yet started this may start it. + * + * @deprecated since 0.6.0 use effectors (or support {@code runAtEntity(Entity, Effector, Map)} if something else is needed); + * (Callable with Map flags is too open-ended, bothersome to support, and not used much) + */ + @Deprecated + public abstract Task runAtEntity(@SuppressWarnings("rawtypes") Map flags, Entity entity, Callable c); + + /** Runs the given effector in the right place for the given entity. + * The task is immediately submitted in the background, but also recorded in the queueing context (if present) + * so it appears as a child, but marked inessential so it does not fail the parent task, who will ordinarily + * call {@link Task#get()} on the object and may do their own failure handling. + */ + protected abstract Task runAtEntity(final Entity entity, final Effector eff, @SuppressWarnings("rawtypes") final Map parameters); + + @Override + public StringConfigMap getConfig() { + return configMap; + } + + @Override + public BrooklynProperties getBrooklynProperties() { + return configMap; + } + + @Override + public synchronized LocationRegistry getLocationRegistry() { + if (locationRegistry==null) locationRegistry = new BasicLocationRegistry(this); + return locationRegistry; + } + + @Override + public BrooklynCatalog getCatalog() { + if (!getCatalogInitialization().hasRunAnyInitialization()) { + // catalog init is needed; normally this will be done from start sequence, + // but if accessed early -- and in tests -- we will load it here + getCatalogInitialization().injectManagementContext(this); + getCatalogInitialization().populateUnofficial(catalog); + } + return catalog; + } + + @Override + public ClassLoader getCatalogClassLoader() { + // catalog does not have to be initialized + return catalog.getRootClassLoader(); + } + + /** + * Optional class-loader that this management context should use as its base, + * as the first-resort in the catalog, and for scanning (if scanning the default in the catalog). + * In most instances the default classloader (ManagementContext.class.getClassLoader(), assuming + * this was in the JARs used at boot time) is fine, and in those cases this method normally returns null. + * (Surefire does some weird stuff, but the default classloader is fine for loading; + * however it requires a custom base classpath to be set for scanning.) + */ + @Override + public ClassLoader getBaseClassLoader() { + return baseClassLoader; + } + + /** See {@link #getBaseClassLoader()}. Only settable once and must be invoked before catalog is loaded. */ + public void setBaseClassLoader(ClassLoader cl) { + if (baseClassLoader==cl) return; + if (baseClassLoader!=null) throw new IllegalStateException("Cannot change base class loader (in "+this+")"); + if (catalog!=null) throw new IllegalStateException("Cannot set base class after catalog has been loaded (in "+this+")"); + this.baseClassLoader = cl; + } + + /** Optional mechanism for setting the classpath which should be scanned by the catalog, if the catalog + * is scanning the default classpath. Usually it infers the right thing, but some classloaders + * (e.g. surefire) do funny things which the underlying org.reflections.Reflections library can't see in to. + *

+ * This should normally be invoked early in the server startup. Setting it after the catalog is loaded will not + * take effect without an explicit internal call to do so. Once set, it can be changed prior to catalog loading + * but it cannot be changed once the catalog is loaded. + *

+ * ClasspathHelper.forJavaClassPath() is often a good argument to pass, and is used internally in some places + * when no items are found on the catalog. */ + @Override + public void setBaseClassPathForScanning(Iterable urls) { + if (Objects.equal(baseClassPathForScanning, urls)) return; + if (baseClassPathForScanning != null) { + if (catalog==null) + log.warn("Changing scan classpath to "+urls+" from "+baseClassPathForScanning); + else + throw new IllegalStateException("Cannot change base class path for scanning (in "+this+")"); + } + this.baseClassPathForScanning = urls; + } + /** + * @see #setBaseClassPathForScanning(Iterable) + */ + @Override + public Iterable getBaseClassPathForScanning() { + return baseClassPathForScanning; + } + + public BrooklynGarbageCollector getGarbageCollector() { + return gc; + } + + @Override + public void setManagementNodeUri(URI uri) { + this.uri = Maybe.of(checkNotNull(uri, "uri")); + } + + @Override + public Maybe getManagementNodeUri() { + return uri; + } + + private Object catalogInitMutex = new Object(); + @Override + public CatalogInitialization getCatalogInitialization() { + synchronized (catalogInitMutex) { + if (catalogInitialization!=null) return catalogInitialization; + CatalogInitialization ci = new CatalogInitialization(); + setCatalogInitialization(ci); + return ci; + } + } + + @Override + public void setCatalogInitialization(CatalogInitialization catalogInitialization) { + synchronized (catalogInitMutex) { + Preconditions.checkNotNull(catalogInitialization, "initialization must not be null"); + if (this.catalogInitialization!=null && this.catalogInitialization != catalogInitialization) + throw new IllegalStateException("Changing catalog init from "+this.catalogInitialization+" to "+catalogInitialization+"; changes not permitted"); + catalogInitialization.injectManagementContext(this); + this.catalogInitialization = catalogInitialization; + } + } + + public BrooklynObject lookup(String id) { + return lookup(id, BrooklynObject.class); + } + + @SuppressWarnings("unchecked") + public T lookup(String id, Class type) { + Object result; + result = getEntityManager().getEntity(id); + if (result!=null && type.isInstance(result)) return (T)result; + + result = getLocationManager().getLocation(id); + if (result!=null && type.isInstance(result)) return (T)result; + + // TODO policies, enrichers, feeds + return null; + } + + @Override + public List errors() { + return errors; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java new file mode 100644 index 0000000..bec041d --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.internal; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collections; +import java.util.Map; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.management.SubscriptionHandle; +import org.apache.brooklyn.api.management.SubscriptionManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects; +import com.google.common.base.Predicate; + +public abstract class AbstractSubscriptionManager implements SubscriptionManager { + + // TODO Perhaps could use guava's SynchronizedSetMultimap? But need to check its synchronization guarantees. + // That would replace the utils used for subscriptionsBySubscriber etc. + + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscriptionManager.class); + + /** performs the actual subscription; should return the subscription parameter as the handle */ + protected abstract SubscriptionHandle subscribe(Map flags, Subscription s); + /** performs the actual publishing -- ie distribution to subscriptions */ + public abstract void publish(final SensorEvent event); + + public static class EntitySensorToken { + Entity e; + Sensor s; + String sName; + public EntitySensorToken(Entity e, Sensor s) { + this.e = e; + this.s = s; + this.sName = (s == null) ? null : checkNotNull(s.getName(), "sensor must have non-null name: %s", s); + } + @Override + public int hashCode() { + return Objects.hashCode(e, sName); + } + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof EntitySensorToken)) return false; + if (!Objects.equal(e, ((EntitySensorToken)obj).e)) return false; + if (!Objects.equal(sName, ((EntitySensorToken)obj).sName)) return false; + return true; + } + @Override + public String toString() { + return (e != null ? e.getId() : "*")+":"+(s != null ? sName : "*"); + } + } + static Object makeEntitySensorToken(Entity e, Sensor s) { + return new EntitySensorToken(e, s); + } + static Object makeEntitySensorToken(SensorEvent se) { + return makeEntitySensorToken(se.getSource(), se.getSensor()); + } + + /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */ + public final SubscriptionHandle subscribe(Entity producer, Sensor sensor, SensorEventListener listener) { + return subscribe(Collections.emptyMap(), producer, sensor, listener); + } + + /** + * This implementation handles the following flags, in addition to those described in the {@link SubscriptionManager} + * interface: + *

    + *
  • subscriberExecutionManagerTag - a tag to pass to execution manager (without setting any execution semantics / TaskPreprocessor); + * if not supplied and there is a subscriber, this will be inferred from the subscriber and set up with SingleThreadedScheduler + *
  • eventFilter - a Predicate<SensorEvent> instance to filter what events are delivered + *
+ * + * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) + */ + public final SubscriptionHandle subscribe(Map flags, Entity producer, Sensor sensor, SensorEventListener listener) { + return subscribe(flags, new Subscription(producer, sensor, listener)); + } + + /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */ + public final SubscriptionHandle subscribeToChildren(Entity parent, Sensor sensor, SensorEventListener listener) { + return subscribeToChildren(Collections.emptyMap(), parent, sensor, listener); + } + + /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */ + public final SubscriptionHandle subscribeToChildren(Map flags, final Entity parent, Sensor sensor, SensorEventListener listener) { + Predicate> eventFilter = new Predicate>() { + public boolean apply(SensorEvent input) { + return parent != null && input.getSource() != null && parent.equals(input.getSource().getParent()); + } + }; + flags.put("eventFilter", eventFilter); + return subscribe(flags, null, sensor, listener); + } + + /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */ + public final SubscriptionHandle subscribeToMembers(Group parent, Sensor sensor, SensorEventListener listener) { + return subscribeToMembers(Collections.emptyMap(), parent, sensor, listener); + } + + /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */ + public final SubscriptionHandle subscribeToMembers(Map flags, final Group parent, Sensor sensor, SensorEventListener listener) { + Predicate> eventFilter = new Predicate>() { + public boolean apply(SensorEvent input) { + return parent.getMembers().contains(input.getSource()); + } + }; + flags.put("eventFilter", eventFilter); + return subscribe(flags, null, sensor, listener); + } + + protected Object getSubscriber(Map flags, Subscription s) { + return s.subscriber!=null ? s.subscriber : flags.containsKey("subscriber") ? flags.remove("subscriber") : s.listener; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java new file mode 100644 index 0000000..5d6a057 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.internal; + +import org.apache.brooklyn.api.management.AccessController; + +import com.google.common.annotations.Beta; + +@Beta +public interface AccessManager { + + AccessController getAccessController(); + + boolean isLocationProvisioningAllowed(); + + boolean isLocationManagementAllowed(); + + boolean isEntityManagementAllowed(); + + void setLocationProvisioningAllowed(boolean allowed); + + void setLocationManagementAllowed(boolean allowed); + + void setEntityManagementAllowed(boolean allowed); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java new file mode 100644 index 0000000..038ec90 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.internal; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.brooklyn.api.management.ExecutionManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.task.BasicExecutionManager; +import brooklyn.util.task.SingleThreadedScheduler; + +public class AsyncCollectionChangeAdapter implements CollectionChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectionChangeAdapter.class); + + private final ExecutionManager executor; + private final CollectionChangeListener delegate; + + public AsyncCollectionChangeAdapter(ExecutionManager executor, CollectionChangeListener delegate) { + this.executor = checkNotNull(executor, "executor"); + this.delegate = checkNotNull(delegate, "delegate"); + ((BasicExecutionManager) executor).setTaskSchedulerForTag(delegate, SingleThreadedScheduler.class); + } + + @Override + public void onItemAdded(final Item item) { + executor.submit(MutableMap.of("tag", delegate), new Runnable() { + public void run() { + try { + delegate.onItemAdded(item); + } catch (Throwable t) { + LOG.warn("Error notifying listener of itemAdded("+item+")", t); + Exceptions.propagate(t); + } + } + }); + } + + @Override + public void onItemRemoved(final Item item) { + executor.submit(MutableMap.of("tag", delegate), new Runnable() { + public void run() { + try { + delegate.onItemRemoved(item); + } catch (Throwable t) { + LOG.warn("Error notifying listener of itemAdded("+item+")", t); + Exceptions.propagate(t); + } + } + }); + } + + @Override + public int hashCode() { + return delegate.hashCode(); + } + + @Override + public boolean equals(Object other) { + return (other instanceof AsyncCollectionChangeAdapter) && + delegate.equals(((AsyncCollectionChangeAdapter) other).delegate); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java new file mode 100644 index 0000000..04aa18f --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.internal; + +import static brooklyn.util.JavaGroovyEquivalents.mapOf; +import groovy.lang.Closure; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.management.SubscriptionContext; +import org.apache.brooklyn.api.management.SubscriptionHandle; +import org.apache.brooklyn.api.management.SubscriptionManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; + +/** + * A {@link SubscriptionContext} for an entity or other user of a {@link SubscriptionManager}. + */ +public class BasicSubscriptionContext implements SubscriptionContext { + + private static final Logger LOG = LoggerFactory.getLogger(BasicSubscriptionContext.class); + + private final SubscriptionManager manager; + private final Object subscriber; + private final Map flags; + + public BasicSubscriptionContext(SubscriptionManager manager, Object subscriber) { + this(Collections.emptyMap(), manager, subscriber); + } + + public BasicSubscriptionContext(Map flags, SubscriptionManager manager, Object subscriber) { + this.manager = manager; + this.subscriber = subscriber; + this.flags = mapOf("subscriber", subscriber); + if (flags!=null) this.flags.putAll(flags); + } + + @SuppressWarnings("rawtypes") + public SubscriptionHandle subscribe(Entity producer, Sensor sensor, Closure c) { + return subscribe(Collections.emptyMap(), producer, sensor, c); + } + + @SuppressWarnings("rawtypes") + public SubscriptionHandle subscribe(Map newFlags, Entity producer, Sensor sensor, Closure c) { + return subscribe(newFlags, producer, sensor, toSensorEventListener(c)); + } + + @Override + public SubscriptionHandle subscribe(Entity producer, Sensor sensor, SensorEventListener listener) { + return subscribe(Collections.emptyMap(), producer, sensor, listener); + } + + @Override + public SubscriptionHandle subscribe(Map newFlags, Entity producer, Sensor sensor, SensorEventListener listener) { + Map subscriptionFlags = Maps.newLinkedHashMap(flags); + if (newFlags != null) subscriptionFlags.putAll(newFlags); + return manager.subscribe(subscriptionFlags, producer, sensor, listener); + } + + @SuppressWarnings("rawtypes") + public SubscriptionHandle subscribeToChildren(Entity parent, Sensor sensor, Closure c) { + return subscribeToChildren(Collections.emptyMap(), parent, sensor, c); + } + + @SuppressWarnings("rawtypes") + public SubscriptionHandle subscribeToChildren(Map newFlags, Entity parent, Sensor sensor, Closure c) { + return subscribeToChildren(newFlags, parent, sensor, toSensorEventListener(c)); + } + + @Override + public SubscriptionHandle subscribeToChildren(Entity parent, Sensor sensor, SensorEventListener listener) { + return subscribeToChildren(Collections.emptyMap(), parent, sensor, listener); + } + + @Override + public SubscriptionHandle subscribeToChildren(Map newFlags, Entity parent, Sensor sensor, SensorEventListener listener) { + Map subscriptionFlags = Maps.newLinkedHashMap(flags); + if (newFlags != null) subscriptionFlags.putAll(newFlags); + return manager.subscribeToChildren(subscriptionFlags, parent, sensor, listener); + } + + @SuppressWarnings("rawtypes") + public SubscriptionHandle subscribeToMembers(Group parent, Sensor sensor, Closure c) { + return subscribeToMembers(Collections.emptyMap(), parent, sensor, c); + } + + @SuppressWarnings("rawtypes") + public SubscriptionHandle subscribeToMembers(Map newFlags, Group parent, Sensor sensor, Closure c) { + return subscribeToMembers(newFlags, parent, sensor, toSensorEventListener(c)); + } + + @Override + public SubscriptionHandle subscribeToMembers(Group parent, Sensor sensor, SensorEventListener listener) { + return subscribeToMembers(Collections.emptyMap(), parent, sensor, listener); + } + + @Override + public SubscriptionHandle subscribeToMembers(Map newFlags, Group parent, Sensor sensor, SensorEventListener listener) { + Map subscriptionFlags = Maps.newLinkedHashMap(flags); + if (newFlags != null) subscriptionFlags.putAll(newFlags); + return manager.subscribeToMembers(subscriptionFlags, parent, sensor, listener); + } + + @SuppressWarnings("rawtypes") + @Override + public boolean unsubscribe(SubscriptionHandle subscriptionId) { + Preconditions.checkNotNull(subscriptionId, "subscriptionId must not be null"); + Preconditions.checkArgument(Objects.equal(subscriber, ((Subscription) subscriptionId).subscriber), "The subscriptionId is for a different "+subscriber+"; expected "+((Subscription) subscriptionId).subscriber); + return manager.unsubscribe(subscriptionId); + } + + /** @see SubscriptionManager#publish(SensorEvent) */ + @Override + public void publish(SensorEvent event) { + manager.publish(event); + } + + /** Return the subscriptions associated with this context */ + @Override + public Set getSubscriptions() { + return manager.getSubscriptionsForSubscriber(subscriber); + } + + @Override + public int unsubscribeAll() { + int count = 0; + + // To avoid ConcurrentModificationException when copying subscriptions, need to synchronize on it + Set subscriptions = getSubscriptions(); + Collection subscriptionsCopy; + synchronized (subscriptions) { + subscriptionsCopy = ImmutableList.copyOf(subscriptions); + } + + for (SubscriptionHandle s : subscriptionsCopy) { + count++; + boolean result = unsubscribe(s); + if (!result) LOG.warn("When unsubscribing from all of {}, unsubscribe of {} return false", subscriber, s); + } + return count; + } + + @SuppressWarnings("rawtypes") + private SensorEventListener toSensorEventListener(final Closure c) { + return new SensorEventListener() { + @Override public void onEvent(SensorEvent event) { + c.call(event); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java new file mode 100644 index 0000000..c02ff81 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.internal; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.management.HasTaskChildren; +import org.apache.brooklyn.api.management.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.BrooklynProperties; +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity; +import brooklyn.entity.basic.BrooklynTaskTags.WrappedStream; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.Entities; +import brooklyn.internal.storage.BrooklynStorage; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.javalang.MemoryUsageTracker; +import brooklyn.util.task.BasicExecutionManager; +import brooklyn.util.task.ExecutionListener; +import brooklyn.util.task.Tasks; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; + +import com.google.common.base.Objects; +import com.google.common.annotations.Beta; +import com.google.common.collect.Iterables; + +/** + * Deletes record of old tasks, to prevent space leaks and the eating up of more and more memory. + * + * The deletion policy is configurable: + *
    + *
  • Period - how frequently to look at the existing tasks to delete some, if required + *
  • Max task age - the time after which a completed task will be automatically deleted + * (i.e. any root task completed more than maxTaskAge ago will be deleted) + *
  • Max tasks per - the maximum number of tasks to be kept for a given tag, + * split into categories based on what is seeming to be useful + *
+ * + * The default is to check with a period of one minute, deleting tasks after 30 days, + * and keeping at most 100000 tasks in the system, + * max 1000 tasks per entity, 50 per effector within that entity, and 50 per other non-effector tag + * within that entity (or global if not attached to an entity). + * + * @author aled + */ +public class BrooklynGarbageCollector { + + private static final Logger LOG = LoggerFactory.getLogger(BrooklynGarbageCollector.class); + + public static final ConfigKey GC_PERIOD = ConfigKeys.newDurationConfigKey( + "brooklyn.gc.period", "the period for checking if any tasks need to be deleted", + Duration.minutes(1)); + + public static final ConfigKey DO_SYSTEM_GC = ConfigKeys.newBooleanConfigKey( + "brooklyn.gc.doSystemGc", "whether to periodically call System.gc()", false); + + /** + * should we check for tasks which are submitted by another but backgrounded, i.e. not a child of that task? + * default to yes, despite it can be some extra loops, to make sure we GC them promptly. + * @since 0.7.0 */ + // work offender is {@link DynamicSequentialTask} internal job tracker, but it is marked + // transient so it is destroyed prompty; there may be others, however; + // but OTOH it might be expensive to check for these all the time! + // TODO probably we can set this false (remove this and related code), + // and just rely on usual GC to pick up background tasks; the lifecycle of background task + // should normally be independent of the submitter. (DST was the exception, and marking + // transient there fixes the main problem, which is when the submitter is GC'd but the submitted is not, + // and we don't want the submitted to show up at the root in the GUI, which it will if its + // submitter has been GC'd) + @Beta + public static final ConfigKey CHECK_SUBTASK_SUBMITTERS = ConfigKeys.newBooleanConfigKey( + "brooklyn.gc.checkSubtaskSubmitters", "whether for subtasks to check the submitters", true); + + public static final ConfigKey MAX_TASKS_PER_TAG = ConfigKeys.newIntegerConfigKey( + "brooklyn.gc.maxTasksPerTag", + "the maximum number of tasks to be kept for a given tag " + + "within an execution context (e.g. entity); " + + "some broad-brush tags are excluded, and if an entity has multiple tags all tag counts must be full", + 50); + + public static final ConfigKey MAX_TASKS_PER_ENTITY = ConfigKeys.newIntegerConfigKey( + "brooklyn.gc.maxTasksPerEntity", + "the maximum number of tasks to be kept for a given entity", + 1000); + + public static final ConfigKey MAX_TASKS_GLOBAL = ConfigKeys.newIntegerConfigKey( + "brooklyn.gc.maxTasksGlobal", + "the maximum number of tasks to be kept across the entire system", + 100000); + + public static final ConfigKey MAX_TASK_AGE = ConfigKeys.newDurationConfigKey( + "brooklyn.gc.maxTaskAge", + "the duration after which a completed task will be automatically deleted", + Duration.days(30)); + + protected final static Comparator> TASKS_OLDEST_FIRST_COMPARATOR = new Comparator>() { + @Override public int compare(Task t1, Task t2) { + long end1 = t1.getEndTimeUtc(); + long end2 = t2.getEndTimeUtc(); + return (end1 < end2) ? -1 : ((end1 == end2) ? 0 : 1); + } + }; + + private final BasicExecutionManager executionManager; + private final BrooklynStorage storage; + private final BrooklynProperties brooklynProperties; + private final ScheduledExecutorService executor; + private ScheduledFuture activeCollector; + private Map> unmanagedEntitiesNeedingGc = new LinkedHashMap>(); + + private Duration gcPeriod; + private final boolean doSystemGc; + private volatile boolean running = true; + + public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExecutionManager executionManager, BrooklynStorage storage) { + this.executionManager = executionManager; + this.storage = storage; + this.brooklynProperties = brooklynProperties; + + doSystemGc = brooklynProperties.getConfig(DO_SYSTEM_GC); + + executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override public Thread newThread(Runnable r) { + return new Thread(r, "brooklyn-gc"); + }}); + + executionManager.addListener(new ExecutionListener() { + @Override public void onTaskDone(Task task) { + BrooklynGarbageCollector.this.onTaskDone(task); + }}); + + scheduleCollector(true); + } + + protected synchronized void scheduleCollector(boolean canInterruptCurrent) { + if (activeCollector != null) activeCollector.cancel(canInterruptCurrent); + + gcPeriod = brooklynProperties.getConfig(GC_PERIOD); + if (gcPeriod!=null) { + activeCollector = executor.scheduleWithFixedDelay( + new Runnable() { + @Override public void run() { + gcIteration(); + } + }, + gcPeriod.toMillisecondsRoundingUp(), + gcPeriod.toMillisecondsRoundingUp(), + TimeUnit.MILLISECONDS); + } + } + + /** force a round of Brooklyn garbage collection */ + public void gcIteration() { + try { + logUsage("brooklyn gc (before)"); + gcTasks(); + logUsage("brooklyn gc (after)"); + + if (doSystemGc) { + // Can be very useful when tracking down OOMEs etc, where a lot of tasks are executing + // Empirically observed that (on OS X jvm at least) calling twice blocks - logs a significant + // amount of memory having been released, as though a full-gc had been run. But this is highly + // dependent on the JVM implementation. + System.gc(); System.gc(); + logUsage("brooklyn gc (after system gc)"); + } + } catch (Throwable t) { + Exceptions.propagateIfFatal(t); + LOG.warn("Error during management-context GC: "+t, t); + // previously we bailed on all errors, but I don't think we should do that -Alex + } + } + + public void logUsage(String prefix) { + if (LOG.isDebugEnabled()) + LOG.debug(prefix+" - using "+getUsageString()); + } + + public static String makeBasicUsageString() { + return Strings.makeSizeString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())+" / "+ + Strings.makeSizeString(Runtime.getRuntime().totalMemory()) + " memory" + + " ("+Strings.makeSizeString(MemoryUsageTracker.SOFT_REFERENCES.getBytesUsed()) + " soft); "+ + Thread.activeCount()+" threads"; + } + + public String getUsageString() { + return makeBasicUsageString()+"; "+ + "storage: " + storage.getStorageMetrics() + "; " + + "tasks: " + + executionManager.getNumActiveTasks()+" active, "+ + executionManager.getNumIncompleteTasks()+" unfinished; "+ + executionManager.getNumInMemoryTasks()+" remembered, "+ + executionManager.getTotalTasksSubmitted()+" total submitted)"; + } + + public void shutdownNow() { + running = false; + if (activeCollector != null) activeCollector.cancel(true); + if (executor != null) executor.shutdownNow(); + } + + public void onUnmanaged(Entity entity) { + // defer task deletions until the entity is completely unmanaged + // (this is usually invoked during the stop sequence) + synchronized (unmanagedEntitiesNeedingGc) { + unmanagedEntitiesNeedingGc.put(entity, Tasks.current()); + } + } + + public void deleteTasksForEntity(Entity entity) { + // remove all references to this entity from tasks + executionManager.deleteTag(entity); + executionManager.deleteTag(BrooklynTaskTags.tagForContextEntity(entity)); + executionManager.deleteTag(BrooklynTaskTags.tagForCallerEntity(entity)); + executionManager.deleteTag(BrooklynTaskTags.tagForTargetEntity(entity)); + } + + public void onUnmanaged(Location loc) { + // No-op currently; no tasks are tracked through their location + } + + public void onTaskDone(Task task) { + if (shouldDeleteTaskImmediately(task)) { + executionManager.deleteTask(task); + } + } + + /** @deprecated since 0.7.0, method moved internal until semantics are clarified; see also {@link #shouldDeleteTaskImmediately(Task)} */ + @Deprecated + public boolean shouldDeleteTask(Task task) { + return shouldDeleteTaskImmediately(task); + } + /** whether this task should be deleted on completion, + * because it is transient, or because it is submitted background without much context information */ + protected boolean shouldDeleteTaskImmediately(Task task) { + if (!task.isDone()) return false; + + Set tags = task.getTags(); + if (tags.contains(ManagementContextInternal.TRANSIENT_TASK_TAG)) + return true; + if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) + return false; + + if (task.getSubmittedByTask()!=null) { + Task parent = task.getSubmittedByTask(); + if (executionManager.getTask(parent.getId())==null) { + // parent is already cleaned up + return true; + } + if (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)) { + // it is a child, let the parent manage this task's death + return false; + } + Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task); + if (associatedEntity!=null) { + // this is associated to an entity; destroy only if the entity is unmanaged + return !Entities.isManaged(associatedEntity); + } + // if not associated to an entity, then delete immediately + return true; + } + + // e.g. scheduled tasks, sensor events, etc + // TODO (in future may keep some of these with another limit, based on a new TagCategory) + // there may also be a server association for server-side tasks which should be kept + // (but be careful not to keep too many subscriptions!) + + return true; + } + + /** + * Deletes old tasks. The age/number of tasks to keep is controlled by fields like + * {@link #maxTasksPerTag} and {@link #maxTaskAge}. + */ + protected synchronized int gcTasks() { + // TODO Must be careful with memory usage here: have seen OOME if we get crazy lots of tasks. + // hopefully the use new limits, filters, and use of live lists in some places (added Sep 2014) will help. + // + // An option is for getTasksWithTag(tag) to return an ArrayList rather than a LinkedHashSet. That + // is a far more memory efficient data structure (e.g. 4 bytes overhead per object rather than + // 32 bytes overhead per object for HashSet). + // + // More notes on optimization is in the history of this file. + + if (!running) return 0; + + Duration newPeriod = brooklynProperties.getConfig(GC_PERIOD); + if (!Objects.equal(gcPeriod, newPeriod)) { + // caller has changed period, reschedule on next run + scheduleCollector(false); + } + + expireUnmanagedEntityTasks(); + expireAgedTasks(); + expireTransientTasks(); + + // now look at overcapacity tags, non-entity tags first + + Set taskTags = executionManager.getTaskTags(); + + int maxTasksPerEntity = brooklynProperties.getConfig(MAX_TASKS_PER_ENTITY); + int maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG); + + Map taskNonEntityTagsOverCapacity = MutableMap.of(); + Map taskEntityTagsOverCapacity = MutableMap.of(); + + Map taskAllTagsOverCapacity = MutableMap.of(); + + for (Object tag : taskTags) { + if (isTagIgnoredForGc(tag)) continue; + + Set> tasksWithTag = executionManager.tasksWithTagLiveOrNull(tag); + if (tasksWithTag==null) continue; + AtomicInteger overA = null; + if (tag instanceof WrappedEntity) { + int over = tasksWithTag.size() - maxTasksPerEntity; + if (over>0) { + overA = new AtomicInteger(over); + taskEntityTagsOverCapacity.put(tag, overA); + } + } else { + int over = tasksWithTag.size() - maxTasksPerTag; + if (over>0) { + overA = new AtomicInteger(over); + taskNonEntityTagsOverCapacity.put(tag, overA); + } + } + if (overA!=null) { + taskAllTagsOverCapacity.put(tag, overA); + } + } + + int deletedCount = 0; + deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false); + deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true); + deletedCount += expireSubTasksWhoseSubmitterIsExpired(); + + int deletedGlobally = expireIfOverCapacityGlobally(); + deletedCount += deletedGlobally; + if (deletedGlobally>0) deletedCount += expireSubTasksWhoseSubmitterIsExpired(); + + return deletedCount; + } + + protected static boolean isTagIgnoredForGc(Object tag) { + if (tag == null) return true; + if (tag.equals(ManagementContextInternal.EFFECTOR_TAG)) return true; + if (tag.equals(ManagementContextInternal.SUB_TASK_TAG)) return true; + if (tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return true; + if (tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true; + if (tag instanceof WrappedStream) { + return true; + } + + return false; + } + + protected void expireUnmanagedEntityTasks() { + Iterator>> ei; + synchronized (unmanagedEntitiesNeedingGc) { + ei = MutableSet.copyOf(unmanagedEntitiesNeedingGc.entrySet()).iterator(); + } + while (ei.hasNext()) { + Entry> ee = ei.next(); + if (Entities.isManaged(ee.getKey())) continue; + if (ee.getValue()!=null && !ee.getValue().isDone()) continue; + deleteTasksForEntity(ee.getKey()); + synchronized (unmanagedEntitiesNeedingGc) { + unmanagedEntitiesNeedingGc.remove(ee.getKey()); + } + } + } + + protected void expireAgedTasks() { + Duration maxTaskAge = brooklynProperties.getConfig(MAX_TASK_AGE); + + Collection> allTasks = executionManager.allTasksLive(); + Collection> tasksToDelete = MutableList.of(); + + try { + for (Task task: allTasks) { + if (!task.isDone()) continue; + if (BrooklynTaskTags.isSubTask(task)) continue; + + if (maxTaskAge.isShorterThan(Duration.sinceUtc(task.getEndTimeUtc()))) + tasksToDelete.add(task); + } + + } catch (ConcurrentModificationException e) { + // delete what we've found so far + LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e); + } + + for (Task task: tasksToDelete) { + executionManager.deleteTask(task); + } + } + + protected void expireTransientTasks() { + Set> transientTasks = executionManager.getTasksWithTag(BrooklynTaskTags.TRANSIENT_TASK_TAG); + for (Task t: transientTasks) { + if (!t.isDone()) continue; + executionManager.deleteTask(t); + } + } + + protected int expireSubTasksWhoseSubmitterIsExpired() { + // ideally we wouldn't have this; see comments on CHECK_SUBTASK_SUBMITTERS + if (!brooklynProperties.getConfig(CHECK_SUBTASK_SUBMITTERS)) + return 0; + + Collection> allTasks = executionManager.allTasksLive(); + Collection> tasksToDelete = MutableList.of(); + try { + for (Task task: allTasks) { + if (!task.isDone()) continue; + Task submitter = task.getSubmittedByTask(); + // if we've leaked, ie a subtask which is not a child task, + // and the submitter is GC'd, then delete this also + if (submitter!=null && submitter.isDone() && executionManager.getTask(submitter.getId())==null) { + tasksToDelete.add(task); + } + } + + } catch (ConcurrentModificationException e) { + // delete what we've found so far + LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e); + } + + for (Task task: tasksToDelete) { + executionManager.deleteTask(task); + } + return tasksToDelete.size(); + } + + protected enum TagCategory { + ENTITY, NON_ENTITY_NORMAL; + + public boolean acceptsTag(Object tag) { + if (isTagIgnoredForGc(tag)) return false; + if (tag instanceof WrappedEntity) return this==ENTITY; + if (this==ENTITY) return false; + return true; + } + } + + + /** expires tasks which are over-capacity in all their non-entity tag categories, returned count */ + protected int expireOverCapacityTagsInCategory(Map taskTagsInCategoryOverCapacity, Map taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) { + if (emptyFilterNeeded) { + // previous run may have decremented counts + MutableList nowOkayTags = MutableList.of(); + for (Map.Entry entry: taskTagsInCategoryOverCapacity.entrySet()) { + if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey()); + } + for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag); + } + + if (taskTagsInCategoryOverCapacity.isEmpty()) + return 0; + + Collection> tasks = executionManager.allTasksLive(); + List> tasksToConsiderDeleting = MutableList.of(); + try { + for (Task task: tasks) { + if (!task.isDone()) continue; + + Set tags = task.getTags(); + + int categoryTags = 0, tooFullCategoryTags = 0; + for (Object tag: tags) { + if (category.acceptsTag(tag)) { + categoryTags++; + if (taskTagsInCategoryOverCapacity.containsKey(tag)) + tooFullCategoryTags++; + } + } + if (tooFullCategoryTags>0) { + if (categoryTags==tooFullCategoryTags) { + // all buckets are full, delete this one + tasksToConsiderDeleting.add(task); + } else { + // if any bucket is under capacity, then give grace to the other buckets in this category + for (Object tag: tags) { + if (category.acceptsTag(tag)) { + AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag); + if (over!=null) { + if (over.decrementAndGet()<=0) { + // and remove it from over-capacity if so + taskTagsInCategoryOverCapacity.remove(tag); + if (taskTagsInCategoryOverCapacity.isEmpty()) + return 0; + } + } + } + } + } + } + } + + } catch (ConcurrentModificationException e) { + // do CME's happen with these data structures? + // if so, let's just delete what we've found so far + LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e); + } + + if (LOG.isDebugEnabled()) + LOG.debug("brooklyn-gc detected "+taskTagsInCategoryOverCapacity.size()+" "+category+" " + + "tags over capacity, expiring old tasks; " + + tasksToConsiderDeleting.size()+" tasks under consideration; categories are: " + + taskTagsInCategoryOverCapacity); + + Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR); + // now try deleting tasks which are overcapacity for each (non-entity) tag + int deleted = 0; + for (Task task: tasksToConsiderDeleting) { + boolean delete = true; + for (Object tag: task.getTags()) { + if (!category.acceptsTag(tag)) + continue; + if (taskTagsInCategoryOverCapacity.get(tag)==null) { + // no longer over capacity in this tag + delete = false; + break; + } + } + if (delete) { + // delete this and update overcapacity info + deleted++; + executionManager.deleteTask(task); + for (Object tag: task.getTags()) { + AtomicInteger counter = taskAllTagsOverCapacity.get(tag); + if (counter!=null && counter.decrementAndGet()<=0) + taskTagsInCategoryOverCapacity.remove(tag); + } + if (LOG.isTraceEnabled()) + LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity); + if (taskTagsInCategoryOverCapacity.isEmpty()) + break; + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; " + + "capacities now: " + taskTagsInCategoryOverCapacity); + return deleted; + } + + protected int expireIfOverCapacityGlobally() { + Collection> tasksLive = executionManager.allTasksLive(); + if (tasksLive.size() <= brooklynProperties.getConfig(MAX_TASKS_GLOBAL)) + return 0; + LOG.debug("brooklyn-gc detected "+tasksLive.size()+" tasks in memory, over global limit, looking at deleting some"); + + try { + tasksLive = MutableList.copyOf(tasksLive); + } catch (ConcurrentModificationException e) { + tasksLive = executionManager.getTasksWithAllTags(MutableList.of()); + } + + MutableList> tasks = MutableList.of(); + for (Task task: tasksLive) { + if (task.isDone()) { + tasks.add(task); + } + } + + int numToDelete = tasks.size() - brooklynProperties.getConfig(MAX_TASKS_GLOBAL); + if (numToDelete <= 0) { + LOG.debug("brooklyn-gc detected only "+tasks.size()+" completed tasks in memory, not over global limit, so not deleting any"); + return 0; + } + + Collections.sort(tasks, TASKS_OLDEST_FIRST_COMPARATOR); + + int numDeleted = 0; + while (numDeleted < numToDelete && tasks.size()>numDeleted) { + executionManager.deleteTask( tasks.get(numDeleted++) ); + } + if (LOG.isDebugEnabled()) + LOG.debug("brooklyn-gc deleted "+numDeleted+" tasks as was over global limit, now have "+executionManager.allTasksLive().size()); + return numDeleted; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java new file mode 100644 index 0000000..49afe27 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.internal; + +/** Indicates how an entity/location/adjunct is treated at a given {@link ManagementContext} */ +public enum BrooklynObjectManagementMode { + /** item does not exist, not in memory, nor persisted (e.g. creating for first time, or finally destroying) */ + NONEXISTENT, + /** item exists or existed elsewhere, i.e. there is persisted state, but is not loaded here */ + UNMANAGED_PERSISTED, + /** item is loaded but read-only (ie not actively managed here) */ + LOADED_READ_ONLY, + /** item is actively managed here */ + MANAGED_PRIMARY +} \ No newline at end of file