Return-Path: X-Original-To: apmail-sling-commits-archive@www.apache.org Delivered-To: apmail-sling-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2ECDC1053E for ; Mon, 13 Jan 2014 09:13:24 +0000 (UTC) Received: (qmail 89763 invoked by uid 500); 13 Jan 2014 09:02:45 -0000 Delivered-To: apmail-sling-commits-archive@sling.apache.org Received: (qmail 89560 invoked by uid 500); 13 Jan 2014 09:01:56 -0000 Mailing-List: contact commits-help@sling.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sling.apache.org Delivered-To: mailing list commits@sling.apache.org Received: (qmail 89550 invoked by uid 99); 13 Jan 2014 09:01:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Jan 2014 09:01:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Jan 2014 09:01:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 112EE23889FD; Mon, 13 Jan 2014 09:01:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1557667 - in /sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor: HealthCheckExecutorImpl.java HealthCheckResultCache.java Date: Mon, 13 Jan 2014 09:01:27 -0000 To: commits@sling.apache.org From: cziegeler@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140113090128.112EE23889FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cziegeler Date: Mon Jan 13 09:01:27 2014 New Revision: 1557667 URL: http://svn.apache.org/r1557667 Log: SLING-3278 : Provide a HealthCheckExecutor service. Simplify code for single reference execution, remove cached entry if service is unregistered Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java?rev=1557667&r1=1557666&r2=1557667&view=diff ============================================================================== --- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java (original) +++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java Mon Jan 13 09:01:27 2014 @@ -44,12 +44,17 @@ import org.apache.sling.commons.osgi.Pro import org.apache.sling.commons.threads.ModifiableThreadPoolConfig; import org.apache.sling.commons.threads.ThreadPool; import org.apache.sling.commons.threads.ThreadPoolManager; +import org.apache.sling.hc.api.HealthCheck; import org.apache.sling.hc.api.Result; import org.apache.sling.hc.api.execution.HealthCheckExecutionResult; import org.apache.sling.hc.api.execution.HealthCheckExecutor; import org.apache.sling.hc.util.HealthCheckFilter; import org.apache.sling.hc.util.HealthCheckMetadata; import org.osgi.framework.BundleContext; +import org.osgi.framework.Constants; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceEvent; +import org.osgi.framework.ServiceListener; import org.osgi.framework.ServiceReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +67,7 @@ import org.slf4j.LoggerFactory; @Component(label = "Apache Sling Health Check Executor", description = "Runs health checks for a given list of tags in parallel.", metatype = true, immediate = true) -public class HealthCheckExecutorImpl implements ExtendedHealthCheckExecutor { +public class HealthCheckExecutorImpl implements ExtendedHealthCheckExecutor, ServiceListener { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -93,7 +98,7 @@ public class HealthCheckExecutorImpl imp private long resultCacheTtlInMs; - private HealthCheckResultCache healthCheckResultCache = new HealthCheckResultCache(); + private final HealthCheckResultCache healthCheckResultCache = new HealthCheckResultCache(); private Map stillRunningFutures = new ConcurrentHashMap(); @@ -112,6 +117,14 @@ public class HealthCheckExecutorImpl imp hcThreadPool = threadPoolManager.create(hcThreadPoolConfig, "Health Check Thread Pool"); this.modified(properties); + + try { + this.bundleContext.addServiceListener(this, "(" + + Constants.OBJECTCLASS + "=" + HealthCheck.class.getName() + ")"); + } catch (final InvalidSyntaxException ise) { + // this should really never happen as the expression above is constant + throw new RuntimeException("Unexpected exception occured.", ise); + } } @Modified @@ -137,7 +150,17 @@ public class HealthCheckExecutorImpl imp @Deactivate protected final void deactivate() { threadPoolManager.release(hcThreadPool); + this.bundleContext.removeServiceListener(this); this.bundleContext = null; + this.healthCheckResultCache.clear(); + } + + @Override + public void serviceChanged(final ServiceEvent event) { + if ( event.getType() == ServiceEvent.UNREGISTERING ) { + final Long serviceId = (Long)event.getServiceReference().getProperty(Constants.SERVICE_ID); + this.healthCheckResultCache.removeCachedResult(serviceId); + } } /** @@ -162,11 +185,8 @@ public class HealthCheckExecutorImpl imp */ @Override public HealthCheckExecutionResult execute(final ServiceReference ref) { - final List result = this.execute(new ServiceReference[] {ref}); - if ( result.size() > 0 ) { - return result.get(0); - } - return null; + final HealthCheckMetadata metadata = this.getHealthCheckMetadata(ref); + return createResultsForDescriptor(metadata); } /** @@ -177,7 +197,7 @@ public class HealthCheckExecutorImpl imp stopWatch.start(); final List results = new ArrayList(); - final List healthCheckDescriptors = getHealthCheckDescriptors(healthCheckReferences); + final List healthCheckDescriptors = getHealthCheckMetadata(healthCheckReferences); createResultsForDescriptors(healthCheckDescriptors, results); @@ -209,20 +229,43 @@ public class HealthCheckExecutorImpl imp // everything else is executed in parallel via futures List futures = createOrReuseFutures(healthCheckDescriptors); - // wait for futures at most until timeout (but will return earlier if all futures are finsihed) + // wait for futures at most until timeout (but will return earlier if all futures are finished) waitForFuturesRespectingTimeout(futures); collectResultsFromFutures(futures, results); healthCheckResultCache.updateWith(results); } + private HealthCheckExecutionResult createResultsForDescriptor(final HealthCheckMetadata metadata) { + // -- All methods below check if they can transform a healthCheckDescriptor into a result + // -- if yes the descriptor is removed from the list and the result added + + // reuse cached results where possible + HealthCheckExecutionResult result; + + result = healthCheckResultCache.useValidCacheResults(metadata, resultCacheTtlInMs); + + if ( result == null ) { + // everything else is executed in parallel via futures + final HealthCheckFuture future = createOrReuseFuture(metadata); + + // wait for futures at most until timeout (but will return earlier if all futures are finished) + waitForFuturesRespectingTimeout(future); + result = collectResultFromFuture(future); + } + + healthCheckResultCache.updateWith(result); + + return result; + } + /** - * Create the health check descriptors + * Create the health check metadata */ - private List getHealthCheckDescriptors(final ServiceReference... healthCheckReferences) { + private List getHealthCheckMetadata(final ServiceReference... healthCheckReferences) { final List descriptors = new LinkedList(); for (final ServiceReference serviceReference : healthCheckReferences) { - final HealthCheckMetadata descriptor = new HealthCheckMetadata(serviceReference); + final HealthCheckMetadata descriptor = getHealthCheckMetadata(serviceReference); descriptors.add(descriptor); } @@ -230,28 +273,55 @@ public class HealthCheckExecutorImpl imp return descriptors; } - private List createOrReuseFutures(final List healthCheckDescriptors) { - List futuresForResultOfThisCall = new LinkedList(); + /** + * Create the health check metadata + */ + private HealthCheckMetadata getHealthCheckMetadata(final ServiceReference healthCheckReference) { + final HealthCheckMetadata descriptor = new HealthCheckMetadata(healthCheckReference); + return descriptor; + } - for (final HealthCheckMetadata healthCheckDescriptor : healthCheckDescriptors) { + private List createOrReuseFutures(final List healthCheckDescriptors) { + final List futuresForResultOfThisCall = new LinkedList(); - HealthCheckFuture stillRunningFuture = this.stillRunningFutures.get(healthCheckDescriptor); - HealthCheckFuture resultFuture; - if (stillRunningFuture != null && !stillRunningFuture.isDone()) { - logger.debug("Found a future that is still running for {}", healthCheckDescriptor); - resultFuture = stillRunningFuture; - } else { - logger.debug("Creating future for {}", healthCheckDescriptor); - resultFuture = new HealthCheckFuture(healthCheckDescriptor, bundleContext); - this.hcThreadPool.execute(resultFuture); - } + for (final HealthCheckMetadata md : healthCheckDescriptors) { - futuresForResultOfThisCall.add(resultFuture); + futuresForResultOfThisCall.add(createOrReuseFuture(md)); } return futuresForResultOfThisCall; } + private HealthCheckFuture createOrReuseFuture(final HealthCheckMetadata metadata) { + HealthCheckFuture stillRunningFuture = this.stillRunningFutures.get(metadata); + HealthCheckFuture resultFuture; + if (stillRunningFuture != null && !stillRunningFuture.isDone()) { + logger.debug("Found a future that is still running for {}", metadata); + resultFuture = stillRunningFuture; + } else { + logger.debug("Creating future for {}", metadata); + resultFuture = new HealthCheckFuture(metadata, bundleContext); + this.hcThreadPool.execute(resultFuture); + } + + return resultFuture; + } + + private void waitForFuturesRespectingTimeout(final HealthCheckFuture healthCheckFuture) { + StopWatch callExcutionTimeStopWatch = new StopWatch(); + callExcutionTimeStopWatch.start(); + boolean allFuturesDone; + do { + try { + Thread.sleep(50); + } catch (InterruptedException ie) { + logger.warn("Unexpected InterruptedException while waiting for healthCheckContributors", ie); + } + + allFuturesDone = healthCheckFuture.isDone(); + } while (!allFuturesDone && callExcutionTimeStopWatch.getTime() < this.timeoutInMs); + } + private void waitForFuturesRespectingTimeout(List futuresForResultOfThisCall) { StopWatch callExcutionTimeStopWatch = new StopWatch(); callExcutionTimeStopWatch.start(); @@ -272,49 +342,13 @@ public class HealthCheckExecutorImpl imp void collectResultsFromFutures(List futuresForResultOfThisCall, Collection results) { - Set resultsFromFutures = new HashSet(); + Set resultsFromFutures = new HashSet(); Iterator futuresIt = futuresForResultOfThisCall.iterator(); while (futuresIt.hasNext()) { - HealthCheckFuture future = futuresIt.next(); - ExecutionResult result; - if (future.isDone()) { - logger.debug("Health Check is done: {}", future.getHealthCheckMetadata()); - - try { - result = future.get(); - } catch (Exception e) { - logger.warn("Unexpected Exception during future.get(): " + e, e); - result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.HEALTH_CHECK_ERROR, - "Unexpected Exception during future.get(): " + e); - } - - // if the future came from a previous call remove it from stillRunningFutures - if (this.stillRunningFutures.containsKey(future.getHealthCheckMetadata())) { - this.stillRunningFutures.remove(future.getHealthCheckMetadata()); - } + final HealthCheckFuture future = futuresIt.next(); + final HealthCheckExecutionResult result = this.collectResultFromFuture(future); - } else { - logger.debug("Health Check timed out: {}", future.getHealthCheckMetadata()); - // Futures must not be cancelled as interrupting a health check might could cause a corrupted repository index - // (CrxRoundtripCheck) or ugly messages/stack traces in the log file - - this.stillRunningFutures.put(future.getHealthCheckMetadata(), future); - - // normally we turn the check into WARN (normal timeout), but if the threshold time for CRITICAL is reached for a certain - // future we turn the result CRITICAL - long futureElapsedTimeMs = new Date().getTime() - future.getCreatedTime().getTime(); - if (futureElapsedTimeMs < this.longRunningFutureThresholdForRedMs) { - result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.WARN, - "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs), futureElapsedTimeMs); - - } else { - result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.CRITICAL, - "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs) - + " (exceeding the configured threshold for CRITICAL: " - + msHumanReadable(this.longRunningFutureThresholdForRedMs) + ")", futureElapsedTimeMs); - } - } resultsFromFutures.add(result); futuresIt.remove(); } @@ -323,6 +357,49 @@ public class HealthCheckExecutorImpl imp results.addAll(resultsFromFutures); } + HealthCheckExecutionResult collectResultFromFuture(final HealthCheckFuture future) { + + HealthCheckExecutionResult result; + if (future.isDone()) { + logger.debug("Health Check is done: {}", future.getHealthCheckMetadata()); + + try { + result = future.get(); + } catch (Exception e) { + logger.warn("Unexpected Exception during future.get(): " + e, e); + result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.HEALTH_CHECK_ERROR, + "Unexpected Exception during future.get(): " + e); + } + + // if the future came from a previous call remove it from stillRunningFutures + if (this.stillRunningFutures.containsKey(future.getHealthCheckMetadata())) { + this.stillRunningFutures.remove(future.getHealthCheckMetadata()); + } + + } else { + logger.debug("Health Check timed out: {}", future.getHealthCheckMetadata()); + // Futures must not be cancelled as interrupting a health check might could cause a corrupted repository index + // (CrxRoundtripCheck) or ugly messages/stack traces in the log file + + this.stillRunningFutures.put(future.getHealthCheckMetadata(), future); + + // normally we turn the check into WARN (normal timeout), but if the threshold time for CRITICAL is reached for a certain + // future we turn the result CRITICAL + long futureElapsedTimeMs = new Date().getTime() - future.getCreatedTime().getTime(); + if (futureElapsedTimeMs < this.longRunningFutureThresholdForRedMs) { + result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.WARN, + "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs), futureElapsedTimeMs); + + } else { + result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.CRITICAL, + "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs) + + " (exceeding the configured threshold for CRITICAL: " + + msHumanReadable(this.longRunningFutureThresholdForRedMs) + ")", futureElapsedTimeMs); + } + } + return result; + } + static String msHumanReadable(final long millis) { double number = millis; Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java?rev=1557667&r1=1557666&r2=1557667&view=diff ============================================================================== --- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java (original) +++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java Mon Jan 13 09:01:27 2014 @@ -33,36 +33,47 @@ import org.slf4j.LoggerFactory; /** * Caches health check results. - * */ public class HealthCheckResultCache { + /** + * The logger. + */ private final Logger logger = LoggerFactory.getLogger(this.getClass()); + /** + * The map holding the cached results. + */ private final Map cache = new ConcurrentHashMap(); - @Override - public String toString() { - return "[HealthCheckResultCache size=" + cache.size() + "]"; - } - + /** + * Update the cache with the results + */ public void updateWith(final Collection results) { for (final HealthCheckExecutionResult result : results) { - final ExecutionResult executionResult = (ExecutionResult) result; - cache.put(executionResult.getServiceId(), result); + this.updateWith(result); } } + /** + * Update the cache with the result + */ + public void updateWith(HealthCheckExecutionResult result) { + final ExecutionResult executionResult = (ExecutionResult) result; + cache.put(executionResult.getServiceId(), result); + } + + /** + * Get the valid cache results + */ public void useValidCacheResults(final List metadatas, final Collection results, final long resultCacheTtlInMs) { - - final Set cachedResults = new TreeSet(); final Iterator checksIt = metadatas.iterator(); while (checksIt.hasNext()) { - final HealthCheckMetadata descriptor = checksIt.next(); - final HealthCheckExecutionResult result = get(descriptor, resultCacheTtlInMs); + final HealthCheckMetadata md = checksIt.next(); + final HealthCheckExecutionResult result = useValidCacheResults(md, resultCacheTtlInMs); if (result != null) { cachedResults.add(result); checksIt.remove(); @@ -72,6 +83,14 @@ public class HealthCheckResultCache { results.addAll(cachedResults); } + /** + * Return the cached result if it's still valid. + */ + public HealthCheckExecutionResult useValidCacheResults(final HealthCheckMetadata metadata, + final long resultCacheTtlInMs) { + return get(metadata, resultCacheTtlInMs); + } + private HealthCheckExecutionResult get(final HealthCheckMetadata metadata, final long resultCacheTtlInMs) { final Long key = metadata.getServiceId(); final HealthCheckExecutionResult cachedResult = cache.get(key); @@ -99,4 +118,23 @@ public class HealthCheckResultCache { return null; } + /** + * Clear the whole cache + */ + public void clear() { + this.cache.clear(); + } + + /** + * Remove entry from cache + */ + public void removeCachedResult(final Long serviceId) { + this.cache.remove(serviceId); + } + + @Override + public String toString() { + return "[HealthCheckResultCache size=" + cache.size() + "]"; + } + }